当配置审计提供的托管规则不能满足您的需求时,您可以使用函数计算新建自定义规则。当您的自定义规则被触发时,配置审计会运行对应的规则函数,并给出规则合规结果。
前提条件
背景信息
自定义规则与预设规则差异的差异如下:
- 托管规则
托管规则是配置审计已在函数计算中构建的规则函数,使用时您可以直接在配置审计控制台选择所需函数。配置审计支持的托管规则,请参见托管规则列表。
- 自定义规则
自定义规则需要您提前在函数计算中定义规则函数,使用时在配置审计控制台选择规则函数的ARN。
新建自定义规则
规则函数的代码构建
规则的本质是一段逻辑判断代码,这段代码存放在新建的函数中。在实际持续审计中,通过触发该函数的执行来实现资源评估。本函数代码主要有两个函数,具体如下:
- handler
handler为入口函数,即自定义规则触发时调用的函数。handler在新建函数时进行设置。
- put_evaluations
put_evaluations在handler中调用,返回合规结果。JSON样例如下:
#!/usr/bin/env python # -*- encoding: utf-8 -*- import logging import json from aliyunsdkcore.client import AcsClient from aliyunsdkcore.auth.credentials import StsTokenCredential from aliyunsdkcore.acs_exception.exceptions import ClientException from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkcore.request import CommonRequest logger = logging.getLogger() # 用于筛选资源类型,以英文逗号(,)分隔多个资源类型。 MATCH_RESOURCE_TYPES = '' # 合规类型 COMPLIACE_TYPE_COMPLIANT = 'COMPLIANT' COMPLIACE_TYPE_NON_COMPLIANT = 'NON_COMPLIANT' COMPLIACE_TYPE_NOT_APPLICABLE = 'NOT_APPLICABLE' COMPLIACE_TYPE_INSUFFICIENT_DATA = 'INSUFFICIENT_DATA' def handler(event, context): """ 处理函数 :param event: 事件 :param context: 上下文 :return: 评估结果 """ # 校验Event evt = validate_event(event) if not evt: return None rule_parameters = evt.get('ruleParameters') result_token = evt.get('resultToken') invoking_event = evt.get('invokingEvent') # 初始化返回值 compliance_type = COMPLIACE_TYPE_NOT_APPLICABLE annotation = None # 获取ConfigurationItem configuration_item = invoking_event.get('configurationItem') if not configuration_item: logger.error('Configuration item is empty.') return None ordering_timestamp = configuration_item.get('captureTime') resource_id = configuration_item.get('resourceId') resource_type = configuration_item.get('resourceType') # 获取评估结果 compliance_type, annotation = evaluate_configuration_item( rule_parameters, configuration_item) # 评估结果 evaluations = [ { 'complianceResourceId': resource_id, 'complianceResourceType': resource_type, 'orderingTimestamp': ordering_timestamp, 'complianceType': compliance_type, 'annotation': annotation } ] # 回写评估结果 put_evaluations(context, result_token, evaluations) return evaluations def evaluate_configuration_item(rule_parameters, configuration_item): """ 评估逻辑 :param rule_parameters: 规则参数 :param configuration_item: 配置项 :return: 评估类型,注解 """ # 初始化返回值 compliance_type = COMPLIACE_TYPE_NOT_APPLICABLE annotation = None # 获取资源类型和资源ID resource_type = configuration_item['resourceType'] full_configuration = configuration_item['configuration'] # 判断资源类型 if MATCH_RESOURCE_TYPES and resource_type not in MATCH_RESOURCE_TYPES.split(','): annotation = 'Resource type is {}, not in {}.'.format( resource_type, MATCH_RESOURCE_TYPES) return compliance_type, annotation # 判断配置信息 if not full_configuration: annotation = 'Configuration is empty.' return compliance_type, annotation # 转换为JSON configuration = parse_json(full_configuration) if not configuration: annotation = 'Configuration:{} in invald.'.format(full_configuration) return compliance_type, annotation # =========== 用户代码 start =========== # # =========== 用户代码 end =========== # return compliance_type, annotation def validate_event(event): """ 校验Event :param event: Event :return: JSON对象 """ if not event: logger.error('Event is empty.') evt = parse_json(event) logger.info('Loading event: %s .' % evt) if 'resultToken' not in evt: logger.error('ResultToken is empty.') return None if 'ruleParameters' not in evt: logger.error('RuleParameters is empty.') return None if 'invokingEvent' not in evt: logger.error('InvokingEvent is empty.') return None return evt def parse_json(content): """ JSON类型转换 :param content: JSON字符串 :return: JSON对象 """ try: return json.loads(content) except Exception as e: logger.error('Parse content:{} to json error:{}.'.format(content, e)) return None def put_evaluations(context, result_token, evaluations): """ 回调配置审计OpenAPI回写评估结果 :param context: 函数计算上下文 :param result_token: 回调令牌 :param evaluations: 评估结果 :return: None """ # 需要输入当前阿里云账号的AccessKey ID和AccessKey Secret,需要具备AliyunConfigFullAccess权限。 client = AcsClient( 'XXXX', 'XXXXXXX', 'cn-shanghai', ) # 新建request,并设置参数,domain为config.cn-shanghai.aliyuncs.com。 request = CommonRequest() request.set_domain('config.cn-shanghai.aliyuncs.com') request.set_version('2019-01-08') request.set_action_name('PutEvaluations') request.add_body_params('ResultToken', result_token) request.add_body_params('Evaluations', evaluations) request.set_method('POST') try: response = client.do_action_with_exception(request) logger.info('PutEvaluations with request: {}, response: {}.'.format(request, response)) except Exception as e: logger.error('PutEvaluations error: %s' % e)
规则函数的入参
在函数中设置的入参信息保存在ruleParameters中,其他内容在规则触发时自动生成事件信息。JSON格式如下:
{
version:"版本号",
orderingTimestamp:"命令执行开始时间",
invokingEvent:{
messageType:"消息类型",
configurationItem:{
"accountId":"阿里云账号ID",
"arn":"资源ARN",
"availabilityZone":"可用区",
"regionId":"地域ID",
"configuration":"字符串形式的资源本身配置信息,各类资源有所不同",
"configurationDiff":"配置变更内容",
"relationship":"关系",
"relationshipDiff":"关系内容变更",
"captureTime":"捕获时间",
"resourceCreationTime":"资源创建时间",
"resourceStatus":"资源状态",
"resourceId":"资源ID",
"resourceName":"资源名称",
"resourceType":"资源类型",
"supplementaryConfiguration":"补充配置",
"tags":"标签"
},
notificationCreationTimestamp:"事件消息生成时间"
},
ruleParameters:{
"key":"value"
},
resultToken:"用户在函数计算中的回调信息"
}
context参数是上下文信息,规则触发时自动带入。
context.credentials.access_key_id:"accessKey"
context.credentials.access_key_secret:"accessSecret"
context.region:"地域信息"
在文档使用中是否遇到以下问题
更多建议
匿名提交