全部产品
云市场

一键归档 RDS &POLARDB数据到 Spark

更新时间:2019-09-27 16:55:51

批量归档目前支持从 Mysql、POLARDB 等数据库把业务数据批量归档到 Spark。本文将以 MySQL 为例详细介绍如何使用本工具。

一、前置条件

购买 MySQL/POLARDB 数据库

如已经购买了 MySQL 数据库则本小结可以忽略,如未购买可以到 https://www.aliyun.com/product/rds/mysql 页面购买,比如本文购买好的 MySQL 数据库如下:1注意:购买的 MySQL 实例的 VPC 需要和 Spark 集群一致。

购买 X-Pack Spark 集群

X-Pack Spark 集群主要用于合并 WAL 数据,把 HBase 列数据转换成行数据,归档后的数据就存储在 X-Pack Spark 集群中。本文购买好的 X-Pack Spark 集群如下:1注意图中的 VPC ID 和 VSwitch ID,和上面 MySQL 需要一样,这样才可以连接。

创建测试表

在创建归档任务之前,我们先到 MySQL 里面创建好相关的测试库和表,如下:

  1. MySQL [(none)]> create database spark_archives_db;
  2. Query OK, 1 row affected (0.00 sec)
  3. MySQL [(none)]> use spark_archives_db;
  4. Database changed
  5. MySQL [spark_archives_db]> create table orders(
  6. -> id INT UNSIGNED AUTO_INCREMENT,
  7. -> name VARCHAR(100) NOT NULL,
  8. -> tele VARCHAR(11) NOT NULL,
  9. -> email VARCHAR(30) NOT NULL,
  10. -> order_no VARCHAR(19) NOT NULL,
  11. -> create_time TIMESTAMP,
  12. -> PRIMARY KEY (id)
  13. -> );
  14. Query OK, 0 rows affected (0.00 sec)

上面我们创建了一个 spark_archives_db 库以及 orders 测试表。我们现在往这张表里面插入一些测试数据:

  1. insert into orders values (1, 'zhangsan', '13188888888', 'zhangsan@aliyun.com', '2019091712234223242', '2019-09-17 12:23:42');
  2. insert into orders values (2, 'lisi', '13212345678', 'lisi@aliyun.com', '2019091714122244232', '2019-09-17 14:12:22');
  3. insert into orders values (3, 'wangwu', '13166666666', 'wangwu@aliyun.com', '2019091720113243213', '2019-09-17 20:11:32');
  4. insert into orders values (4, 'zhaoliu', '13288888888', 'zhaoliu@aliyun.com', '2019091809234223242', '2019-09-18 09:23:42');
  5. insert into orders values (5, 'liqi', '13121233453', 'liqi@aliyun.com', '2019091819113412345', '2019-09-18 19:11:34');
  6. insert into orders values (6, 'wuba', '18888888888', 'wuba@aliyun.com', '2019091822430167854', '2019-09-18 22:43:01');
  7. insert into orders values (7, 'spark', '16666666666', 'spark@aliyun.com', '2019091822554298765', '2019-09-18 22:55:42');
  8. insert into orders values (8, 'hbase', '15555555555', 'hbase@aliyun.com', '2019091823123423242', '2019-09-18 23:12:34');
  9. insert into orders values (9, 'cassandra', '13333333333', 'cassandra@aliyun.com', '2019091901024566666', '2019-09-19 01:02:45');
  10. insert into orders values (10, 'hadoop', '13434343434', 'hadoop@aliyun.com', '2019091917552788888', '2019-09-19 17:55:27');
  11. MySQL [spark_archives_db]> select * from orders;
  12. +----+-----------+-------------+----------------------+---------------------+---------------------+
  13. | id | name | tele | email | order_no | create_time |
  14. +----+-----------+-------------+----------------------+---------------------+---------------------+
  15. | 1 | zhangsan | 13188888888 | zhangsan@aliyun.com | 2019091712234223242 | 2019-09-17 12:23:42 |
  16. | 2 | lisi | 13212345678 | lisi@aliyun.com | 2019091714122244232 | 2019-09-17 14:12:22 |
  17. | 3 | wangwu | 13166666666 | wangwu@aliyun.com | 2019091720113243213 | 2019-09-17 20:11:32 |
  18. | 4 | zhaoliu | 13288888888 | zhaoliu@aliyun.com | 2019091809234223242 | 2019-09-18 09:23:42 |
  19. | 5 | liqi | 13121233453 | liqi@aliyun.com | 2019091819113412345 | 2019-09-18 19:11:34 |
  20. | 6 | wuba | 18888888888 | wuba@aliyun.com | 2019091822430167854 | 2019-09-18 22:43:01 |
  21. | 7 | spark | 16666666666 | spark@aliyun.com | 2019091822554298765 | 2019-09-18 22:55:42 |
  22. | 8 | hbase | 15555555555 | hbase@aliyun.com | 2019091823123423242 | 2019-09-18 23:12:34 |
  23. | 9 | cassandra | 13333333333 | cassandra@aliyun.com | 2019091901024566666 | 2019-09-19 01:02:45 |
  24. | 10 | hadoop | 13434343434 | hadoop@aliyun.com | 2019091917552788888 | 2019-09-19 17:55:27 |
  25. +----+-----------+-------------+----------------------+---------------------+---------------------+
  26. 10 rows in set (0.00 sec)

