本文介绍如何使用AnalyticDB MySQL湖仓版(3.0)Spark通过ENI网络访问消息队列Kafka版。
前提条件
- 已创建AnalyticDB MySQL湖仓版(3.0)集群。具体操作,请参见创建湖仓版(3.0)集群。
- 已创建Job型资源组。具体操作,请参见新建资源组。
- 已创建数据库账号。
- 如果您是通过阿里云账号访问,只需创建高权限账号。具体操作,请参见创建高权限账号。
- 如果是通过RAM用户访问,需要创建高权限账号和普通账号并且将RAM用户绑定到普通账号上。具体操作,请参见创建数据库账号和绑定或解绑RAM用户与数据库账号。
- 已创建与AnalyticDB MySQL湖仓版(3.0)集群位于同一地域的Kafka实例。具体操作,请参见公网和VPC接入。
- 已在Kafka实例中创建Topic和Group。具体操作,请参见创建资源。
- 已开通OSS服务,并创建与AnalyticDB MySQL湖仓版(3.0)集群位于相同地域的存储空间。具体操作,请参见开通OSS服务和创建存储空间。
准备工作
- 在消息队列Kafka版控制台的实例详情页面,获取Kafka实例的交换机ID。
- 在ECS管理控制台的安全组页面,搜索Kafka实例ID来获取安全组ID。
- 在消息队列Kafka版控制台的白名单管理页面,查看Kafka实例的白名单是否为交换机ID的网段。
操作步骤
- 分别下载与Kafka和AnalyticDB MySQL Spark实例版本对应的JAR包。下载链接,请参见Kafka-clients和Spark-sql-kafka-0-10。
- 在pom.xml文件的dependencies中添加依赖项。
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.2.2</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.1</version> </dependency>
- 编写
Spark Streaming
示例程序来读取Kafka中的消息,并进行编译打包。本文生成的JAR包名称为spark-example.jar
。package com.aliyun.spark.streaming import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object SparkKafka { def main(args: Array[String]): Unit = { if(args.length < 3){ System.err.println( """ |args0: groupId |args1: topicName |args2: bootstrapServers |""".stripMargin) System.exit(1) } val groupId = args(0) val topicName = args(1) val bootstrapServers = args(2) val sparkConf: SparkConf = new SparkConf() .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .setAppName("SparkKafkaSub") sparkConf.registerKryoClasses(Array(classOf[ConsumerRecord[_,_]])) val sparkSession = SparkSession .builder() .config(sparkConf) .getOrCreate() val df = sparkSession .readStream .format("kafka") //Kafka实例的域名接入点。 .option("kafka.bootstrap.servers", alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092) //Kafka实例的Topic名称。 .option("subscribe", kafka_test) //Kafka实例的Group ID。 .option("group.id", kafka_groupId) .load() val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .writeStream .outputMode("append") .format("console") .start() query.awaitTermination() } }
- 将下载的JAR包和
Spark Streaming
示例程序上传至OSS。具体操作,请参见上传文件。 - 进入Spark开发编辑器。
- 登录云原生数据仓库AnalyticDB MySQL控制台。
- 在页面左上角,选择集群所在地域。
- 在左侧导航栏,单击集群列表。
- 在湖仓版(3.0)页签下,单击目标集群ID。
- 在左侧导航栏,单击 。
- 在编辑器窗口上方,选择Job型资源组和Spark作业类型。本文以Batch类型为例。
- 在编辑器中执行以下作业内容。
{ "args": [ "kafka_groupId", "kafka_test", "alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092" ], "file": "oss://<bucket_name>/spark-example.jar", "jars": "oss://<bucket_name>/kafka-clients-2.8.1.jar,oss://<bucket_name>/spark-sql-kafka-0-10_2.12-3.2.0.jar", "name": "Kafka Example", "className": "com.aliyun.spark.streaming.SparkKafka", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small", "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****" } }
参数说明如下。参数名称 参数说明 args
Spark作业传入的参数,多个参数之间以英文逗号(,)分隔。 file
Spark作业主文件的存储位置。主文件是入口类所在的JAR包或者Python的入口执行文件。 说明 Spark作业主文件目前只支持存储在OSS中。jars
Spark作业依赖的JAR包,多个JAR包之间以英文逗号(,)分隔。 name
Spark作业名称。 className
Java或者Scala程序入口类。Python不需要指定入口类。 spark.adb.eni.enabled
是否开启ENI访问。使用湖仓版(3.0)Spark访问kafka数据源时,需要开启ENI访问。 spark.adb.eni.vswitchId
准备工作中获取的交换机ID。 spark.adb.eni.securityGroupId
准备工作中获取的安全组ID。 conf
其他参数与开源Spark中的配置项基本一致,参数格式为 key:value
形式,多个参数之间以英文逗号(,)分隔。与开源Spark用法不一致的配置参数及AnalyticDB MySQL特有的配置参数,请参见Conf配置参数。 - 单击立即执行。