AnalyticDB MySQL版支持在SparkSQL中使用用户自定义函数(User Defined Function,简称UDF),以满足系统内置函数无法实现的业务需要。本文介绍如何定义一个UDF函数并在SparkSQL中使用该函数。

前提条件

  • AnalyticDB MySQL版SparkSQL中使用的UDF函数需通过如下接口实现:
    • Spark原生UDF(即继承自org.apache.spark.sql.api.java.UDF[0-22])。
    • Hive UDF、UDTF或UDAF。
    • MaxCompute UDF或UDTF。
  • 需在本地开发工具(例如IDEA)中完成UDF函数开发,并将开发完成的函数打成JAR包。JAR包需满足如下要求:
    • JAR包中需包含UDF函数依赖的二方或三方包。
    • JAR包中不允许包含Hive或Spark等Spark计算框架中已有的JAR包,以避免JAR包冲突。

操作步骤

  1. 在开发工具(本文示例中使用的是IDEA开发工具)中创建一个Maven工程,并在pom.xml文件中添加如下依赖:
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.0.1</version>
        <scope>provided</scope>
    </dependency>
    5
  2. 在开发工具中定义UDF函数并将其打成JAR包。本文示例中需要开发的UDF函数名为hi,函数定义如下:
    package spark.udf
    import org.apache.spark.sql.api.java.UDF1;
    public class UdfHi implements UDF1<String, String> {
        @Override
        public String call(String s) throws Exception {
            return "Hi, " + s;
        }
    }
    1
  3. 将打好的JAR包通过AnalyticDB MySQL版控制台从本地上传至OSS目录。文件上传步骤,请参见资源管理
    说明 上传成功后,在目标资源右侧的 操作栏中,单击 复制路径,以便于在步骤4中将该路径粘贴至UDF函数定义信息的文本文件里。
  4. 新建一个名为_udfs的文本文件,文件中需包含UDF函数定义信息,用于自动注册到AnalyticDB MySQL版 Spark中,方便后续执行SparkSQL时能够调用该UDF函数。
    说明
    • UDF函数定义中包括如下信息:
      • <udf_name>:函数名。
      • <udf_class>:类名。
      • <jar_path>:JAR包的OSS路径,即步骤3中复制的OSS路径。

      文件格式为<udf_name> <udf_class> <jar_path>,三者间需以空格分隔。示例如下:

      hi spark.udf.UdfHi oss://your_oss_bucket/user_uploads/adb-spark-udf-1.0.0.0.jar
    • AnalyticDB MySQL版默认读取文件名为_udfs的UDF函数注册信息,因此在生成UDF函数定义文本文件时,若文件名后带有后缀(如.txt),上传至OSS前需将后缀删除。
    • 若您同时定义了多个UDF函数,每个UDF函数定义信息对应一行文本。
  5. 将生成的UDF函数定义文件通过AnalyticDB MySQL版控制台从本地上传至OSS目录。文件上传步骤,请参见资源管理

后续步骤

完成上述步骤后,您即可在提交SparkSQL作业时使用该UDF函数,使用方式和普通函数一样。

例如,您可以使用UDF函数,将通过DMS控制台提交SparkSQL作业步骤4中的INSERT INTO target_table SELECT k, name, concat('Hi ', name) FROM source_table;语句替换为如下语句:

INSERT INTO target_table SELECT k, name, hi(name) FROM source_table;