本文为您介绍如何为实时计算自定义标量函数(UDF)搭建开发环境、编写业务代码以及上线。

说明 目前阿里云实时计算共享模式暂不支持自定义函数,仅独享模式支持自定义函数。

定义

用户定义的标量函数(UDF)将0个、1个或多个标量值映射到一个新的标量值。

开发环境搭建

请参见环境搭建

业务代码

UDF需要在ScalarFunction类中实现eval方法。open方法和close方法可选。
注意 UDF默认对于相同的输入会有相同的输出。如果UDF不能保证相同的输出(例如在UDF中调用外部服务,相同的输入值可能返回不同的结果),建议实现override isDeterministic()方法,返回false。否则在某些条件下,(例如UDF算子前移)会使输出结果不符合预期。
以Java为例,示例代码如下。
package com.hjc.test.blink.sql.udx;

import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;

public class StringLengthUdf extends ScalarFunction {
    // 可选,open方法可以不编写。
    // 如果编写open方法需要声明'import org.apache.flink.table.functions.FunctionContext;'。
    @Override
    public void open(FunctionContext context) {
        }
    public long eval(String a) {
        return a == null ? 0 : a.length();
    }
    public long eval(String b, String c) {
        return eval(b) + eval(c);
    }
    //可选,close方法可以不写。
    @Override
    public void close() {
        }
}

自定义函数上线

在您指定的Class中编写SQL语句,单击上线 > 启动,即可完成自定义函数的上线。自定义函数中SQL语句示例如下。
-- udf str.length()
CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';
create table sls_stream(
    a int,
    b int,
    c varchar
) with (
    type='sls',
    endPoint='<yourEndpoint>',
    accessKeyId='<yourAccessId>',
    accessKeySecret='<yourAccessSecret>',
    startTime = '2017-07-04 00:00:00',
    project='<yourProjectName>',
    logStore='<yourLogStoreName>',
    consumerGroup='consumerGroupTest1'
);
create table rds_output(
    id int,
    len bigint,
    content VARCHAR
) with (
    type='rds',
    url='yourDatabaseURL',
    tableName='<yourDatabaseTableName>',
    userName='<yourDatabaseUserName>',
    password='<yourDatabasePassword>'
);
insert into rds_output
select
    a,
    stringLengthUdf(c),
    c as content
from sls_stream

常见问题

Q:为什么实现了一个随机数生成函数,在运行时产生的值却是一样的?

A:如果自定义函数是无参的,并且没有声明是非确定性函数,编译期间可能会被优化成一个常量值。如果需要让函数变成非确定性函数 ,不被优化成常量,可以在自定义函数中实现override isDeterministic() 方法, 返回 false 即可。