本文为您介绍数据总线DataHub结果表的DDL定义、WITH参数和类型映射。

什么是数据总线DataHub

阿里云流数据处理平台DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布(Publish)、订阅(Subscribe)和分发功能,让您可以轻松构建基于流式数据的分析和应用。Flink支持将DataHub作为流式数据的输出。

DDL定义

create table datahub_sink(
  name varchar,
  age BIGINT,
  birthday BIGINT
)with(
  'connector'='datahub',
  'endpoint'='<endPoint>',
  'project'='<yourProjectName>',
  'topic'='<yourTopicName>',
  'accessId'='<yourAccessId>',
  'accessKey'='<yourAccessKey>',
  'batchCount'='500',
  'batchSize'='512000',
  'flushInterval'='5000'
);

WITH参数

参数 注释说明 是否必填 备注
connector 结果表类型 固定值为datahub
endPoint 消费端点信息 详情请参见DataHub域名列表
accessId AccessKey ID
accessKey AccessKey Secret
project 读取的项目
topic Project下的具体的Topic名称
retryTimeout 最大持续重试时间 单位为毫秒,默认值为180000(半小时)。
retryInterval 重试间隔 单位为毫秒,默认值为1000。
batchCount 每次批量写入数据的最大数据条数 默认值为500。
batchSize 每次批量写入数据的大小 默认值为512000 Byte。
flushInterval 缓存数据的最大超时时间 单位毫秒,默认值为5000。
partitionBy 写入Sink节点前会根据partitionBy值进行hash,数据会流向对应的Sink节点。 默认值为空,随机发送。
hashFields 指定列名后,相同列的值会写入到同一个Shard。 默认值为Null,即随机写可以指明多个列值,用逗号(,)分隔。例如 hashFields=a,b

类型映射

DataHub和Flink字段类型对应关系如下,建议使用该对应关系时进行DDL声明。
DataHub字段类型 Flink字段类型
STRING VARCHAR
TIMESTAMP BIGINT
TINYINT TINYINT
SMALLINT SMALLINT
INTEGER INTEGER
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
BOOLEAN BOOLEAN
DECIMAL DECIMAL

代码示例

CREATE TEMPORARY table datahub_source(
  name VARCHAR
) with (
  'connector'='datahub',
  'endpoint'='<endPoint>',
  'project'='<yourProjectName>',
  'topic'='<yourTopicName>',
  'subId'='<yourSubId>',
  'accessId'='<yourAccessId>',
  'accessKey'='<yourAccessKey>',
  'startTime'='2018-06-01 00:00:00'
);

CREATE TEMPORARY table datahub_sink(
  name varchar
)with(
  'connector'='datahub',
  'endpoint'='<endPoint>',
  'project'='<yourProjectName>',
  'topic'='<yourTopicName>',
  'accessId'='<yourAccessId>',
  'accessKey'='<yourAccessKey>',
  'batchSize'='512000',
  'batchCount'='500'
);

INSERT INTO datahub_sink
SELECT 
  LOWER(name)
from datahub_source;