PAI Pipeline Service中可以使用PAI提供的组件(Operator)处理数据和训练模型,也可以使用自定义组件处理具体任务。本文介绍如何在PAI Pipeline Service中,通过Python SDK构建并使用自定义组件。

前提条件

背景信息

在PAI Pipeline Service中,您可以使用PAI提供的组件进行数据处理、特征工程、模型训练及离线推理。如果需要自己定义工作流中的节点功能,则需要通过构建自定义组件的方式实现相关功能。例如,训练完成后通过调用API通知用户的后端服务,或使用PyODPS处理MaxCompute上的数据表。

构建自定义组件,实际上就是定义组件在PAI Pipeline Service中的运行行为,它主要包括如下两部分信息:
  • 组件作为节点输入输出参数或数据的定义,用于引导PAI Pipeline Service完成输入输出数据搬运,同时也作为节点在PAI Pipeline Service中拼接的签名信息 。
  • 容器镜像运行定义,包括使用的镜像、镜像仓库访问凭证、镜像的环境变量定义及镜像运行命令。PAI Pipeline Service根据这些信息运行对应的镜像。
Python SDK提供了ScriptOperator(推荐)和ContainerOperator两种方式,帮助您快速构建自定义组件,详情请参见构建自定义组件:ScriptOperator(推荐)构建自定义组件:ContainerOperator
使用自定义组件构建工作流时,您需要先获取组件运行的上下文信息,再构建工作流:
  1. 获取上下文信息(pai_running_utils)
  2. 构建工作流

准备工作

