您可以通过多种方式实现去重需求,例如FIRST_VALUE、LAST_VALUE和DISTINCT等。本文为您介绍如何使用TopN方法实现去重,以及使用过程中的注意事项。
去重的方案通常有两种:
- 保留第一条。
- 保留最后一条。
说明 ORDER BY后的时间属性字段必须在源表中定义。
语法
由于SQL没有直接去重的语法,因此我们使用SQL的
ROW_NUMBER OVER WINDOW
功能实现去重。ROW_NUMBER OVER WINDOW
与TopN语句方法类似,可以理解为一种特殊的TopN。 SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]
ORDER BY timeAttributeCol [asc|desc]) AS rownum
FROM table_name)
WHERE rownum = 1;
参数说明:
ROW_NUMBER()
:计算行号,行号计算从1开始。PARTITION BY col1[, col2..]
:指定分区的列,即去重的Key,也可以不指定分区的列。ORDER BY timeAttributeCol [asc|desc])
:指定排序的列,必须是时间属性字段(Processing Time或Event Time)。可以指定为顺序(Keep First Row)或倒序(Keep Last Row)。- 外层查询
rownum
必须为= 1
或者<= 1
。条件必须是AND
,且不能存在Undeterministic的UDF的条件。
如上语法所示,去重需要两层Query:
- 子查询中:使用
ROW_NUMBER()
,按照时间属性列对数据进行排序编号。 - 外层查询中:对排名进行过滤,只取第一条,达到去重的目的。时间列排序方向可以为:
- 顺序:
deduplicate keep first row
。 - 倒序:
deduplicate keep last row
。
- 顺序:
Deduplicate Keep First Row
保留首行的去重策略,即保留指定Key下第一条出现的数据,之后出现在该Key下的数据会被丢弃掉。因为其State中只存储了Key数据,因此性能较优。示例如下。
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
FROM T
)
WHERE rowNum = 1;
本例中,将T表按照b字段进行去重,并按照系统时间保留第一条数据。proctime在以上示例中是源表T中的一个具有Processing Time属性的字段。如果您按照系统时间去重,也可以将proctime字段简化成PROCTIME()
函数进行调用,可以省略proctime字段的声明。
说明 Blink-3.3.1版本后,FirstRow支持使用Event Time进行开窗,并且不会产生Retraction。
Deduplicate Keep Last Row
注意 LastRow不支持使用Event Time进行开窗。
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY proctime DESC) as rowNum
FROM T
)
WHERE rowNum = 1;
FAQ
Q:执行
ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY now() as time DESC)
语句时,产生如下报错,应该如何处理? java.lang.RuntimeException: Can not retract a non-existent record:
38c30001,1b800000008,1c000000013,85000035343a3731,5d304013.
This should never happen.
A:通常有以下两种原因,您可以按照以下方式进行处理:
- 问题原因:由代码中
now()
导致。因为TopN不支持非确定性的排序字段,now()
每次输出的值不同,所以导致Retraction会找不到之前的值。解决方法:Event Time或源表中一个具有Processing Time属性的字段。
- 问题原因:
blink.state.ttl.ms
或state.backend.niagara.ttl.ms
参数值设置过小。解决方法:设置过小的TTL参数使用默认配置,或调大参数值。
在文档使用中是否遇到以下问题
更多建议
匿名提交