本文为您介绍MaxCompute结果表的DDL定义、WITH参数、类型映射和代码示例。

实现原理

MaxCompute Sink可以分为以下两个阶段:
  1. 写入数据。调用MaxCompute SDK中的接口将数据写入缓冲区,在缓冲区大小超过64 MB或者每隔指定的时间间隔时,上传数据到MaxCompute的临时文件中。
  2. 提交会话。在任务进行Checkpoint时, MaxCompute Sink会调用Tunnel的Commit方法,提交会话,移动临时文件到MaxCompute表的数据目录,并修改元数据。
    说明 Commit方法不能提供原子性。因此,MaxCompute Sink提供的是At least Once方式,而不是Exactly Once方式。

DDL定义

Flink全托管支持将MaxCompute作为结果输出,示例代码如下。
create table odps_output(
    id INT,
    user_name VARCHAR,
    content VARCHAR
) with (
    'connector' = 'odps',
    'endpoint' = '<yourEndpoint>',
    'project' = '<yourProjectName>',
    'tablename' = '<yourTableName>',
    'accessid' = '<yourAccessKeyId>',
    'accesskey' = '<yourAccessKeySecret>',
    'partition' = 'ds=2018****'
);
说明
  • WITH参数需要全部小写。
  • DDL定义中的字段名称、顺序和类型必须与MaxCompute物理表保持一致,否则可能导致MaxCompute物理表中查询的数据为/n

WITH参数

参数 说明 是否必填 备注
connector 数据结果表类型 固定值为odps
endPoint MaxCompute服务地址 参见开通MaxCompute服务的Region和服务连接对照表
tunnelEndpoint MaxCompute Tunnel服务的连接地址 参见开通MaxCompute服务的Region和服务连接对照表
说明 VPC环境下必填。
project MaxCompute项目名称
tableName 表名
accessId AccessKey ID
accessKey AccessKey Secret
partition 分区名 如果存在分区表,则必填partition。填写partition需要注意以下两点:
  • 固定分区

    例如`partition` = 'ds=20180905'表示将数据写入分区ds= 20180905

  • 动态分区

    如果未明文显示分区值,则根据写入数据中分区列具体值,写入对应分区。例如`partition`='ds'表示根据ds字段值写入对应分区。

    如果要创建多级动态分区,Partition中多个字段顺序必须和MaxCompute物理表保持一致,各个分区字段之间使用逗号(,)分割。

    说明
    • 动态分区列需要显式写在建表语句中。
    • 在动态分区字段为空时,如果数据源中ds=null或者ds='',则输出结果为ds=NULL的分区。
flushIntervalMs Odps tunnel writer缓冲区Flush间隔。

MaxCompute Sink写入记录时,先放入数据到MaxCompute的缓冲区中,等缓冲区溢出或者每隔一段时间(flushIntervalMs)时,再把缓冲区里的数据写到目标 MaxCompute表。

单位为毫秒,默认值为30000(30秒)。
partitionBy 写入Sink节点前,系统会根据该值做Hash Shuffle,数据就会流向对应的Sink节点。 系统按照多个列进行Hash Shuffle,各个列名之间使用逗号(,)分割。默认为空。
dynamicPartitionLimit 分区数目最大值 默认值为100,系统会把已写入的分区和TunnelBufferedWriter的映射关系维护到一个Map里,如果该Map大小超过了dynamicPartitionLimit设定值,则会出现Too many dynamic partitions: 100, which exceeds the size limit: 100报错。

类型映射

MaxCompute字段类型 Flink全托管字段类型
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
BOOLEAN BOOLEAN
DATETIME TIMESTAMP
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
STRING VARCHAR
DECIMAL DECIMAL
BINARY VARBINARY

代码示例

  • 写入固定分区
    CREATE TABLE source (
        id INT,
        len INT,
        content VARCHAR
    ) with (
        'connector' = 'random'
    );
    
    create table odps_sink (
        id INT,
        len INT,
        content VARCHAR
    ) with (
        'connector' = 'odps',
        'endpoint' = 'your_end_point_name',
        'project' = 'your_project_name',
        'tablename' = 'your_table_name',
        'accessid' = 'your_access_id',
        'accesskey' = 'your_access_key',
        'partition' = 'ds=20180905'
    );
    
    INSERT INTO odps_sink 
    SELECT 
         id, len, content 
    FROM source;
  • 写入动态分区
    CREATE TABLE source (
         id INT,
         len INT,
         content VARCHAR,
         c TIMESTAMP 
    ) with (
         'connector' = 'random'
    );
    
    create table odps_sink (
         id  INT,
         len INT,
         content VARCHAR,
         ds VARCHAR --动态分区列需要显式声明。
    ) with (
         'connector' = 'odps',
         'endpoint' = 'your_end_point_name',
         'project' = 'your_project_name',
         'tablename' = 'your_table_name',
         'accessid' = 'your_access_id',
         'accesskey' = 'your_access_key',
         'partition' ='ds' --不写分区的值,表示根据ds字段的值写入不同分区。
    );
    
    INSERT INTO odps_sink 
    SELECT 
          id, 
          len, 
          content,
          DATE_FORMAT(c, 'yyMMdd') as ds
    FROM source;