保存或运行自定义组件时,SDK需要与PAI的后端服务进行交互,且交互依赖于您初始化的全局会话Session。因此,您需要安装SDK、初始化全局Session和AI工作空间。

  1. 安装SDK。
    如果您通过本地的Python开发环境使用SDK,则需要安装SDK。如果您通过PAI-DSW环境使用SDK,该环境中已经预装了SDK,您可以跳过该步骤。通过pip命令安装SDK的命令如下。
    pip install https://pai-sdk.oss-cn-shanghai.aliyuncs.com/alipai/dist/alipai-0.3.0-py2.py3-none-any.whl
    其中https://pai-sdk.oss-cn-shanghai.aliyuncs.com/alipai/dist/alipai-0.3.0-py2.py3-none-any.whl表示SDK的下载地址,您无需修改。
    说明 虽然SDK支持Python 2和Python 3版本,但是Python社区已经停止维护Python 2版本,因此推荐您使用Python 3环境运行SDK。
  2. 初始化默认的SDK Session和AI工作空间。
    PAI Pipeline Service SDK依赖于阿里云机器学习PAI提供的服务,SDK的Session负责与PAI的后端服务和依赖的其他阿里云服务进行交互。Session封装了鉴权凭证AccessKey、使用PAI服务的地域及当前使用的AI工作空间。

    您可以通过pai.core.session.setup_default_session方法初始化一个全局默认的Session对象。当调用的API(例如SaveOperator.listWorkspace.list)需要与PAI后端服务进行通讯时,默认使用该Session进行通讯。

    初始化默认的SDK Session和AI工作空间时,您可以通过以下两种方式指定AI工作空间:
    • 方式一:首先在PAI控制台上查看AI工作空间的名称或ID,然后初始化默认Session的同时指定AI工作空间。
      Python SDK提供了ScriptOperator(推荐)和ContainerOperator两种方式构建自定义组件,这两种方式在初始化默认的SDK Session和AI工作空间时,您需要注意以下差异:
      • 如果使用ScriptOperator构建自定义组件,则需要使用OSS存储用户代码,因此初始化Session时,需要额外提供OSS信息。代码示例如下。
        from pai.core.session import setup_default_session
        from pai.common import ProviderAlibabaPAI
        
        # 初始化默认的全局session和AI工作空间,同时提供OSS信息。
        session = setup_default_session(access_key_id="<your_access_key_id>", 
                                        access_key_secret="<your_access_key_secret>",
                                        region_id="<region_id>", 
                                        oss_bucket_name="<your_oss_bucket_name>", 
                                        oss_bucket_endpoint="<your_oss_bucket_endpoint>", 
                                        # workspace_id="<your_workspace_id>",         # AI工作空间名称和ID二选一。
                                        workspace_name="<your_workspace_name>")
        参数详情请参见下文表 1,您需要将参数值替换为实际值。
      • 如果使用ContainerOperator构建自定义组件,则无需使用OSS。示例代码如下。
        from pai.core.session import setup_default_session
        from pai.core.workspace import Workspace
        
        setup_default_session(access_key_id="<your_access_key_id>", 
                              access_key_secret="<your_access_key_secret>", 
                              region_id="<region_id>",
                              # workspace_id="<your_workspace_id>",         # AI工作空间名称和ID二选一。
                              workspace_name="<your_workspace_name>")
        参数详情请参见下文表 1,您需要将参数值替换为实际值。
    • 方式二:首先设置默认的Session,然后获取阿里云账号下可访问的AI工作空间列表,再指定使用的AI工作空间。
      Python SDK提供了ScriptOperator(推荐)和ContainerOperator两种方式构建自定义组件,这两种方式在初始化默认的SDK Session和AI工作空间时,您需要注意以下差异:
      • 如果使用ScriptOperator构建自定义组件,则在初始化Session时,需要额外提供OSS信息。代码示例如下。
        from pai.core.session import setup_default_session
        from pai.core.workspace import Workspace
        session = setup_default_session(access_key_id="<your_access_key_id>", 
                                        access_key_secret="<your_access_key_secret>",
                                        region_id="<region_id>", 
                                        oss_bucket_name="<your_oss_bucket_name>", 
                                        oss_bucket_endpoint="<your_oss_bucket_endpoint>")
        for ws in Workspace.list():
            print(ws.name, ws.id)
        
        session.set_workspace(workspace=Workspace.get_by_name("<your_workspace_name>"))
        参数详情请参见下文表 1,您需要将参数值替换为实际值。
      • 如果使用ContainerOperator构建自定义组件,则无需使用OSS。示例代码如下。
        from pai.core.session import setup_default_session
        from pai.core.workspace import Workspace
        session = setup_default_session(access_key_id="<your_access_key_id>", access_key_secret="<your_access_key_secret>",
                                        region_id="<region_id>")
        for ws in Workspace.list():
            print(ws.name, ws.id)
        
        session.set_workspace(workspace=Workspace.get_by_name("<your_workspace_name>"))
        参数详情请参见下文表 1,您需要将参数值替换为实际值。
    表 1. 初始化的相关参数
    参数 描述
    <your_access_key_id> 阿里云账号的AccessKey ID。
    <your_access_key_secret> 阿里云账号的AccessKey Secret。
    <region_id> PAI Pipeline Service的地域,后续提交的任务将运行在该地域。该参数支持以下取值:
    • cn-shanghai:华东2(上海)
    • cn-hangzhou:华东1(杭州)
    • cn-beijing:华北2(北京)
    • cn-shenzhen:华南1(深圳)
    <your_oss_bucket_name> 仅使用ScriptOperator构建自定义组件时提供,表示OSS Bucket的名称。
    <your_oss_bucket_endpoint> 仅使用ScriptOperator构建自定义组件时提供,表示OSS Bucket所在的地域,详情请参见访问域名和数据中心
    <your_workspace_name><your_workspace_id> AI工作空间名称或ID。
    对于上面两种指定AI工作空间的方式,分别根据以下方法获取该参数值:
    • 如果使用方式一,则<your_workspace_name><your_workspace_id>二选一。您可以登录PAI控制台,在AI工作空间列表页面查看参数值。您也可以创建新的AI工作空间,详情请参见AI工作空间
      说明 如果指定AI工作空间时,同时指定了AI工作空间的名称和ID,则接口报错。
    • 如果使用方式二,则将<your_workspace_name>指定为Workspace.list()接口返回的一个ws.name

构建自定义组件:ScriptOperator(推荐)

