Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): plan nested loop temporal join #19201

Merged
merged 6 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions e2e_test/streaming/temporal_join/append_only/nested_loop.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;

statement ok
create table version(id2 int, a2 int, b2 int, primary key (id2));

statement ok
create materialized view v1 as select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2;

statement ok
create materialized view v2 as select id1, a1, id2, a2 from stream cross join version FOR SYSTEM_TIME AS OF PROCTIME();

statement ok
insert into stream values(1, 14, 111);

statement ok
insert into version values(1, 12, 122);

statement ok
insert into stream values(2, 13, 133);

statement ok
delete from version;

query IIII rowsort
select * from v1;
----
2 13 1 12

query IIII rowsort
select * from v2;
----
2 13 1 12

statement ok
insert into version values(2, 10, 102);

statement ok
insert into stream values(3, 9, 222);

query IIII rowsort
select * from v1;
----
2 13 1 12

query IIII rowsort
select * from v2;
----
2 13 1 12
3 9 2 10

statement ok
drop materialized view v1;

statement ok
drop materialized view v2;

statement ok
delete from version where id2 = 2;

statement ok
insert into version values(4, 10, 104);

statement ok
create materialized view v1 as select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2

statement ok
create materialized view v2 as select id1, a1, id2, a2 from stream cross join version FOR SYSTEM_TIME AS OF PROCTIME();

query IIII rowsort
select * from v1;
----
1 14 4 10
2 13 4 10

query IIII rowsort
select * from v2;
----
1 14 4 10
2 13 4 10
3 9 4 10

statement ok
drop materialized view v1;

statement ok
drop materialized view v2;

statement ok
drop table stream;

statement ok
drop table version;
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,8 @@ message TemporalJoinNode {
repeated uint32 table_output_indices = 8;
// The state table used for non-append-only temporal join.
optional catalog.Table memo_table = 9;
// If it is a nested lool temporal join
bool is_nested_loop = 10;
}

