本文为您介绍如何为实时计算自定义标量函数(UDF)搭建开发环境、编写业务代码以及上线。

说明 目前阿里云实时计算共享模式暂不支持自定义函数,仅独享模式支持自定义函数。

定义

用户定义的标量函数(UDF)将0个、1个或多个标量值映射到一个新的标量值。

开发环境搭建

请您参见环境搭建

编写业务逻辑代码

UDF需要在ScalarFunction类中实现eval方法。open方法和close方法可选。以Java为例,示例代码如下。

package com.hjc.test.blink.sql.udx;

import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;

public class StringLengthUdf extends ScalarFunction {
    // 可选,open方法可以不编写。
    // 如果编写open方法需要声明'import org.apache.flink.table.functions.FunctionContext;'。
    @Override
    public void open(FunctionContext context) {
        }
    public long eval(String a) {
        return a == null ? 0 : a.length();
    }
    public long eval(String b, String c) {
        return eval(b) + eval(c);
    }
    //可选,close方法可不写。
    @Override
    public void close() {
        }
}

上线

找到您指定的Class编写SQL语句后,点击上线,进入运维页面点击启动即可。

-- udf str.length()
CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';
create table sls_stream(
	a int,
	b int,
	c varchar
) with (
	type='sls',
	endPoint='yourEndpoint',
	accessKeyId='yourAccessId',
	accessKeySecret='yourAccessSecret',
	startTime = '2017-07-04 00:00:00',
	project='yourProjectName',
	logStore='yourLogStoreName',
	consumerGroup='consumerGroupTest1'
);
create table rds_output(
	id int,
	len bigint,
	content VARCHAR
) with (
	type='rds',
	url='yourDatabaseURL',
	tableName='yourDatabaseTableName',
	userName='yourDatabaseUserName',
	password='yourDatabasePassword'
);
insert into rds_output
select
	a,
	stringLengthUdf(c),
	c as content
from sls_stream

常见问题

Q:为什么实现了一个随机数生成函数,在运行时产生的值却是一样的?

A:如果自定义函数是无参的,并且没有声明是非确定性函数,编译期间可能会被优化掉变成一个常量值。所以如果想让函数变成非确定性函数 ,不被优化成常量,可以在自定义函数中overrideisDeterministic() 方法, 返回 false 即可。