实时计算Flink版内置插件支持通过批量数据通道写入MaxCompute,受到批量数据通道并发数及存储文件数影响,内置版本插件会有性能瓶颈。MaxCompute提供了使用流式数据通道的Flink插件,支持使用Flink在高并发、高QPS场景下写入MaxCompute。

前提条件

背景信息

实时计算Flink版可以调用MaxCompute SDK中的接口将数据写入缓冲区,当缓冲区的大小超过指定的大小(默认为1 MB)或每隔指定的时间间隔时,将数据上传至MaxCompute结果表中。

说明 建议Flink同步MaxCompute并发数大于32或Flush间隔小于60秒的场景下,使用MaxCompute自定义插件。其他场景可以随意选择Flink内置插件和MaxCompute自定义插件。
MaxCompute与实时计算Flink版的字段类型对照关系如下。
MaxCompute字段类型 实时计算Flink版字段类型
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
BOOLEAN BOOLEAN
DATETIME TIMESTAMP
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
STRING VARCHAR
DECIMAL DECIMAL
BINARY VARBINARY

使用限制

该功能的使用限制如下:
  • 本插件仅支持Blink 3.2.1及以上版本。
  • MaxCompute中的聚簇表不支持作为MaxCompute结果表。

语法示例

您需要在Flink控制台新建作业,创建MaxCompute结果表。新建作业操作请参见开发

说明 DDL语句中定义的字段需要与MaxCompute物理表中的字段名称、顺序以及类型保持一致,否则可能导致在MaxCompute物理表中查询的数据为 /n
命令示例如下:
create table odps_output(
    id INT,
    user_name VARCHAR,
    content VARCHAR
) with (
    type ='custom',
    class = 'com.alibaba.blink.customersink.MaxComputeStreamTunnelSink',
    endpoint = '<YourEndPoint>',
    project = '<YourProjectName>',
    `table` = '<YourtableName>',
    access_id = '<yourAccessKeyId>',
    access_key = '<yourAccessKeySecret>',
    `partition` = 'ds=2018****'
);

WITH参数

参数 说明 是否必填 备注
type 结果表的类型。 固定值为custom
class 插件入口类。 固定值为com.alibaba.blink.customersink.MaxComputeStreamTunnelSink
endpoint MaxCompute服务地址。 参见开通MaxCompute服务的Region和服务连接对照表
tunnel_endpoint MaxCompute Tunnel服务的连接地址。 参见开通MaxCompute服务的Region和服务连接对照表
说明 VPC环境下必填。
project MaxCompute项目名称。
table MaxCompute物理表名称。
access_id 可以访问MaxCompute项目的AccessKey ID。
access_key AccessKey ID对应的AccessKey Secret。
partition 分区表的分区名称。 如果表为分区表则必填:
  • 固定分区

    例如`partition` = 'ds=20180905'表示将数据写入分区ds= 20180905

  • 动态分区

    如果不明文显示分区的值,则会根据写入数据中的分区列具体的值,写入到不同的分区中。例如`partition`='ds'表示根据ds字段的值写入分区。

    如果要创建多级动态分区,With参数中Partition的字段顺序和结果表的DDL中的分区字段顺序,必须与物理表一致,各个分区字段之间使用英文逗号(,)分隔。

    说明
    • 动态分区列需要显式写在建表语句中。
    • 对于动态分区字段为空的情况,如果数据源中ds=nullds='',则会创建ds=NULL的分区。
enable_dynamic_partition 设置是否开启动态分区机制。 默认值为False。
dynamic_partition_limit 设置最大并发分区数。动态分区模式会为每个分区分配一个缓冲区,缓冲区大小通过flush_batch_size参数控制,所以动态分区模式最大会占用分区数量×缓冲区大小的内存。例如100个分区,每个分区1 MB,则最大占用内存为100 MB。 默认值为100。系统内存中会维护一个分区到Writer的Map,如果这个Map的大小超过了dynamicPartitionLimit的值,系统会通过LRU(Least Recently Used)的规则尝试淘汰没有数据写入的分区。如果所有分区都有数据写入,则会出现dynamic partition limit exceeded: 100报错。
flush_batch_size 数据缓冲区大小,单位字节。缓冲区数据写满后会触发Flush操作,将数据发送到MaxCompute。 默认值为1048576,即1 MB。
flush_interval_ms 缓冲区Flush间隔,单位毫秒。

