Skip to content

Commit

Permalink
feat(frontend): plan nested loop temporal join (#19201)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhao-su authored Nov 1, 2024
1 parent b39c9af commit d9ee9b4
Show file tree
Hide file tree
Showing 12 changed files with 647 additions and 174 deletions.
96 changes: 96 additions & 0 deletions e2e_test/streaming/temporal_join/append_only/nested_loop.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;

statement ok
create table version(id2 int, a2 int, b2 int, primary key (id2));

statement ok
create materialized view v1 as select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2;

statement ok
create materialized view v2 as select id1, a1, id2, a2 from stream cross join version FOR SYSTEM_TIME AS OF PROCTIME();

statement ok
insert into stream values(1, 14, 111);

statement ok
insert into version values(1, 12, 122);

statement ok
insert into stream values(2, 13, 133);

statement ok
delete from version;

query IIII rowsort
select * from v1;
----
2 13 1 12

query IIII rowsort
select * from v2;
----
2 13 1 12

statement ok
insert into version values(2, 10, 102);

statement ok
insert into stream values(3, 9, 222);

query IIII rowsort
select * from v1;
----
2 13 1 12

query IIII rowsort
select * from v2;
----
2 13 1 12
3 9 2 10

statement ok
drop materialized view v1;

statement ok
drop materialized view v2;

statement ok
delete from version where id2 = 2;

statement ok
insert into version values(4, 10, 104);

statement ok
create materialized view v1 as select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2

statement ok
create materialized view v2 as select id1, a1, id2, a2 from stream cross join version FOR SYSTEM_TIME AS OF PROCTIME();

query IIII rowsort
select * from v1;
----
1 14 4 10
2 13 4 10

query IIII rowsort
select * from v2;
----
1 14 4 10
2 13 4 10
3 9 4 10

statement ok
drop materialized view v1;

statement ok
drop materialized view v2;

statement ok
drop table stream;

statement ok
drop table version;
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,8 @@ message TemporalJoinNode {
repeated uint32 table_output_indices = 8;
// The state table used for non-append-only temporal join.
optional catalog.Table memo_table = 9;
// If it is a nested lool temporal join
bool is_nested_loop = 10;
}

message DynamicFilterNode {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
- name: Left join type for temporal join
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int);
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2 and b1 > b2
expected_outputs:
- batch_error
- stream_error
- name: Inner join type for temporal join
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int);
select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2 where a2 < 10;
expected_outputs:
- stream_plan
- name: Cross join for temporal join
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int);
select id1, a1, id2, a2 from stream cross join version FOR SYSTEM_TIME AS OF PROCTIME();
expected_outputs:
- stream_plan
- name: implicit join with temporal tables
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, a2 from stream, version FOR SYSTEM_TIME AS OF PROCTIME() where a1 > a2 and a2 < 10;
expected_outputs:
- stream_plan
- name: Temporal join with Aggregation
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
select count(*) from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2 where a2 < 10;
expected_outputs:
- stream_plan
- name: Temporal join type test
sql: |
create table stream(id1 int, a1 int, b1 int);
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, a2 from stream right join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2;
expected_outputs:
- stream_error
- name: Temporal join append only test
sql: |
create table stream(id1 int, a1 int, b1 int);
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2;
expected_outputs:
- stream_error
- name: multi-way temporal join
sql: |
create table stream(k int, a1 int, b1 int) APPEND ONLY;
create table version1(k int, x1 int, y2 int, primary key (k));
create table version2(k int, x2 int, y2 int, primary key (k));
select stream.k, x1, x2, a1, b1
from stream
join version1 FOR SYSTEM_TIME AS OF PROCTIME() on stream.a1 < version1.x1
join version2 FOR SYSTEM_TIME AS OF PROCTIME() on stream.b1 > version2.y2 where a1 < 10;
expected_outputs:
- stream_plan
- name: use CTE as temporal join right table. https://github.com/risingwavelabs/risingwave/issues/18703
sql: |
create table stream(id1 int, a1 int, b1 int, c1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, c2 int, primary key (id2));
with version as (select * from version) select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 > id2;
expected_outputs:
- binder_error
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- name: Left join type for temporal join
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int);
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2 and b1 > b2
batch_error: |-
Not supported: do not support temporal join for batch queries
HINT: please use temporal join in streaming queries
stream_error: |-
Not supported: Temporal join requires an inner join
HINT: Please use an inner join
- name: Inner join type for temporal join
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int);
select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2 where a2 < 10;
stream_plan: |-
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), version._row_id(hidden)], stream_key: [stream._row_id, version._row_id], pk_columns: [stream._row_id, version._row_id], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(stream._row_id, version._row_id) }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: AND (stream.a1 > version.a2) AND (version.a2 < 10:Int32), nested_loop: true, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id, version._row_id] }
├─StreamExchange { dist: Broadcast }
│ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version._row_id) }
└─StreamTableScan { table: version, columns: [version.id2, version.a2, version._row_id], stream_scan_type: UpstreamOnly, stream_key: [version._row_id], pk: [_row_id], dist: UpstreamHashShard(version._row_id) }
- name: Cross join for temporal join
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int);
select id1, a1, id2, a2 from stream cross join version FOR SYSTEM_TIME AS OF PROCTIME();
stream_plan: |-
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), version._row_id(hidden)], stream_key: [stream._row_id, version._row_id], pk_columns: [stream._row_id, version._row_id], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(stream._row_id, version._row_id) }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: , nested_loop: true, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id, version._row_id] }
├─StreamExchange { dist: Broadcast }
│ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version._row_id) }
└─StreamTableScan { table: version, columns: [version.id2, version.a2, version._row_id], stream_scan_type: UpstreamOnly, stream_key: [version._row_id], pk: [_row_id], dist: UpstreamHashShard(version._row_id) }
- name: implicit join with temporal tables
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, a2 from stream, version FOR SYSTEM_TIME AS OF PROCTIME() where a1 > a2 and a2 < 10;
stream_plan: |-
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], stream_key: [stream._row_id, id2], pk_columns: [stream._row_id, id2], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(version.id2, stream._row_id) }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: AND (stream.a1 > version.a2) AND (version.a2 < 10:Int32), nested_loop: true, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] }
├─StreamExchange { dist: Broadcast }
│ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) }
└─StreamTableScan { table: version, columns: [version.id2, version.a2], stream_scan_type: UpstreamOnly, stream_key: [version.id2], pk: [id2], dist: UpstreamHashShard(version.id2) }
- name: Temporal join with Aggregation
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
select count(*) from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2 where a2 < 10;
stream_plan: |-
StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [sum0(count)] }
└─StreamSimpleAgg [append_only] { aggs: [sum0(count), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [count] }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: AND (stream.a1 > version.a2) AND (version.a2 < 10:Int32), nested_loop: true, output: [stream._row_id, version.id2] }
├─StreamExchange { dist: Broadcast }
│ └─StreamTableScan { table: stream, columns: [stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) }
└─StreamTableScan { table: version, columns: [version.a2, version.id2], stream_scan_type: UpstreamOnly, stream_key: [version.id2], pk: [id2], dist: UpstreamHashShard(version.id2) }
- name: Temporal join type test
sql: |
create table stream(id1 int, a1 int, b1 int);
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, a2 from stream right join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2;
stream_error: |-
Not supported: streaming nested-loop join
HINT: The non-equal join in the query requires a nested-loop join executor, which could be very expensive to run. Consider rewriting the query to use dynamic filter as a substitute if possible.
See also: https://docs.risingwave.com/docs/current/sql-pattern-dynamic-filters/
- name: Temporal join append only test
sql: |
create table stream(id1 int, a1 int, b1 int);
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2;
stream_error: |-
Not supported: Nested-loop Temporal join requires the left hash side to be append only
HINT: Please ensure the left hash side is append only
- name: multi-way temporal join
sql: |
create table stream(k int, a1 int, b1 int) APPEND ONLY;
create table version1(k int, x1 int, y2 int, primary key (k));
create table version2(k int, x2 int, y2 int, primary key (k));
select stream.k, x1, x2, a1, b1
from stream
join version1 FOR SYSTEM_TIME AS OF PROCTIME() on stream.a1 < version1.x1
join version2 FOR SYSTEM_TIME AS OF PROCTIME() on stream.b1 > version2.y2 where a1 < 10;
stream_plan: |-
StreamMaterialize { columns: [k, x1, x2, a1, b1, stream._row_id(hidden), version1.k(hidden), version2.k(hidden)], stream_key: [stream._row_id, version1.k, version2.k], pk_columns: [stream._row_id, version1.k, version2.k], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(stream._row_id, version1.k, version2.k) }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: AND (stream.b1 > version2.y2), nested_loop: true, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version1.k, version2.k] }
├─StreamExchange { dist: Broadcast }
│ └─StreamTemporalJoin { type: Inner, append_only: true, predicate: AND (stream.a1 < version1.x1), nested_loop: true, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] }
│ ├─StreamExchange { dist: Broadcast }
│ │ └─StreamFilter { predicate: (stream.a1 < 10:Int32) }
│ │ └─StreamTableScan { table: stream, columns: [stream.k, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) }
│ └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version1.k) }
│ └─StreamTableScan { table: version1, columns: [version1.x1, version1.k], stream_scan_type: UpstreamOnly, stream_key: [version1.k], pk: [k], dist: UpstreamHashShard(version1.k) }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version2.k) }
└─StreamTableScan { table: version2, columns: [version2.x2, version2.y2, version2.k], stream_scan_type: UpstreamOnly, stream_key: [version2.k], pk: [k], dist: UpstreamHashShard(version2.k) }
- name: use CTE as temporal join right table. https://github.com/risingwavelabs/risingwave/issues/18703
sql: |
create table stream(id1 int, a1 int, b1 int, c1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, c2 int, primary key (id2));
with version as (select * from version) select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 > id2;
binder_error: 'Bind error: Right table of a temporal join should not be a CTE. It should be a table, index, or materialized view'
6 changes: 3 additions & 3 deletions src/frontend/planner_test/tests/testdata/output/nexmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@
ON mod(B.auction, 10000) = S.key
sink_plan: |-
StreamSink { type: append-only, columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr10018(hidden), side_input.key(hidden)] }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, nested_loop: false, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] }
├─StreamExchange { dist: HashShard($expr1) }
│ └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, (bid.auction % 10000:Int32) as $expr1, bid._row_id] }
│ └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) }
Expand All @@ -1007,7 +1007,7 @@
stream_plan: |-
StreamMaterialize { columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr1(hidden), side_input.key(hidden)], stream_key: [bid._row_id, $expr1], pk_columns: [bid._row_id, $expr1], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(bid._row_id, $expr1) }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, nested_loop: false, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] }
├─StreamExchange { dist: HashShard($expr1) }
│ └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, (bid.auction % 10000:Int32) as $expr1, bid._row_id] }
│ └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) }
Expand All @@ -1020,7 +1020,7 @@
└── StreamExchange Hash([5, 6]) from 1
Fragment 1
StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] }
StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, nested_loop: false, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] }
├── StreamExchange Hash([4]) from 2
└── StreamExchange NoShuffle from 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@
StreamMaterialize { columns: [id1, a1, id2, v1, stream._row_id(hidden)], stream_key: [stream._row_id, id1], pk_columns: [stream._row_id, id1], pk_conflict: NoCheck, watermark_columns: [v1] }
└─StreamProject { exprs: [stream.id1, stream.a1, version.id2, stream.v1, stream._row_id], output_watermarks: [stream.v1] }
└─StreamDynamicFilter { predicate: (stream.v1 > now), output_watermarks: [stream.v1], output: [stream.id1, stream.a1, stream.v1, version.id2, stream._row_id], cleaned_by_watermark: true }
├─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.id1 = version.id2, output: [stream.id1, stream.a1, stream.v1, version.id2, stream._row_id] }
├─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.id1 = version.id2, nested_loop: false, output: [stream.id1, stream.a1, stream.v1, version.id2, stream._row_id] }
│ ├─StreamExchange { dist: HashShard(stream.id1) }
│ │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.v1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) }
│ └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) }
Expand Down
Loading

0 comments on commit d9ee9b4

Please sign in to comment.