新版数据订阅支持使用0.11版本至1.1版本的Kafka客户端消费订阅数据,DTS为您提供了Kafka客户端Demo,本文将介绍该客户端的使用说明。

前提条件

注意事项

  • 使用本文提供的Demo消费数据时,如果采用auto commit(自动提交),可能会因为数据还没被消费完就执行了提交操作,从而丢失部分数据,建议采用手动提交的方式以避免该问题。
    说明 如果发生故障没有提交成功,重启客户端后会从上一个记录的位点进行数据消费,期间会有部分重复数据,您需要手动过滤。
  • 数据以Avro序列化存储,详细格式请参见Record.avsc文档。
    警告 如果您使用的不是本文提供的Kafka客户端,在进行反序列化解析时,可能出现解析的数据有误,您需要自行验证数据的正确性。
  • 关于offsetFotTimes接口,DTS的搜索单位为秒,原生Kafka的搜索单位为毫秒。

Kafka客户端Demo下载及运行流程说明

请下载Kafka客户端Demo代码。更多关于代码使用的详细介绍,请参见Demo中的Readme文档。

表 1. 运行流程说明
步骤 相关目录或文件
1、使用原生的Kafka consumer从数据订阅通道中获取增量数据。 subscribe_example-master/javaimpl/src/main/java/recordgenerator/
2、将获取的增量数据镜像执行反序列化,并从中获取 前镜像后镜像 和其他属性。 subscribe_example-master/javaimpl/src/main/java/boot/MysqlRecordPrinter.java
3、将反序列化后的数据中的dataTypeNumber字段转换为对应的MySQL/Oracle字段类型。 subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/

操作步骤

本文以IntelliJ IDEA软件(Community Edition 2018.1.4 Windows版本)为例,介绍如何运行该客户端进行数据消费。

  1. 下载Kafka客户端Demo代码,然后解压该文件。
  2. 打开IntelliJ IDEA软件,然后单击Open
    打开项目
  3. 在弹出的对话框中,定位至Kafka客户端Demo代码下载的目录,参照下图依次展开文件夹,找到项目对象模型文件:pom.xml
    打开项目文件
  4. 在弹出对话框中,选择Open as Project
  5. 在IntelliJ IDEA软件界面,依次展开文件夹,找到并双击打开Kafka客户端Demo文件:NotifyDemo.java
    打开kafka客户端demo文件
  6. 设置NotifyDemo.java文件中的各参数对应的值。
    设置参数值
    参数 说明 获取方式
    USER_NAME 消费组的账号。
    警告 如果您没有使用本文提供的客户端,请按照<消费组的账号>-<消费组ID>的格式设置用户名(例如:dtstest-dtsae******bpv),否则无法正常连接。
    在DTS控制台单击目标订阅实例ID,然后单击数据消费,您可以获取到消费组ID和消费组的账号信息。
    说明 消费组账号的密码已在您新建消费组时指定。
    查看消费组和账号
    PASSWORD_NAME 该账号的密码。
    SID_NAME 消费组ID。
    GROUP_NAME 消费组名称,需保持和消费组ID相同(即本参数也填入消费组ID)。
    KAFKA_TOPIC 数据订阅通道的订阅Topic。 在DTS控制台单击目标订阅实例ID,在订阅配置页面,您可以获取到订阅Topic、网络地址及端口号信息。获取topic和网络信息
    KAFKA_BROKER_URL_NAME 数据订阅通道的网络地址及端口号信息。
    说明 如果您部署Kafka客户端所属的ECS实例与数据订阅通道属于经典网络或同一专有网络,建议通过内网地址进行数据订阅,网络延迟最小。
    INITIAL_CHECKPOINT_NAME 消费的数据时间点,格式为Unix时间戳。
    说明 您需要自行保存时间点信息,当业务程序中断后,您可以通过订阅客户端传入已消费的数据时间点来继续消费数据,防止数据丢失。同时,您还可以在订阅客户端启动时,传入所需的消费位点,对订阅位点进行调整,实现按需消费数据。
    首次订阅时,您可以查看订阅实例的数据范围,将业务所需的时间点转化为Unix时间戳,详情请参见常见问题
    USE_CONFIG_CHECKPOINT_NAME 默认取值为true,即强制使用指定的数据时间点来消费数据,避免丢失已接收到的但未处理的数据。
    SUBSCRIBE_MODE_NAME 一个消费组下支持同时启动两个Kafka客户端实现灾备,如需实现该功能,请部署两个客户端并将该参数的值设置为subscribe

    默认值为assign,即不使用该功能,只部署一个客户端。

  7. 在IntelliJ IDEA软件界面的顶部,选择Run > Run运行该客户端。
    说明 首次运行时,软件将自动加载相关依赖包并完成安装。

