本文为您介绍开源Flink 1.11如何实时写入数据至Hologres。

前提条件

  • 开通Hologres实例,并连接开发工具,详情请参见连接HoloWeb
  • 搭建Flink集群(本次示例使用的是1.13版本),可以前往Flink官网下载二进制包,启动一个Standalone集群,详情请参见文档集群搭建

背景信息

从开源Flink1.11版本开始,Hologres代码已开源,相应版本的Connector已经在中央仓库发布Release包(1.0.1),可在项目中参照如下pom文件进行配置。详细内容请参见Hologres GitHub官方库
<dependency>
    <groupId>com.alibaba.hologres</groupId>
    <artifactId>hologres-connector-flink-1.13</artifactId>
    <version>1.0.1</version>
    <classifier>jar-with-dependencies</classifier>
</dependency>

Flink SQL写入数据至Hologres代码示例

您可以参照如下代码示例,通过将Flink SQL将数据写入Hologres。其中,更多详细的代码示例请参见Hologres GitHub官方库
        String createHologresTable =
                String.format(
                        "create table sink("
                                + "  user_id bigint,"
                                + "  user_name string,"
                                + "  price decimal(38,2),"
                                + "  sale_timestamp timestamp"
                                + ") with ("
                                + "  'connector'='hologres',"
                                + "  'dbname' = '%s',"
                                + "  'tablename' = '%s',"
                                + "  'username' = '%s',"
                                + "  'password' = '%s',"
                                + "  'endpoint' = '%s'"
                                + ")",
                        database, tableName, userName, password, endPoint);
        tEnv.executeSql(createHologresTable);

        createScanTable(tEnv);

        tEnv.executeSql("insert into sink select * from source");
更多详尽的代码示例请参见hologres-connector-flink-examples,包括如下示例。
  • FlinkSQLToHoloExample:一个使用纯Flink SQL接口实现的应用,将数据写入至Hologres。
  • FlinkDSAndSQLToHoloExample:一个混合Flink DataStream以及SQL 接口实现的应用,写入Hologres前,将DataStream转换成Table,之后再用Flink SQL写入Hologres。
  • FlinkDataStreamToHoloExample:一个使用纯Flink DataStream接口实现的应用,将数据写入至Hologres。
  • FlinkRoaringBitmapAggJob:一个使用Flink及RoaringBitmap,结合Hologres维表,实现实时去重统计UV的应用,并将统计结果写入Hologres。

Hologres Flink Connector参数说明

您可以将Flink数据写入Hologres,Hologres Flink Connector相关参数具体内容如下:
参数是否必填说明
connector结果表类型,固定值为hologres。
dbnameHologres的数据库名称。
tablenameHologres接收数据的表名称。
username当前阿里云账号的AccessKey ID。

您可以单击AccessKey 管理,获取AccessKey ID。

password当前阿里云账号的AccessKey Secret。

您可以单击AccessKey 管理,获取AccessKey Secret。

