全部产品
云市场

Spark对接RDS快速入门

更新时间:2019-07-04 20:55:06

简介

阿里云关系型数据库(Relational Database Service,简称RDS)是一种稳定可靠、可弹性伸缩的在线数据库服务。基于阿里云分布式文件系统和SSD盘高性能存储,RDS支持MySQL、SQL Server、PostgreSQL、PPAS(Postgre Plus Advanced Server,高度兼容Oracle数据库)和MariaDB引擎,并且提供了容灾、备份、恢复、监控、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。
本文主要介绍通过“数据工作台”使用Spark对接RDS MySQL版、RDS SQL Server版和RDS PostgreSQL版的方法。

前置条件

  1. Spark集群RDS在同一个VPC下。
    进入Spark分析集群页面,选择“数据库连接”>“连接信息”,查看Spark集群的VPC ID信息。如下图:
  2. Spark集群关联RDS集群

    • 对于RDS for MySQL可以直接spark的集群管理页面一键关联a

    • 对于其他RDS数据库需要手动把Spark集群ip端配置到RDS的白名单。
      首先在阿里云控制台进入“专有网络VPC”,找到Spark集群的VPC ID对应的IPv4网段。如下图:

      然后进入RDS对应数据库集群配置白名单,添加上述IPv4网段。如下图:
  3. RDS已创建表,本实例使用的RDS表名为:test_table,数据库名为:testdb。建表语句和内容如下(SQL语句可直接拷贝到DMS上执行):

  1. #MySQL 建表语句:
  2. CREATE TABLE `testdb`.`test_table` (
  3. `name` varchar(32) NULL,
  4. `age` INT NULL,
  5. `score` DOUBLE NULL
  6. )
  7. #MySQL 插入数据语句:
  8. INSERT INTO `testdb`.`test_table` VALUES('aliyun01', 1001, 10.1);
  9. INSERT INTO `testdb`.`test_table` VALUES('aliyun02', 1002, 10.2);
  10. INSERT INTO `testdb`.`test_table` VALUES('aliyun03', 1003, 10.3);
  11. INSERT INTO `testdb`.`test_table` VALUES('aliyun04', 1004, 10.4);
  12. INSERT INTO `testdb`.`test_table` VALUES('aliyun05', 1005, 10.5);
  13. #SQL Server 建表语句:
  14. CREATE TABLE [testdb].[dbo].[test_table] (
  15. [name] varchar(32) NULL,
  16. [age] int NULL,
  17. [score] float NULL
  18. )
  19. #SQL Server 插入数据语句:
  20. INSERT INTO [testdb].[dbo].[test_table] VALUES('aliyun01', 1001, 10.1);
  21. INSERT INTO [testdb].[dbo].[test_table] VALUES('aliyun02', 1002, 10.2);
  22. INSERT INTO [testdb].[dbo].[test_table] VALUES('aliyun03', 1003, 10.3);
  23. INSERT INTO [testdb].[dbo].[test_table] VALUES('aliyun04', 1004, 10.4);
  24. INSERT INTO [testdb].[dbo].[test_table] VALUES('aliyun05', 1005, 10.5);
  25. #PostgreSQL 建表语句:
  26. CREATE TABLE "testdb"."public"."test_table" (
  27. "name" varchar(32) NULL,
  28. "age" int NULL,
  29. "score" double precision NULL
  30. )
  31. #PostgreSQL 插入数据语句:
  32. INSERT INTO "testdb"."public"."test_table" VALUES('aliyun01', 1001, 10.1);
  33. INSERT INTO "testdb"."public"."test_table" VALUES('aliyun02', 1002, 10.2);
  34. INSERT INTO "testdb"."public"."test_table" VALUES('aliyun03', 1003, 10.3);
  35. INSERT INTO "testdb"."public"."test_table" VALUES('aliyun04', 1004, 10.4);
  36. INSERT INTO "testdb"."public"."test_table" VALUES('aliyun05', 1005, 10.5);

使用“数据工作台”>“作业管理”运行样例-RDS MySQL版

步骤 1:通过“资源管理”上传样例代码Jar包

下载样例代码jar包“spark-examples-0.0.1-SNAPSHOT.jar”以及依赖jar包到本地目录。

  1. wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_example/spark-examples-0.0.1-SNAPSHOT.jar
  2. wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_connectors/mysql-connector-java-5.1.34.jar
  3. wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_connectors/mysql-connector-java-8.0.16.jar

在“数据工作台”>“资源管理”中添加文件夹“spark_on_rds”。
上传jar包“spark-examples-0.0.1-SNAPSHOT.jar”,“mysql-connector-java-5.1.34.jar”,”mysql-connector-java-8.0.16.jar”到/spark_on_rds/文件夹。如下图:

