文档

创建和运行工作流

更新时间:

PAI支持通过Designer拖拉拽的形式构建数据处理、数据验证及模型训练的机器学习工作流,也支持使用Python SDK构建相同的工作流,提交到PAI的工作流服务执行。本文介绍如何使用Python SDK在PAI创建和执行一个机器学习工作流。

背景信息

为了获得合适的机器学习模型,将其应用于在线推理或离线推理,通常机器学习工程师需要进行数据提取、数据校验、特征工程、模型训练及模型评估全流程,且以上流程并非只进行一次。为了获得更符合业务需求的模型,机器学习工程师需要尝试不同的特征和参数,或使用增量的数据训练模型,以更好地拟合最新的输入特征。为了复用这些流程,通常的解决方案是先将这些流程以工作流的形式组织起来,再提交至一个自动化工作流服务中运行。

PAI提供机器学习工作流服务,您可以通过Designer拖拉拽的方式或Python SDK的方式构建一个机器学习工作流,提交到PAI的工作流服务运行。

说明

Designer是通过拖拉拽的方式构建的图形化工作流,和通过Python SDK构建的工作流不同,因此不能直接使用本地Python SDK调用Designer上的算法组件。

前提条件

准备工作

首先需要安装PAI Python SDK以运行本示例。

pip install "alipai>=0.4.0"

SDK需要配置访问阿里云服务需要的AccessKey,以及当前使用的工作空间和OSS Bucket。在PAI SDK安装之后,通过在命令行终端中执行以下命令进行配置,详细的安装和配置介绍见文档:安装和配置

python -m pai.toolkit.config

获取算法组件

Component是PAI工作流中定义的算法组件,包含了组件的输入输出信息、运行参数及具体执行的实现方式(执行一个DAG或执行一个单独的镜像)。您可以通过Python SDK从PAI获取PAI内置的公共算法组件(RegisteredComponent),也可以将本地构造的工作流对象保存为一个Component进行复用(参见下文的构建工作流)。

通过Python SDK从PAI工作流服务获取PAI内置的公共算法组件(RegisteredComponent)的详细步骤如下。

  1. 通过RegisteredComponent.list方法获取组件列表。

    RegisteredComponent.list方法中,PAI提供了一些公共的算法组件,通过指定providerProviderAlibabaPAI,即可获取PAI提供的算法组件列表。此外,您可以通过inputsoutputs属性,查看对应组件的输入输出信息,代码示例如下所示。

    from pai.pipeline.component import RegisteredComponent
    from pai.common import ProviderAlibabaPAI
    
    for item in RegisteredComponent.list(provider=ProviderAlibabaPAI, page_size=50, page_number=1):
        print(item)
  2. 获取指定的算法组件。

    通过上一步的RegisteredComponent.list方法,可以获取算法组件的identifier-provider-versionpipeline_id信息,您可以使用二者中的任意一个从PAI获取一个唯一的算法组件。这两种方式的区别在于identifier-provider-version是由组件开发者在保存组件时指定的,而pipeline_id是由PAI生成的组件唯一标识ID。以PAI提供的split组件为例,两种方式的代码示例分别如下:

    • 通过identifier-provider-version获取对应的组件

      component = RegisteredComponent.get_by_identifier(identifier="split", provider=ProviderAlibabaPAI, version="v1")
      
      print(component.inputs)

      目前PAI提供的组件主要基于MaxCompute算法,底层依赖通过PAI命令实现。这些组件的identifier默认为 PAI命令的nameversionv1。您可以在PAI的常规机器学习组件帮助文档中查看组件的输入参数定义,详情请参见常规机器学习组件

    • 通过pipeline_id获取对应的组件

      component = RegisteredComponent.get(component.id)
      print(component.identifier, component.provider, component.version)
      print(component.inputs)
  3. 提交运行任务。

    通过指定组件的必须输入参数,您可以提交一个单独的组件运行任务,以下以PAI提供的split算法组件为例,介绍如何使用SDK将输入数据表odps://pai_online_project/tables/wumai_data(该输入表为公开表,您可以直接使用)按照给定比例拆分到两张新表中。

    # split运行在MaxCompute中,需要指定运行的MaxCompute项目和执行环境。
    # maxc_execution作为算法组件的一个输入,标识算法组件的执行MaxCompute Project项目信息。
    maxc_execution = {
        "endpoint": "<YourMaxComputeProjectEndpoint>",
        "odpsProject": "<yourMaxComputeProjectName>",
    }
    #提交运行任务。
    pipeline_run = component.run(
        job_name="example-split-job",
        arguments={
            "execution":maxc_execution,
            "inputTable": "odps://pai_online_project/tables/wumai_data",
            "fraction": "0.7",  #split算法组件的输入参数,可以通过op.inputs获取输入信息。
        }
    )
    
    print(pipeline_run.get_outputs())

    需要根据实际情况,替换以下参数值:

    • <YourMaxComputeProjectEndpoint>:项目所在地域的Endpoint,详情请参见Endpoint

    • <YourMaxComputeProjectName>:MaxCompute项目名称。

    上述代码中通过run接口提交任务后,SDK会在Console中输出任务在PAI控制台中的URL,您也可以直接在PAI控制台的任务管理页面,通过返回的运行任务ID或任务名称查找对应的任务实例。找到该任务后,进入任务详情页面,您可以查看任务执行的DAG、日志及输出,并且可以将模型直接部署到EAS

    组件可以单独运行,也可以将多个组件拼接为一个工作流运行,详情请参见下文的构建工作流

