本文介绍如何将开源Flink中的数据写入AnalyticDB MySQL版集群。

前提条件

  • 下载Flink驱动,并将其部署到Flink所有节点的${flink部署目录}/lib目录下。您可以根据Flink版本下载对应的驱动:

    如需其他版本的驱动,请前往JDBC SQL Connector页面下载。

  • 下载MySQL驱动,并将其部署到Flink所有节点的${flink部署目录}/lib目录下。
    说明 MySQL驱动版本需为5.1.40或以上,请前往MySQL驱动下载页面下载。
  • 部署所有的JAR包后请重启Flink集群。启动方式,请参见Start a Cluster
  • 已在目标AnalyticDB MySQL版集群中创建数据库和数据表,用于保存需要写入的数据。数据库和数据表的创建方法,请参见CREATE DATABASECREATE TABLE
    说明
    • 本文示例中创建的数据库名称为tpch,建库语句如下:
      CREATE DATABASE IF NOT EXISTS tpch;
    • 本文示例中创建的数据表名为person,建表语句如下:
      CREATE TABLE IF NOT EXISTS person(user_id string, user_name string, age int);
  • 如果您的AnalyticDB MySQL集群是弹性模式,您需要在集群信息页面的网络信息区域,打开启用ENI网络的开关。启用ENI网络

注意事项

流程介绍

说明 本文示例以CSV格式的文件作为输入源介绍数据写入流程。
步骤 说明
步骤一:数据准备 创建一个新的CSV文件并在文件中写入源数据,然后将新文件部署至Flink所有节点的/root下。
步骤二:数据写入 通过SQL语句在Flink中创建源表和结果表,并通过源表和结果表将数据写入AnalyticDB MySQL中。
步骤三:数据验证 登录AnalyticDB MySQL目标数据库,来查看并验证源数据是否成功导入。

步骤一:数据准备

  1. 在其中一个Flink节点的root目录下,执行vim /root/data.csv命令来创建一个名为data.csv的CSV文件。
    文件中包含的数据如下(您可以多复制几行相同的数据来增加写入的数据量):
    0,json00,20
    1,json01,21
    2,json02,22
    3,json03,23
    4,json04,24
    5,json05,25
    6,json06,26
    7,json07,27
    8,json08,28
    9,json09,29
  2. 文件创建完成后,将其部署至Flink其他节点的/root目录下。

步骤二:数据写入

  1. 启动并运行Flink SQL程序。详细操作步骤,请参见Starting the SQL Client CLI
  2. 创建一张名为csv_person的源表,语句如下:
    CREATE TABLE if not exists csv_person (
      `user_id` STRING,
      `user_name` STRING,
      `age` INT
    ) WITH (
      'connector' = 'filesystem',
      'path' = 'file:///root/data.csv',
      'format' = 'csv',
      'csv.ignore-parse-errors' = 'true',
      'csv.allow-comments' = 'true'
    );
    说明
    • 源表中的列名和数据类型需与AnalyticDB MySQL版中目标表的列名和数据类型保持一致。
    • 建表语句中填写的pathdata.csv的本地路径(Flink各个节点的路径均需一致)。如果您的data.csv文件不在本地,请根据实际情况填写正确的路径。

      关于建表语句中的其他参数说明,请参见FileSystem SQL Connector

  3. 创建一张名为mysql_person的结果表,语句如下:
    CREATE TABLE mysql_person (
      user_id String,
      user_name String,
      age INT
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true',
      'table-name' = '<table_name>',
      'username' = '<username>',
      'password' = '<password>',
      'sink.buffer-flush.max-rows' = '10',
      'sink.buffer-flush.interval' = '1s'
      );
    说明
    • 结果表中的列名和数据类型需与AnalyticDB MySQL版中目标表的列名和数据类型保持一致。
    • 下表仅列举了连接AnalyticDB MySQL版集群时的必填配置项,关于选填配置项的信息,请参见Connector Options
    必填配置项 说明
    connector 指定Flink使用的连接器类型,选择jdbc
    url AnalyticDB MySQL版集群的JDBC URL。

    格式:jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true',其中:

    • endpoint:目标AnalyticDB MySQL版集群的连接地址。
      说明 如果需要使用公网地址连接集群,您需要先申请公网地址,申请方法,请参见申请公网地址
    • db_nameAnalyticDB MySQL版中的目标数据库名。
    • useServerPrepStmts=false&rewriteBatchedStatements=true:批量写入数据至AnalyticDB MySQL版的必填配置,用于提高写入性能,以及降低对AnalyticDB MySQL版集群的压力。

    示例:jdbc:mysql://am-**********.ads.aliyuncs.com:3306/tpch?useServerPrepStmts=false&rewriteBatchedStatements=true

    table-name AnalyticDB MySQL版中的目标表名,用于存储写入的数据。本文示例中目标表名为person
    username AnalyticDB MySQL版中具有写入权限的数据库账号名。
    说明
    • 您可以通过SHOW GRANTS查看当前账号所拥有的权限。
    • 您可以通过GRANT语句为目标账号授予权限。
    password AnalyticDB MySQL版中具有写入权限的数据库账号密码。
    sink.buffer-flush.max-rows 从Flink写入数据至AnalyticDB MySQL版时,一次批量写入的最大行数。Flink会接收实时数据,当接收到的数据行数达到最大写入行数后,再将数据批量写入AnalyticDB MySQL版集群。可选取值如下:
    • 0:最大行数为0时,批量写入数据功能仅考虑sink.buffer-flush.interval配置,即只要满足最大间隔时间就会开始批量写入。
    • 具体的行数,例如10002000等。
    说明 不建议将该参数设置为0。取值为0不仅会导致写入性能变差,也会导致AnalyticDB MySQL版集群执行并发查询时的压力变大。
    sink.buffer-flush.max-rowssink.buffer-flush.interval配置均不为0时,批量写入功能生效规则如下:
    • 若Flink接收到的数据量已达到sink.buffer-flush.max-rows所设的值,但最大时间间隔还未到达sink.buffer-flush.interval所设的值,那么Flink无需等待间隔期满,即可直接触发批量写入数据至AnalyticDB MySQL版
    • 若Flink接收到的数据量未达到sink.buffer-flush.max-rows所设的值,但间隔时间已达到sink.buffer-flush.interval所设的值,那么无论Flink接收了多少数据量,都直接触发批量写入数据至AnalyticDB MySQL版
    sink.buffer-flush.interval Flink批量写入数据至AnalyticDB MySQL版的最大间隔时间,即执行下一次批量写入数据前的最大等待时间,可选取值如下:
    • 0:时间间隔为0时,批量写入数据功能仅考虑sink.buffer-flush.max-rows配置,即只要Flink接收到的数据行数达到最大写入行数后就会开始批量写入。
    • 具体的时间间隔,例如1d1h1min1s1ms等。
    说明 不建议将该参数设置为0,避免在业务低谷期产生源数据较少的场景下,影响数据导入的及时性。
  4. 通过源表和结果表将数据写入AnalyticDB MySQL版集群中,语句如下:
    INSERT INTO mysql_person SELECT user_id, user_name, age FROM csv_person;

步骤三:数据验证

导入完成后,您可以登录AnalyticDB MySQL集群的目标库tpch,执行如下语句查看并验证源数据是否成功导入至目标表person中:
SELECT * FROM person;