步骤 2:通过“作业管理”创建并编辑作业内容

注意目前云上RDS MySQL主要由5.x和8.x两个系列,这两个系列需要使用的mysql-connector不同

  • 如果访问的MySQL为8.x系列,在“数据工作台”>“作业管理”中创建Spark作业,作业内容如下:

    1. --class com.aliyun.spark.rds.SparkOnRDSMySQLSparkSession
    2. --jars /spark_on_rds/mysql-connector-java-8.0.16.jar
    3. --driver-memory 1G
    4. --driver-cores 1
    5. --executor-cores 1
    6. --executor-memory 2G
    7. --num-executors 1
    8. --name spark_on_rds
    9. /spark_on_rds/spark-examples-0.0.1-SNAPSHOT.jar
    10. rm-xxx.mysql.rds.aliyuncs.com:3306 testdb test_table xxx1 xxx2 spark_on_rds01
  • 如果访问的MySQL为8.x系列,在“数据工作台”>“作业管理”中创建Spark作业,作业内容如下:

    1. --class com.aliyun.spark.rds.SparkOnRDSMySQLSparkSession
    2. --jars /spark_on_rds/mysql-connector-java-5.1.34.jar
    3. --driver-memory 1G
    4. --driver-cores 1
    5. --executor-cores 1
    6. --executor-memory 2G
    7. --num-executors 1
    8. --name spark_on_rds
    9. /spark_on_rds/spark-examples-0.0.1-SNAPSHOT.jar
    10. rm-xxx.mysql.rds.aliyuncs.com:3306 testdb test_table xxx1 xxx2 spark_on_rds01

作业内容参数说明:

参数 说明
rm-xxx.mysql.rds.aliyuncs.com:3306 RDS MySQL版的数据库连接的“内网地址”和“内网端口”。
testdb test_table 分别为RDS MySQL版的数据库名和表名。
xxx1 xxx2 分别为RDS MySQL版的数据库登陆的用户名和密码。
spark_on_rds01 Spark中创建映射RDS MySQL版数据库表的表名。

如下图:

步骤 3:通过“作业管理”运行作业并查看结果

作业编辑完成后点击“运行”,选择Spark集群。运行状态会在下侧显示,如图:

运行成功后点击“YarnUI”,翻到最后看到如下结果表明Spark读取RDS MySQL成功。如下:

  1. +--------+----+-----+
  2. | name| age|score|
  3. +--------+----+-----+
  4. |aliyun01|1001| 10.1|
  5. +--------+----+-----+

使用“交互式查询”运行样例

步骤 1:通过“会话管理”创建会话

在“数据工作台”>“会话管理”中点击“创建会话”。填写“会话名称”:spark_on_rds, 选择需要执行的Spark集群。这里同样需要确认是MySQL 8.x还是MySQL 5.x系列,选择不同版本的mysql-connector。

点击“确认”后,编辑“会话内容”,内容如下:

  1. --name spark_on_rds
  2. --driver-memory 1G
  3. --driver-cores 1
  4. --executor-cores 1
  5. --executor-memory 2G
  6. --num-executors 1
  7. --jars /spark_on_rds/mysql-connector-java-5.1.34.jar

内容编辑完成后,点击“运行”(此处点击运行是为了下一步骤中可以选择会话“spark_on_rds”)。

步骤 2:通过“交互式查询”创建查询

进入“数据工作台”>“交互式查询”,在“会话列表”下拉框中选择“spark_on_rds”,然后点击“新建查询”。填写“查询名称”:spark_on_rds,“查询类型”选择“sql”。如下图:

步骤 3:通过“交互式查询”编辑查询

查询输入如下内容:

  1. CREATE TABLE spark_on_rds101 USING org.apache.spark.sql.jdbc
  2. options (
  3. driver 'com.mysql.jdbc.Driver',
  4. url 'jdbc:mysql://rm-xxx.mysql.rds.aliyuncs.com:3306',
  5. dbtable 'testdb.test_table',
  6. user 'xxx1',
  7. password 'xxx2'
  8. )

如下图:

建表语句关键字说明:

关键字 说明
rm-xxx.mysql.rds.aliyuncs.com:3306 RDS MySQL版的数据库连接的“内网地址”和“内网端口”。
testdb test_table 分别为RDS MySQL版的数据库名和表名。
xxx1 xxx2 分别为RDS MySQL版的数据库登陆的用户名和密码。
spark_on_rds101 Spark中创建映射RDS MySQL版数据库表的表名。

