Flink SQL是ETL为了简化计算模型、降低使用门槛而设计的一套符合标准SQL语义的开发语言。相对于DAG模式(可视化拖拽方式),Flink SQL的功能更为强大,您可在Flink SQL的命令窗口中输入DAG模式暂不支持的语法。本文将介绍如何通过Flink SQL模式配置ETL任务。
背景信息
ETL功能处于灰度公测阶段,仅部分用户可以免费体验。如在体验过程中遇到问题,请加钉钉群沟通(钉钉群号:32326646)。
在配置ETL任务前,请您了解以下信息:
输入/维表指ETL的源库。
输出指经过ETL处理后写入的目标库。
数据库传输服务DTS为数据同步过程提供了流式的ETL功能,您可以在源库和目标库之间添加各种转换组件,实现丰富的转换操作,并将处理后的数据实时写入目标库。例如将两张流表做JOIN操作后形成一张大表,写入目标库;或者给源表新增一个字段,并为该字段配置函数进行赋值,源表该字段经过赋值转换后写入目标库。
前提条件
当前仅支持在华东1(杭州)、华东2(上海)、华北1(青岛)、华北2(北京)、华北3(张家口)、华南1(深圳)、华南3(广州)和中国香港创建ETL任务。
当前源库支持MySQL、PolarDB MySQL、Oracle、PostgreSQL、DB2 iSeries(AS/400)、DB2 LUW、DRDS(PolarDB-X 1.0)、PolarDB PostgreSQL、MariaDB、PolarDB Oracle、SQLServer、PolarDB-X 2.0。
当前目标库支持MySQL、PolarDB MySQL、Oracle、AnalyticDB MySQL 3.0、PolarDB PostgreSQL、PostgreSQL、DB2 LUW、DB2 iSeries(AS/400)、AnalyticDB PostgreSQL、SQLServer、MariaDB、DRDS(PolarDB-X 1.0)、PolarDB Oracle、Tablestore。
由于ETL功能暂不支持结构迁移,所以您需要根据转换条件在目标库侧完成对应表结构的创建。例如A表中包含字段1、字段2和字段3,B表中包含字段2、字段3和字段4,对两张表通过做JOIN操作后,需要输出字段2和字段3,则需要在目标库侧创建做JOIN操作后的C表,C表中包含字段2和字段3。
由于ETL功能暂不支持全量数据同步,所以您只能对增量数据进行实时转换。
注意事项
所有的源库和目标库属于同一地域。
所有流表均来源于同一实例。
数据库的库名和表名唯一。
- 当前暂不支持配置跨账号的任务。
操作步骤
进入ETL任务的列表页面。
登录DMS数据管理服务。
在顶部菜单栏中,单击集成与开发(DTS)。
在左侧导航栏,选择 。
说明您也可以在DTS控制台的ETL页面,单击去DMS创建流式ETL。
- 单击左上角的,在新增数据流对话框中,您需在数据流名称配置ETL任务名称,选择开发方式为FlinkSQL。
单击确认。
- 在数据加工页面的数据流信息部分,添加源库和目标库。
参数 说明 地区 选择数据源所在地域。 类型 选择库表类型。 - 配置源表信息时,如源表为 流表 ,则需选择流表;如源表为 维表 ,则需选择维表。
- 配置目标表信息时,则需选择输出。
数据库类型 选择源库或目标库的数据库类型。 实例 输入实例名称或实例ID,搜索并选择源和目标实例。 重要 您需要先在DMS中录入源实例和目标实例。录入方式,请参见实例管理。数据库 选择数据加工对象所属的源库或目标库。 物理表 选择数据加工对象所属的源表或目标表。 物理表别名 为源表或目标表设置精简易读的别名,便于ETL在运行SQL语句时定位至具体的表。 - 在数据加工页面的SQL命令窗口,添加用于配置ETL任务的SQL语句。本案例以如下SQL语句为例,配置ETL任务,将流表test_orders与维表product结合至目标表test_orders_new中。重要 SQL语句间需以英文分号(;)分割。
CREATE TABLE `etltest_test_orders` ( `order_id` BIGINT, `user_id` BIGINT, `product_id` BIGINT, `total_price` DECIMAL(15,2), `order_date` TIMESTAMP(6), `dts_etl_schema_db_table` STRING, `dts_etl_db_log_time` BIGINT, `pt` AS PROCTIME(), WATERMARK FOR `order_date` AS `order_date` - INTERVAL '5' SECOND ) WITH ( 'streamType'= 'append', 'alias'= 'test_orders', 'vertexType'= 'stream' ); CREATE TABLE `etltest_product` ( `product_id` BIGINT, `product_name` STRING, `product_price` DECIMAL(15,2) ) WITH ( 'alias'= 'product', 'vertexType'= 'lookup' ); CREATE VIEW `etltest_test_orders_JOIN_etltest_product` AS SELECT `etltest_test_orders`.`order_id` AS `order_id`, `etltest_test_orders`.`user_id` AS `user_id`, `etltest_test_orders`.`product_id` AS `product_id`, `etltest_test_orders`.`total_price` AS `total_price`, `etltest_test_orders`.`order_date` AS `order_date`, `etltest_test_orders`.`dts_etl_schema_db_table` AS `dts_etl_schema_db_table`, `etltest_test_orders`.`dts_etl_db_log_time` AS `dts_etl_db_log_time`, `etltest_product`.`product_id` AS `product_id_0001011101`, `etltest_product`.`product_name` AS `product_name`, `etltest_product`.`product_price` AS `product_price` FROM `etltest_test_orders` LEFT JOIN `etltest_product` FOR SYSTEM_TIME AS OF `etltest_test_orders`.`pt` ON etltest_test_orders.product_id = etltest_product.product_id ; CREATE TABLE `test_orders_new` ( `order_id` BIGINT, `user_id` BIGINT, `product_id` BIGINT, `total_price` DECIMAL(15,2), `order_date` TIMESTAMP(6), `product_name` STRING, `product_price` DECIMAL(15,2) ) WITH ( 'alias'= 'test_orders_new', 'vertexType'= 'sink' ); INSERT INTO `test_orders_new` ( `order_id`, `user_id`, `product_id`, `total_price`, `order_date`, `product_name`, `product_price` ) SELECT `etltest_test_orders_JOIN_etltest_product`.`order_id`, `etltest_test_orders_JOIN_etltest_product`.`user_id`, `etltest_test_orders_JOIN_etltest_product`.`product_id`, `etltest_test_orders_JOIN_etltest_product`.`total_price`, `etltest_test_orders_JOIN_etltest_product`.`order_date`, `etltest_test_orders_JOIN_etltest_product`.`product_name`, `etltest_test_orders_JOIN_etltest_product`.`product_price` FROM `etltest_test_orders_JOIN_etltest_product`;
类型 说明 源表和目标表信息 - 您需使用CREATE TABLE语句定义源表和目标表信息。
- SQL语句的WITH从句中可设置三个参数:streamType 、alias、vertexType 。其中流表必须设置以上三个参数,维表和输出仅需设置alias和vertexType 。
- streamType :
流类型。
- Upsert:Upsert流。动态表中的数据支持通过INSERT、UPDATE和DELETE操作修改,当转换为流时,会将INSERT和UPDATE操作编码为upsert message,将DELETE操作编码为delete message。说明 该编码方式要求动态表具有唯一键(可能是复合的)。
- append: Append-only流。动态表中的数据仅支持INSERT操作修改,当转换为流时仅需发送INSERT的数据。
- Upsert:Upsert流。动态表中的数据支持通过INSERT、UPDATE和DELETE操作修改,当转换为流时,会将INSERT和UPDATE操作编码为upsert message,将DELETE操作编码为delete message。
- alias:在步骤3配置源库和目标库时设置的物理表别名。
- streamType :
流类型。
- vertexType :表类型。
- stream:流表。
- lookup:维表。
- sink:目标表。
数据加工的计算逻辑 您需使用CREATE VIEW语句描述数据加工的计算逻辑。 加工后的目标表信息 您需使用INSERT INTO语句定义加工后的目标表信息。 - 配置完成源库和目标库信息,以及SQL语句后,单击生成 Flink SQL校验。说明
- 您也可以单击发布,直接执行校验和预检查。
- 如Flink SQL校验成功,您可单击,查看Flink SQL校验详情。
- 如Flink SQL校验失败,您可单击,根据提示信息修复SQL语句,并重新进行生成Flink SQL校验。
- Flink SQL校验成功后,单击发布进入预检查阶段。
- 预检查通过率显示为100%时,单击下一步购买(免费)。说明 如果预检查失败,请单击检查失败项后的查看详情,根据提示信息修复后,重新进行预检查。
- 在购买页面,选择链路规格并确认计算资源(CU)(公测期间,固定为2)。阅读并勾选数据传输(按量付费)服务条款和公测协议条款。说明 ETL功能公测中,每个用户可以免费创建并使用两个ETL实例。
- 单击购买并启动,ETL任务正式开始。
- 本页导读 (1)