文档

RDS MySQL同步至阿里云消息队列Kafka版

更新时间:
重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

数据传输服务DTS(Data Transmission Service)支持RDS MySQL实例同步至阿里云消息队列Kafka版

前提条件

  • 已创建源实例RDS MySQL和目标实例阿里云消息队列Kafka版

    说明
  • 目标实例阿里云消息队列Kafka版中已创建用于接收同步数据的Topic,请参见步骤一:创建Topic

  • 目标实例阿里云消息队列Kafka版的存储空间须大于源实例RDS MySQL占用的存储空间。

注意事项

说明

DTS不会将源数据库中的外键同步到目标数据库,因此源数据库的级联、删除等操作不会同步到目标数据库。

类型

说明

源库的环境要求

  • 待同步的表需具备主键或唯一约束,且字段具有唯一性,否则可能会导致目标数据库中出现重复数据。

  • 如同步对象为表级别,且需进行编辑(如表列名映射),则单次同步任务仅支持同步至多1000张表。当超出数量限制,任务提交后会显示请求报错,此时建议您拆分待同步的表,分批配置多个任务,或者配置整库的同步任务。

  • Binlog日志:

    • RDS MySQL默认已开启Binlog,您需确保binlog_row_image的值为full,否则预检查阶段提示报错,且无法成功启动数据同步任务。参数设置方法,请参见设置实例参数

      重要
      • 若源实例为自建MySQL,则需开启Binlog,并且设置binlog_format为row且binlog_row_image为full。

      • 若源实例自建MySQL是双主集群(两者互为主从),为保障DTS能获取全部的Binlog日志,则您需开启参数log_slave_updates。具体操作请参见为自建MySQL创建账号并设置binlog

    • 如为增量同步任务,DTS要求源数据库的本地Binlog日志保存24小时以上,如为全量同步和增量同步任务,DTS要求源数据库的本地Binlog日志至少保留7天以上(您可在全量同步完成后将Binlog保存时间设置为24小时以上),否则DTS可能因无法获取Binlog而导致任务失败,极端情况下甚至可能会导致数据不一致或丢失。由于您所设置的Binlog日志保存时间低于DTS要求的时间进而导致的问题,不在DTS的SLA保障范围内。如源为RDS MySQL,具体操作请参见管理本地日志(Binlog)

注意事项

  • 执行数据同步前需评估源库和目标库的性能,同时建议业务低峰期执行数据同步。否则全量数据初始化时将占用源库和目标库一定的读写资源,可能会导致数据库的负载上升。

  • 全量初始化会并发执行INSERT操作,导致目标数据库的表产生碎片,因此全量初始化完成后目标实例的表空间比源实例的表空间大。

  • 如同步对象为单个或多个表(非整库),那么在数据同步时,勿对源库的同步对象使用pt-online-schema-change等类似工具执行在线DDL变更,否则会导致同步失败。

    您可以使用数据管理DMS(Data Management)来执行在线DDL变更,请参见不锁表结构变更

  • 在DTS同步期间,不允许有除DTS外的数据写入目标库,否则会导致源库与目标库数据不一致。例如,有除DTS外的数据写入目标库时,使用DMS执行在线DDL变更,可能引起目标库数据丢失。

  • 在同步期间,若目标Kafka发生了扩容或缩容,您需要重启实例。

特殊情况

当源库为自建MySQL时

  • 在同步时,如果源库进行主备切换,将会导致同步任务失败。

  • 由于DTS的延迟时间是根据同步到目标库最后一条数据的时间戳和当前时间戳对比得出,源库长时间未执行DML操作可能导致延迟信息不准确。如果任务显示的延迟时间过大,您可以在源库执行一个DML操作来更新延迟信息。

    说明

    如果同步对象选择为整库,您还可以创建心跳表,心跳表每秒定期更新或者写入数据。

  • DTS会在源库定时执CREATE DATABASE IF NOT EXISTS `test`命令以推进Binlog位点。

费用说明

同步类型链路配置费用
库表结构同步和全量数据同步不收费。
增量数据同步收费,详情请参见计费概述

单条记录大小限制

由于写入Kafka单条记录的大小是10MB,因此当源端一行数据超过10MB时,DTS由于无法成功写入Kafka会导致任务中断。在该场景下建议您不要同步该表,如果一定要同步,也只能同步部分列,即配置DTS任务时,过滤掉这些大字段的记录。如果已经是在同步中的任务,则需要修改同步对象,将该表移出,再次点击修改同步对象,加入该表,并将该表的大字段列过滤,不做同步。

支持的同步架构

  • 一对一单向同步。

  • 一对多单向同步。

  • 多对一单向同步。

关于各类同步架构的介绍及注意事项,请参见数据同步拓扑介绍

