IntervalJoin语句可以让两个流进行JOIN时,左流和右流中每条记录只关联另外一条流上同一时间段内的数据,且进行完JOIN后,仍然保留输入流上的时间列,让您继续进行基于Event Time的操作。
语法格式
SELECT column-names
FROM table1 [AS <alias1>]
[INNER | LEFT | RIGHT |FULL ] JOIN table2
ON table1.column-name1 = table2.key-name1 AND TIMEBOUND_EXPRESSION
说明
- 支持INNER JOIN、LEFT JOIN、RIGHT JOIN和FULL JOIN,如果直接使用JOIN,默认为INNER JOIN。
- 暂不支持SEMI JOIN和ANTI JOIN。
- TIMEBOUND_EXPRESSION为左右两个流时间属性列上的区间条件表达式,支持以下三种条件表达式:
- ltime = rtime
- ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
- ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
示例1(基于Event Time)
统计下单后4个小时内的物流信息。
- 测试数据
- 订单表(orders)
id productName orderTime 1 iphone 2020-04-01 10:00:00.0 2 mac 2020-04-01 10:02:00.0 3 huawei 2020-04-01 10:03:00.0 4 pad 2020-04-01 10:05:00.0 - 物流表(shipments)
shipId orderId status shiptime 0 1 shipped 2020-04-01 11:00:00.0 1 2 delivered 2020-04-01 17:00:00.0 2 3 shipped 2020-04-01 12:00:00.0 3 4 shipped 2020-04-01 11:30:00.0
- 订单表(orders)
- 测试语句
CREATE TABLE Orders( id BIGINT, productName VARCHAR, orderTime TIMESTAMP, WATERMARK wk FOR orderTime as withOffset(orderTime, 2000) --为rowtime定义Watermark。 ) WITH ( type='datahub', endpoint='<yourEndpoint>', accessId='<yourAccessID>', accessKey='<yourAccessSecret>', projectName='<yourProjectName>', topic='<yourTopic>', project='<yourProjectName>' ); CREATE TABLE Shipments( shipId BIGINT, orderId BIGINT, status VARCHAR, shiptime TIMESTAMP, WATERMARK wk FOR shiptime as withOffset(shiptime, 2000) --为rowtime定义Watermark。 ) WITH ( type='datahub', endpoint='<yourEndpoint>', accessId='<yourAccessID>', accessKey='<yourAccessSecret>', projectName='<yourProjectName>', topic='<yourTopic>', project='<yourProjectName>' ); --使用RDS作为结果表 CREATE TABLE rds_output( id BIGINT, productName VARCHAR, status VARCHAR ) WITH ( type='rds', url='<yourDatabaseURL>', tableName='<yourDatabaseTablename>', userName='<yourDatabaseUserName>', password='<yourDatabasePassword>' ); INSERT INTO rds_output SELECT id, productName, status FROM Orders AS o JOIN Shipments AS s on o.id = s.orderId AND o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime;
- 测试结果
id(bigint) productName(varchar) status(varchar) 1 iphone shipped 3 huawei shipped 4 pad shipped
示例2(基于Processing Time)
- 测试数据
- datahub_stream1
k1 v1 1 val1 2 val2 3 val3 - datahub_stream2
k1 v1 1 val1 2 val2 3 val3
- datahub_stream1
- 测试语句
CREATE TABLE datahub_stream1 ( k1 BIGINT, v1 VARCHAR, d AS PROCTIME() ) WITH ( type='datahub', endpoint='<yourEndpoint>', accessId='<yourAccessID>', accessKey='<yourAccessSecret>', projectName='<yourProjectName>', topic='<yourTopic>', project='<yourProjectName>' ); CREATE TABLE datahub_stream2 ( k2 BIGINT, v2 VARCHAR, e AS PROCTIME() ) WITH ( type='datahub', endpoint='<yourEndpoint>', accessId='<yourAccessID>', accessKey='<yourAccessSecret>', projectName='<yourProjectName>', topic='<yourTopic>', project='<yourProjectName>' ); --使用RDS作为结果表 CREATE TABLE rds_output( k1 BIGINT, v1 VARCHAR, v2 VARCHAR ) WITH ( type='rds', url='<yourDatabaseURL>', tableName='<yourDatabaseTablename>', userName='<yourDatabaseUserName>', password='<yourDatabasePassword>' ); INSERT INTO rds_output SELECT k1, v1, v2 FROM datahub_stream1 AS o JOIN datahub_stream2 AS s on o.k1 = s.k2 AND o.d BETWEEN s.e - INTERVAL '4' MINUTE AND s.e;
说明 由于结果取决于两个流里每条数据进入系统的时间,具有不确定性,因此该示例暂不提供预期结果。
在文档使用中是否遇到以下问题
更多建议
匿名提交