二、创建归档任务

到这里,我们已经准备好所有的测试环境,现在我们可以到数据工作台创建相关的归档任务了,具体步骤如下:

  1. 到 HBase X-Pack 控制台
  2. 选择左边菜单里面的一键归档
  3. 切换到新的页面之后选择创建归档
  4. 在弹出的对话框里面选择批量归档
  5. 填写归档名称,这里填的是 mysql2spark_archive_test
  6. 点击确认

2

点击 确认 之后我们就进入到归档编辑页面中,里面我们可以填写相关的信息,具体如下:3

图中序号说明如下:

  1. 归档的 Spark 集群,对应的就是上面我们购买的 Spark 集群 ID;
  2. 数据源类型选择 MYSQL
  3. 数据源实例ID填写我们需要归档的表所在 MySQL 实例,也就是我们上面创建好的 MySQL 实例;
  4. 账号对应的是 MySQL 实例的用户名;
  5. 密码对应的是 MySQL 实例的密码;
  6. 归档配置里面对应的是我们归档类型的相关配置,目前批量归档的归档类型支持 一次全量、每天全量、每天增量。分别的含义为:

    • 一次全量:一次就将对应 MySQL 里面的全量数据归档到 Spark 相关表,这个一般是在初始化数据的时候会用到;
    • 每天全量:逻辑处理简单,适用于数据量不大的场景。即每天都会拉取全量数据,每个 Spark 分区表数据都是全量的;
    • 每天增量:根据增量字段,每天只拉取有更新或新增的数据。增量字段一般使用创建时间或者修改时间。一次全量 + 每天增量 模式可以把 MySQL 的数据按照 T+1 的形式同步到 Spark 分区表中。

    归档配置里面的分区字段 对应的就是每天增量数据存放到 Spark 对应表的分区名字;增量字段代表的是按照 MySQL 对应表中的那个字段来增量抽取数据,一般都会选择表的创建时间或者修改时间来作为增量字段;高级属性选项可以参照 https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html 文档里的说明;

  7. 源库名代表的是需要归档的 MYSQL 表所在的数据库名字;

  8. 源表名代表的是需要归档的 MYSQL 表的名字;
  9. Spark库名代表我们归档的数据存放到 Spark 数据仓库的数据库名字;
  10. Spark表名代表我们归档的数据存放到 Spark 数据仓库的表名字;
  11. 这里对应的就是需要归档的 MySQL 表的列和 Spark 归档表列的对应关系
  12. 当我们上面信息填写完之后就可以点击提交按钮,这时候我们就创建好一个归档任务。

点击完提交按钮之后,会在数据工作台里面的工作流创建相关的工作流,并且会在数据工作台里面的作业管理创建相关作业,具体如下:45

同时也会在 X-Pack Spark 里面创建 orders 表,如下:6

三、测试及数据查看

执行归档任务(这里一步主要可以用来首次的测试)

归档的正常逻辑是按照指定定时调度时间周期性的把数据ETL处理后写入到spark表,但是在测试阶段,可以写入数据,然后手动的运行一下作业看看效果。我们在上一步骤往 orders 表插入了一些测试数据,我们可以手动执行上面创建好的 mysql2spark_archive_test 归档任务,如下:7

我们进行测试的日期是 2019-09-19,当我们按照上面的配置来手动运行归档任务的时候,实际上是执行了 T+1 形式的归档,也就是归档了 2019-09-18 这天的数据。我们可以到运行记录里面查看归档的进度。

在 X-Pack Spark 里面查询归档的数据

等作业运行完成之后,我们就可以查到刚刚归档的数据(创建sql类型的交互式查询):

可以先执行

  1. show partitions orders

查看下生成的分区情况:11正如图中显示的,X-Pack Spark 正确的归档了 2019-09-18 这一天 MySQL 新增的数据。我们在查看一下里面的数据情况:12可以看到我们正确的将 2019-09-18 这天 MySQL 的数据归档到 Spark 数据仓库的 orders 表里面了。

mysql -> spark 类型映射

目前 Spark 支持以下 MySQL 的类型:

MySQL Spark
TINYINT(1), BOOL, BOOLEAN, BIT(1) BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
MEDIUMINT, INT, INTEGER INT
BIGINT BIGINT
DECIMAL DECIMAL
FLOAT FLOAT
DOUBLE DOUBLE
DATE DATE
DATETIME, TIMESTAMP TIMESTAMP
CHAR CHAR
VARCHAR VARCHAR
TEXT TEXT
BINARY BINARY