支持同步的SQL操作

操作类型

SQL操作语句

DML

INSERT、UPDATE、DELETE

DDL

  • CREATE TABLE、ALTER TABLE、DROP TABLE、RENAME TABLE、TRUNCATE TABLE

  • CREATE VIEW、ALTER VIEW、DROP VIEW

  • CREATE PROCEDURE、ALTER PROCEDURE、DROP PROCEDURE

  • CREATE FUNCTION、DROP FUNCTION、CREATE TRIGGER、DROP TRIGGER

  • CREATE INDEX、DROP INDEX

操作步骤

说明

本文以新版DTS操作为例,与DMS操作有一些差异,具体操作请以DMS的实际界面为准。

  1. 进入同步任务的列表页面。

    1. 登录DMS数据管理服务

    2. 在顶部菜单栏中,单击集成与开发(DTS)

    3. 在左侧导航栏,选择数据传输(DTS) > 数据同步

    说明
  2. 同步任务右侧,选择同步实例所属地域。

    说明

    新版DTS同步任务列表页面,需要在页面左上角选择同步实例所属地域。

  3. 单击创建任务,配置源库及目标库信息。

    警告

    选择源和目标实例后,建议您仔细阅读页面上方显示的使用限制,否则可能会导致任务失败或数据不一致。

    类别

    配置

    说明

    任务名称

    DTS会自动生成一个任务名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。

    源库信息

    选择已有的DMS数据库实例(可选,如未创建可忽略此处选择,直接在下方配置数据库信息即可)

    您可以按实际需求,选择是否使用已有实例。

    • 如使用已有实例,下方数据库信息将自动填入,您无需重复输入。

    • 如不使用已有实例,您需要输入下方的数据库信息。

    数据库类型

    选择MySQL

    接入方式

    选择云实例

    实例地区

    选择源RDS MySQL实例所属地域。

    是否跨阿里云账号

    本示例为同一阿里云账号间的同步,选择不跨账号

    RDS实例ID

    选择源RDS MySQL实例ID。

    数据库账号

    填入源RDS MySQL实例的数据库账号,需具备待同步对象的读权限。

    数据库密码

    填入该数据库账号对应的密码。

    连接方式

    根据需求选择非加密连接SSL安全连接。如果设置为SSL安全连接,您需要提前开启RDS MySQL实例的SSL加密功能,详情请参见设置SSL加密

    目标库信息

    选择已有的DMS数据库实例(可选,如未创建可忽略此处选择,直接在下方配置数据库信息即可)

    填入该数据库账号对应的密码。

    数据库类型

    选择Kafka

    接入方式

    选择专线/VPN网关/智能网关

    说明

    由于DTS暂时不支持直接选择阿里云消息队列Kafka版,此处将其作为自建Kafka来配置数据同步。

    实例地区

    选择阿里云消息队列Kafka版实例所属地域。

    已和目标端数据库联通的VPC

    选择目标阿里云消息队列Kafka版实例所属的专有网络ID。

    说明

    您可以在阿里云消息队列Kafka版实例信息页签的配置信息区域,查看VPC ID

    IP地址 (不支持域名)

    填入阿里云消息队列Kafka版实例默认接入点中的任意一个IP地址。

    说明

    您可以在阿里云消息队列Kafka版实例信息页签的接入点信息区域,找到类型默认接入点域名接入点。将鼠标光标悬停在接入点信息上,在弹出的气泡中查看IP接入点

    端口

    填入阿里云消息队列Kafka版实例的服务端口,默认为9092

    说明

    填入任意一组IP地址和对应的端口即可。

    数据库账号

    仅开启ACL的阿里云消息队列Kafka版实例才需要填写数据库账号数据库密码

    说明
    • 关于开启ACL的信息,请参见SASL用户授权

    • 数据库账号:请在SASL用户管理页签,查看用户名

    • 数据库密码:请在SASL用户管理页签,单击目标数据库账号行的复制密码

    数据库密码

    Kafka版本

    根据Kafka实例版本,选择对应的版本信息。

    • 若您部署的阿里云消息队列Kafka版版本0.10.2,请选择0.10
    • 若您部署的阿里云消息队列Kafka版版本2.6.22.2.0,请选择1.0以上

    连接方式

    根据业务及安全需求,选择非加密连接SCRAM-SHA-256

    Topic

    在下拉框中选择具体的Topic。

    存储DDL的Topic

    在下拉框中选择具体的Topic,用于存储DDL信息。如果未指定,DDL信息默认存储在Topic选择的Topic中。

    是否使用Kafka Schema Registry

    Kafka Schema Registry是元数据提供服务层,提供了一个RESTful接口,用于存储和检索Avro Schema。

    • :不使用Kafka Schema Registry。

    • :使用Kafka Schema Registry。您需要输入Avro Schema在Kafka Schema Registry注册的URL或IP。

  4. 如果您的自建数据库具备白名单安全设置,您需要复制弹跳框中的DTS服务器IP地址,并加入自建数据库的白名单安全设置中。然后单击测试连接以进行下一步

    说明

    DTS服务器IP地址的更多说明,请参见迁移、同步或订阅本地数据库时需添加的IP白名单

  5. 配置任务对象及高级配置。

    配置

    说明

    同步类型

    固定选中增量同步。默认情况下,您还需要同时选中库表结构同步全量同步。预检查完成后,DTS会将源实例中待同步对象的全量数据在目标集群中初始化,作为后续增量同步数据的基线数据。

    目标已存在表的处理模式

    • 预检查并报错拦截:检查目标数据库中是否有同名的表。如果目标数据库中没有同名的表,则通过该检查项目;如果目标数据库中有同名的表,则在预检查阶段提示错误,数据同步任务不会被启动。

      说明

      如果目标库中同名的表不方便删除或重命名,您可以更改该表在目标库中的名称,请参见库表列名映射

    • 忽略报错并继续执行:跳过目标数据库中是否有同名表的检查项。

      警告

      选择为忽略报错并继续执行,可能导致数据不一致,给业务带来风险,例如:

      • 表结构一致的情况下,如在目标库遇到与源库主键或唯一键的值相同的记录:

        • 全量期间,DTS会保留目标集群中的该条记录,即源库中的该条记录不会同步至目标数据库中。

        • 增量期间,DTS不会保留目标集群中的该条记录,即源库中的该条记录会覆盖至目标数据库中。

      • 表结构不一致的情况下,可能会导致无法初始化数据、只能同步部分列的数据或同步失败,请谨慎操作。

    投递到Kafka的数据格式

    根据需求选择同步到Kafka实例中的数据存储格式。

    • 如果您选择DTS Avro,根据DTS Avro的schema定义进行数据解析,schema定义详情请参见DTS Avro的schema定义

    • 如果您选择Canal Json,Canal Json的参数说明和示例请参见Canal Json说明

    投递到Kafka Partition策略

    根据业务需求选择同步的策略,详细介绍请参见Kafka Partition迁移策略说明

    目标库对象名称大小写策略

    您可以配置目标实例中同步对象的库名、表名和列名的英文大小写策略。默认情况下选择DTS默认策略,您也可以选择与源库、目标库默认策略保持一致。更多信息,请参见目标库对象名称大小写策略

    源库对象

    源库对象框中单击待同步对象,然后单击向右将其移动至已选择对象框。

    说明

    同步对象的选择粒度为表。

    已选择对象

    • 如需更改单个同步对象在目标实例中的名称,请右击已选择对象中的同步对象,设置方式,请参见库表列名单个映射

    • 如需批量更改同步对象在目标实例中的名称,请单击已选择对象方框右上方的批量编辑,设置方式,请参见库表列名批量映射

    说明
    • 如需按库或表级别选择同步的SQL操作,请在已选择对象中右击待同步对象,并在弹出的对话框中选择所需同步的SQL操作。支持的操作,请参见支持同步的SQL操作

    • 如需设置WHERE条件过滤数据,请在已选择对象中右击待同步的表,在弹出的对话框中设置过滤条件。设置方法请参见通过SQL条件过滤任务数据

    • 如果使用了对象名映射功能,可能会导致依赖这个对象的其他对象同步失败。

  6. 可选:在已选择对象区域框中,将鼠标指针放置在目标Topic名上,然后右击Topic名后出现编辑,在弹出的对话框中设置源表在目标Kafka实例中的Topic名称、Topic的Partition数量、Partition Key等信息。

    设置新Topic,Partition的数量及Partition Key

    配置

    说明

    表名称

    设置源表同步到的目标Topic名称。

    警告

    设置的Topic名称必须在目标Kafka实例中真实存在,否则将导致数据同步失败。

    过滤条件

    • 过滤条件支持标准的SQL WHERE语句(仅支持=!=<>操作符),只有满足WHERE条件的数据才会被同步到目标Topic。本案例填入book_id>1

    • 过滤条件中如需使用引号,请使用单引号('),例如address in('hangzhou','shanghai')

    设置新建Topic的Partition数量

    本场景中,目标Kafka为消息队列Kafka实例,暂不支持该功能,无需配置本参数。

    Partition Key

    当您在上一步中选择投递到Kafka Partition策略按主键的hash值投递到不同Partition时,您可以配置本参数,指定单个或多个列作为Partition Key来计算Hash值,DTS将根据计算得到的Hash值将不同的行投递到目标Topic的各Partition中。

    说明

    取消选中全表同步后,才可以勾选Partition Key

  7. 单击下一步高级配置,进行高级配置。

    配置

    说明

    选择调度该任务的专属集群

    DTS默认将任务调度到共享集群上,您无需选择。您可以购买指定规格的专属集群来运行DTS同步任务,详情请参见什么是DTS专属集群

    设置告警

    是否设置告警,当同步失败或延迟超过阈值后,将通知告警联系人。

    源库、目标库无法连接后的重试时间

    在同步任务启动后,若源库或目标库连接失败则DTS会报错,并会立即进行持续的重试连接,默认持续重试时间为720分钟,您也可以在取值范围(10~1440分钟)内自定义重试时间,建议设置30分钟以上。如果DTS在设置的重试时间内重新连接上源库、目标库,同步任务将自动恢复。否则,同步任务将会失败。

    说明
    • 针对同源或者同目标的多个DTS实例,如DTS实例A和DTS实例B,设置网络重试时间时A设置30分钟,B设置60分钟,则重试时间以低的30分钟为准。

    • 由于连接重试期间,DTS将收取任务运行费用,建议您根据业务需要自定义重试时间,或者在源和目标库实例释放后尽快释放DTS实例。

    源库、目标库出现其他问题后的重试时间

    在同步任务启动后,若源库或目标库出现非连接性的其他问题(如DDL或DML执行异常),则DTS会报错并会立即进行持续的重试操作,默认持续重试时间为10分钟,您也可以在取值范围(1~1440分钟)内自定义重试时间,建议设置10分钟以上。如果DTS在设置的重试时间内相关操作执行成功,同步任务将自动恢复。否则,同步任务将会失败。

    重要

    源库、目标库出现其他问题后的重试时间的值需要小于源库、目标库无法连接后的重试时间的值。

    是否限制全量迁移速率

    在全量同步阶段,DTS将占用源库和目标库一定的读写资源,可能会导致数据库的负载上升。您可以根据实际情况,选择是否对全量同步任务进行限速设置(设置每秒查询源库的速率QPS每秒全量迁移的行数RPS每秒全量迁移的数据量(MB)BPS),以缓解目标库的压力。

    说明

    仅当同步类型选择了全量同步时才可以配置。

    是否限制增量同步速率

    您也可以根据实际情况,选择是否对增量同步任务进行限速设置(设置每秒增量同步的行数RPS每秒增量同步的数据量(MB)BPS),以缓解目标库的压力。

    环境标签

    您可以根据实际情况,选择用于标识实例的环境标签。本示例无需选择。

    配置ETL功能

    选择是否配置ETL功能。关于ETL的更多信息,请参见什么是ETL
  8. 保存任务并进行预检查。

    • 若您需要查看调用API接口配置该实例时的参数信息,请将鼠标光标移动至下一步保存任务并预检查按钮上,然后单击气泡中的预览OpenAPI参数

    • 若您无需查看或已完成查看API参数,请单击页面下方的下一步保存任务并预检查

    说明
    • 在同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动同步作业。

    • 如果预检查失败,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。

    • 如果预检查产生警告:

      • 对于不可以忽略的检查项,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。

      • 对于可以忽略无需修复的检查项,您可以依次单击点击确认告警详情确认屏蔽确定重新进行预检查,跳过告警检查项重新进行预检查。如果选择屏蔽告警检查项,可能会导致数据不一致等问题,给业务带来风险。

  9. 预检查通过率显示为100%时,单击下一步购买

  10. 购买页面,选择数据同步实例的计费方式、链路规格,详细说明请参见下表。

    类别

    参数

    说明

    信息配置

    计费方式

    • 预付费(包年包月):在新建实例时支付费用。适合长期需求,价格比按量付费更实惠,且购买时长越长,折扣越多。

    • 后付费(按量付费):按小时扣费。适合短期需求,用完可立即释放实例,节省费用。

    资源组配置

    实例所属的资源组,默认为default resource group。更多信息,请参见什么是资源管理

    链路规格

    DTS为您提供了不同性能的同步规格,同步链路规格的不同会影响同步速率,您可以根据业务场景进行选择。更多信息,请参见数据同步链路规格说明

    订购时长

    在预付费模式下,选择包年包月实例的时长和数量,包月可选择1~9个月,包年可选择1年、2年、3年和5年。

    说明

    该选项仅在付费类型为预付费时出现。

  11. 配置完成后,阅读并勾选《数据传输(按量付费)服务条款》

  12. 单击购买并启动,同步任务正式开始,您可在数据同步界面查看具体任务进度。

  • 本页导读 (1)
文档反馈