endpointHologres的VPC网络地址。进入Hologres管理控制台的实例详情页,从实例配置获取Endpoint。
说明 endpoint需包含端口号,格式为ip:port同一个区域使用VPC网络地址,跨区域请使用公共网络。
  • 连接参数
    参数是否必填说明
    connectionSize单个Flink Hologres Task所创建的JDBC连接池大小。

    默认值:3,和吞吐成正比。

    connectionPoolName连接池名称,同一个TaskManager中,表配置同名的连接池名称可以共享连接池。

    无默认值,每个表默认使用自己的连接池。如果设置连接池名称,则所有表的connectionSize需要相同

    fixedConnectionMode写入和点查不占用连接数(beta功能,需要connector版本>=1.2.0,hologres引擎版本>=1.3)

    默认值:false

    jdbcRetryCount当连接故障时,写入和查询的重试次数。

    默认值:10。

    jdbcRetrySleepInitMs每次重试的等待时间=retrySleepInitMs+retry*retrySleepStepMs。

    默认值:1000ms。

    jdbcRetrySleepStepMs每次重试的等待时间=retrySleepInitMs+retry*retrySleepStepMs。

    默认值:5000ms。

    jdbcConnectionMaxIdleMs写入线程和点查线程数据库连接的最大Idle时间,超过连接将被释放。

    默认值:60000ms。

    jdbcMetaCacheTTLTableSchema信息的本地缓存时间。

    默认值:60000ms。

    jdbcMetaAutoRefreshFactor当TableSchema cache剩余存活时间短于metaCacheTTL/metaAutoRefreshFactor 将自动刷新cache。

    默认值:-1,表示不自动刷新。

  • 写入参数
    参数是否必填说明
    mutatetype数据写入模式,详情请参见流式语义

    默认值:insertorignore。

    ignoredelete是否忽略撤回消息。通常Flink的Group By会产生回撤消息,回撤消息到Hologres connector会产生Delete请求。

    默认值:true,仅在使用流式语义时生效。

    createparttable当写入分区表时,是否自动根据分区值自动创建分区表。Flink 全托管2.1.x及以上版本支持自动创建分区表。
    • false(默认值):不会自动创建。
    • true:自动创建。
    建议慎用该功能,确保分区值不会出现脏数据导致创建错误的分区表。
    ignoreNullWhenUpdatemutatetype='insertOrUpdate'时,是否忽略更新写入数据中的Null值。

    默认值:false。

    jdbcWriteBatchSizeHologres Sink节点数据攒批的最大批大小。

    默认值:256

    jdbcWriteBatchByteSizeHologres Sink节点单个线程数据攒批的最大字节大小。

    默认值:2097152(2 * 1024 * 1024),2MB。

    jdbcWriteBatchTotalByteSizeHologres Sink节点所有数据攒批的最大字节大小。

    默认值:20971520(20 * 1024 * 1024),20MB。

    jdbcWriteFlushIntervalHologres Sink节点数据攒批的最长Flush等待时间。

    默认值:10000,即10秒。

    jdbcEnableDefaultForNotNullColumn设置为true时,not null且未在表上设置default的字段传入null时,将以默认值写入。String类型默认为"",Number类型默认为0,Date、timestamp、timestamptz 类型默认为1970-01-01 00:00:00。

    默认值:true。

    jdbcCopyWriteMode是否使用fixed copy方式写入。fixed copy是hologres1.3新增的能力,相比insert方法,fixed copy方式可以更高的吞吐(因为是流模式),更低的数据延时,更低的客户端内存消耗(因为不攒批),但不支持回撤。

    默认值:false。

    jdbcCopyWriteFormat底层是否走二进制协议。
    • binary(默认值):表示使用二进制模式,二进制会更快。
    • text:为文本模式。
    jdbcCopyWriteDirectConnectcopy模式是否直连。copy的瓶颈往往是VIP endpoint的网络吞吐,因此copy写入模式下会测试当前环境能否直连holo fe,支持的话默认使用直连。此参数设置为false则不进行直连。

    默认值:true。

  • 点查参数
    参数是否必填说明
    jdbcReadBatchSize维表点查最大批次大小。

    默认值:128。

    jdbcReadBatchQueueSize维表点查请求缓冲队列大小。

    默认值:256。

    async是否采用异步方式同步数据。

    默认值:false。异步模式可以并发地处理多个请求和响应,从而连续的请求之间不需要阻塞等待,提高查询的吞吐。但在异步模式下,无法保证请求的绝对顺序。

    cache缓存策略。

    默认值:None。Hologres仅支持以下两种缓存策略:None:无缓存。LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果未找到,则去物理维表中查找。

    cachesize缓存大小。

    默认值:10000。选择LRU缓存策略后,可以设置缓存大小。

    cachettlms更新缓存的时间间隔,单位为毫秒。

    当选择LRU缓存策略后,可以设置缓存失效的超时时间,默认不过期。

    cacheempty是否缓存join结果为空的数据。

    默认值:true,表示缓存join结果为空的数据。false:表示不缓存join结果为空的数据。

  • 数据类型映射

    当前Flink全托管与Hologres的数据类型映射请参见Blink/Flink与Hologres的数据类型映射