本文为您介绍MySQL的CDC(Change Data Capture)源表DDL定义、WITH参数、类型映射和代码示例。

注意 MySQL的CDC源表正处于公测中,如果您对作业稳定性要求较高时,建议不要使用MySQL的CDC源表。

什么是MySQL的CDC源表

MySQL的CDC源表,即MySQL的流式源表,会先读取数据库的历史全量数据,并平滑切换到Binlog读取上,保证不多读一条也不少读一条数据。即使发生故障,也能保证通过Exactly Once语义处理数据。MySQL CDC Connector支持并发地读取全量数据,通过增量快照算法实现了全程无锁和断点续传。

支持以下核心特性:
  • 流批一体,支持读取全量和增量数据,无需维护两套流程。
  • 支持并发读取全量数据,性能水平扩展。
  • 全量读取无缝切换增量读取,自动缩容,节省计算资源。
  • 全量阶段读取支持断点续传,更稳定。
  • 无锁读取全量数据,不影响在线业务。

实现原理

Source在启动时会扫描全表,将表按照主键分成多个chunk。并使用增量快照算法逐个读取每个chunk的数据。作业会周期性执行Checkpoint,记录下已经完成的chunk。当发生Failover时,只需要继续读取未完成的chunk。当chunk全部读取完后,会从之前获取的Binlog位点读取增量的变更记录。Flink作业会继续周期性执行Checkpoint,记录下Binlog位点,当作业发生Failover,便会从之前记录的Binlog位点继续处理,从而实现Exactly Once语义。更详细的增量快照算法,请参见MySQL CDC Connector

前提条件

  • MySQL和VVP的网络连通。
  • MySQL服务器配置如下:
    • MySQL版本为5.7和8.0.X。
    • 已开启了Binlog。
    • Binlog格式已设置为ROW。
    • binlog_row_image已设置为FULL。
  • 已在MySQL配置文件中配置了交互超时或等待超时参数。
  • 已创建MySQL用户,并授予了SELECT、 SHOW DATABASES 、REPLICATION SLAVE和REPLICATION CLIENT权限。
说明 以上配置需要在RDS MySQL、PolarDB MySQL或者自建MySQL上的操作,详情请参见准备工作

使用限制

  • 仅在vvr-4.0.8-flink-1.13及以上引擎版本支持无锁读取和并发读取功能。
  • MySQL CDC Connector支持读取的MySQL版本为5.7和8.0.X。
  • MySQL CDC 源表暂不支持定义Watermark。如果您需要进行窗口聚合,您可以采用非窗口聚合的方式,详情请参见不支持定义Watermark,那如何进行窗口聚合?
  • MySQL的CDC源表需要一个有特定权限(包括SELECT、SHOW DATABASES、REPLICATION SLAVE和REPLICATION CLIENT)的MySQL用户,才能读取全量和增量数据。

注意事项

  • 每个作业需显式配置不同的server-id。

    每个同步数据库数据的客户端,都会有一个唯一ID,即server-id。MySQL SERVER会根据该ID来维护网络连接以及Binlog位点。因此如果有大量不同的SERVER ID的客户端一起连接MySQL SERVER,可能导致MySQL SERVER的CPU陡增,影响线上业务稳定性。

    此外,多个作业共享相同的server-id,会导致Binlog位点错乱,多读或少读数据。因此建议每个CDC作业都配置不同的SERVER ID。建议通过动态Hints来配置server-id,而不是在DDL参数中配置server-id。配置不同的server-id代码示例如下。
    SELECT * FROM source_table  /*+ OPTIONS('server-id'='123456') */ ;
    动态Hints详情请参见动态Hints
  • 仅VVR 4.0.8 及以上版本支持全量阶段的无锁读取、并发读取、断点续传等功能。

    如果您使用的是VVR 4.0.8以下版本,需要对MySQL用户授予RELOAD权限用来获取全局读锁,保证数据读取的一致性。全局读锁会阻塞写入操作,持锁时间可能达到秒级,因此可能对线上业务造成影响。

    此外,VVR 4.0.8以下版本在全量读取阶段无法执行Checkpoint,全量阶段的作业失败会导致作业重新读取全量数据,稳定性不佳。因此建议您将作业升级到VVR 4.0.8及以上版本。

DDL定义

CREATE TABLE mysqlcdc_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);

WITH参数

参数 说明 是否必填 数据类型 备注
connector 源表类型。 STRING 固定值为mysql-cdc
hostname MySQL数据库的IP地址或者Hostname。 STRING 无。
username MySQL数据库服务的用户名。 STRING 无。
password MySQL数据库服务的密码。 STRING 无。
database-name MySQL数据库名称。 STRING 数据库名称支持正则表达式以读取多个数据库的数据。
table-name MySQL表名。 STRING 表名支持正则表达式以读取多个表的数据。
port MySQL数据库服务的端口号。 INTEGER 默认值为3306。
server-id 数据库客户端的一个数字 ID。 STRING 该ID必须是MySQL集群中全局唯一的。建议针对同一个数据库的每个作业都设置一个不同的ID。默认会随机生成一个5400~6400的值。