构建工作流

PAI工作流服务支持将多个算法组件拼接为一个新的工作流,通过提供输入参数可以提交并运行该工作流,或将其保存为一个复合工作流组件。保存的工作流组件可以作为一个普通组件直接运行(参见上文的提交任务)或作为新创建的工作流的一个节点。

创建一个新的工作流主要包括以下流程:

  1. 定义工作流的输入信息。

    输入信息包括用户输入参数PipelineParameter或数据输入PipelineArtifact。对于数据输入,目前PAI的工作流服务支持OSS数据、MaxCompute表数据及MaxCompute OfflineModel。

  2. 创建工作流中的Step及其输入。

    Step的输入可能来自于其他Step,也可能来源于当前创建的工作流的输入。

  3. 指定工作流的输出信息。

    工作流可以使用引用的方式将Step节点的输出作为新的工作流的输出。

下面的示例代码中,构建了一个简单的工作流,它先使用类型转换组件将输入表的部分字段转换为浮点类型,再通过数据拆分组件将表拆分为两张MaxCompute表。

from pai.pipeline.types import PipelineParameter, PipelineArtifact, ArtifactMetadataUtils
from pai.pipeline import PipelineStep, Pipeline, RegisteredComponent

def create_composite_pipeline():
    # 定义当前工作流的输入信息。
    
    # 由于组件底层的处理是运行在MaxCompute上,execution参数传递执行的MaxCompute Project信息。
    execution_input = PipelineParameter(name="execution", typ=dict)
    cols_to_double_input = PipelineParameter(name="cols_to_double")
    table_input = PipelineArtifact(name="input_table", metadata=ArtifactMetadataUtils.maxc_table())

    # 构建类型转换Step。
    # 指定identifier-provider-version, 使用一个已经保存的组件,作为工作流的一个Step。
    type_transform_step = PipelineStep.from_registered_component(
        identifier="type_transform", provider=ProviderAlibabaPAI,
        version="v1", name="typeTransform", inputs={
            "inputTable": table_input,
            "execution": execution_input,
            # "outputTable": gen_temp_table(), 
            "cols_to_double": cols_to_double_input,
        }
    )
    
    # 构建拆分表Step。
    # SavedOperator也可以作为一个Step构建工作流。
    split_operator = RegisteredComponent.get_by_identifier(identifier="split",
     provider=ProviderAlibabaPAI, version="v1")
    split_step = split_operator.as_step(inputs={"inputTable": type_transform_step.outputs[0],
            "execution": execution_input,
            # "output1TableName": gen_temp_table(),
            "fraction": 0.5,
            # "output2TableName": gen_temp_table(),
        })

    # Pipeline构造函数中的steps和inputs信息并不要求完整输入,Pipeline graph时,是通过Pipeline的outputs和steps推导他们的依赖,从而构造对应的执行DAG。
    p = Pipeline(
        steps=[split_step],
        outputs=split_step.outputs[:2],
    )
    return p