步骤 4:通过“交互式查询”运行查询

编辑查询后,点击“运行”;运行成功后,在“查询内容”中输入“select * from spark_on_rds101”,然后点击“运行”验证结果,出现如下结果表示运行成功。如下图:

使用“数据工作台”>“作业管理”运行样例-RDS SQL Server版

步骤 1:通过“资源管理”上传样例代码Jar包

下载样例代码jar包“spark-examples-0.0.1-SNAPSHOT.jar”以及依赖jar包到本地目录。

  1. wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_example/spark-examples-0.0.1-SNAPSHOT.jar
  2. wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_connectors/sqljdbc42.jar

在“数据工作台”>“资源管理”中添加文件夹“spark_on_rds”。
上传jar包“spark-examples-0.0.1-SNAPSHOT.jar”以及“sqljdbc42.jar”到此文件夹。如下图:

步骤 2:通过“作业管理”创建并编辑作业内容

在“数据工作台”>“作业管理”中创建Spark作业,作业内容如下:

  1. --class com.aliyun.spark.rds.SparkOnRDSSqlServerSparkSession
  2. --jars /spark_on_rds/sqljdbc42.jar
  3. --driver-memory 1G
  4. --driver-cores 1
  5. --executor-cores 1
  6. --executor-memory 2G
  7. --num-executors 1
  8. --name spark_on_rds
  9. /spark_on_rds/spark-examples-0.0.1-SNAPSHOT.jar
  10. rm-xxx.sqlserver.rds.aliyuncs.com:1433 testdb dbo test_table xxx1 xxx2 spark_on_rds02

作业内容参数说明:

参数 说明
rm-xxx.sqlserver.rds.aliyuncs.com:1433 RDS SQL Server版的数据库连接的“内网地址”和“内网端口”。
testdb dbo test_table 分别为RDS SQL Server版的数据库名、schema(架构)名和表名。
xxx1 xxx2 分别为RDS SQL Server版的数据库登陆的用户名和密码。
spark_on_rds02 Spark中创建映射RDS SQL Server版数据库表的表名。

如下图:

步骤 3:通过“作业管理”运行作业并查看结果

作业编辑完成后点击“运行”,选择Spark集群。运行状态会在下侧显示,如图:

运行成功后点击“YarnUI”,翻到最后看到如下结果表明Spark读取RDS SQL Server成功。如下:

  1. +--------+----+-----+
  2. | name| age|score|
  3. +--------+----+-----+
  4. |aliyun01|1001| 10.1|
  5. +--------+----+-----+

使用“交互式查询”运行样例

步骤 1:通过“会话管理”创建会话

在“数据工作台”>“会话管理”中点击“创建会话”。填写“会话名称”:spark_on_rds, 选择需要执行的Spark集群。

点击“确认”后,编辑“会话内容”,内容如下:

  1. --name spark_on_rds
  2. --driver-memory 1G
  3. --driver-cores 1
  4. --executor-cores 1
  5. --executor-memory 2G
  6. --num-executors 1
  7. --jars /spark_on_rds/sqljdbc42.jar

内容编辑完成后,点击“运行”(此处点击运行是为了下一步骤中可以选择会话“spark_on_rds”)。

步骤 2:通过“交互式查询”创建查询

进入“数据工作台”>“交互式查询”,在“会话列表”下拉框中选择“spark_on_rds”,然后点击“新建查询”。填写“查询名称”:spark_on_rds,“查询类型”选择“sql”。如下图:

步骤 3:通过“交互式查询”编辑查询

查询输入如下内容:

  1. CREATE TABLE spark_on_rds101 USING org.apache.spark.sql.jdbc
  2. options (
  3. driver 'com.microsoft.sqlserver.jdbc.SQLServerDriver',
  4. url 'jdbc:sqlserver://rm-xxx.sqlserver.rds.aliyuncs.com:1433;DatabaseName=testdb',
  5. dbtable 'dbo.test_table',
  6. user 'xxx1',
  7. password 'xxx2'
  8. )

如下图:

建表语句关键字说明:

关键字 说明
rm-xxx.sqlserver.rds.aliyuncs.com:1433 RDS SQL Server版的数据库连接的“内网地址”和“内网端口”。
testdb dbo test_table 分别为RDS SQL Server版的数据库名、schema(架构)名和表名。
xxx1 xxx2 分别为RDS SQL Server版的数据库登陆的用户名和密码。
spark_on_rds101 Spark中创建映射RDS SQL Server版数据库表的表名。

步骤 4:通过“交互式查询”运行查询

