本文为您介绍云原生数据仓库AnalyticDB MySQL版3.0结果表的DDL定义、WITH参数和类型映射。

什么是云原生数据仓库AnalyticDB MySQL版

云原生数据仓库AnalyticDB MySQL版是融合数据库、大数据技术于一体的云原生企业级数据仓库服务。AnalyticDB MySQL版支持高吞吐的数据实时增删改、低延时的实时分析和复杂ETL,兼容上下游生态工具,可用于构建企业级报表系统、数据仓库和数据服务引擎。

前提条件

使用限制

仅Flink计算引擎VVR 2.0.0及以上版本支持云原生数据仓库AnalyticDB MySQL版3.0 Connector。

DDL定义

CREATE TABLE adb_sink (
  id INT,
  num BIGINT
) WITH (
  'connector' = 'adb3.0',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'url' = 'jdbc:mysql://<yourNetworkAddress>:<PortId>/<yourDatabaseName>',
  'userName' = '<yourUsername>'
);

WITH参数

参数 注释说明 是否必选 备注
connector 结果表类型。 固定值为adb3.0
password 密码。 无。
tableName 表名。 无。
url JDBC连接地址。 云原生数据仓库AnalyticDB MySQL版数据库地址。示例:url='jdbc:mysql://databaseName****-cn-shenzhen-a.ads.aliyuncs.com:10014/databaseName'
说明
  • 云原生数据仓库AnalyticDB MySQL版数据库连接信息,请参见URL地址查询
  • databaseName为云原生数据仓库AnalyticDB MySQL版数据库名称。
username 用户名。 无。
maxRetryTimes 写入数据失败后,重试写入的次数。 默认值为3。
batchSize 一次批量写入的条数。 默认值为5000。
flushIntervalMs 清空缓存的时间间隔。 单位为毫秒,默认值为0,即默认不开启缓存flush。如果设置值为 2000,表示如果缓存中的数据在等待2秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。
replaceMode 是否采用replace into语法插入数据。 参数取值如下:
  • true(默认值):采用replace into语法插入数据。
  • false:采用insert into on duplicate key语法插入数据。
说明
  • 仅Flink计算引擎VVR 4.x及以上版本支持该参数。
  • 仅AnalyticDB MySQL 3.1.3.5及以上版本支持该参数。

类型映射

云原生数据仓库AnalyticDB MySQL版3.0字段类型 Flink字段类型
BOOLEAN BOOLEAN
TINYINT INT
SMALLINT INT
INT INT
BIGINT BIGINT
DOUBLE DOUBLE
VARCHAR VARCHAR
DATETIME TIMESTAMP
DATE DATE
说明 因为MySQL的JDBC Driver在获取数据时,由于精度问题,Flink可能会采用不同的数据类型进行数据承接。

代码示例

CREATE TEMPORARY TABLE datagen_source (
  `name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE adb_sink (
  `name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'adb3.0',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>'
);

INSERT INTO adb_sink
SELECT  * FROM datagen_source;

常见问题

为什么MySQL物理表(包含RDS MySQL和ADB)的INT UNSIGNED字段类型,在Flink SQL中要被声明为其他类型?