文档

概述

更新时间:

实时计算Flink版支持在Flink SQL作业中使用Java自定义函数,本文介绍Flink Java自定义函数的分类、参数传递及调用注意事项。

注意事项

  • 为了避免JAR包依赖冲突,在开发自定义函数时您需要注意以下几点:

    • SQL开发页面选择的Flink版本,请和Pom依赖中的Flink版本保持一致。

    • Flink相关依赖,scope请使用provided,即在依赖中添加<scope>provided</scope>

    • 其他第三方依赖请采用Shade方式打包,Shade打包详情请参见Apache Maven Shade Plugin

    Flink依赖冲突问题,详情请参见如何解决Flink依赖冲突问题?

  • 为避免UDF在SQL作业文本里被频繁调用导致超时的情况,推荐您将UDF的JAR包作为依赖文件上传,并且通过CRETATE TEMPORARY FUNCTION语法在作业中声明函数,例如CREATE TEMPORARY FUNCTION 'GetJson' AS 'com.soybean.function.udf.MyGetJsonObjectFunction';

自定义函数分类

Flink支持以下3类自定义函数。

分类

描述

UDSF(User Defined Scalar Function)

用户自定义标量值函数,将0个、1个或多个标量值映射到一个新的标量值。其输入与输出是一对一的关系,即读入一行数据,写出一条输出值。详情请参见自定义标量函数(UDSF)

UDAF(User Defined Aggregation Function)

自定义聚合函数,将多条记录聚合成1条记录。其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。详情请参见自定义聚合函数(UDAF)

UDTF(User Defined Table-valued Function)

自定义表值函数,将0个、1个或多个标量值作为输入参数(可以是变长参数)。与自定义的标量函数类似,但与标量函数不同。表值函数可以返回任意数量的行作为输出,而不仅是1个值。返回的行可以由1个或多个列组成。调用一次函数输出多行或多列数据。详情请参见自定义表值函数(UDTF)

自定义函数注册

自定义函数参数传递

您可以在Flink开发控制台配置自定义函数中的参数并在UDF代码中使用。这样,后续可以直接在控制台上修改参数值,实现快速修改UDF参数值的目的。

自定义函数中提供了可选的open(FunctionContext context)方法,FunctionContext具备参数传递功能,自定义配置项通过此对象来传递。具体步骤如下:

  1. 在Flink开发控制台作业运维页面部署详情页签运行参数配置其他配置中,添加pipeline.global-job-parameters配置项,代码示例如下。

    pipeline.global-job-parameters: | 
      'k1:{hi,hello}',
      'k2:"str:ing,str:ing"',
      'k3:"str""ing,str:ing"'

    FunctionContext#getJobParameter只能获取pipeline.global-job-parameters这一配置项的值。因此需要将UDF用到的所有配置项全部写入到pipeline.global-job-parameters中。pipeline.global-job-parameters配置项填写的具体操作步骤如下。

    步骤

    动作

    具体操作

    示例

    步骤1

    定义key-value。

    将key和value之间通过冒号(:)分隔,并将每一对key-value用单引号(')包围起来。

    说明
    • 如果key或value中含有半角冒号(:),则需要用双引号(")将key或value包围起来。

    • 如果key或value中含有半角冒号(:)和双引号("),则需要通过连写两个双引号("")进行转义。

    • key = k1,value = {hi,hello},则定义为'k1:{hi,hello}'

    • key = k2,value = str:ing,str:ing,则定义为'k2:"str:ing,str:ing"'

    • key = k3,value = str"ing,str:ing,则定义为'k3:"str""ing,str:ing"'

    步骤2

    按照YAML文件的格式,形成最终的pipeline.global-job-parameters

    将不同的key-value放在不同的行里,并将所有key-value用逗号(,)连接。

    说明
    • YAML文件的多行字符串以竖线(| )开始。

    • YAML文件的多行字符串,每一行需要有相同的缩进。

    pipeline.global-job-parameters: | 
      'k1:{hi,hello}',
      'k2:"str:ing,str:ing"',
      'k3:"str""ing,str:ing"'
  2. 在自定义函数代码中,通过FunctionContext#getJobParameter获取map的各项内容。

    代码示例如下。

    context.getJobParameter("k1", null); // 获得字符串 {hi,hello}。
    context.getJobParameter("k2", null); // 获得字符串 str:ing,str:ing。
    context.getJobParameter("k3", null); // 获得字符串 str"ing,str:ing。
    context.getJobParameter("pipeline.global-job-parameters", null); // null,只能获得pipeline.global-job-parameters里定义的内容,而不能获得任意的作业配置项。

相关文档

  • 本页导读 (1)
文档反馈