MaxCompute Sink写入数据时,先将数据放到MaxCompute的缓冲区中,等缓冲区溢出或每隔一段时间(flush_interval_ms)时,再把缓冲区中的数据写到目标 MaxCompute表。

默认值为-1,即不设置主动Flush间隔。
flush_retry_count 数据Flush失败重试次数,在缓冲区Flush失败的场景下自动重试。 默认值为10,即重试10次。
flush_retry_interval_sec Flush失败重试的时间间隔,单位秒。 默认值为1,即1秒。
flush_retry_strategy Flush失败重试策略,多次重试的时间间隔增长策略,配合flush_retry_interval_sec使用。包含如下三种策略:
  • constant:常数时间,即每次重试间隔使用固定时间间隔。
  • linear:线性增长,即每次重试间隔时间线性增长,例如flush_retry_interval_sec设置为1,flush_retry_count设置为5,多次重试时间间隔为1、2、3、4、5秒。
  • exponential:指数增长。例如flush_retry_interval_sec设置为1,flush_retry_count设置为5,多次重试中间间隔为1、2、4、8、16秒。
默认值为constant,即常数时间间隔。

类型映射

MaxCompute字段类型 实时计算Flink版字段类型
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
BOOLEAN BOOLEAN
DATETIME TIMESTAMP
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
STRING VARCHAR
DECIMAL DECIMAL
BINARY VARBINARY

代码示例

包含MaxCompute结果表的实时计算Flink版作业代码示例如下:
  • 写入固定分区
    create table source (
       id INT,
       len INT,
       content VARCHAR
    ) with (
       type = 'random'
    );
    create table odps_sink (
       id INT,
       len INT,
       content VARCHAR
    ) with (
       typ='custom',
       class = 'com.alibaba.blink.customersink.MaxComputeStreamTunnelSink',
       endpoint = '<yourEndpoint>', 
       project = '<yourProjectName>',
       `table` = '<yourTableName>',
       accessId = '<yourAccessId>',
       accessKey = '<yourAccessPassword>',
       `partition` = 'ds=20180418'
    );
    insert into odps_sink 
    select 
       id, len, content 
    from source;
  • 写入动态分区
    create table source (
       id INT,
       len INT,
       content VARCHAR,
       c TIMESTAMP 
    ) with (
       type = 'random'
    );
    create table odps_sink (
       id INT,
       len INT,
       content VARCHAR,
       ds VARCHAR                        --动态分区列需要显式写在建表语句中。
    ) with (
       type = 'odps',
       endpoint = '<yourEndpoint>', 
       project = '<yourProjectName>',
       `table` = '<yourTableName>',
       accessId = '<yourAccessId>',
       accessKey = '<yourAccessPassword>',
       `partition`='ds'                 --不写分区的值,表示根据ds字段的值写入不同分区。
       ,enable_dynamic_partition = 'true' --启用动态分区。
       ,dynamic_partition_limit='50' --最大并发分区数50。
       ,flush_batch_size = '524288' --缓冲区512 KB。
       ,flush_interval_ms = '60000' --Flush间隔60秒。
       ,flush_retry_count = '5' --Flush失败重试5次。
       ,flush_retry_interval_sec = '2' --失败重试间隔单位2秒。
       ,flush_retry_strategy = 'linear' --连续失败重试时间间隔线性增长。
    );
    insert into odps_sink 
    select 
       id, 
       len, 
       content,
       date_dormat(c, 'yyMMdd') as ds
    from source;