通过SDK的ScriptOperator,您只需要定义组件的输入输出信息和镜像内执行的Python脚本,既可完成组件的定义,极大简化了自定义一个组件的成本。以下示例使用ScriptOperator构建了一个自定义组件,该组件对应的容器内会运行定义的entry_point

  1. 定义组件的输入和输出。
    注意 调用以下代码之前,需要先设置存储代码的OSS Bucket,详情请参见准备工作
    import yaml
    from pai.operator import ScriptOperator
    from pai.pipeline.types import PipelineParameter
    
    op = ScriptOperator(
        entry_point="main.py",
        # script_dir目录下的文件会被一起打包上传至OSS。
        script_dir="scripts",
        inputs=[
            PipelineParameter(name="foo", default=10),
            PipelineParameter(name="bar", default=10),
        ],
        outputs=[],
    )
    
    # 直接运行对应的组件。
    op.run(
        job_name="exampleScript",
        arguments={
            "foo": "ThisIsFoo",
            "bar": "BAR",
        }
    )
    
    # 保存组件(组件的identifier和version不能冲突)。
    op.save(identifier="simpleExample", version="v1")
    
    # 查看组件的定义信息。
    print(yaml.dump(op.to_dict()))
  2. 定义镜像内执行的Python脚本entry_point
    该示例中entry_point的取值为main.py,即组件对应的容器中运行main.py文件。main.py文件的内容如下所示,在容器内通过python -m main --foo ThisIsFoo --bar BAR命令调用。
    import argparse
    
    def main():
        parser = argparse.ArgumentParser("ScriptOperator arguments parser")
        parser.add_argument("--foo")
        parser.add_argument("--bar")
    
        args, _ = parser.parse_known_args()
    
        print("Arguments foo is ", args.foo)
        print("Arguments bar is ", args.bar)
    
    
    if __name__ == "__main__":
        main()
    ScriptOperator将script_dir目录下的文件打包上传到OSS中,将对应的OSS URL和运行脚本entry_point作为对应容器的环境变量,并在组件的描述文件中进行定义,默认使用launch作为镜像的启动命令。
    容器内的launch命令是预先安装在默认镜像中的(安装pai_running_utils时默认安装的命令行脚本),它主要完成以下工作(如下图所示):
    1. 使用环境变量PAI_SOURCE_CODE_URL获取代码,并将其解压到/work/code目录。如果相关的脚本已经在对应的镜像内,则无需使用文件下载机制准备相关的脚本,即可以跳过该步骤。
    2. 如果对应的代码包中包括requirements.txt文件,则使用pip命令安装依赖的三方库。
    3. 根据环境变量PAI_PROGRAM_ENTRY_POINT运行对应的代码。
    launch命令工作范围

构建自定义组件:ContainerOperator

使用SDK中的ContainerOperator,您可以构建一个基于容器的自定义组件。以下示例构建了一个自定义组件,该组件实现了打印运行容器环境变量的功能。

  1. 构造ContainerOperator对象,并定义组件的输入和输出信息。
    import yaml
    from pai.operator.container import ContainerOperator
    from pai.pipeline.types import PipelineParameter
    
    container_op = ContainerOperator(
        image_uri="python:3",
        inputs=[
            PipelineParameter(name="foo"),
            PipelineParameter(name="bar"),
        ],
        outputs=[],
        command=[
            "python",
            "-c",
            "import os; print('\\n'.join(['%s=%s' % (k, v) for k, v in os.environ.items()]));",
        ],
        env={"CustomEnvKey": "CustomEnvValue"},
    )                      
    上述代码中ContainerOperator构造函数涉及的参数详情如下:
    • inputs:用于定义组件的输入信息。
    • outputs:用于定义组件的输出信息。
    • image_uri:使用的容器。
    • command:运行命令。
    • env:注入容器的环境变量信息。
  2. 运行组件。
    为构造的ContainerOperator对象指定输入参数,即可运行该组件,示例代码如下。
    container_op.run(
        job_name="containerTemplExample",
        arguments={
            "foo": "this is foo",
            "bar": "this is bar",
        },
    )
  3. 如果需要将该组件共享给该阿里云账号下的所有RAM用户或使用它构造新工作流的一个节点,则可以通过save方法将其保存到PAI Pipeline Service后端中。
    container_op.save(identifier="containerTemplExample", version="v1")
    # 打印YAML文件。
    print(yaml.dump(container_op.to_dict()))  

