阿里云数据集成(Data Integration)提供增量数据同步服务,包括脚本模式和向导模式两种方式,本文主要为您介绍如何通过脚本模式将表格存储的增量数据同步至MaxCompute。

关于如何使用向导模式将表格存储的增量数据同步至MaxCompute,请参考增量同步(向导模式)

方式

数据集成脚本模式。
  • Reader:OTSStreamReader
  • Writer:MaxComputeWriter

配置表格存储

无需配置。

配置MaxCompute

无需配置。

配置数据集成

  1. 创建表格存储数据源。
    • 如果已经创建了表格存储的数据源,可以跳过这一步。
    • 如果您不希望创建数据源,也可以在后续的配置页面中配置相应的endpoint、instanceName、AccessKeyID和AccessKeySecret。

    创建数据源的具体步骤,请参见步骤二 创建MaxCompute数据源

  2. 创建MaxCompute数据源。本操作与上一个步骤类似,只是选择MaxCompute作为数据源。
  3. 创建同步任务。
    1. 登录数据集成控制台
    2. 新建同步任务页面的左侧菜单栏,选择数据开发,新建数据集成-离线任务
    3. 默认任务是向导模式,来源类型选择OTS Stream目标类型选择ODPS
    4. 任务管理页面,单击转换脚本,从向导模式变成脚本模式。
  4. 完善配置项。
    1. 在配置界面,已经提前嵌入了OTSStreamReader和MaxComputeWrite的模板,请参考以下示例完成配置。
      {
          "type": "job",
          "steps": [
              {
                  "stepType": "otsstream",    # Reader插件的名称。
                  "parameter": {
                      "mode": "single_version_and_update_only",    # 配置导出模式,默认不设置,为列模式。
                      "statusTable": "TableStoreStreamReaderStatusTable",    # 存储TableStore Stream状态的表,一般不需要修改。
                      "maxRetries": 30,    # 从TableStore中读增量数据时,每次请求的最大重试次数,默认为30。重试之间有间隔,重试30次的总时间约为5分钟,通常无需更改。
                      "isExportSequenceInfo": false,    # 是否导出时序信息,时序信息包含了数据的写入时间等。默认该值为false,即不导出。single_version_and_update_only模式下只能是false。
                      "datasource": "",    # Tablestore的数据源名称,如果有此项则不再需要配置endpoint,accessId,accessKey和instanceName。
                      "column": [    # 按照需求添加需要导出TableStore中的列,您可以自定义设置配置个数。
                          {
                              "name": "h"    # 列名示例,可以是主键或属性列。
                          },
                          {
                              "name": "n"
                          },
                          {
                              "name": "s"
                          }
                      ],
                      "startTimeString": "${startTime}",    # 增量数据的时间范围(左闭右开)的左边界,格式为yyyymmddhh24miss,单位毫秒。
                      "table": "",    # TableStore中的表名。
                      "endTimeString": "${endTime}"    # 增量数据的时间范围(左闭右开)的右边界,格式为yyyymmddhh24miss,单位为毫秒。
                  },
                  "name": "Reader",
                  "category": "reader"
              },
              {
                  "stepType": "odps",    # Writer插件的名称。
                  "parameter": {
                      "partition": "",    # 需要写入数据表的分区信息。
                      "truncate": false,    # 清理规则,写入前是否清理已有数据。
                      "compress": false,    # 是否压缩。
                      "datasource": "",    # 数据源名。
                      "column": [    # 需要导入的字段列表。
                          "h",
                          "n",
                          "s"
                      ],
                      "emptyAsNull": false,    # 空字符串是否作为null,默认是。
                      "table": ""    # 表名。
                  },
                  "name": "Writer",
                  "category": "writer"
              }
          ],
          "version": "2.0",
          "order": {
              "hops": [
                  {
                      "from": "Reader",
                      "to": "Writer"
                  }
              ]
          },
          "setting": {
              "errorLimit": {
                  "record": "0"    # 允许出错的个数,当错误超过这个数目的时候同步任务会失败。
              },
              "speed": {
                  "throttle": false,    #同步速率不限流。
                  "concurrent": 2    # 每次同步任务的并发度。
              }
          }
      }
      说明 详细的配置说明参见配置OTSStream Reader配置MaxCompute Writer
    2. 单击保存
  5. 运行任务。
    1. 在页面上方,单击运行
    2. 在弹出的配置框中,配置变量参数。
    3. 单击确认,开始运行任务。
    4. 运行结束后可以通过临时查询来查询导入的数据。
  6. 配置调度。
    1. 单击提交
    2. 在弹出的对话框中,配置各项参数。
      参数说明如下:
      参数 描述
      startTime 输入$[yyyymmddhh24miss-10/24/60],表示调度时间减去10分钟。
      endTime 输入$[yyyymmddhh24miss-5/24/60],表示调度时间减去5分钟。注意,endTime需要比调度时间提前5分钟及以上。
      date 输入${bdp.system.bizdate},表示调度日期。
      生成实例方式 选择T+1次日生成。
      出错自动重跑 如果勾选,则当失败的时候会默认重试3次,每次间隔2分钟。
      生效日期 使用默认值。
      调度周期 选择分钟。
      开始时间 选择00:00至23:59,表示全天24小时都需要调度。
      时间间隔 选择5分钟。
      依赖属性 如果有依赖则填写,没有则不用填。
      依赖上一周期 如果勾上,选择对应的依赖项。
    3. 单击确认。周期性的同步任务配置完成,当前配置文件显示为只读状态。
  7. 查看任务。
    1. 单击页面上方的运维
    2. 在左侧导航栏,选择周期任务运维 > 周期任务> ,可以查看新创建的同步任务。
      • 新建的任务会从第二天零点开始执行。
      • 在左侧导航栏,选择周期任务运维 > 周期实例,查看每一个预创建的当天同步任务。每个任务相隔5分钟,每个任务处理过去10~5分钟的数据。单击实例名称,可以查看任务详情。
      • 单个任务在运行中或运行结束后,可以查看日志。
  8. 登录控制台查看MaxCompute表导入的数据。

至此,表格存储的增量数据可以在延迟5~10分钟的基础上自动同步到MaxCompute中了。