本文为您介绍如何为实时计算自定义聚合函数(UDAF)搭建开发环境、编写业务代码以及上线。

说明 目前阿里云实时计算共享模式暂不支持自定义函数,仅独享模式支持自定义函数。

定义

自定义聚合函数(UDAF)将多条记录聚合成1条记录。

UDAF抽象类内部方法

AggregateFunction的核心接口方法,如下所示。

说明 虽然UDAF可以用Java或者Scala实现,但是建议您使用Java,因为Scala的数据类型有时会造成不必要的性能损失。
  • createAccumulator和getValue方法
    /*
    * @param <T> UDAF的输出结果的类型。
    * @param <ACC> UDAF的accumulator的类型。accumulator是UDAF计算中用来存放计算中间结果的数据类型。您可以需要根据需要自行设计每个UDAF的accumulator。
    */
    public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
    /*
    * 初始化AggregateFunction的accumulator。
    * 系统在第一个做aggregate计算之前调用一次这个方法。
    */
    public ACC createAccumulator();
    /*
    * 系统在每次aggregate计算完成后调用这个方法。
    */
    public T getValue(ACC accumulator);
    }
    说明
    • createAccumulatorgetValue可以定义在AggregateFunction抽象类内。
    • UDAF必须包含1个accumulate方法。
  • accumulator方法
    public void accumulate(ACC accumulator, ...[用户指定的输入参数]...);
    说明
    • 您需要实现一个accumulate方法,来描述如何计算用户的输入的数据,并更新到accumulator中。
    • accumulate方法的第一个参数必须是使用AggregateFunction的ACC类型的accumulator。在系统运行过程中,底层runtime代码会把历史状态accumulator,和您指定的上游数据(支持任意数量,任意类型的数据)做为参数,一起发送给accumulate计算。
  • retract和merge方法

    createAccumulator、getValue 和 accumulate3个方法一起使用,就能设计出一个最基本的UDAF。但是实时计算一些特殊的场景需要您提供retract和merge两个方法才能完成。

    在实时计算的场景里,很多时候的计算都是对无限流的一个提前的观测值(early firing)。既然有early firing,就会有对发出的结果的修改,这个操作叫做撤回(retract)。SQL翻译优化器会帮助 您自动判断哪些情况下会产生撤回的数据,哪些操作需要处理带有撤回标记的数据。但是您需要实现一个retract方法来处理撤回的数据。
    public void retract(ACC accumulator, ...[您指定的输入参数]...);
    说明
    • retract方法是accumulate方法的逆操作。例如,count UDAF,在accumulate的时,每来一条数据要加1,在retract的时候就是要减1。
    • 类似于accumulate方法,retract方法的第1个参数必须是使用AggregateFunction的ACC类型的accumulator。在系统运行过程中,底层runtime代码会把历史状态accumulator,和您指定的上游数据(任意数量, 任意类型的数据)一起发送给retract计算。
    在实时计算中一些场景需要merge,例如,session window。 由于实时计算具有out of order的特性,后输入的数据有可能位于2个原本分开的session中间,这样就把2个session合为1个session。此时,需要使用merge方法把多个accumulator合为1个accumulator。
    public void merge(ACC accumulator, Iterable<ACC> its);
    说明
    • merge方法的第1个参数,必须是使用AggregateFunction的ACC类型的accumulator,而且第1个accumulator是merge方法完成之后,状态所存放的地方。
    • merge方法的第2个参数是1个ACC type的accumulator遍历迭代器,里面有可能存在1个或者多个accumulator。

开发环境搭建

开发环境搭建请参见环境搭建

编写业务逻辑代码

以Java为例,举例代码如下。

import org.apache.flink.table.functions.AggregateFunction;

public class CountUdaf extends AggregateFunction<Long, CountUdaf.CountAccum> {
    //定义存放count UDAF状态的accumulator的数据的结构。
    public static class CountAccum {
        public long total;
    }

    //初始化count UDAF的accumulator。
    public CountAccum createAccumulator() {
        CountAccum acc = new CountAccum();
        acc.total = 0;
        return acc;
    }

    //getValue提供了,如何通过存放状态的accumulator,计算count UDAF的结果的方法。
    public Long getValue(CountAccum accumulator) {
        return accumulator.total;
    }

    //accumulate提供了,如何根据输入的数据,更新count UDAF存放状态的accumulator。
    public void accumulate(CountAccum accumulator, Object iValue) {
        accumulator.total++;
    }

    public void merge(CountAccum accumulator, Iterable<CountAccum> its) {
         for (CountAccum other : its) {
            accumulator.total += other.total;
         }
    }
}
说明 AggregateFunction的子类支持open和close方法作为可选方法,可参考UDF或UDTF的写法。

上线和启动

自定义聚合函数(UDAF)的上线和启动步骤,请参见 开发上线启动

UDAF DEMO

-- UDAF 计算count
CREATE FUNCTION countUdaf AS 'com.hjc.test.blink.sql.udx.CountUdaf';
create table sls_stream(
a int,
b bigint,
c varchar
) with (
type='sls',
endPoint='yourEndpoint',
accessKeyId='yourAccessId',
accessKeySecret='yourAccessSecret',
startTime = '2017-07-04 00:00:00',
project='yourPorjectName',
logStore='stream-test2',
consumerGroup='consumerGroupTest3'
);

create table rds_output(
len1 bigint,
len2 bigint
) with (
type='rds',
url='yourDatabaseURL',
tableName='yourDatabaseTableName',
userName='yourDatabaseUserName',
password='yourDatabasePassword'
);

insert into rds_output
select
count(a),
countUdaf(a)
from sls_stream