本文为您介绍Flink自定义函数开发时的注意事项、自定义函数的分类、定义和参数传递方法。

注意事项

为了避免JAR包依赖冲突,在开发自定义函数时您需要注意以下几点:
  • 作业开发页面选择的Flink版本,请和Pom依赖中的Flink版本保持一致。
  • Flink相关依赖,scope请使用provided,即在依赖中添加<scope>provided</scope>
  • 其他第三方依赖请采用Shade方式打包,Shade打包详情请参见Apache Maven Shade Plugin

Flink依赖冲突问题,详情请参见如何解决Flink依赖冲突问题?

自定义函数分类

Flink支持以下3类自定义函数。
分类 描述
UDF(User Defined Scalar Function) 用户自定义标量值函数,将0个、1个或多个标量值映射到一个新的标量值。其输入与输出是一对一的关系,即读入一行数据,写出一条输出值。详情请参见自定义标量函数(UDF)
UDAF(User Defined Aggregation Function) 自定义聚合函数,将多条记录聚合成1条记录。其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。详情请参见自定义聚合函数(UDAF)
UDTF(User Defined Table-valued Function) 自定义表值函数,将0个、1个或多个标量值作为输入参数(可以是变长参数)。与自定义的标量函数类似,但与标量函数不同。表值函数可以返回任意数量的行作为输出,而不仅是1个值。返回的行可以由1个或多个列组成。调用一次函数输出多行或多列数据。详情请参见自定义表值函数(UDTF)

自定义函数参数传递

如果您的自定义函数中某些参数值后期需要修改,直接在UDF代码里修改会很麻烦。此时,您可以在Flink全托管控制台更多Flink配置项中配置该参数后,在UDF代码中使用该参数。后续您可以直接在Flink全托管控制台修改该参数值,从而达到快速修改UDF参数值的目的。

自定义函数中提供了可选的open(FunctionContext context)方法,FunctionContext具备参数传递功能,自定义配置项通过此对象来传递。自定义函数的参数传递操作步骤如下:
  1. 在作业开发页面右侧高级配置面板更多Flink配置中添加参数及取值。代码示例如下。
    maxJobCreationAttempts: value
  2. 在UDF代码中通过ConfigOptions.key()方式获取自定义函数参数。代码示例如下。
    public void open(FunctionContext context) throws Exception {
        LOG.info(String.format("maxJobCreationAttempts:%s%n",
        ConfigOptions.key("maxJobCreationAttempts").stringType().noDefaultValue()));
    }

自定义函数注册方法

自定义函数分为全局自定义函数和作业级自定义函数,注册方法详情如下所示:
  • 全局自定义函数
    1. 登录实时计算管理控制台
    2. Flink全托管页签,单击目标工作空间操作列下的控制台
    3. 在左侧导航栏上,单击作业开发
    4. 在页面左上角,单击添加图标。
    5. 上传自定义函数JAR文件。注册UDF
      您可以通过以下任何一种方式上传自定义函数JAR文件:
      • 上传文件:单击选择文件项右侧的选择文件后,选择您的目标自定义函数JAR文件。如果有依赖文件,单击依赖文件项右侧的选择文件,选择您的目标自定义函数JAR所依赖的文件。
      • 外部URL:输入外部URL地址。
      说明
      • 您的自定义函数JAR文件会被上传到您选择的OSS Bucket中的sql-artifacts目录下。此外,Flink开发控制台会解析自定义函数JAR文件中是否使用了Flink UDF、UDAF和UDTF接口的类,并自动提取类名,填充到Function Name字段中。
      • 对于Java类型的UDF,其依赖可以打包到自定义函数JAR包中,也可以通过依赖文件项进行上传。
      • 在自定义函数文件或者其依赖文件比较大时,推荐通过外部URL的方式进行上传。需要注意的是,如果外部URL是OSS Bucket地址,其依赖文件必须位于sql-artifacts/namespaces/{namespace}目录下。
    6. 单击确认

      在SQL编辑器页面左侧UDFs列表,您可以看到所有注册成功的UDF。

  • 作业级自定义函数
    因全局自定义函数(Catalog Function)在注册时,默认使用最新的Flink版本解析。如果您在作业中使用全局自定义函数,同时在高级配置选中了非最新的Flink版本,则有可能出现不兼容的问题。此时,您可以根据作业需要使用的引擎版本来实现UDF代码,并通过作业级自定义函数的方式解决此问题。作业级自定义函数的使用流程如下:
    1. 上传自定义函数JAR包。

      在左侧导航栏资源上传中,上传自定义函数JAR包。

    2. 在作业中指定作业级自定义函数。

      在作业附加依赖文件中,指定自定义函数JAR包。

    3. 注册自定义函数。
      通过CREATE TEMPORARY FUNCTION 语法注册作业级自定义函数。
      CREATE TEMPORARY FUNCTION MyScalarFunc AS 'com.test.MyScalarFunc';