为RAM用户开启多因素认证(MFA),可以提高RAM用户的使用安全。通过配置审计的托管规则(高权限的RAM用户开启MFA),可以检测所有RAM用户是否开启MFA。如果您只想检测指定RAM用户(例如:高风险用户)是否开启MFA,请使用本文所述的自定义规则的方法。

前提条件

数据规划

数据规划如下表所示。
云服务 参数 示例
配置审计 规则名称 RAMUserMFA
规则触发机制 配置变更,周期执行
触发频率 1小时
规则入参名称 dangerousActions
期望值 ecs:*,oss:*,log:*
消息服务 主题名称 MNSTestConfig
主题地域 华东2(上海)
说明

由于配置审计部署在华东2(上海),为了减少网络损耗,建议消息服务MNS的主题地域选择华东2(上海)

访问控制 RAM用户名称 Alice
RAM用户ID 25849250231246****
默认权限策略 AliyunECSFullAccess
函数计算 服务 Ram_User
函数 RamDangerousPolicyUserBindMFA

操作流程

操作流程如下图所示。操作流程

操作步骤

  1. 新建服务。
    1. 登录函数计算控制台
    2. 在顶部菜单栏,选择地域,例如:华东2(上海)
    3. 在左侧导航栏,单击服务及函数
    4. 服务列表区域,单击新增服务
    5. 新建服务页面,服务名称输入Ram_User,取消选中绑定日志复选框。
    6. 单击提交
  2. 新建函数。
    1. 在服务Ram_User页面,单击新增函数
    2. 新建函数页面,鼠标悬浮在事件函数区域,单击配置部署
    3. 新建函数页面,所在服务选择Ram_User函数名称输入RamDangerousPolicyUserBindMFA运行环境选择Python 3,其他参数保持默认值。
      新建函数
    4. 单击新建
  3. 配置函数的环境变量。
    1. 在函数RamDangerousPolicyUserBindMFA代码执行页签,单击概览页签。
    2. 概览页签的函数属性区域,单击修改配置
    3. 修改配置页面,环境变量选择键值,输入该环境变量的键和值。
      示例
      AK 当前阿里云账号的AccessKey ID。关于如何获取AccessKey ID,请参见获取AccessKey LTAI4G6JZSANb8MZMkm1****
      SK 当前阿里云账号的AccessKey Secret。关于如何获取AccessKey Secret,请参见获取AccessKey EMLHThhpD2UJqH1DXuAKii2sI****
      ResourceTypes 资源类型。 ACS::RAM::User
    4. 单击提交
  4. 配置检测RAM用户是否开启MFA的代码。
    1. 在函数RamDangerousPolicyUserBindMFA代码执行页签的在线编辑页面,单击文件index.py
    2. 拷贝并粘贴如下代码至文件index.py
      #!/usr/bin/env python
      # -*- encoding: utf-8 -*-
      
      import logging
      import json
      import os
      from aliyunsdkcore.client import AcsClient
      from aliyunsdkram.request.v20150501 import ListPoliciesForUserRequest
      from aliyunsdkram.request.v20150501 import GetPolicyRequest
      from aliyunsdkram.request.v20150501 import GetUserMFAInfoRequest
      from aliyunsdkcore.auth.credentials import StsTokenCredential
      from aliyunsdkcore.acs_exception.exceptions import ClientException
      from aliyunsdkcore.acs_exception.exceptions import ServerException
      from aliyunsdkcore.request import CommonRequest
      
      logger = logging.getLogger()
      
      
      # AccessKey/SecretKey, need AliyunConfigFullAccess policy
      AK = 'LTAI4FgrMeKLB7NqDmPe****'
      SK = 'dylEiakiwLFB1CufDyxyCwlCxZ****'
      
      # Valid compliace type
      COMPLIACE_TYPE_COMPLIANT = 'COMPLIANT'
      COMPLIACE_TYPE_NON_COMPLIANT = 'NON_COMPLIANT'
      COMPLIACE_TYPE_NOT_APPLICABLE = 'NOT_APPLICABLE'
      COMPLIACE_TYPE_INSUFFICIENT_DATA = 'INSUFFICIENT_DATA'
      
      
      def handler(event, context):
          # Validate event
          evt = validate_event(event)
          if not evt:
              return None
      
          rule_parameters = evt.get('ruleParameters')
          result_token = evt.get('resultToken')
          ordering_timestamp = evt.get('orderingTimestamp')
          invoking_event = evt.get('invokingEvent')
      
          # Initilize
          compliance_type = COMPLIACE_TYPE_NOT_APPLICABLE
          annotation = None
      
      
          configuration_item = invoking_event.get('configurationItem')
          if not configuration_item:
              logger.error('Configuration item is empty.')
              return None
      
          resource_id = configuration_item.get('resourceId')
          resource_type = configuration_item.get('resourceType')
          region_id = configuration_item.get('regionId')
      
          creds = context.credentials
          # Get compliace result
          compliance_type, annotation = evaluate_configuration_item(rule_parameters, configuration_item, creds)
      
          # Compliance result
          evaluations = [
              {
                  'complianceResourceId': resource_id,
                  'complianceResourceType': resource_type,
                  'complianceRegionId': region_id,
                  'orderingTimestamp': ordering_timestamp,
                  'complianceType': compliance_type,
                  'annotation': annotation
              }
          ]
      
          put_evaluations(context, result_token, evaluations)
          return evaluations
      
      
      def evaluate_configuration_item(rule_parameters, configuration_item, creds):
          # Initilize
          compliance_type = COMPLIACE_TYPE_NOT_APPLICABLE
          annotation = None
      
          # Get resource type and configuration
          resource_type = configuration_item['resourceType']
          full_configuration = configuration_item['configuration']
      
      
          # Check configuration
          if not full_configuration:
              annotation = 'Configuration is empty.'
              return compliance_type, annotation
      
          # Parse to json object
          configuration = parse_json(full_configuration)
          if not configuration:
              annotation = 'Configuration:{} in invald.'.format(full_configuration)
              return compliance_type, annotation
      
          # =========== Customer code start =========== #
          if 'UserPrincipalName' in configuration and configuration['UserPrincipalName']:
              user_name = configuration_item['resourceName']
              user_principal_name = configuration['UserPrincipalName']
              if rule_parameters and 'dangerousActions' in rule_parameters and rule_parameters['dangerousActions']:
                  actions = rule_parameters['dangerousActions'].split(',')
                  isvalid, reason = validate_user_bind_MFADevice(user_name, user_principal_name, actions)
                  if isvalid:
                      compliance_type, annotation = COMPLIACE_TYPE_COMPLIANT, None
                  else:
                      compliance_type, annotation = COMPLIACE_TYPE_NON_COMPLIANT, reason
              else:
                  compliance_type, annotation = COMPLIACE_TYPE_COMPLIANT, 'rule parameter:{dangerousActions} not specified'
          else:
              compliance_type, annotation = COMPLIACE_TYPE_INSUFFICIENT_DATA, 'ram user not named'
      
          # =========== Customer code end =========== #
          return compliance_type, annotation
      
      
      def validate_user_bind_MFADevice(user_name, user_principal_name, input_actions):
          client = AcsClient(AK, SK)
          # Get user ploicy list
          list_user_policy_req = ListPoliciesForUserRequest.ListPoliciesForUserRequest()
          list_user_policy_req.set_UserName(user_name)
          list_user_policy_res = None
          try:
              list_user_policy_res = client.do_action_with_exception(list_user_policy_req)
          except ServerException as se:
              if se.get_http_status() == 404 and se.get_error_code() == 'EntityNotExist.User':
                  return True, None
              else:
                  return False, se.get_error_msg()
          if list_user_policy_res is None:
              return True, None
      
          list_user_policy_json_res = json.loads(list_user_policy_res)
          user_policies = list_user_policy_json_res['Policies']['Policy']
          if user_policies is None or len(user_policies) == 0:
              return True, None
      
          # Get user ploicy filter by input_actions
          policy_action_list = list()
          for policy in user_policies:
              get_policy_req = GetPolicyRequest.GetPolicyRequest()
              get_policy_req.set_PolicyName(policy['PolicyName'])
              get_policy_req.set_PolicyType(policy['PolicyType'])
              get_policy_res = None
              try:
                  get_policy_res = client.do_action_with_exception(get_policy_req)
                  policy_document = json.loads(json.loads(get_policy_res)['DefaultPolicyVersion']['PolicyDocument'])
                  for action in list(map(lambda x: x['Action'],
                                         list(filter(lambda x: x['Effect'] == 'Allow', policy_document['Statement'])))):
                      if type(action) is list:
                          policy_action_list.extend(action)
                      else:
                          policy_action_list.append(action)
              except ServerException as se:
                  logger.error(se)
              except Exception as ex:
                  logger.error(ex)
          policy_action_set = set(policy_action_list)
          logger.info('Policy actions: {}, input: {} .'.format(str(policy_action_set), str(input_actions)))
      
          # Verify policy actions contains input_actions
          if policy_action_set.intersection(set(input_actions)):
              get_user_mfa_req = GetUserMFAInfoRequest.GetUserMFAInfoRequest()
              get_user_mfa_req.set_UserName(user_principal_name)
              get_user_mfa_res = None
              try:
                  get_user_mfa_res = client.do_action_with_exception(get_user_mfa_req)
                  logger.info('GetUserMFAInfo user_name: {}, result: {} .'.format(user_name, str(get_user_mfa_res)))
                  if ('SerialNumber' not in get_user_mfa_res) or ('MFADevice' not in get_user_mfa_res):
                      return False, 'user MFADevice empty'
                  else:
                      return True, None
              except ServerException as se:
                  return False, se.get_error_msg()
          else:
              return True, None
      
      
      def validate_event(event):
          if not event:
              logger.error('Event is empty.')
      
          evt = parse_json(event)
          logger.info('Loading event: %s .' % evt)
      
          if 'resultToken' not in evt:
              logger.error('ResultToken is empty.')
              return None
      
          if 'ruleParameters' not in evt:
              logger.error('RuleParameters is empty.')
              return None
      
          if 'invokingEvent' not in evt:
              logger.error('InvokingEvent is empty.')
              return None
          return evt
      
      
      def parse_json(content):
          try:
              return json.loads(content)
          except Exception as e:
              logger.error('Parse content:{} to json error:{}.'.format(content, e))
              return None
      
      
      def put_evaluations(context, result_token, evaluations):
          # ak/sk, need AliyunConfigFullAccess policy
          client = AcsClient(AK, SK, 'cn-shanghai')
      
          # Open api request
          request = CommonRequest()
          request.set_domain('config.cn-shanghai.aliyuncs.com')
          request.set_version('2019-01-08')
          request.set_action_name('PutEvaluations')
          request.add_body_params('ResultToken', result_token)
          request.add_body_params('Evaluations', evaluations)
          request.set_method('POST')
      
          try:
              response = client.do_action_with_exception(request)
              logger.info('PutEvaluations with request: {}, response: {}.'.format(request, response))
          except Exception as e:
              logger.error('PutEvaluations error: %s' % e)
      #!/usr/bin/env python
      # -*- encoding: utf-8 -*-
      
      import logging
      import json
      import os
      from aliyunsdkcore.client import AcsClient
      from aliyunsdkram.request.v20150501 import ListPoliciesForUserRequest
      from aliyunsdkram.request.v20150501 import GetPolicyRequest
      from aliyunsdkram.request.v20150501 import GetUserMFAInfoRequest
      from aliyunsdkcore.auth.credentials import StsTokenCredential
      from aliyunsdkcore.acs_exception.exceptions import ClientException
      from aliyunsdkcore.acs_exception.exceptions import ServerException
      from aliyunsdkcore.request import CommonRequest
      
      logger = logging.getLogger()
      
      
      # AccessKey/SecretKey, need AliyunConfigFullAccess policy
      AK = 'LTAI4FgrMeKLB7NqDmPe****'
      SK = 'dylEiakiwLFB1CufDyxyCwlCxZ****'
      
      # Valid compliace type
      COMPLIACE_TYPE_COMPLIANT = 'COMPLIANT'
      COMPLIACE_TYPE_NON_COMPLIANT = 'NON_COMPLIANT'
      COMPLIACE_TYPE_NOT_APPLICABLE = 'NOT_APPLICABLE'
      COMPLIACE_TYPE_INSUFFICIENT_DATA = 'INSUFFICIENT_DATA'
      
      
      def handler(event, context):
          # Validate event
          evt = validate_event(event)
          if not evt:
              return None
      
          rule_parameters = evt.get('ruleParameters')
          result_token = evt.get('resultToken')
          ordering_timestamp = evt.get('orderingTimestamp')
          invoking_event = evt.get('invokingEvent')
      
          # Initilize
          compliance_type = COMPLIACE_TYPE_NOT_APPLICABLE
          annotation = None
      
      
          configuration_item = invoking_event.get('configurationItem')
          if not configuration_item:
              logger.error('Configuration item is empty.')
              return None
      
          resource_id = configuration_item.get('resourceId')
          resource_type = configuration_item.get('resourceType')
          region_id = configuration_item.get('regionId')
      
          creds = context.credentials
          # Get compliace result
          compliance_type, annotation = evaluate_configuration_item(rule_parameters, configuration_item, creds)
      
          # Compliance result
          evaluations = [
              {
                  'complianceResourceId': resource_id,
                  'complianceResourceType': resource_type,
                  'complianceRegionId': region_id,
                  'orderingTimestamp': ordering_timestamp,
                  'complianceType': compliance_type,
                  'annotation': annotation
              }
          ]
      
          put_evaluations(context, result_token, evaluations)
          return evaluations
      
      
      def evaluate_configuration_item(rule_parameters, configuration_item, creds):
          # Initilize
          compliance_type = COMPLIACE_TYPE_NOT_APPLICABLE
          annotation = None
      
          # Get resource type and configuration
          resource_type = configuration_item['resourceType']
          full_configuration = configuration_item['configuration']
      
      
          # Check configuration
          if not full_configuration:
              annotation = 'Configuration is empty.'
              return compliance_type, annotation
      
          # Parse to json object
          configuration = parse_json(full_configuration)
          if not configuration:
              annotation = 'Configuration:{} in invald.'.format(full_configuration)
              return compliance_type, annotation
      
          # =========== Customer code start =========== #
          if 'UserPrincipalName' in configuration and configuration['UserPrincipalName']:
              user_name = configuration_item['resourceName']
              user_principal_name = configuration['UserPrincipalName']
              if rule_parameters and 'dangerousActions' in rule_parameters and rule_parameters['dangerousActions']:
                  actions = rule_parameters['dangerousActions'].split(',')
                  isvalid, reason = validate_user_bind_MFADevice(user_name, user_principal_name, actions)
                  if isvalid:
                      compliance_type, annotation = COMPLIACE_TYPE_COMPLIANT, None
                  else:
                      compliance_type, annotation = COMPLIACE_TYPE_NON_COMPLIANT, reason
              else:
                  compliance_type, annotation = COMPLIACE_TYPE_COMPLIANT, 'rule parameter:{dangerousActions} not specified'
          else:
              compliance_type, annotation = COMPLIACE_TYPE_INSUFFICIENT_DATA, 'ram user not named'
      
          # =========== Customer code end =========== #
          return compliance_type, annotation
      
      
      def validate_user_bind_MFADevice(user_name, user_principal_name, input_actions):
          client = AcsClient(AK, SK)
          # Get user ploicy list
          list_user_policy_req = ListPoliciesForUserRequest.ListPoliciesForUserRequest()
          list_user_policy_req.set_UserName(user_name)
          list_user_policy_res = None
          try:
              list_user_policy_res = client.do_action_with_exception(list_user_policy_req)
          except ServerException as se:
              if se.get_http_status() == 404 and se.get_error_code() == 'EntityNotExist.User':
                  return True, None
              else:
                  return False, se.get_error_msg()
          if list_user_policy_res is None:
              return True, None
      
          list_user_policy_json_res = json.loads(list_user_policy_res)
          user_policies = list_user_policy_json_res['Policies']['Policy']
          if user_policies is None or len(user_policies) == 0:
              return True, None
      
          # Get user ploicy filter by input_actions
          policy_action_list = list()
          for policy in user_policies:
              get_policy_req = GetPolicyRequest.GetPolicyRequest()
              get_policy_req.set_PolicyName(policy['PolicyName'])
              get_policy_req.set_PolicyType(policy['PolicyType'])
              get_policy_res = None
              try:
                  get_policy_res = client.do_action_with_exception(get_policy_req)
                  policy_document = json.loads(json.loads(get_policy_res)['DefaultPolicyVersion']['PolicyDocument'])
                  for action in list(map(lambda x: x['Action'],
                                         list(filter(lambda x: x['Effect'] == 'Allow', policy_document['Statement'])))):
                      if type(action) is list:
                          policy_action_list.extend(action)
                      else:
                          policy_action_list.append(action)
              except ServerException as se:
                  logger.error(se)
              except Exception as ex:
                  logger.error(ex)
          policy_action_set = set(policy_action_list)
          logger.info('Policy actions: {}, input: {} .'.format(str(policy_action_set), str(input_actions)))
      
          # Verify policy actions contains input_actions
          if policy_action_set.intersection(set(input_actions)):
              get_user_mfa_req = GetUserMFAInfoRequest.GetUserMFAInfoRequest()
              get_user_mfa_req.set_UserName(user_principal_name)
              get_user_mfa_res = None
              try:
                  get_user_mfa_res = client.do_action_with_exception(get_user_mfa_req)
                  logger.info('GetUserMFAInfo user_name: {}, result: {} .'.format(user_name, str(get_user_mfa_res)))
                  if ('SerialNumber' not in get_user_mfa_res) or ('MFADevice' not in get_user_mfa_res):
                      return False, 'user MFADevice empty'
                  else:
                      return True, None
              except ServerException as se:
                  return False, se.get_error_msg()
          else:
              return True, None
      
      
      def validate_event(event):
          if not event:
              logger.error('Event is empty.')
      
          evt = parse_json(event)
          logger.info('Loading event: %s .' % evt)
      
          if 'resultToken' not in evt:
              logger.error('ResultToken is empty.')
              return None
      
          if 'ruleParameters' not in evt:
              logger.error('RuleParameters is empty.')
              return None
      
          if 'invokingEvent' not in evt:
              logger.error('InvokingEvent is empty.')
              return None
          return evt
      
      
      def parse_json(content):
          try:
              return json.loads(content)
          except Exception as e:
              logger.error('Parse content:{} to json error:{}.'.format(content, e))
              return None
      
      
      def put_evaluations(context, result_token, evaluations):
          # ak/sk, need AliyunConfigFullAccess policy
          client = AcsClient(AK, SK, 'ap-southeast-1')
      
          # Open api request
          request = CommonRequest()
          request.set_domain('config.ap-southeast-1.aliyuncs.com')
          request.set_version('2019-01-08')
          request.set_action_name('PutEvaluations')
          request.add_body_params('ResultToken', result_token)
          request.add_body_params('Evaluations', evaluations)
          request.set_method('POST')
      
          try:
              response = client.do_action_with_exception(request)
              logger.info('PutEvaluations with request: {}, response: {}.'.format(request, response))
          except Exception as e:
              logger.error('PutEvaluations error: %s' % e)
      本段代码用于检测RAM用户是否开启MFA,代码中主要参数说明如下表所示。
      参数 说明 示例
      AK 当前阿里云账号的AccessKey ID。该参数必须与步骤 3中的AK相同。 LTAI4FgrMeKLB7NqDmPe****
      SK 当前阿里云账号的AccessKey Secret。该参数必须与步骤 3中的SK相同。 dylEiakiwLFB1CufDyxyCwlCxZ****
      user_name RAM用户。 无。
      rule_parameters 规则参数。 dangerousActions
      input_actions 指定的危险操作。 ecs:*,oss:*,log:*
      configuration_item 资源的配置详情。 请参见自定义规则的数据结构如何
      说明 本段代码以检测RAM用户是否开启MFA为例。如果您需要通过其他参数检测RAM用户,请参见自定义规则的数据结构是什么
    3. 在线编辑页面,单击右上角的部署
  5. 新建自定义规则。
    1. 登录配置审计控制台
    2. 在左侧导航栏,单击规则
    3. 规则页面,单击新建规则
    4. 新建规则页面,单击新建自定义规则
    5. 基本属性页面,函数Arn所在地域选择华东2(上海)服务选择Ram_User函数选择RamDangerousPolicyUserBindMFA规则名称输入RamUserMFA规则触发机制选择配置变更周期执行触发频率选择1小时,单击下一步
      新建自定义规则
    6. 评估资源范围页面,先单击自定义资源类型,再选择规则关联的资源类型为Ram用户,单击下一步
      评估资源范围
    7. 参数设置页面,单击添加规则入参规则入参名称输入dangerousActions期望值输入ecs:*,oss:*,log:*,单击下一步
      参数设置
      说明 规则入参名称期望值必须与步骤 4中的参数rule_parametersinput_actions保持一致。
    8. 修正设置页面,单击下一步
    9. 预览并保存页面,单击提交
  6. 查看RAM用户Alice的检测结果。
    1. 新建规则成功页面,单击查看规则详情
    2. 单击检测结果页签。
    3. 关联资源的合规结果区域,单击目标资源ID链接,查看该资源的合规结果。
      查看规则评估结果
  7. 设置资源合规事件投递。
    设置资源合规事件MNSTestConfig投递到消息服务MNS的指定主题(Topic)后,您会收到来自消息服务MNS的不合规通知。具体操作,请参见发送资源事件到消息服务MNS