文档

创建投递任务

更新时间:

使用CreateDeliveryTask接口创建一个投递任务。通过创建投递任务,您可以将表格存储数据表中的数据投递到OSS Bucket中存储。

注意

请确认已安装支持数据湖投递功能的表格存储Go SDK。

前提条件

  • 已开通OSS服务且在表格存储实例所在地域创建Bucket。具体操作,请参见开通OSS服务

  • 已通过控制台创建表格存储服务关联角色并记录角色的ARN。具体操作,请参见创建投递任务

    服务关联角色的ARN请通过RAM控制台获取,具体操作如下:

    RAM 角色管理界面,搜索AliyunServiceRoleForOTSDataDelivery后,单击角色名称,在角色详情界面,可以查看和复制角色的ARN信息。

    figrole

  • 已初始化TableStoreClient。具体操作,请参见初始化

  • 已创建数据表并写入数据。

参数

参数

说明

TableName

数据表名称。

TaskName

投递任务名称。

名称只能包含英文小写字母(a~z)、数字和短横线(-),开头和结尾必须为英文小写字母或数字,且长度为3~16字符。

TaskConfig

投递任务配置,包括如下选项:

  • OssPrefix:OSS Bucket中的目录前缀,将表格存储的数据投递到该OSS Bucket目录中。投递路径中支持引用$yyyy、$MM、$dd、$HH、$mm五种时间变量。

    • 当投递路径中引用时间变量时,可以按数据的写入时间动态生成OSS目录,实现hive partition naming style的数据时间分区,从而按照时间分区组织OSS中的文件分布。

    • 当投递路径中不引用时间变量时,所有文件会被投递到固定的OSS前缀目录中。

  • OssBucket:OSS Bucket名称。

  • OssEndpoint:OSS Bucket所在地域的服务地址。

  • OssRoleName:表格存储服务关联角色的ARN信息。

  • Format:投递的数据的存储以Parquet列存格式存储,数据湖投递默认使用PLAIN编码方式,PLAIN编码方式支持任意类型数据。

  • EventTimeColumn:事件时间列,用于指定按某一列数据的时间进行分区。如果不设置此参数,则按数据写入表格存储的时间进行分区。

  • Schema:指定需要投递的数据列,必须手动配置投递字段的源表字段、目标字段和目标字段类型。

    您可以选择任意字段以任意顺序、名称写入列存文件,OSS的列存数据会按Schema数组中的数据列先后顺序分布。

    注意

    投递数据的字段类型必须与数据源的字段类型匹配,否则会作为脏数据丢弃。字段类型映射详情请参见数据格式映射

TaskType

投递任务的类型,包括如下选项:

  • IncTask:表示增量数据投递模式,只同步增量数据。

  • BaseTask:表示全量数据投递模式,一次性全表扫描数据同步。

  • BaseIncTask(默认):表示全量&增量数据投递模式,全量数据同步完成后,再同步增量数据。

    其中增量数据同步时可以获取最新投递时间和了解当前投递状态。

示例

func CreateTaskSample(client *tablestore.TableStoreClient) {
 createTask := &tablestore.CreateDeliveryTaskRequest{
  TableName: "sampleTable",
  TaskName: "sampledeliverytask",
  TaskType: tablestore.BaseIncTask,
  TaskConfig: &tablestore.OSSTaskConfig{
   OssPrefix:   "sample/year=$yyyy/month=$MM",
   OssBucket:      "datadeliverytest",
   OssEndpoint:    "oss-cn-hangzhou.aliyuncs.com",
   OssRoleName:    "acs:ram::17************45:role/aliyunserviceroleforotsdatadelivery",
   Schema: []*tablestore.TaskSchema{
    {
     ColumnName: "PK1",
     OssColumnName: "PK1",
     Type: tablestore.ParquetInt64,
    },
    {
     ColumnName: "PK2",
     OssColumnName: "PK2",
     Type: tablestore.ParquetUtf8,
    },
    {
     ColumnName: "Col1",
     OssColumnName: "Col1",
     Type: tablestore.ParquetDouble,
    },


   },
  },
 }
 createResp, err := client.CreateDeliveryTask(createTask)
 if err != nil {
  log.Fatal("create delivery task failed ", err)
 }
 fmt.Println("create delivery task success ", createResp.RequestId)
}

  • 本页导读 (1)
文档反馈