message DynamicFilterNode {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
- name: Left join type for temporal join
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int);
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2 and b1 > b2
expected_outputs:
- batch_error
- stream_error
- name: Inner join type for temporal join
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int);
select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2 where a2 < 10;
expected_outputs:
- stream_plan
- name: Cross join for temporal join
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int);
select id1, a1, id2, a2 from stream cross join version FOR SYSTEM_TIME AS OF PROCTIME();
expected_outputs:
- stream_plan
- name: implicit join with temporal tables
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, a2 from stream, version FOR SYSTEM_TIME AS OF PROCTIME() where a1 > a2 and a2 < 10;
expected_outputs:
- stream_plan
- name: Temporal join with Aggregation
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
select count(*) from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2 where a2 < 10;
expected_outputs:
- stream_plan
- name: Temporal join type test
sql: |
create table stream(id1 int, a1 int, b1 int);
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, a2 from stream right join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2;
expected_outputs:
- stream_error
- name: Temporal join append only test
sql: |
create table stream(id1 int, a1 int, b1 int);
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2;
expected_outputs:
- stream_error
- name: multi-way temporal join
sql: |
create table stream(k int, a1 int, b1 int) APPEND ONLY;
create table version1(k int, x1 int, y2 int, primary key (k));
create table version2(k int, x2 int, y2 int, primary key (k));
select stream.k, x1, x2, a1, b1
from stream
join version1 FOR SYSTEM_TIME AS OF PROCTIME() on stream.a1 < version1.x1
join version2 FOR SYSTEM_TIME AS OF PROCTIME() on stream.b1 > version2.y2 where a1 < 10;
expected_outputs:
- stream_plan
- name: use CTE as temporal join right table. https://github.com/risingwavelabs/risingwave/issues/18703
sql: |
create table stream(id1 int, a1 int, b1 int, c1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, c2 int, primary key (id2));
with version as (select * from version) select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 > id2;
expected_outputs:
- binder_error
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- name: Left join type for temporal join
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int);
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2 and b1 > b2
batch_error: |-
Not supported: do not support temporal join for batch queries
HINT: please use temporal join in streaming queries
stream_error: |-
Not supported: Temporal join requires an inner join
HINT: Please use an inner join
- name: Inner join type for temporal join
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int);
select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2 where a2 < 10;
stream_plan: |-
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), version._row_id(hidden)], stream_key: [stream._row_id, version._row_id], pk_columns: [stream._row_id, version._row_id], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(stream._row_id, version._row_id) }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: AND (stream.a1 > version.a2) AND (version.a2 < 10:Int32), nested_loop: true, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id, version._row_id] }
├─StreamExchange { dist: Broadcast }
│ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version._row_id) }
└─StreamTableScan { table: version, columns: [version.id2, version.a2, version._row_id], stream_scan_type: UpstreamOnly, stream_key: [version._row_id], pk: [_row_id], dist: UpstreamHashShard(version._row_id) }
- name: Cross join for temporal join
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int);
select id1, a1, id2, a2 from stream cross join version FOR SYSTEM_TIME AS OF PROCTIME();
stream_plan: |-
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), version._row_id(hidden)], stream_key: [stream._row_id, version._row_id], pk_columns: [stream._row_id, version._row_id], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(stream._row_id, version._row_id) }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: , nested_loop: true, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id, version._row_id] }
├─StreamExchange { dist: Broadcast }
│ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version._row_id) }
└─StreamTableScan { table: version, columns: [version.id2, version.a2, version._row_id], stream_scan_type: UpstreamOnly, stream_key: [version._row_id], pk: [_row_id], dist: UpstreamHashShard(version._row_id) }
- name: implicit join with temporal tables
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, a2 from stream, version FOR SYSTEM_TIME AS OF PROCTIME() where a1 > a2 and a2 < 10;
stream_plan: |-
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], stream_key: [stream._row_id, id2], pk_columns: [stream._row_id, id2], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(version.id2, stream._row_id) }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: AND (stream.a1 > version.a2) AND (version.a2 < 10:Int32), nested_loop: true, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] }
├─StreamExchange { dist: Broadcast }
│ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) }
└─StreamTableScan { table: version, columns: [version.id2, version.a2], stream_scan_type: UpstreamOnly, stream_key: [version.id2], pk: [id2], dist: UpstreamHashShard(version.id2) }
- name: Temporal join with Aggregation
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
select count(*) from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2 where a2 < 10;
stream_plan: |-
StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [sum0(count)] }
└─StreamSimpleAgg [append_only] { aggs: [sum0(count), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [count] }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: AND (stream.a1 > version.a2) AND (version.a2 < 10:Int32), nested_loop: true, output: [stream._row_id, version.id2] }
├─StreamExchange { dist: Broadcast }
│ └─StreamTableScan { table: stream, columns: [stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) }
└─StreamTableScan { table: version, columns: [version.a2, version.id2], stream_scan_type: UpstreamOnly, stream_key: [version.id2], pk: [id2], dist: UpstreamHashShard(version.id2) }
- name: Temporal join type test
sql: |
create table stream(id1 int, a1 int, b1 int);
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, a2 from stream right join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2;
stream_error: |-
Not supported: streaming nested-loop join
HINT: The non-equal join in the query requires a nested-loop join executor, which could be very expensive to run. Consider rewriting the query to use dynamic filter as a substitute if possible.
See also: https://docs.risingwave.com/docs/current/sql-pattern-dynamic-filters/
- name: Temporal join append only test
sql: |
create table stream(id1 int, a1 int, b1 int);
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2;
stream_error: |-
Not supported: Temporal join requires the left hash side to be append only
HINT: Please ensure the left hash side is append only
- name: multi-way temporal join
sql: |
create table stream(k int, a1 int, b1 int) APPEND ONLY;
create table version1(k int, x1 int, y2 int, primary key (k));
create table version2(k int, x2 int, y2 int, primary key (k));
select stream.k, x1, x2, a1, b1
from stream
join version1 FOR SYSTEM_TIME AS OF PROCTIME() on stream.a1 < version1.x1
join version2 FOR SYSTEM_TIME AS OF PROCTIME() on stream.b1 > version2.y2 where a1 < 10;
stream_plan: |-
StreamMaterialize { columns: [k, x1, x2, a1, b1, stream._row_id(hidden), version1.k(hidden), version2.k(hidden)], stream_key: [stream._row_id, version1.k, version2.k], pk_columns: [stream._row_id, version1.k, version2.k], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(stream._row_id, version1.k, version2.k) }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: AND (stream.b1 > version2.y2), nested_loop: true, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version1.k, version2.k] }
├─StreamExchange { dist: Broadcast }
│ └─StreamTemporalJoin { type: Inner, append_only: true, predicate: AND (stream.a1 < version1.x1), nested_loop: true, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] }
│ ├─StreamExchange { dist: Broadcast }
│ │ └─StreamFilter { predicate: (stream.a1 < 10:Int32) }
│ │ └─StreamTableScan { table: stream, columns: [stream.k, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) }
│ └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version1.k) }
│ └─StreamTableScan { table: version1, columns: [version1.x1, version1.k], stream_scan_type: UpstreamOnly, stream_key: [version1.k], pk: [k], dist: UpstreamHashShard(version1.k) }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version2.k) }
└─StreamTableScan { table: version2, columns: [version2.x2, version2.y2, version2.k], stream_scan_type: UpstreamOnly, stream_key: [version2.k], pk: [k], dist: UpstreamHashShard(version2.k) }
- name: use CTE as temporal join right table. https://github.com/risingwavelabs/risingwave/issues/18703
sql: |
create table stream(id1 int, a1 int, b1 int, c1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, c2 int, primary key (id2));
with version as (select * from version) select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 > id2;
binder_error: 'Bind error: Right table of a temporal join should not be a CTE. It should be a table, index, or materialized view'
6 changes: 3 additions & 3 deletions src/frontend/planner_test/tests/testdata/output/nexmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@
ON mod(B.auction, 10000) = S.key
sink_plan: |-
StreamSink { type: append-only, columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr10018(hidden), side_input.key(hidden)] }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, nested_loop: false, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] }
├─StreamExchange { dist: HashShard($expr1) }
│ └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, (bid.auction % 10000:Int32) as $expr1, bid._row_id] }
│ └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) }
Expand All @@ -1007,7 +1007,7 @@
stream_plan: |-
StreamMaterialize { columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr1(hidden), side_input.key(hidden)], stream_key: [bid._row_id, $expr1], pk_columns: [bid._row_id, $expr1], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(bid._row_id, $expr1) }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, nested_loop: false, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] }
├─StreamExchange { dist: HashShard($expr1) }
│ └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, (bid.auction % 10000:Int32) as $expr1, bid._row_id] }
│ └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) }
Expand All @@ -1020,7 +1020,7 @@
└── StreamExchange Hash([5, 6]) from 1

Fragment 1
StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] }
StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, nested_loop: false, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] }
├── StreamExchange Hash([4]) from 2
└── StreamExchange NoShuffle from 3

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@
StreamMaterialize { columns: [id1, a1, id2, v1, stream._row_id(hidden)], stream_key: [stream._row_id, id1], pk_columns: [stream._row_id, id1], pk_conflict: NoCheck, watermark_columns: [v1] }
└─StreamProject { exprs: [stream.id1, stream.a1, version.id2, stream.v1, stream._row_id], output_watermarks: [stream.v1] }
└─StreamDynamicFilter { predicate: (stream.v1 > now), output_watermarks: [stream.v1], output: [stream.id1, stream.a1, stream.v1, version.id2, stream._row_id], cleaned_by_watermark: true }
├─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.id1 = version.id2, output: [stream.id1, stream.a1, stream.v1, version.id2, stream._row_id] }
├─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.id1 = version.id2, nested_loop: false, output: [stream.id1, stream.a1, stream.v1, version.id2, stream._row_id] }
│ ├─StreamExchange { dist: HashShard(stream.id1) }
│ │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.v1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) }
│ └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) }
Expand Down
Loading
Loading