p = create_composite_pipeline()
# 输入工作流运行所需参数(arguments)后,提交到PAI Pipeline Service运行。
pipeline_run = p.run(job_name="demo-composite-pipeline-run", arguments={
            "execution": maxc_execution,
            "cols_to_double": "time,hour,pm2,pm10,so2,co,no2",
            "input_table": "odps://pai_online_project/tables/wumai_data",
        }, wait=True)


pipeline_run.get_outputs()

对于上述构建好的工作流,您可以为其指定组件名称和版本,将该工作流保存到服务端成为一个可复用组件。保存的组件默认共享给阿里云账号下的所有RAM用户,保存组件的示例代码如下。

# 指定identifier和版本,保存工作流。保存的组件的provider默认为对应的阿里云账号的UID。
p = p.save(identifier="demo-composite-pipeline", version="v1")
print(p.pipeline_id, p.identifier, p.version, p.provider)

心脏病预测案例

以下示例代码通过Python SDK构建了一个心脏病预测工作流,并提交任务使其运行在PAI工作流服务。有关心脏病预测案例的详细信息,请参见心脏病预测

您可以在本地的Python环境或DSW中,完成SDK初始化(参见安装和配置)后,运行以下的示例代码训练心脏病预测模型。为了保存输出的PMML模型文件,工作流在运行时,需要提供您的OSS信息。

import random
from pai.pipeline.types import (
    ArtifactMetadataUtils,
    ParameterType,
    PipelineArtifact,
    PipelineParameter,
)
from pai.pipeline import Pipeline, PipelineStep

feature_cols = "sex,cp,fbs,restecg,exang,slop,thal,age,trestbps,chol,thalach,oldpeak,ca,ifhealth"
label_col = "ifhealth"

def create_pipeline():
    pmml_oss_bucket = PipelineParameter("pmml_oss_bucket")
    pmml_oss_rolearn = PipelineParameter("pmml_oss_rolearn")
    pmml_oss_path = PipelineParameter("pmml_oss_path")
    pmml_oss_endpoint = PipelineParameter("pmml_oss_endpoint")
    execution = PipelineParameter("execution", ParameterType.Map)
    dataset_input = PipelineArtifact(
        "dataset-table",
        metadata=ArtifactMetadataUtils.maxc_table(),
        required=True,
    )

    sql = (
        "select age, (case sex when 'male' then 1 else 0 end) as sex,(case cp when"
        " 'angina' then 0  when 'notang' then 1 else 2 end) as cp, trestbps, chol,"
        " (case fbs when 'true' then 1 else 0 end) as fbs, (case restecg when 'norm'"
        " then 0  when 'abn' then 1 else 2 end) as restecg, thalach, (case exang when"
        " 'true' then 1 else 0 end) as exang, oldpeak, (case slop when 'up' then 0  "
        "when 'flat' then 1 else 2 end) as slop, ca, (case thal when 'norm' then 0 "
        " when 'fix' then 1 else 2 end) as thal, (case status when 'sick' then 1 else "
        "0 end) as ifHealth from ${t1};"
    )
    sql_step = PipelineStep.from_registered_component(
        "sql",
        name="sql-1",
        provider=ProviderAlibabaPAI,
        version="v1",
        inputs={
            "inputTable1": dataset_input,
            "execution": execution,
            "sql": sql,
        },
    )

    type_transform_step = PipelineStep.from_registered_component(
        "type_transform",
        name="type-transform-1",
        provider=ProviderAlibabaPAI,
        version="v1",
        inputs={
            "execution": execution,
            "inputTable": sql_step.outputs["outputTable"],
            "cols_to_double": feature_cols,
            # "outputTable": gen_run_node_scoped_placeholder(suffix="outputTable"),
        },
    )

    normalize_step = PipelineStep.from_registered_component(
        "normalize_1",
        name="normalize-1",
        provider=ProviderAlibabaPAI,
        version="v1",
        inputs={
            "execution": execution,
            "inputTable": type_transform_step.outputs["outputTable"],
            "selectedColNames": feature_cols,
            "lifecycle": 1,
            # "outputTableName": gen_run_node_scoped_placeholder(suffix="outputTable"),
            # "outputParaTableName": gen_run_node_scoped_placeholder(
            #     suffix="outputParaTable"
            # ),
        },
    )

    split_step = PipelineStep.from_registered_component(
        identifier="split",
        name="split-1",
        provider=ProviderAlibabaPAI,
        version="v1",
        inputs={
            "inputTable": normalize_step.outputs["outputTable"],
            "execution": execution,
            "fraction": 0.8,
        },
    )

    model_name = "test_health_prediction_by_pipeline_%s" % (random.randint(0, 999999))

    lr_step = PipelineStep.from_registered_component(
        identifier="logisticregression_binary",
        name="logisticregression-1",
        provider=ProviderAlibabaPAI,
        version="v1",
        inputs={
            "inputTable": split_step.outputs["output1Table"],
            "execution": execution,
            "generatePmml": True,
            "pmmlOssEndpoint": pmml_oss_endpoint,
            "pmmlOssBucket": pmml_oss_bucket,
            "pmmlOssPath": pmml_oss_path,
            "pmmlOverwrite": True,
            "roleArn": pmml_oss_rolearn,
            "regularizedLevel": 1.0,
            "regularizedType": "l2",
            "modelName": model_name,
            "goodValue": 1,
            "featureColNames": ",".join(feature_cols),
            "labelColName": label_col,
        },
    )

    offline_model_pred_step = PipelineStep.from_registered_component(
        identifier="Prediction_1",
        name="offlinemodel-pred",
        provider=ProviderAlibabaPAI,
        version="v1",
        inputs={
            "model": lr_step.outputs["model"],
            "inputTable": split_step.outputs["output2Table"],
            "execution": execution,
            "featureColNames": feature_cols,
            "appendColNames": label_col,
        },
    )

    evaluate_step = PipelineStep.from_registered_component(
        identifier="evaluate_1",
        name="evaluate-1",
        provider=ProviderAlibabaPAI,
        version="v1",
        inputs={
            "execution": execution,
            "inputTable": offline_model_pred_step.outputs["outputTable"],
            "scoreColName": "prediction_score",
            "labelColName": label_col,
        },
    )

    p = Pipeline(
        steps=[evaluate_step, offline_model_pred_step],
        outputs={
            "pmmlModel": lr_step.outputs["PMMLOutput"],
            "evaluateResult": evaluate_step.outputs["outputMetricTable"],
        },
    )
    return p

