新版数据订阅支持使用0.11版本至2.0版本的Kafka客户端消费订阅数据,DTS为您提供了Kafka客户端Demo,本文将介绍该客户端的使用说明。

注意事项

  • 使用本文提供的Demo消费数据时,如果采用auto commit(自动提交),可能会因为数据还没被消费完就执行了提交操作,从而丢失部分数据,建议采用手动提交的方式以避免该问题。
    说明 如果发生故障没有提交成功,重启客户端后会从上一个记录的位点进行数据消费,期间会有部分重复数据,您需要手动过滤。
  • 数据以Avro序列化存储,详细格式请参见Record.avsc文档。
    警告 如果您使用的不是本文提供的Kafka客户端,在进行反序列化解析时,可能出现解析的数据有误,您需要自行验证数据的正确性。
  • 关于offsetForTimes接口,DTS的搜索单位为秒,原生Kafka的搜索单位为毫秒。

Kafka客户端运行流程说明

请下载Kafka客户端Demo代码。更多关于代码使用的详细介绍,请参见Demo中的Readme文档。

说明 如需使用Kafka客户端2.0版本,您需要修改 subscribe_example-master/javaimpl/pom.xml文件,将kafka客户端的版本号修改成2.0.0。
kafka2.0
表 1. 运行流程说明
步骤 相关目录或文件
1、使用原生的Kafka consumer从数据订阅通道中获取增量数据。 subscribe_example-master/javaimpl/src/main/java/recordgenerator/
2、将获取的增量数据镜像执行反序列化,并从中获取 前镜像后镜像 和其他属性。
警告
  • 如源实例为自建Oracle数据库,则为确保客户端成功消费订阅数据,并保证前后镜像完整性,您需要开启全列补偿日志,请提交工单进行开通。
  • 如源实例不为自建Oracle数据库,则DTS暂时不能保证前镜像的完整性,建议您对所获得的前镜像进行校验。
subscribe_example-master/javaimpl/src/main/java/boot/MysqlRecordPrinter.java
3、将反序列化后的数据中的dataTypeNumber字段转换为对应的MySQL/Oracle字段类型。 subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/

操作步骤

本文以IntelliJ IDEA软件(Community Edition 2018.1.4 Windows版本)为例,介绍如何运行该客户端消费订阅通道中的数据。

  1. 创建新版数据订阅通道,详情请参见创建RDS MySQL数据订阅通道(新版)创建PolarDB MySQL数据订阅通道创建Oracle数据订阅通道
  2. 创建一个或多个消费组,详情请参见新增消费组
  3. 下载Kafka客户端Demo代码,然后解压该文件。
  4. 打开IntelliJ IDEA软件,然后单击Open
    打开项目
  5. 在弹出的对话框中,定位至Kafka客户端Demo代码下载的目录,参照下图依次展开文件夹,找到项目对象模型文件:pom.xml
    打开项目文件
  6. 在弹出对话框中,选择Open as Project
  7. 在IntelliJ IDEA软件界面,依次展开文件夹,找到并双击打开Kafka客户端Demo文件:NotifyDemo.java
    打开kafka客户端demo文件
  8. 设置NotifyDemo.java文件中的各参数对应的值。
    设置参数值
    参数 说明 获取方式
    USER_NAME 消费组的账号。
    警告 如您未使用本文提供的客户端,请按照 <消费组的账号>-<消费组ID>的格式设置用户名(例如: dtstest-dtsae******bpv),否则无法正常连接。
    在DTS控制台单击目标订阅实例ID,然后单击数据消费,您可以获取到消费组ID和消费组的账号信息。
    说明 消费组账号的密码已在您新建消费组时指定。
    查看消费组和账号
    PASSWORD_NAME 该账号的密码。
    SID_NAME 消费组ID。
    GROUP_NAME 消费组名称,需保持和消费组ID相同(即本参数也填入消费组ID)。
    KAFKA_TOPIC 数据订阅通道的订阅Topic。 在DTS控制台单击目标订阅实例ID,在订阅配置页面,您可以获取到订阅Topic、网络地址及端口号信息。获取topic和网络信息
    KAFKA_BROKER_URL_NAME 数据订阅通道的网络地址及端口号信息。
    说明 如果您部署Kafka客户端所属的ECS实例与数据订阅通道属于经典网络或同一专有网络,建议通过内网地址进行数据订阅,网络延迟最小。
    INITIAL_CHECKPOINT_NAME 消费的数据时间点,格式为Unix时间戳,例如1592269238。
    说明 您需要自行保存时间点信息,以便:
    • 当业务程序中断后,传入已消费的数据时间点继续消费数据,防止数据丢失。
    • 在订阅客户端启动时,传入所需的消费位点,调整订阅位点,实现按需消费数据。
    消费的数据时间点必须在订阅实例的数据范围(如图示)之内,并需转化为Unix时间戳。数据范围
    说明
    • 数据范围查看方式,请参见查看订阅数据
    • Unix时间戳转换工具可用搜索引擎获取。
    USE_CONFIG_CHECKPOINT_NAME 默认取值为true,即强制使用指定的数据时间点来消费数据,避免丢失已接收到的但未处理的数据。
    SUBSCRIBE_MODE_NAME 一个消费组下支持同时启动两个Kafka客户端实现灾备,如需实现该功能,请部署两个客户端并将该参数的值设置为subscribe

    默认值为assign,即不使用该功能,只部署一个客户端。

  9. 在IntelliJ IDEA软件界面的顶部,选择Run > Run运行该客户端。
    说明 首次运行时,软件需要一定时间自动加载相关依赖包并完成安装。

