本文介绍如何在AnalyticDB MySQL控制台或DMS控制台上提交SparkSQL作业。

背景信息

Spark是专为大规模数据处理而设计的快速通用计算引擎,云原生数据仓库AnalyticDB MySQL版支持在内部集成Spark引擎(目前基于Apache Spark 3.0.1版本),支持通过SparkSQL直接读写和分析同一个AnalyticDB MySQL版集群数据库上的数据,在同一份数据存储上完成大规模数据的离线计算处理。您可以通过AnalyticDB MySQL控制台或DMS控制台来提交SparkSQL作业,来进行SparkSQL作业的开发和调试。

通过AnalyticDB MySQL控制台提交SparkSQL作业

AnalyticDB MySQL控制台上提交SparkSQL作业的详细操作步骤,请参见作业测试

SparkSQL作业提交后,您还可以通过AnalyticDB MySQL控制台进入Yarn集群UI详细监控UI界面查看SparkSQL作业的详请,详细操作步骤,请参见集群管理

Yarn集群UI详细监控UI界面,您可以看到如下信息:
  • Yarn集群UI
    您可以在该界面查看SparkSQL作业的ID、状态等信息。1
  • 详细监控UI
    您可以在该界面查看已完成的SparkSQL作业的ID开始时间、完成时间等信息。2
    您还可以单击App ID进入目标SparkSQL作业详情页查看详细的执行信息。例如,您可以在Executors页签下单击stdout,来查看目标SparkSQL的执行结果详情。3

通过DMS控制台提交SparkSQL作业

说明
  • AnalyticDB MySQL版集群在DMS控制台上默认使用羲和引擎执行SQL语句,因此在执行SparkSQL时,需要在SQL语句中增加/*+query_type=spark*/Hint或设置set query_type=spark来指定使用Spark引擎执行。
  • 使用Spark引擎时,DMS仅支持执行INSERT语句,不支持执行单独的SELECT语句。例如您可以在DMS使用Spark引擎执行INSERT INTO target_table SELECT col1, col2, col3 FROM source_table;,但不支持执行SELECT * FROM target_table;
  • 本文示例场景中,需要将source_table表中的数据通过执行SparkSQL作业导入至target_table表中,同时在target_table表中增加concat列。
  1. 使用DMS连接AnalyticDB MySQL版集群。详细操作步骤,请参见使用DMS连接
  2. (可选)若还未创建任何数据库,您可以在DMS的SQLConsole窗口中先创建一个数据库(本文示例中数据库名称为adb_demo),详细操作步骤,请参见创建数据库
  3. 在DMS的SQLConsole窗口中,执行CREATE TABLE语句创建目标表和源表,并执行INSERT INTO语句往表中插入测试数据。语句示例如下:
    CREATE TABLE source_table(k int, name varchar, PRIMARY KEY (k)) DISTRIBUTED BY HASH(k);
    CREATE TABLE target_table(k int, name varchar, hi varchar, PRIMARY KEY (k)) DISTRIBUTED BY HASH(k);
    INSERT INTO source_table VALUES(1, 'spark');
    INSERT INTO source_table VALUES(2, 'world');
    数据插入成功后,您可以执行如下语句分别查看source_tabletarget_table表中的当前数据:
    SELECT * FROM source_table;
    SELECT * FROM target_table;
  4. 通过Hint或SET参数指定使用Spark引擎执行SQL语句,示例如下:
    • Hint
      /*+query_type=spark*/INSERT INTO target_table SELECT k, name, concat('Hi ', name) FROM source_table;
    • SET
      说明 需要在DMS的SQLConsole窗口中同时选中SET语句和INSERT INTO语句来执行。
      SET query_type=spark;
      SET spark.driver.resourceSpec=small;
      SET spark.executor.resourceSpec=small;
      SET spark.executor.instances=1;
      INSERT INTO target_table SELECT k, name, concat('Hi ', name) FROM source_table;
      表 1. 参数说明
      参数 是否必填 说明
      query_type 必填 用于指定执行SQL语句的引擎类型,使用SparkSQL时必须选择Spark引擎。
      spark.driver.resourceSpec 选填 用于指定Spark Driver的资源规格,取值范围如下:
      • small(默认值):1 vcore 4 GB。
      • medium:2 vcore 8 GB。
      • large:4 vcore 16 GB。
      • xlarge:8 vcore 32 GB。
      spark.executor.resourceSpec 用于指定Spark Executor的资源规格,取值范围如下:
      • small(默认值):1 vcore 4 GB。
      • medium:2 vcore 8 GB。
      • large:4 vcore 16 GB。
      • xlarge:8 vcore 32 GB。
      spark.executor.instances 用于指定Spark Executor的数量,取值为正整数,数量上限受AnalyticDB MySQL版集群的整体资源限制。
      说明 系统会根据实际计算需求动态分配执行器资源,因此一般情况下无需设置该参数,除非您的SQL任务对资源消耗有特殊需求。
  5. 数据写入target_table成功后,您可以执行如下语句查看表中的数据进行验证:
    SELECT * FROM target_table;

相关文档

如何在SparkSQL中使用UDF