p = create_pipeline()

# 输出Pipeline的图片。
p.dot().render()

pmml_oss_endpoint = "<YourOssBucketEndpoint>"
pmml_oss_path = "<PathForThePmmlFile>"
pmml_oss_bucket = "<YourOssBucketName>"

pmml_oss_rolearn = "<RoleArnForPaiToVisitOssBucket>"
maxc_execution = {
    "endpoint": "<your_Project_Endpoint>",
    "odpsProject": "<your_MaxCompute_project>",
}

run_instance = p.run(
    job_name="test_heart_disease_pred",
    arguments={
        "execution": maxc_execution,
        "pmml_oss_rolearn": pmml_oss_rolearn,
        "pmml_oss_path": pmml_oss_path,
        "pmml_oss_bucket": pmml_oss_bucket,
        "pmml_oss_endpoint": pmml_oss_endpoint,
        "inputTable": "odps://pai_online_project/tables/heart_disease_prediction"
    },
    wait=True,
)
print(run_instance.get_outputs())

需要根据实际情况,替换以下参数值。

参数

描述

<YourOssBucketEndpoint>

OSS Bucket所在地域的Endpoint,详情请参见访问域名和数据中心

<PathForThePmmlFile>

模型文件保存的OSS Bucket路径。

<YourOssBucketName>

OSS Bucket名称。

<RoleArnForPaiToVisitOssBucket>

OSS的roleArn。您可以登录PAI控制台,在资源管理 > 全部产品依赖页面的Designer区域,单击操作列下的查看授权信息,获取role_Arn,具体操作请参见PAI访问云产品授权:OSS

<your_Project_Endpoint>

项目所在地域的Endpoint,详情请参见Endpoint

<your_MaxCompute_project>

MaxCompute项目名称。

上述代码构建的工作流,通过pipeline.dot().render()渲染后,得到的Pipeline DAG如下图所示。心脏病Pipeline DAG

  • 本页导读 (1)
文档反馈