该参数也支持ID范围的格式,例如5400-5408。在开启增量读取模式时支持多并发读取,此时推荐设定为ID范围,使得每个并发使用不同的ID。

scan.incremental.snapshot.enabled 是否开启增量快照。 BOOLEAN 默认开启增量快照。增量快照是一种读取全量数据快照的新机制。与旧的快照读取相比,增量快照有很多优点,包括:
  • 读取全量数据时,Source可以是并行读取。
  • 读取全量数据时,Source支持chunk粒度的检查点。
  • 读取全量数据时,Source不需要获取全局读锁(FLUSH TABLES WITH read lock)。

如果您希望Source支持并发读取,每个并发的Reader需要有一个唯一的服务器ID,因此server-id必须是5400-6400这样的范围,并且范围必须大于等于并发数。

scan.incremental.snapshot.chunk.size 表的chunk的大小(行数)。 Integer 默认值为8096。当开启增量快照读取时,表会被切分成多个chunk读取。在读完chunk的数据之前,chunk的数据会先缓存在内存中,因此chunk 太大,可能导致内存OOM。chunk越小,故障恢复的粒度也越小,但也会降低吞吐。
scan.snapshot.fetch.size 当读取表的全量数据时,每次最多拉取的记录数。 Integer 默认值为1024。
scan.startup.mode 消费数据时的启动模式。 STRING 参数取值如下:
  • initial(默认):在第一次启动时,会先扫描历史全量数据,然后读取最新的Binlog数据。
  • latest-offset:在第一次启动时,不会扫描历史全量数据,直接从Binlog的末尾(最新的Binlog处)开始读取,即只读取该Connector启动以后的最新变更。
server-time-zone 数据库在使用的会话时区。 STRING 例如Asia/Shanghai,该参数控制了MySQL中的TIMESTAMP类型如何转成STRING类型。更多信息请参见Debezium时间类型
debezium.min.row.count.to.stream.results 当表的条数大于该值时,会使用分批读取模式。 INTEGER 默认值为1000。Flink采用以下方式读取MySQL源表数据:
  • 全量读取:直接将整个表的数据读取到内存里。优点是速度快,缺点是会消耗对应大小的内存,如果源表数据量非常大,可能会有OOM风险。
  • 分批读取:分多次读取,每次读取一定数量的行数,直到读取完所有数据。优点是读取数据量比较大的表没有OOM风险,缺点是读取速度相对较慢。
connect.timeout 在尝试连接MySQL数据库服务器之后,连接器在超时之前应该等待的最大时间。 Duration 默认值为30秒。

并发控制

MySQL-CDC connector 支持多并发读取全量数据,能够提高数据加载效率。同时配合Flink VVP平台的Autopilot自动调优功能,在多并发读取完成后增量阶段,能够自动缩容,节约计算资源。

在Flink全托管VVP平台,您可以在资源配置页面的基础模式或专家模式中设置作业的并发数。设置并发的区别如下:
  • 基础模式设置的并发数为整个作业的全局并发数。基础模式
  • 专家模式支持按需为某个VERTEX设置并发数。vertex并发
资源配置详情请参见配置细粒度资源
注意 无论是基础模式还是专家模式,在设置的并发时,表中声明的server-id范围必须大于等于作业的并发数。例如server-id的范围为5404-5412,则共有8个唯一的server-id,因此作业最多可以设置8个并发,且不同的作业对于同一个MySQL实例的server-id范围不能有重叠,即每个作业需显式配置不同的server-id。

Autopilot自动缩容

全量阶段积累了大量历史数据,为了提高读取效率,通常采用并发的方式读取历史数据。而在Binlog增量阶段,因为Binlog数据量少且为了保证全局有序,通常只需要单并发读取。全量阶段和增量阶段对资源的不同需求,可以通过自动调优功能自动帮您实现性能和资源的平衡。

自动调优会监控MySQL CDC Source的每个task的流量。当进入Binlog阶段,如果只有一个task在负责Binlog读取,其他task均空闲时,自动调优便会自动缩小Source的CU数和并发。开启自动调优只需要在作业运维页面,将自动调优的模式设置为Active模式。
说明 默认调低并发度的最小触发时间间隔为24小时。更多自动调优的参数和细节,请参见配置自动调优

类型映射

MySQL的CDC和Flink字段类型对应关系如下。
MySQL CDC字段类型 Flink字段类型
TINYINT TINYINT
SMALLINT SMALLINT
TINYINT UNSIGNED
INT INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT BIGINT
INT UNSIGNED
BIGINT UNSIGNED DECIMAL(20, 0)
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DOUBLE PRECISION
NUMERIC(p, s) DECIMAL(p, s)
DECIMAL(p, s)
BOOLEAN BOOLEAN
TINYINT(1)
DATE DATE
TIME [(p)] TIME [(p)] [WITHOUT TIMEZONE]
DATETIME [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)] TIMESTAMP [(p)]
TIMESTAMP [(p)] WITH LOCAL TIME ZONE
CHAR(n) STRING
VARCHAR(n)
TEXT
BINARY BYTES
VARBINARY
BLOB

常见问题