构建工作流

  1. 获取上下文信息(pai_running_utils)。

    如果您通过自定义组件构建工作流,则需要获取该组件的输入信息(包括输入的数据类型和表名等)、读取环境变量和本地文件、并理解对应的输入输出格式,该过程比较复杂。为了帮助您快速地获取组件的上下文信息,PAI Pipeline Service提供了一个简单的Runtime SDK pai_running_utils。您可以使用它获取节点的输入参数和输入Artifact,也可以使用它写出当前节点的输出Artifact。关于Artifact的详细信息,请参见附录:工作流中的输入输出数据(Artifact)

    Python脚本可以通过pai_running_utils.context.Context实例获取对应的运行环境上下文信息,主要接口如下:
    • 获取运行节点的输入参数input_parameters,示例如下。
      from pai_running.context import Context
      # 构建context实例,获取当前的上下文。
      context = Context()
      
      # 可以通过name作为key在input_parameters中查找输入参数的值。例如获取key为featureColNames的输入参数值。
      feature_cols = context.input_parameters["featureColNames"]
      # 打印所有输入参数的名称和参数值。
      for name, value in context.input_parameters.items():
          print("parameters is", name, value)
    • 获取输入和输出数据,即input_artifactsoutput_artifacts,示例如下。
      # 遍历组件输入的Artifacts定义。如果有输入信息,则打印Artifact的名称和路径。
      for artifact in context.input_artifacts:
          if artifact:
              print(artifact.name, artifact.path)
      
      # 可以通过名称或index定位Artifact。
      context.output_artifacts["outputModel"].write_output(
          {
              "location": {
                  "bucket": "test",
                  "endpoint": "oss-cn-hangzhou.aliyuncs.com",
                  "key": "/pipeline/output/lr_model.xml",
              }
          }
      )
    • 获取节点在PAI Pipeline Service中运行的环境信息,示例如下。
      # 节点对应组件的inputs和outputs定义信息。
      print(context.inputs_spec)
      print(context.outputs_spec)
      
      # 节点的运行用户信息。
      print(context.user_id)
      # 节点运行的工作空间信息。
      print(context.workspace_id)
  2. 构建工作流。
    关于如何构建工作流,请参见构建工作流

示例:使用ScriptOperator构建自定义组件并获取上下文信息