执行结果

运行结果如下图所示,该客户端可正常订阅到源库的数据变更信息。

Kafka客户端订阅结果

您也可以去除NotifyDemo.java文件中的打印日志详情的注释(即删除第25行//log.info(ret);中的//),然后再次运行该客户端即可查看详细的数据变更信息。

kafka客户端运行的详细结果

常见问题

  • Q:首次使用kafka客户端时,代码中INITIAL_CHECKPOINT_NAME的值应该填多少?
    A:您可以查看订阅实例的数据范围,将业务所需的时间点转化为Unix时间戳。如下图所示,您可以将数据范围的起始时间(2020年6月16日 09:00:38)转换为Unix时间戳,将其作为INITIAL_CHECKPOINT_NAME的值,即1592269238数据范围
  • Q:为什么需要自行记录客户端的消费位点?

    A:由于DTS记录的消费位点是接收到Kafka消费客户端执行commit操作的时间点,可能与当前实际消费到的时间点存在一定的时间差。当业务程序或Kafka消费客户端异常中断后,您可以传入自行记录的消费位点以继续消费,避免消费到重复的数据或缺失部分数据。

MySQL字段类型与dataTypeNumber数值的对应关系

MySQL字段类型 对应dataTypeNumber数值
MYSQL_TYPE_DECIMAL 0
MYSQL_TYPE_INT8 1
MYSQL_TYPE_INT16 2
MYSQL_TYPE_INT32 3
MYSQL_TYPE_FLOAT 4
MYSQL_TYPE_DOUBLE 5
MYSQL_TYPE_NULL 6
MYSQL_TYPE_TIMESTAMP 7
MYSQL_TYPE_INT64 8
MYSQL_TYPE_INT24 9
MYSQL_TYPE_DATE 10
MYSQL_TYPE_TIME 11
MYSQL_TYPE_DATETIME 12
MYSQL_TYPE_YEAR 13
MYSQL_TYPE_DATE_NEW 14
MYSQL_TYPE_VARCHAR 15
MYSQL_TYPE_BIT 16
MYSQL_TYPE_TIMESTAMP_NEW 17
MYSQL_TYPE_DATETIME_NEW 18
MYSQL_TYPE_TIME_NEW 19
MYSQL_TYPE_JSON 245
MYSQL_TYPE_DECIMAL_NEW 246
MYSQL_TYPE_ENUM 247
MYSQL_TYPE_SET 248
MYSQL_TYPE_TINY_BLOB 249
MYSQL_TYPE_MEDIUM_BLOB 250
MYSQL_TYPE_LONG_BLOB 251
MYSQL_TYPE_BLOB 252
MYSQL_TYPE_VAR_STRING 253
MYSQL_TYPE_STRING 254
MYSQL_TYPE_GEOMETRY 255

Oracle字段类型与dataTypeNumber数值的对应关系

Oracle字段类型 对应dataTypeNumber数值
VARCHAR2/NVARCHAR2 1
NUMBER/FLOAT 2
LONG 8
DATE 12
RAW 23
LONG_RAW 24
UNDEFINED 29
XMLTYPE 58
ROWID 69
CHAR、NCHAR 96
BINARY_FLOAT 100
BINARY_DOUBLE 101
CLOB/NCLOB 112
BLOB 113
BFILE 114
TIMESTAMP 180
TIMESTAMP_WITH_TIME_ZONE 181
INTERVAL_YEAR_TO_MONTH 182
INTERVAL_DAY_TO_SECOND 183
UROWID 208
TIMESTAMP_WITH_LOCAL_TIME_ZONE 231