执行结果

运行结果如下图所示,该客户端可正常订阅到源库的数据变更信息。

Kafka客户端订阅结果

您也可以去除NotifyDemo.java文件中的打印日志详情的注释(即删除第25行//log.info(ret);中的//),然后再次运行该客户端即可查看详细的数据变更信息。

kafka客户端运行的详细结果

常见问题

  • Q:为什么需要自行记录客户端的消费位点?

    A:由于DTS记录的消费位点是接收到Kafka消费客户端执行commit操作的时间点,可能与当前实际消费到的时间点存在一定的时间差。当业务程序或Kafka消费客户端异常中断后,您可以传入自行记录的消费位点以继续消费,避免消费到重复的数据或缺失部分数据。

MySQL字段类型与dataTypeNumber数值的对应关系

MySQL字段类型 对应dataTypeNumber数值
MYSQL_TYPE_DECIMAL 0
MYSQL_TYPE_INT8 1
MYSQL_TYPE_INT16 2
MYSQL_TYPE_INT32 3
MYSQL_TYPE_FLOAT 4
MYSQL_TYPE_DOUBLE 5
MYSQL_TYPE_NULL 6
MYSQL_TYPE_TIMESTAMP 7
MYSQL_TYPE_INT64 8
MYSQL_TYPE_INT24 9
MYSQL_TYPE_DATE 10
MYSQL_TYPE_TIME 11
MYSQL_TYPE_DATETIME 12
MYSQL_TYPE_YEAR 13
MYSQL_TYPE_DATE_NEW 14
MYSQL_TYPE_VARCHAR 15
MYSQL_TYPE_BIT 16
MYSQL_TYPE_TIMESTAMP_NEW 17
MYSQL_TYPE_DATETIME_NEW 18
MYSQL_TYPE_TIME_NEW 19
MYSQL_TYPE_JSON 245
MYSQL_TYPE_DECIMAL_NEW 246
MYSQL_TYPE_ENUM 247
MYSQL_TYPE_SET 248
MYSQL_TYPE_TINY_BLOB 249
MYSQL_TYPE_MEDIUM_BLOB 250
MYSQL_TYPE_LONG_BLOB 251
MYSQL_TYPE_BLOB 252
MYSQL_TYPE_VAR_STRING 253
MYSQL_TYPE_STRING 254
MYSQL_TYPE_GEOMETRY 255

Oracle字段类型与dataTypeNumber数值的对应关系

Oracle字段类型 对应dataTypeNumber数值
VARCHAR2/NVARCHAR2 1
NUMBER/FLOAT 2
LONG 8
DATE 12
RAW 23
LONG_RAW 24
UNDEFINED 29
XMLTYPE 58
ROWID 69
CHAR、NCHAR 96
BINARY_FLOAT 100
BINARY_DOUBLE 101
CLOB/NCLOB 112
BLOB 113
BFILE 114
TIMESTAMP 180
TIMESTAMP_WITH_TIME_ZONE 181
INTERVAL_YEAR_TO_MONTH 182
INTERVAL_DAY_TO_SECOND 183
UROWID 208
TIMESTAMP_WITH_LOCAL_TIME_ZONE 231