以下示例,使用ScriptOperator定义了一个组件,它可以从MaxCompute输入表odps://pai_online_project/tables/wumai_data(PAI 提供的公共数据表,您可以直接使用)中选择部分列,输出到一张新的MaxCompute表中。具体的实现方法如下。

  1. 定义组件的输入输出信息。
    from pai.operator import ScriptOperator
    from pai.pipeline.types import PipelineParameter, PipelineArtifact, MetadataBuilder
    
    op = ScriptOperator(
        entry_point="main.py",
        script_dir="scripts",
        inputs=[
            PipelineParameter(name="destTable", desc="输出的目的表"),
            PipelineParameter(name="execution", typ=dict, desc="max_compute config"),
            PipelineParameter(name="selectColNames", desc="输出的目的表"),
            PipelineArtifact(name="inputTable", metadata=MetadataBuilder.maxc_table()),
        ],
        outputs=[
            PipelineArtifact(name="outputTable", metadata=MetadataBuilder.maxc_table()),
        ])
    
    op.run(
        job_name="example",
        arguments={
            "destTable": "sql_script_dest_table",
            "selectColName": "time,hour,pm2,pm10",
            "execution": {
                "project": "{{test_project_name}}",
                "endpoint": "{{max_compute_project_endpoint}}",
            },
            "inputTable":  "odps://pai_online_project/tables/wumai_data",
        }
    )
    上述代码定义的组件共三个输入参数(PipelineParameter)和一个输入的MaxCompute表(PipelineArtifact)。三个输入参数分别为输出的目标MaxCompute表名(destTable)、选择的列信息(selectColNames)及执行任务的MaxCompute引擎配置信息(execution)。
  2. 定义镜像内执行的Python脚本。
    import json
    from pai_running.context import Context
    from odps import ODPS
    
    
    def main():
        # 获取当前节点的运行相关信息。
        context = Context()
        # 节点的运行输入参数(parameters)。
        parameters = context.input_parameters
        maxc_config = json.loads(parameters["execution"])
    
        odps = ODPS(
            access_id=maxc_config["accessKeyId"],
            secret_access_key=maxc_config["accessKeySecret"],
            project=maxc_config["project"],
            endpoint=maxc_config["endpoint"],
        )
    
        # 节点的输入Artifacts。
        input_table = context.input_artifacts[0].get_table()
        col_names = parameters["selectColNames"]
        output_table = parameters["outputTableName"]
    
        sql = "create table {0} select {1} from {2}".format(
            output_table,
            col_names,
            input_table,
        )
    
        run_instance = odps.run_sql(sql)
        run_instance.wait_for_success()
        
        context.output_artifacts["outputTable"].write_output(
            {
                "location":{
                    "project": maxc_config["project"],
                    "endpoint": maxc_config["endpoint"],
                    "table": "exampleOutputTable",
                }
            }
        )
    
    if __name__ == "__main__":
        main()
    该脚本首先通过pai_running_utilsContext()接口获取输入参数和输入数据信息,包括MaxCompute的运行配置、输入表及选择的列等信息,然后调用PyODPS运行构造的SQL。任务运行成功后,输出组件的Artifact信息(MaxComputeLocationArtifact)。关于如何快速获取输入参数和输入数据,请参见上文的获取上下文信息(pai_running_utils)

附录:工作流中的输入输出数据(Artifact)

Artifact是组件的输入输出数据。组件在工作流中运行时,PAI Pipeline Service会在镜像启动前将上游组件输出的Artifact搬运到当前组件对应的容器文件系统中。默认组件的工作目录示例如下所示。
/work/
|── code                        # 下载的依赖代码所在目录。
|── inputs                      # 输入的相关信息所在的文件夹。
|   |── artifacts
|   |     └── inputDataSet
|   |            └── data
|   └── parameters
├── outputs                     # 输出信息所在目录。
|     └── artifacts
|        └── outputDataSet
|               └── data
PAI Pipeline Service默认将输入的数据Artifact加载到/work/inputs/artifacts/<artifactName>/data路径下,组件运行结束后(容器退出后),从/work/outputs/artifacts/<artifactName>/data路径下读取保存组件的输出数据。在上述工作目录示例中,其输入数据<artifactName>为inputDataSet,输出数据<artifactName>为outputDataSet
目前PAI Pipeline Service支持的Artifact包括OssArtifact和MaxComputeTableArtifact,格式分别如下:
  • OssArtifact
    OssArtifact表示在OSS上存储的数据,格式如下所示。
    {
      "location": {
        "bucket": "pai-test",
        "endpoint": "oss-cn-shanghai-internal.aliyuncs.com",
        "key": "/paiflow/model_transfer2oss_test/test_health_prediction_by_pipeline_500940.xml"
      },
    }
    各字段的含义如下:
  • MaxComputeTableArtifact
    MaxComputeTableArtifact的格式如下所示。
    {
      "location": {
        "project": "myProject",
        "table": "myTable",
        "partition": "myPartition",
        "endpoint": "myEndpoint",
      }
    }
    各字段的含义如下:
    • project:MaxCompute的项目名。
    • table:MaxCompute的表名。
    • partition:分区信息,例如name1=value。如果存在多级分区,则使用正斜线(/)分隔,例如name1=value1/name2=value2
    • endpoint:MaxCompute项目的Endpoint,详情请参见Endpoint