当您通过函数计算新建自定义规则时,如果规则被触发,配置审计会运行对应的规则函数对资源进行检测,并提供资源合规评估结果。本文通过JSON示例为您介绍自定义规则函数的代码和入参。

函数代码

规则的本质是一段逻辑判断代码,这段代码存放在新建的函数中。在配置审计对资源的持续审计中,通过触发该函数的执行来实现对资源的评估。本函数代码主要有两个函数,具体如下:
  • handler

    handler为入口函数,即自定义规则触发时调用的函数。handler在新建函数时进行设置。

  • put_evaluations

    put_evaluationshandler中调用,返回合规结果。

Python示例如下:
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import logging
import json
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.auth.credentials import StsTokenCredential
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.request import CommonRequest
logger = logging.getLogger()

# 合规类型
COMPLIACE_TYPE_COMPLIANT = 'COMPLIANT'
COMPLIACE_TYPE_NON_COMPLIANT = 'NON_COMPLIANT'
COMPLIACE_TYPE_NOT_APPLICABLE = 'NOT_APPLICABLE'
COMPLIACE_TYPE_INSUFFICIENT_DATA = 'INSUFFICIENT_DATA'

# 入口函数,完成业务逻辑编排和处理。
def handler(event, context):
    """
    处理函数
    :param event:事件
    :param context:上下文
    :return:评估结果
    """
    # 校验Event,代码可直接复制。
    evt = validate_event(event)
    if not evt:
        return None

    rule_parameters = evt.get('ruleParameters')
    result_token = evt.get('resultToken')
    invoking_event = evt.get('invokingEvent')
    ordering_timestamp = evt.get('orderingTimestamp')
    annotation = None

    # 初始化返回值,根据业务场景设置规则的默认合规结果。
    compliance_type = COMPLIACE_TYPE_NOT_APPLICABLE

    # 资源配置信息。当规则触发机制设置为配置变更时,该入参有效。当您新建规则或手动执行规则时,配置审计调用函数逐个评估资源,如果资源配置变更,配置审计根据变更的资源信息自动调用函数触发一次资源评估。
    # 当规则触发机制设置为周期执行时,该入参为空。请您根据API自行实现待评估资源的查询逻辑。
    configuration_item = invoking_event.get('configurationItem')
    account_id = configuration_item.get('accountId')
    resource_id = configuration_item.get('resourceId')
    resource_type = configuration_item.get('resourceType')
    region_id = configuration_item.get('regionId')

    # 对资源进行评估,需要根据实际业务自行实现评估逻辑,以下代码仅供参考。
    compliance_type, annotation = evaluate_configuration_item(
        rule_parameters, configuration_item)

    # 设置评估结果。格式符合以下示例即可。
    evaluations = [
        {
            'accountId': account_id,
            'complianceResourceId': resource_id,
            'complianceResourceType': resource_type,
            'complianceRegionId': region_id,
            'orderingTimestamp': ordering_timestamp,
            'complianceType': compliance_type,
            'annotation': annotation
        }
    ]

# 将评估结果返回并写入配置审计,代码可直接复制。
    put_evaluations(context, result_token, evaluations)

    return evaluations

# 根据入参和资源进行评估。需要根据实际业务自行实现评估逻辑,以下代码仅供参考。
def evaluate_configuration_item(rule_parameters, configuration_item):
    """
    评估逻辑
    :param rule_parameters:规则参数
    :param configuration_item:配置项
    :return:评估类型
    """
    # 初始化返回值
    compliance_type = COMPLIACE_TYPE_NOT_APPLICABLE
    annotation = None

    # 获取资源类型和资源ID
    resource_type = configuration_item['resourceType']
    full_configuration = configuration_item['configuration']

    # 判断配置信息
    if not full_configuration:
        annotation = 'Configuration is empty.'
        return compliance_type, annotation

    # 转换为JSON
    configuration = parse_json(full_configuration)
    if not configuration:
        annotation = 'Configuration:{} in invald.'.format(full_configuration)
        return compliance_type, annotation

    return compliance_type, annotation


def validate_event(event):
    """
    校验Event
    :param event:Event
    :return:JSON对象
    """
    if not event:
        logger.error('Event is empty.')

    evt = parse_json(event)
    logger.info('Loading event: %s .' % evt)

    if 'resultToken' not in evt:
        logger.error('ResultToken is empty.')
        return None

    if 'ruleParameters' not in evt:
        logger.error('RuleParameters is empty.')
        return None

    if 'invokingEvent' not in evt:
        logger.error('InvokingEvent is empty.')
        return None

    return evt


def parse_json(content):
    """
    JSON类型转换
    :param content:JSON字符串
    :return:JSON对象
    """
    try:
        return json.loads(content)
    except Exception as e:
        logger.error('Parse content:{} to json error:{}.'.format(content, e))
        return None


# 评估结果返回,并写入配置审计,代码可直接复制。
def put_evaluations(context, result_token, evaluations):
    """
    调用API返回并写入评估结果
    :param context:函数计算上下文
    :param result_token:回调令牌
    :param evaluations:评估结果
    :return: None
    """
    # 输入当前阿里云账号的AccessKey ID和AccessKey Secret,同时具备权限AliyunConfigFullAccess。
    client = AcsClient(
        'LTAI4FxbrTniVtqg1FZW****',
        'C0GXsd6UU6ECUmrsCgPXA6OEvy****',
        'cn-shanghai',
    )

    # 新建Request,并设置参数,Domain为config.cn-shanghai.aliyuncs.com。
    request = CommonRequest()
    request.set_domain('config.cn-shanghai.aliyuncs.com')
    request.set_version('2019-01-08')
    request.set_action_name('PutEvaluations')
    request.add_body_params('ResultToken', result_token)
    request.add_body_params('Evaluations', evaluations)
    request.set_method('POST')

    try:
        response = client.do_action_with_exception(request)
        logger.info('PutEvaluations with request: {}, response: {}.'.format(request, response))
    except Exception as e:
        logger.error('PutEvaluations error: %s' % e)

                

函数入参

在函数中设置的入参信息保存在ruleParameters中,其他内容在规则触发时自动生成事件信息。JSON示例如下:
{
    "orderingTimestamp": "命令执行开始时间戳",
    "invokingEvent": {
        "messageType": "消息类型",
        "configurationItem": {
            "accountId": "阿里云账号ID",
            "arn": "资源ARN",
            "availabilityZone": "可用区",
            "regionId": "地域ID",
            "configuration": "资源的详细配置",
            "configurationDiff": "资源配置变更的具体变更项及变更前后信息",
            "relationship": "关联资源",
            "relationshipDiff": "关系内容变更",
            "captureTime": "配置审计发现资源变更事件并生成日志的时间戳",
            "resourceCreationTime": "新建资源的时间戳",
            "resourceStatus": "资源状态",
            "resourceId": "资源ID",
            "resourceName": "资源名称",
            "resourceType": "资源类型",
            "supplementaryConfiguration": "资源补充配置",
            "tags": "标签"
        }
    },
    "ruleParameters": {
        "key": "value"
    },
    "resultToken": "用户在函数计算中的回调信息"
}
Context参数是上下文信息,规则触发时自动带入。
  • context.credentials.access_key_id:"accessKey"
  • context.credentials.access_key_secret:"accessSecret"
  • context.region:"地域"