编辑查询后,点击“运行”;运行成功后,在“查询内容”中输入“select * from spark_on_rds101”,然后点击“运行”验证结果,出现如下结果表示运行成功。如下图:

使用“数据工作台”>“作业管理”运行样例-RDS PostgreSQL版

步骤 1:通过“资源管理”上传样例代码Jar包

下载样例代码jar包“spark-examples-0.0.1-SNAPSHOT.jar”以及依赖jar包到本地目录。

  1. wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_example/spark-examples-0.0.1-SNAPSHOT.jar
  2. wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_connectors/postgresql-42.2.5.jar

在“数据工作台”>“资源管理”中添加文件夹“spark_on_rds”。
上传jar包“spark-examples-0.0.1-SNAPSHOT.jar”以及“postgresql-42.2.5.jar”到此文件夹。如下图:

步骤 2:通过“作业管理”创建并编辑作业内容

在“数据工作台”>“作业管理”中创建Spark作业,作业内容如下:

  1. --class com.aliyun.spark.rds.SparkOnRDSPostgreSQLSparkSession
  2. --jars /spark_on_rds/postgresql-42.2.5.jar
  3. --driver-memory 1G
  4. --driver-cores 1
  5. --executor-cores 1
  6. --executor-memory 2G
  7. --num-executors 1
  8. --name spark_on_rds
  9. /spark_on_rds/spark-examples-0.0.1-SNAPSHOT.jar
  10. rm-xxx.pg.rds.aliyuncs.com:3433 testdb public test_table xxx1 xxx2 spark_on_rds03

作业内容参数说明:

参数 说明
rm-xxx.pg.rds.aliyuncs.com:3433 RDS PostgreSQL版的数据库连接的“内网地址”和“内网端口”。
testdb public test_table 分别为RDS PostgreSQL版的据库名、schema(模式)名和表名。
xxx1 xxx2 分别为RDS PostgreSQL版的数据库登陆的用户名和密码。
spark_on_rds03 Spark中创建映射RDS PostgreSQL版数据库表的表名。

如下图:

步骤 3:通过“作业管理”运行作业并查看结果

作业编辑完成后点击“运行”,选择Spark集群。运行状态会在下侧显示,如图:

运行成功后点击“YarnUI”,翻到最后看到如下结果表明Spark读取RDS PostgreSQL成功。如下:

  1. +--------+----+-----+
  2. | name| age|score|
  3. +--------+----+-----+
  4. |aliyun01|1001| 10.1|
  5. +--------+----+-----+

使用“交互式查询”运行样例

步骤 1:通过“会话管理”创建会话

在“数据工作台”>“会话管理”中点击“创建会话”。填写“会话名称”:spark_on_rds, 选择需要执行的Spark集群。

点击“确认”后,编辑“会话内容”,内容如下:

  1. --name spark_on_rds
  2. --driver-memory 1G
  3. --driver-cores 1
  4. --executor-cores 1
  5. --executor-memory 2G
  6. --num-executors 1
  7. --jars /spark_on_rds/postgresql-42.2.5.jar

内容编辑完成后,点击“运行”(此处点击运行是为了下一步骤中可以选择会话“spark_on_rds”)。

步骤 2:通过“交互式查询”创建查询

进入“数据工作台”>“交互式查询”,在“会话列表”下拉框中选择“spark_on_rds”,然后点击“新建查询”。填写“查询名称”:spark_on_rds,“查询类型”选择“sql”。如下图:

步骤 3:通过“交互式查询”编辑查询

查询输入如下内容:

  1. CREATE TABLE spark_on_rds101 USING org.apache.spark.sql.jdbc
  2. options (
  3. driver 'org.postgresql.Driver',
  4. url 'jdbc:postgresql://rm-xxx.pg.rds.aliyuncs.com:3433/testdb',
  5. dbtable 'public.test_table',
  6. user 'xxx1',
  7. password 'xxx2'
  8. )

如下图:

建表语句关键字说明:

关键字 说明
rm-xxx.pg.rds.aliyuncs.com:3433 RDS PostgreSQL版的数据库连接的“内网地址”和“内网端口”。
testdb public test_table 分别为RDS PostgreSQL版的据库名、schema(模式)名和表名。
xxx1 xxx2 分别为RDS PostgreSQL版的数据库登陆的用户名和密码。
spark_on_rds03 Spark中创建映射RDS PostgreSQL版数据库表的表名。

步骤 4:通过“交互式查询”运行查询

编辑查询后,点击“运行”;运行成功后,在“查询内容”中输入“select * from spark_on_rds101”,然后点击“运行”验证结果,出现如下结果表示运行成功。如下图:

小结