通过DataWorks控制台将表格存储中的增量数据导出到MaxCompute中。

步骤一:新增表格存储数据源

如果已新增表格存储数据源,请跳过此步骤。

新增表格存储数据源的操作请参见步骤一:新增表格存储数据源

步骤二:新增MaxCompute数据源

如果已新增MaxCompute数据源,请跳过此步骤。

新增MaxCompute数据源的操作请参见步骤二:新增MaxCompute数据源

步骤三:配置同步任务

新建并配置表格存储到MaxCompute的增量数据同步任务,具体操作步骤如下:

  1. 进入数据开发。
    1. 以项目管理员身份登录DataWorks控制台
    2. 选择地域,在左侧导航栏,单击工作空间列表
    3. 工作空间列表页面,单击工作空间操作区域的进入数据开发
  2. 在DataStudio控制台的数据开发页面,单击业务流程节点下的目标业务流程。

    如果需要新建业务流程,请参见创建业务流程

  3. 新建同步任务节点。
    每个同步任务都需创建一个相应的节点。
    1. 数据集成节点上右键选择新建 > 离线同步
      您也可以将鼠标悬停在fig_addnode图标,选择数据集成 > 离线同步来新建节点。
    2. 新建节点对话框,输入节点名称,选择一个目标文件夹。
      fig_stream_ots2odps
    3. 单击提交
  4. 配置数据源。
    1. 数据集成节点下,双击同步任务节点。
    2. 在同步任务节点的编辑页面的选择数据源区域,配置数据来源和数据去向。
      • 配置数据来源。

        选择数据来源数据源OTS Stream,选择数据源和数据表,可根据需要配置任务开始时间、结束时间、状态表的名称、最大重试次数等。

      • 配置数据去向。

        选择数据去向数据源ODPS,选择数据源和表,可根据需要配置清理规则、是否把空字符串作为null等。

      fig_ots2stream_001
    3. 单击script图标,进行脚本配置。

      使用过程时涉及OTSStream Reader和OSS Writer插件的配置。具体操作,请参见配置OTSStream Reader配置MaxCompute Writer

      在脚本配置页面,请根据如下示例完成配置。

      {
          "type": "job",
          "steps": [
              {
                  "stepType": "otsstream",    # Reader插件的名称。
                  "parameter": {
                      "mode": "single_version_and_update_only",    # 配置导出模式,默认不设置,为列模式。
                      "statusTable": "TableStoreStreamReaderStatusTable",    # 存储TableStore Stream状态的表,一般无需修改。
                      "maxRetries": 30,    # 从TableStore中读取增量数据时,每次请求的最大重试次数,默认为30。重试之间有间隔,重试30次的总时间约为5分钟,一般无需修改。
                      "isExportSequenceInfo": false,    # 是否导出时序信息,时序信息包含了数据的写入时间等。默认该值为false,即不导出。single_version_and_update_only模式下只能为false。
                      "datasource": "",    # Tablestore的数据源名称,如果有此项则无需配置endpoint、accessId、accessKey和instanceName。
                      "column": [    # 按照需求添加需要导出TableStore中的列,您可以自定义个数。
                          {
                              "name": "h"    # 列名示例,可以是主键或属性列。
                          },
                          {
                              "name": "n"
                          },
                          {
                              "name": "s"
                          }
                      ],
                      "startTimestampMillis": "${startTime}",    # 增量数据的时间范围(左闭右开)的左边界,格式为yyyymmddhh24miss,单位毫秒。输入$[yyyymmddhh24miss-10/24/60],表示调度时间减去10分钟。
                      "table": "",    # TableStore中的表名。
                      "endTimestampMillis": "${endTime}"    # 增量数据的时间范围(左闭右开)的右边界,格式为yyyymmddhh24miss,单位为毫秒。输入$[yyyymmddhh24miss-5/24/60],表示调度时间减去5分钟。注意,endTime需要比调度时间提前5分钟及以上。
                  },
                  "name": "Reader",
                  "category": "reader"
              },
              {
                  "stepType": "odps",    # Writer插件的名称。
                  "parameter": {
                      "partition": "",    # 需要写入数据表的分区信息。
                      "truncate": false,    # 清理规则,写入前是否清理已有数据。
                      "compress": false,    # 是否压缩。
                      "datasource": "",    # 数据源名。
                      "column": [    # 需要导入的字段列表。
                          "h",
                          "n",
                          "s"
                      ],
                      "emptyAsNull": false,    # 空字符串是否作为null,默认是。
                      "table": ""    # 表名。
                  },
                  "name": "Writer",
                  "category": "writer"
              }
          ],
          "version": "2.0",
          "order": {
              "hops": [
                  {
                      "from": "Reader",
                      "to": "Writer"
                  }
              ]
          },
          "setting": {
              "errorLimit": {
                  "record": "0"    # 允许出错的个数,当错误超过这个数目的时候同步任务会失败。
              },
              "speed": {
                  "throttle": false,    #同步速率不限流。
                  "concurrent": 2    # 每次同步任务的并发度。
              }
          }
      }
    4. 单击save图标,保存数据源配置。
  5. 运行同步任务。
    1. 单击start图标。
    2. 参数对话框,选择调度的资源组。
    3. 单击确定,开始运行任务。
      运行结束后,在运行日志页签中可以查看任务是否成功和导出的数据行数。

      表格存储的增量数据可以在延迟5~10分钟的基础上自动同步到MaxCompute中。

  6. 配置调度参数。
    通过调度配置,可以配置同步任务的执行时间、重跑属性、调度依赖等。
    1. 数据集成节点下,双击同步任务节点。
    2. 在同步任务节点的编辑页面的右侧单击调度配置,进行调度参数配置,详情请参见配置调度和依赖属性
  7. 提交同步任务。
    1. 在同步任务节点的编辑页面,单击submit图标。
    2. 提交新版本对话框,输入备注信息。
    3. 单击确认
      将同步任务提交到调度系统后,调度系统会根据配置的调度参数,自动定时执行同步任务。

步骤四:查看同步任务

  1. 进入运维中心。
    说明 您也可以在DataStudio控制台的右上角单击运维中心,快速进入运维中心。
    1. 以项目管理员身份登录DataWorks控制台
    2. 选择地域,在左侧导航栏,单击工作空间列表
    3. 工作空间列表页面,单击工作空间操作区域的进入运维中心
  2. 在运维中心控制台,选择周期任务运维 > 周期任务
  3. 周期任务页面,查看提交的同步任务详情。
    • 在左侧导航栏中,选择周期任务运维 > 周期实例,可以查看当天需要运行的周期任务。单击实例名称,可以查看任务运行详情。
    • 当单个任务在运行中或运行结束后,可以查看日志。

步骤五:查看导入到MaxCompute中的数据

  1. 进入数据地图。
    1. 以项目管理员身份登录DataWorks控制台
    2. 选择地域,在左侧导航栏,单击工作空间列表
    3. 工作空间列表页面,单击工作空间操作区域的进入数据地图
  2. 在数据地图控制台的导航栏,选择我的数据 > 我管理的数据
  3. 我管理的数据页签,单击导入数据的表名称。
  4. 在表详情页面,单击数据预览页签,查看导入的数据。