您可以通过多种方式实现去重需求,例如FIRST_VALUE、LAST_VALUE和DISTINCT等。本文为您介绍如何使用TopN方法实现去重,以及使用过程中的注意事项。

去重的方案通常有两种:
  • 保留第一条。
  • 保留最后一条。
说明 ORDER BY后的时间属性字段必须在源表中定义。

语法

由于SQL没有直接去重的语法,因此我们使用SQL的ROW_NUMBER OVER WINDOW功能实现去重。ROW_NUMBER OVER WINDOWTopN语句方法类似,可以理解为一种特殊的TopN。
SELECT *
FROM (
   SELECT *,
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]
     ORDER BY timeAttributeCol [asc|desc]) AS rownum
   FROM table_name)
WHERE rownum = 1;
参数说明:
  • ROW_NUMBER():计算行号,行号计算从1开始。
  • PARTITION BY col1[, col2..]:指定分区的列,即去重的Key,也可以不指定分区的列。
  • ORDER BY timeAttributeCol [asc|desc]):指定排序的列,必须是时间属性字段(Processing Time或Event Time)。可以指定为顺序(Keep First Row)或倒序(Keep Last Row)。
  • 外层查询rownum必须为= 1或者<= 1。条件必须是AND,且不能存在Undeterministic的UDF的条件。
如上语法所示,去重需要两层Query:
  • 子查询中:使用ROW_NUMBER(),按照时间属性列对数据进行排序编号。
  • 外层查询中:对排名进行过滤,只取第一条,达到去重的目的。时间列排序方向可以为:
    • 顺序:deduplicate keep first row
    • 倒序:deduplicate keep last row
当排序字段是Processing Time列时,Flink会按系统时间去重,其每次运行结果不确定。当排序字段是Event Time列时,Flink会按业务时间去重,其每次运行结果是确定的。

Deduplicate Keep First Row

保留首行的去重策略,即保留指定Key下第一条出现的数据,之后出现在该Key下的数据会被丢弃掉。因为其State中只存储了Key数据,因此性能较优。示例如下。
SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
  FROM T
)
WHERE rowNum = 1;

本例中,将T表按照b字段进行去重,并按照系统时间保留第一条数据。proctime在以上示例中是源表T中的一个具有Processing Time属性的字段。如果您按照系统时间去重,也可以将proctime字段简化成PROCTIME()函数进行调用,可以省略proctime字段的声明。

说明 Blink-3.3.1版本后,FirstRow支持使用Event Time进行开窗,并且不会产生Retraction。

Deduplicate Keep Last Row

注意 LastRow不支持使用Event Time进行开窗。
LastRow的作用也是去重,且只保留该主键下最后一条出现的数据。其性能略胜于LAST_VALUE函数,示例如下。
SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY proctime DESC) as rowNum
  FROM T
)
WHERE rowNum = 1;

FAQ

Q:执行ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY now() as time DESC)语句时,产生如下报错,应该如何处理?
java.lang.RuntimeException: Can not retract a non-existent record:
    38c30001,1b800000008,1c000000013,85000035343a3731,5d304013.
    This should never happen.
A:通常有以下两种原因,您可以按照以下方式进行处理:
  • 问题原因:由代码中now()导致。因为TopN不支持非确定性的排序字段,now()每次输出的值不同,所以导致Retraction会找不到之前的值。

    解决方法:Event Time或源表中一个具有Processing Time属性的字段。

  • 问题原因:blink.state.ttl.msstate.backend.niagara.ttl.ms参数值设置过小。

    解决方法:设置过小的TTL参数使用默认配置,或调大参数值。