PolarDB-X(原DRDS升级版)是由阿里巴巴自主研发的云原生分布式数据库,融合分布式SQL引擎DRDS与分布式自研存储X-DB,基于云原生一体化架构设计,可支撑千万级并发规模及百PB级海量存储。本文主要介绍如何通过DLA
Serverless Spark访问云数据库PolarDB-X。
前提条件
- 已经开通对象存储OSS(Object Storage Service)服务。具体操作请参考开通OSS服务。
- 已经创建PolarDB-X数据库。具体请参考创建数据库。
- 在PolarDB-X数据库中已创建数据表,并插入数据。参考命令样例如下:
#建表语句:
CREATE TABLE `testdb_drds`.`test_table` (
`name` varchar(32) NULL,
`age` INT NULL,
`score` DOUBLE NULL
)
#插入数据语句:
INSERT INTO `testdb_drds`.`test_table` VALUES('aliyun01', 1001, 10.1);
INSERT INTO `testdb_drds`.`test_table` VALUES('aliyun02', 1002, 10.2);
INSERT INTO `testdb_drds`.`test_table` VALUES('aliyun03', 1003, 10.3);
INSERT INTO `testdb_drds`.`test_table` VALUES('aliyun04', 1004, 10.4);
INSERT INTO `testdb_drds`.`test_table` VALUES('aliyun05', 1005, 10.5);
- 准备DLA Spark访问PolarDB-X数据库所需的安全组ID和交换机ID。具体操作请参见配置数据源网络。
- DLA Spark访问PolarDB-X数据库所需的交换机IP,已添加到PolarDB-X数据库的白名单中。具体操作请参见设置白名单。
操作步骤
- 准备以下测试代码和依赖包来访问PolarDB-X,并将此测试代码和依赖包分别编译打包生成jar包上传至您的OSS。
测试代码示例:
package com.aliyun.spark
import java.util.Properties
import org.apache.spark.sql.SparkSession
object SparkOnPOLARDB {
def main(args: Array[String]): Unit = {
//获取POLARDB的url、database、tableName、登录POLARDB数据库的user和password。
val url = args(0)
val jdbcConnURL = s"jdbc:mysql://$url"
val database = args(1)
val tableName = args(2)
val user = args(3)
val password = args(4)
//Spark侧的表名。
var sparkTableName = args(5)
val sparkSession = SparkSession
.builder()
.appName("scala spark on POLARDB test")
.getOrCreate()
val driver = "com.mysql.cj.jdbc.Driver"
//如果存在的话就删除表。
sparkSession.sql(s"drop table if exists $sparkTableName")
//Sql方式,Spark会映射POLARDB中表的Schema。
val createCmd =
s"""CREATE TABLE ${sparkTableName} USING org.apache.spark.sql.jdbc
| options (
| driver '$driver',
| url '$jdbcConnURL',
| dbtable '$database.$tableName',
| user '$user',
| password '$password'
| )""".stripMargin
println(s"createCmd: \n $createCmd")
sparkSession.sql(createCmd)
val querySql = "select * from " + sparkTableName + " limit 1"
sparkSession.sql(querySql).show
//使用dataset API接口。
val connectionProperties = new Properties()
connectionProperties.put("driver", driver)
connectionProperties.put("user", user)
connectionProperties.put("password", password)
//读取数据。
var jdbcDf = sparkSession.read.jdbc(jdbcConnURL,
s"$database.$tableName",
connectionProperties)
jdbcDf.select("name", "age", "score").show()
val data =
Seq(
PersonPolardb("bill", 30, 170.5),
PersonPolardb("gate", 29, 200.3)
)
val dfWrite = sparkSession.createDataFrame(data)
//写入数据。
dfWrite
.write
.mode("append")
.jdbc(jdbcConnURL, s"$database.$tableName", connectionProperties)
jdbcDf.select("name", "age").show()
sparkSession.stop()
}
}
case class PersonPolardb(name: String, age: Int, score: Double)
PolarDB-X依赖的pom文件:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
</dependency>
- 登录Data Lake Analytics管理控制台。
- 在页面左上角,选择PolarDB-X实例所在的地域。
- 单击左侧导航栏中的。
- 在作业编辑页面,单击创建作业。
- 在创建作业模板页面,按照页面提示进行参数配置后,单击确定创建Spark作业。
- 单击Spark作业名,在Spark作业编辑框中输入以下作业内容,并按照以下参数说明进行参数值替换。保存并提交Spark作业。
{
"args": [
"xxx.drds.aliyuncs.com:3306", #POLARDB-X的内网地址和端口。
"testdb_drds", #POLARDB-X中的数据库名称。
"test_table", #POLARDB-X中的数据库表名。
"xxx1", #登录POLARDB-X数据库的用户名。
"xxx2", #登录POLARDB-X数据库的密码。
"spark_on_polardbx_table" #Spark中创建映射POLARDB-X表的表名。
],
"file": "oss://spark_test/jars/polardbx/spark-examples-0.0.1-SNAPSHOT.jar", #存放测试代码的OSS路径。
"name": "polardbx-test",
"jars": [
"oss://spark_test/jars/polardbx/mysql-connector-java-8.0.22.jar" #存放测试代码依赖包的OSS路径。
],
"className": "com.aliyun.spark.SparkOnPolarDBX",
"conf": {
"spark.driver.resourceSpec": "small", #表示driver的规格,有small、medium、large、xlarge之分。
"spark.executor.instances": 2, #表示executor的个数。
"spark.executor.resourceSpec": "small", #表示executor的规格,有small、medium、large、xlarge之分。
"spark.dla.eni.enable": "true", #开启访问用户VPC网络的权限。当您需要访问用户VPC网络内的数据时,需要开启此选项。
"spark.dla.eni.vswitch.id": "vsw-xxx", #可访问PolarDB-X的交换机id。
"spark.dla.eni.security.group.id": "sg-xxx" #可访问PolarDB-X的安全组id。
}
}
执行结果
作业运行成功后,在任务列表中单击,查看作业日志。出现如下日志说明作业运行成功: