本文为您介绍如何搭建MaxCompute Spark开发环境。
前提条件
搭建开发环境之前,请确保您已经安装如下软件:
- JDK 1.8
- Python 2.7
- Maven
- Git
下载MaxCompute Spark客户端
MaxCompute Spark发布包集成了MaxCompute认证功能。作为客户端工具,它通过Spark-Submit方式提交作业到MaxCompute项目中运行。目前提供了面向Spark1.x和Spark2.x的2个发布包:
- Spark-1.6.3:适用于Spark1.x应用的开发。
- Spark-2.3.0:适用于Spark2.x应用的开发。
设置环境变量
您需要在操作系统的命令行执行窗口配置环境变量,配置信息如下:
- JAVA_HOME设置。
# 推荐使用JDK 1.8,以linux系统为例。 export JAVA_HOME=/path/to/jdk export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$JAVA_HOME/bin:$PATH
- SPARK_HOME设置。
将SPARK_HOME参数替换为MaxCompute Spark客户端的解压路径。
# 以linux系统为例。 export SPARK_HOME=/path/to/spark-x.x.x-public export PATH=$SPARK_HOME/bin:$PATH
- PySpark的用户请安装Python 2.7版本,并设置PATH。
# 以linux系统为例。 export PATH=/path/to/python/bin/:$PATH
配置spark-defaults.conf
第一次使用MaxCompute Spark客户端时,需要在解压路径下配置spark-defaults.conf文件。
$SPARK_HOME/conf路径下存在spark-defaults.conf.template文件,您可以将其作为spark-defaults.conf的模版进行相关配置。
配置信息如下。
注意 请将spark-defaults.conf.template重命名为spark-defaults.conf后再进行相关配置。如果没有对文件进行重命名,将会导致配置无法生效。
# spark-defaults.conf
# 填写MaxCompute的项目空间名称以及账号信息。
spark.hadoop.odps.project.name = XXX
spark.hadoop.odps.access.id = XXX
spark.hadoop.odps.access.key = XXX
# 以下配置保持不变。
spark.hadoop.odps.end.point = http://service.<regionid>.maxcompute.aliyun.com/api # Spark客户端连接访问MaxCompute项目的Endpoint,您可以根据自己情况进行修改。详情请参见配置Endpoint。
spark.hadoop.odps.runtime.end.point = http://service.<regionid>.maxcompute.aliyun-inc.com/api # Spark运行环境Endpoint,所在Region的MaxCompute VPC网络的Endpoint。您可以根据自己情况进行修改。
spark.sql.catalogImplementation=odps
spark.hadoop.odps.task.major.version = cupid_v2
spark.hadoop.odps.cupid.container.image.enable = true
spark.hadoop.odps.cupid.container.vm.engine.type = hyper
spark.hadoop.odps.cupid.webproxy.endpoint = http://service.cn.maxcompute.aliyun-inc.com/api
spark.hadoop.odps.moye.trackurl.host = http://jobview.odps.aliyun.com
特殊场景和功能,需要开启一些其他的配置参数,详情请参见Spark配置详解。
准备项目工程
MaxCompute Spark提供了项目工程模版,建议您下载模版复制后直接在模版里开发。
说明 模版工程里的关于spark依赖的scope为provided,请不要更改,否则提交的作业无法正常运行。
- Spark-1.x 模板及编译
git clone https://github.com/aliyun/MaxCompute-Spark.git cd spark-1.x mvn clean package
- Spark-2.x 模板及编译
git clone https://github.com/aliyun/MaxCompute-Spark.git cd spark-2.x mvn clean package
配置依赖说明
- 配置访问MaxCompute表所需的依赖。
Spark作业访问MaxCompute表,需要依赖odps-spark-datasource模块。Maven配置举例如下。
<!-- Spark-2.x请依赖此模块 --> <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>odps-spark-datasource_2.11</artifactId> <version>3.3.8-public</version> </dependency> <!-- Spark-1.x请依赖此模块 --> <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>odps-spark-datasource_2.10</artifactId> <version>3.3.8-public</version> </dependency>
- 配置访问OSS所需的依赖。
如果作业需要访问OSS,直接添加以下依赖即可。
<dependency> <groupId>com.aliyun.odps</groupId> <artifactId>hadoop-fs-oss</artifactId> <version>3.3.8-public</version> </dependency>
- Spark-2.x依赖配置
pom文件请参见Spark-2.x pom文件。
- Spark-1.x依赖配置
pom文件请参见Spark-1.x pom文件。
引用外部文件
用户在开发过程中涉及到如下场景时,需要引用外部文件:
- 作业需要读取一些配置文件。
- 作业需要额外的资源包或第三方库。例如JAR包、Python库。
在实际操作中,您需要先上传文件后才可以引用文件,上传文件方式有以下两种,任选其中一种即可:
- 方式一:通过Spark参数上传文件
MaxCompute Spark支持Spark社区版原生的
--jars
、--py-files
、--files
、--archives
参数,您可以在提交作业时,通过这些参数上传文件,这些文件在作业运行时会被上传到用户的工作目录下。- 通过MaxCompute Spark客户端,使用Spark-Submit方式上传文件。
说明
--jars
:会将配置的JAR包上传至Driver和Executor的当前工作目录,多个文件用英文逗号(,)分隔。这些JAR包都会加入Driver和Executor的Classpath。在Spark作业中直接通过"./your_jar_name"
即可引用,与社区版Spark行为相同。--files
、--py-files
:会将配置的普通文件或Python文件上传至Driver和Executor的当前工作目录,多个文件用英文逗号(,)分隔。在Spark作业中直接通过"./your_file_name"
即可引用,与社区版Spark行为相同。--archives
:与社区版Spark行为略有不同,多个文件用英文逗号(,)分隔,配置方式为xxx#yyy
,会将配置的归档文件(例如.zip)解压到Driver和Executor的当前工作目录的子目录中。例如当配置方式为xx.zip#yy
时,应以"./yy/xx/"
引用到归档文件中的内容;当配置方式为xx.zip
时,应以"./xx.zip/xx/"
引用到归档文件中的内容。如果一定要将归档内容直接解压到当前目录,即直接引用"./xxx/"
,请使用下文中的spark.hadoop.odps.cupid.resources
参数进行配置。
- 通过DataWorks,添加作业需要的资源,操作详情请参见创建MaxCompute资源。
说明 DataWorks中上传资源限制最大为50 MB,如果需要使用更大的资源,您需要通过MaxCompute客户端将资源上传为MaxCompute资源,并将资源添加至数据开发面板。更多MaxCompute资源信息,请参见MaxCompute资源。
- 通过MaxCompute Spark客户端,使用Spark-Submit方式上传文件。
- 方式二:通过MaxCompute资源上传文件
MaxCompute Spark提供
spark.hadoop.odps.cupid.resources
参数,可以直接引用MaxCompute中的资源,这些资源在作业运行时会被上传到用户的工作目录下。使用方式如下:- 通过MaxCompute客户端将文件上传至MaxCompute项目。单个文件最大支持500 MB。
- 在Spark作业配置中添加
spark.hadoop.odps.cupid.resources
参数,指定Spark作业运行所需要的MaxCompute资源。格式为<projectname>.<resourcename>
,如果需要引用多个文件,需要用英文逗号(,)分隔。配置示例如下:
指定的资源将被下载到Driver和Executor的当前工作目录,资源下载到工作目录后默认名称是spark.hadoop.odps.cupid.resources=public.python-python-2.7-ucs4.zip,public.myjar.jar
<projectname>.<resourcename>
。此外,您可以在配置时通过<projectname>.<resourcename>:<newresourcename>
方式重命名资源名称。配置示例如下:spark.hadoop.odps.cupid.resources=public.myjar.jar:myjar.jar
注意 该配置项必须在spark-default.conf或DataWorks的配置项中进行配置才能生效,不能写在代码中。
通过上述两种方式将文件上传后,即可在代码中引用文件,文件读取示例如下:
val targetFile = "文件名"
val file = Source.fromFile(targetFile)
for (line <- file.getLines)
println(line)
file.close
SparkPi冒烟测试
完成以上的工作之后,执行冒烟测试,验证MaxCompute Spark是否可以端到端连通。以Spark-2.x为例,您可以提交一个SparkPi验证功能是否正常,提交命令如下。
# /path/to/MaxCompute-Spark请指向正确的编译出来后的应用程序的Jar包。
cd $SPARK_HOME
bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.SparkPi \
/path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
# 当看到以下日志表明冒烟作业成功。
19/06/11 11:57:30 INFO Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: 11.222.166.90
ApplicationMaster RPC port: 38965
queue: queue
start time: 1560225401092
final status: SUCCEEDED
IDEA本地执行注意事项
通常,本地调试成功后会在集群上执行代码。但是Spark可以支持在IDEA里以Local模式直接运行代码,运行时请注意以下几点:
- 代码需要手动设置spark.master。
val spark = SparkSession .builder() .appName("SparkPi") .config("spark.master", "local[4]") // 需要设置spark.master为local[N]才能直接运行,N为并发数。 .getOrCreate()
- 在IDEA里手动添加MaxCompute Spark客户端的相关依赖。
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency>
pom.xml中设置要求scope为provided,所以运行时会出现NoClassDefFoundError报错。
您可以按照以下方式手动将MaxCompute Spark下的Jars目录加入IDEA模板工程项目中,既可以保持Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/SparkSession$ at com.aliyun.odps.spark.examples.SparkPi$.main(SparkPi.scala:27) at com.aliyun.odps.spark.examples.Spa。r。kPi.main(SparkPi.scala) Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.SparkSession$ at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 2 more
scope=provided
,又能在IDEA里直接运行不报错:- 在IDEA中单击顶部菜单栏上的File,选中Project Structure…。
- 在Project Structure页面,单击左侧导航栏上的Modules。选择资源包,并单击资源包的Dependencies页签。
- 在资源包的Dependencies页签下,单击左下角的+,选择JARs or directories…添加MaxCompute Spark下的Jars目录。
- 在IDEA中单击顶部菜单栏上的File,选中Project Structure…。
- Local不能直接引用spark-defaults.conf里的配置,需要手动指定相关配置。
通过Spark-Submit方式提交作业,系统会读取spark-defaults.conf文件中的配置。通过Local模式提交作业,需要手动在代码里指定相关配置。例如,在Local模式下如果要通过Spark-Sql读取MaxCompute的表,配置如下。
val spark = SparkSession .builder() .appName("SparkPi") .config("spark.master", "local[4]") // 需设置spark.master为local[N]才能直接运行,N为并发数。 .config("spark.hadoop.odps.project.name", "****") .config("spark.hadoop.odps.access.id", "****") .config("spark.hadoop.odps.access.key", "****") .config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api") .config("spark.sql.catalogImplementation", "odps") .getOrCreate()
在文档使用中是否遇到以下问题
更多建议
匿名提交