diff --git a/e2e_test/streaming/temporal_join/append_only/nested_loop.slt b/e2e_test/streaming/temporal_join/append_only/nested_loop.slt new file mode 100644 index 000000000000..6c9e28799f30 --- /dev/null +++ b/e2e_test/streaming/temporal_join/append_only/nested_loop.slt @@ -0,0 +1,96 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + +statement ok +create table version(id2 int, a2 int, b2 int, primary key (id2)); + +statement ok +create materialized view v1 as select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2; + +statement ok +create materialized view v2 as select id1, a1, id2, a2 from stream cross join version FOR SYSTEM_TIME AS OF PROCTIME(); + +statement ok +insert into stream values(1, 14, 111); + +statement ok +insert into version values(1, 12, 122); + +statement ok +insert into stream values(2, 13, 133); + +statement ok +delete from version; + +query IIII rowsort +select * from v1; +---- +2 13 1 12 + +query IIII rowsort +select * from v2; +---- +2 13 1 12 + +statement ok +insert into version values(2, 10, 102); + +statement ok +insert into stream values(3, 9, 222); + +query IIII rowsort +select * from v1; +---- +2 13 1 12 + +query IIII rowsort +select * from v2; +---- +2 13 1 12 +3 9 2 10 + +statement ok +drop materialized view v1; + +statement ok +drop materialized view v2; + +statement ok +delete from version where id2 = 2; + +statement ok +insert into version values(4, 10, 104); + +statement ok +create materialized view v1 as select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2 + +statement ok +create materialized view v2 as select id1, a1, id2, a2 from stream cross join version FOR SYSTEM_TIME AS OF PROCTIME(); + +query IIII rowsort +select * from v1; +---- +1 14 4 10 +2 13 4 10 + +query IIII rowsort +select * from v2; +---- +1 14 4 10 +2 13 4 10 +3 9 4 10 + +statement ok +drop materialized view v1; + +statement ok +drop materialized view v2; + +statement ok +drop table stream; + +statement ok +drop table version; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index fe3a2c93d11a..ad0601189ec9 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -491,6 +491,8 @@ message TemporalJoinNode { repeated uint32 table_output_indices = 8; // The state table used for non-append-only temporal join. optional catalog.Table memo_table = 9; + // If it is a nested lool temporal join + bool is_nested_loop = 10; } message DynamicFilterNode { diff --git a/src/frontend/planner_test/tests/testdata/input/nested_loop_temporal_join.yaml b/src/frontend/planner_test/tests/testdata/input/nested_loop_temporal_join.yaml new file mode 100644 index 000000000000..404726980dfa --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/input/nested_loop_temporal_join.yaml @@ -0,0 +1,68 @@ +- name: Left join type for temporal join + sql: | + create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table version(id2 int, a2 int, b2 int); + select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2 and b1 > b2 + expected_outputs: + - batch_error + - stream_error +- name: Inner join type for temporal join + sql: | + create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table version(id2 int, a2 int, b2 int); + select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2 where a2 < 10; + expected_outputs: + - stream_plan +- name: Cross join for temporal join + sql: | + create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table version(id2 int, a2 int, b2 int); + select id1, a1, id2, a2 from stream cross join version FOR SYSTEM_TIME AS OF PROCTIME(); + expected_outputs: + - stream_plan +- name: implicit join with temporal tables + sql: | + create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table version(id2 int, a2 int, b2 int, primary key (id2)); + select id1, a1, id2, a2 from stream, version FOR SYSTEM_TIME AS OF PROCTIME() where a1 > a2 and a2 < 10; + expected_outputs: + - stream_plan +- name: Temporal join with Aggregation + sql: | + create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table version(id2 int, a2 int, b2 int, primary key (id2)); + select count(*) from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2 where a2 < 10; + expected_outputs: + - stream_plan +- name: Temporal join type test + sql: | + create table stream(id1 int, a1 int, b1 int); + create table version(id2 int, a2 int, b2 int, primary key (id2)); + select id1, a1, id2, a2 from stream right join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2; + expected_outputs: + - stream_error +- name: Temporal join append only test + sql: | + create table stream(id1 int, a1 int, b1 int); + create table version(id2 int, a2 int, b2 int, primary key (id2)); + select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2; + expected_outputs: + - stream_error +- name: multi-way temporal join + sql: | + create table stream(k int, a1 int, b1 int) APPEND ONLY; + create table version1(k int, x1 int, y2 int, primary key (k)); + create table version2(k int, x2 int, y2 int, primary key (k)); + select stream.k, x1, x2, a1, b1 + from stream + join version1 FOR SYSTEM_TIME AS OF PROCTIME() on stream.a1 < version1.x1 + join version2 FOR SYSTEM_TIME AS OF PROCTIME() on stream.b1 > version2.y2 where a1 < 10; + expected_outputs: + - stream_plan +- name: use CTE as temporal join right table. https://github.com/risingwavelabs/risingwave/issues/18703 + sql: | + create table stream(id1 int, a1 int, b1 int, c1 int) APPEND ONLY; + create table version(id2 int, a2 int, b2 int, c2 int, primary key (id2)); + with version as (select * from version) select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 > id2; + expected_outputs: + - binder_error \ No newline at end of file diff --git a/src/frontend/planner_test/tests/testdata/output/nested_loop_temporal_join.yaml b/src/frontend/planner_test/tests/testdata/output/nested_loop_temporal_join.yaml new file mode 100644 index 000000000000..258c33f6f504 --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/output/nested_loop_temporal_join.yaml @@ -0,0 +1,112 @@ +# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information. +- name: Left join type for temporal join + sql: | + create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table version(id2 int, a2 int, b2 int); + select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2 and b1 > b2 + batch_error: |- + Not supported: do not support temporal join for batch queries + HINT: please use temporal join in streaming queries + stream_error: |- + Not supported: Temporal join requires an inner join + HINT: Please use an inner join +- name: Inner join type for temporal join + sql: | + create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table version(id2 int, a2 int, b2 int); + select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2 where a2 < 10; + stream_plan: |- + StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), version._row_id(hidden)], stream_key: [stream._row_id, version._row_id], pk_columns: [stream._row_id, version._row_id], pk_conflict: NoCheck } + └─StreamExchange { dist: HashShard(stream._row_id, version._row_id) } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: AND (stream.a1 > version.a2) AND (version.a2 < 10:Int32), nested_loop: true, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id, version._row_id] } + ├─StreamExchange { dist: Broadcast } + │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } + └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version._row_id) } + └─StreamTableScan { table: version, columns: [version.id2, version.a2, version._row_id], stream_scan_type: UpstreamOnly, stream_key: [version._row_id], pk: [_row_id], dist: UpstreamHashShard(version._row_id) } +- name: Cross join for temporal join + sql: | + create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table version(id2 int, a2 int, b2 int); + select id1, a1, id2, a2 from stream cross join version FOR SYSTEM_TIME AS OF PROCTIME(); + stream_plan: |- + StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), version._row_id(hidden)], stream_key: [stream._row_id, version._row_id], pk_columns: [stream._row_id, version._row_id], pk_conflict: NoCheck } + └─StreamExchange { dist: HashShard(stream._row_id, version._row_id) } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: , nested_loop: true, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id, version._row_id] } + ├─StreamExchange { dist: Broadcast } + │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } + └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version._row_id) } + └─StreamTableScan { table: version, columns: [version.id2, version.a2, version._row_id], stream_scan_type: UpstreamOnly, stream_key: [version._row_id], pk: [_row_id], dist: UpstreamHashShard(version._row_id) } +- name: implicit join with temporal tables + sql: | + create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table version(id2 int, a2 int, b2 int, primary key (id2)); + select id1, a1, id2, a2 from stream, version FOR SYSTEM_TIME AS OF PROCTIME() where a1 > a2 and a2 < 10; + stream_plan: |- + StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], stream_key: [stream._row_id, id2], pk_columns: [stream._row_id, id2], pk_conflict: NoCheck } + └─StreamExchange { dist: HashShard(version.id2, stream._row_id) } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: AND (stream.a1 > version.a2) AND (version.a2 < 10:Int32), nested_loop: true, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } + ├─StreamExchange { dist: Broadcast } + │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } + └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } + └─StreamTableScan { table: version, columns: [version.id2, version.a2], stream_scan_type: UpstreamOnly, stream_key: [version.id2], pk: [id2], dist: UpstreamHashShard(version.id2) } +- name: Temporal join with Aggregation + sql: | + create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table version(id2 int, a2 int, b2 int, primary key (id2)); + select count(*) from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2 where a2 < 10; + stream_plan: |- + StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: NoCheck } + └─StreamProject { exprs: [sum0(count)] } + └─StreamSimpleAgg [append_only] { aggs: [sum0(count), count] } + └─StreamExchange { dist: Single } + └─StreamStatelessSimpleAgg { aggs: [count] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: AND (stream.a1 > version.a2) AND (version.a2 < 10:Int32), nested_loop: true, output: [stream._row_id, version.id2] } + ├─StreamExchange { dist: Broadcast } + │ └─StreamTableScan { table: stream, columns: [stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } + └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } + └─StreamTableScan { table: version, columns: [version.a2, version.id2], stream_scan_type: UpstreamOnly, stream_key: [version.id2], pk: [id2], dist: UpstreamHashShard(version.id2) } +- name: Temporal join type test + sql: | + create table stream(id1 int, a1 int, b1 int); + create table version(id2 int, a2 int, b2 int, primary key (id2)); + select id1, a1, id2, a2 from stream right join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2; + stream_error: |- + Not supported: streaming nested-loop join + HINT: The non-equal join in the query requires a nested-loop join executor, which could be very expensive to run. Consider rewriting the query to use dynamic filter as a substitute if possible. + See also: https://docs.risingwave.com/docs/current/sql-pattern-dynamic-filters/ +- name: Temporal join append only test + sql: | + create table stream(id1 int, a1 int, b1 int); + create table version(id2 int, a2 int, b2 int, primary key (id2)); + select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2; + stream_error: |- + Not supported: Nested-loop Temporal join requires the left hash side to be append only + HINT: Please ensure the left hash side is append only +- name: multi-way temporal join + sql: | + create table stream(k int, a1 int, b1 int) APPEND ONLY; + create table version1(k int, x1 int, y2 int, primary key (k)); + create table version2(k int, x2 int, y2 int, primary key (k)); + select stream.k, x1, x2, a1, b1 + from stream + join version1 FOR SYSTEM_TIME AS OF PROCTIME() on stream.a1 < version1.x1 + join version2 FOR SYSTEM_TIME AS OF PROCTIME() on stream.b1 > version2.y2 where a1 < 10; + stream_plan: |- + StreamMaterialize { columns: [k, x1, x2, a1, b1, stream._row_id(hidden), version1.k(hidden), version2.k(hidden)], stream_key: [stream._row_id, version1.k, version2.k], pk_columns: [stream._row_id, version1.k, version2.k], pk_conflict: NoCheck } + └─StreamExchange { dist: HashShard(stream._row_id, version1.k, version2.k) } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: AND (stream.b1 > version2.y2), nested_loop: true, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version1.k, version2.k] } + ├─StreamExchange { dist: Broadcast } + │ └─StreamTemporalJoin { type: Inner, append_only: true, predicate: AND (stream.a1 < version1.x1), nested_loop: true, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] } + │ ├─StreamExchange { dist: Broadcast } + │ │ └─StreamFilter { predicate: (stream.a1 < 10:Int32) } + │ │ └─StreamTableScan { table: stream, columns: [stream.k, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } + │ └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version1.k) } + │ └─StreamTableScan { table: version1, columns: [version1.x1, version1.k], stream_scan_type: UpstreamOnly, stream_key: [version1.k], pk: [k], dist: UpstreamHashShard(version1.k) } + └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version2.k) } + └─StreamTableScan { table: version2, columns: [version2.x2, version2.y2, version2.k], stream_scan_type: UpstreamOnly, stream_key: [version2.k], pk: [k], dist: UpstreamHashShard(version2.k) } +- name: use CTE as temporal join right table. https://github.com/risingwavelabs/risingwave/issues/18703 + sql: | + create table stream(id1 int, a1 int, b1 int, c1 int) APPEND ONLY; + create table version(id2 int, a2 int, b2 int, c2 int, primary key (id2)); + with version as (select * from version) select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 > id2; + binder_error: 'Bind error: Right table of a temporal join should not be a CTE. It should be a table, index, or materialized view' diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml index d6b90da0a8c1..1f667a63d2fd 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml @@ -998,7 +998,7 @@ ON mod(B.auction, 10000) = S.key sink_plan: |- StreamSink { type: append-only, columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr10018(hidden), side_input.key(hidden)] } - └─StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, nested_loop: false, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] } ├─StreamExchange { dist: HashShard($expr1) } │ └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, (bid.auction % 10000:Int32) as $expr1, bid._row_id] } │ └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) } @@ -1007,7 +1007,7 @@ stream_plan: |- StreamMaterialize { columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr1(hidden), side_input.key(hidden)], stream_key: [bid._row_id, $expr1], pk_columns: [bid._row_id, $expr1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(bid._row_id, $expr1) } - └─StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, nested_loop: false, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] } ├─StreamExchange { dist: HashShard($expr1) } │ └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, (bid.auction % 10000:Int32) as $expr1, bid._row_id] } │ └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) } @@ -1020,7 +1020,7 @@ └── StreamExchange Hash([5, 6]) from 1 Fragment 1 - StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] } + StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, nested_loop: false, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] } ├── StreamExchange Hash([4]) from 2 └── StreamExchange NoShuffle from 3 diff --git a/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml b/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml index 514a56f7dff6..4ad48902ffb0 100644 --- a/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml +++ b/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml @@ -331,7 +331,7 @@ StreamMaterialize { columns: [id1, a1, id2, v1, stream._row_id(hidden)], stream_key: [stream._row_id, id1], pk_columns: [stream._row_id, id1], pk_conflict: NoCheck, watermark_columns: [v1] } └─StreamProject { exprs: [stream.id1, stream.a1, version.id2, stream.v1, stream._row_id], output_watermarks: [stream.v1] } └─StreamDynamicFilter { predicate: (stream.v1 > now), output_watermarks: [stream.v1], output: [stream.id1, stream.a1, stream.v1, version.id2, stream._row_id], cleaned_by_watermark: true } - ├─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.id1 = version.id2, output: [stream.id1, stream.a1, stream.v1, version.id2, stream._row_id] } + ├─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.id1 = version.id2, nested_loop: false, output: [stream.id1, stream.a1, stream.v1, version.id2, stream._row_id] } │ ├─StreamExchange { dist: HashShard(stream.id1) } │ │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.v1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } │ └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } diff --git a/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml b/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml index 40fda8fcb85a..41aad6991429 100644 --- a/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml +++ b/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml @@ -7,7 +7,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], stream_key: [stream._row_id, id1], pk_columns: [stream._row_id, id1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream._row_id) } - └─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.id1 = version.id2, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } + └─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.id1 = version.id2, nested_loop: false, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } ├─StreamExchange { dist: HashShard(stream.id1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } @@ -23,7 +23,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], stream_key: [stream._row_id, id1], pk_columns: [stream._row_id, id1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream._row_id) } - └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), nested_loop: false, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } ├─StreamExchange { dist: HashShard(stream.id1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } @@ -36,7 +36,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], stream_key: [stream._row_id, id1], pk_columns: [stream._row_id, id1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream._row_id) } - └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), nested_loop: false, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } ├─StreamExchange { dist: HashShard(stream.id1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } @@ -49,7 +49,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], stream_key: [stream._row_id, id1, a1], pk_columns: [stream._row_id, id1, a1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream.a1, stream._row_id) } - └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version.id2 AND stream.a1 = version.a2 AND (version.b2 <> version.a2), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version.id2 AND stream.a1 = version.a2 AND (version.b2 <> version.a2), nested_loop: false, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } ├─StreamExchange { dist: HashShard(stream.id1, stream.a1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2, version.a2) } @@ -65,7 +65,7 @@ └─StreamSimpleAgg [append_only] { aggs: [sum0(count), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count] } - └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream._row_id, stream.id1, version.id2] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), nested_loop: false, output: [stream._row_id, stream.id1, version.id2] } ├─StreamExchange { dist: HashShard(stream.id1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } @@ -86,7 +86,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], stream_key: [stream._row_id, id1], pk_columns: [stream._row_id, id1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream._row_id) } - └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), nested_loop: false, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } ├─StreamExchange { dist: HashShard(stream.id1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } @@ -111,9 +111,9 @@ stream_plan: |- StreamMaterialize { columns: [k, x1, x2, a1, b1, stream._row_id(hidden), version2.k(hidden)], stream_key: [stream._row_id, k], pk_columns: [stream._row_id, k], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.k, stream._row_id) } - └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.k = version2.k, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version2.k] } + └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.k = version2.k, nested_loop: false, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version2.k] } ├─StreamExchange { dist: HashShard(stream.k) } - │ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.k = version1.k, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] } + │ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.k = version1.k, nested_loop: false, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] } │ ├─StreamExchange { dist: HashShard(stream.k) } │ │ └─StreamFilter { predicate: (stream.a1 < 10:Int32) } │ │ └─StreamTableScan { table: stream, columns: [stream.k, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } @@ -133,9 +133,9 @@ stream_plan: |- StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version2.id2(hidden)], stream_key: [stream._row_id, id1, id2], pk_columns: [stream._row_id, id1, id2], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream.id2, stream._row_id) } - └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] } + └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id2 = version2.id2, nested_loop: false, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] } ├─StreamExchange { dist: HashShard(stream.id2) } - │ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } + │ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id1 = version1.id1, nested_loop: false, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } │ ├─StreamExchange { dist: HashShard(stream.id1) } │ │ └─StreamFilter { predicate: (stream.a1 < 10:Int32) } │ │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.id2, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } @@ -155,9 +155,9 @@ stream_plan: |- StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version2.id2(hidden)], stream_key: [stream._row_id, id1, id2], pk_columns: [stream._row_id, id1, id2], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream.id2, stream._row_id) } - └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] } + └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id2 = version2.id2, nested_loop: false, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] } ├─StreamExchange { dist: HashShard(stream.id2) } - │ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } + │ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id1 = version1.id1, nested_loop: false, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } │ ├─StreamExchange { dist: HashShard(stream.id1) } │ │ └─StreamFilter { predicate: (stream.a1 < 10:Int32) } │ │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.id2, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } @@ -174,7 +174,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.a1, idx2.id2, stream._row_id, stream.b1) } - └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, nested_loop: false, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } ├─StreamExchange { dist: HashShard(stream.a1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.a2) } @@ -188,7 +188,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.a1, idx2.id2, stream._row_id, stream.b1) } - └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, nested_loop: false, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } ├─StreamExchange { dist: HashShard(stream.a1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.a2) } @@ -202,7 +202,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, stream.b1, a1], pk_columns: [stream._row_id, id2, stream.b1, a1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.a1, idx2.id2, stream._row_id, stream.b1) } - └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.b1 = idx2.b2 AND (stream.a1 = idx2.a2), output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.b1 = idx2.b2 AND (stream.a1 = idx2.a2), nested_loop: false, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } ├─StreamExchange { dist: HashShard(stream.b1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.b2) } @@ -214,7 +214,7 @@ select * from t left join v FOR SYSTEM_TIME AS OF PROCTIME() on a = count; stream_plan: |- StreamMaterialize { columns: [a, count, t._row_id(hidden), $expr1(hidden)], stream_key: [t._row_id, $expr1], pk_columns: [t._row_id, $expr1], pk_conflict: NoCheck } - └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: AND ($expr1 = v.count), output: [t.a, v.count, t._row_id, $expr1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: AND ($expr1 = v.count), nested_loop: false, output: [t.a, v.count, t._row_id, $expr1] } ├─StreamExchange { dist: Single } │ └─StreamProject { exprs: [t.a, t.a::Int64 as $expr1, t._row_id] } │ └─StreamTableScan { table: t, columns: [t.a, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } @@ -229,7 +229,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.a1, idx.id2, stream._row_id, stream.b1) } - └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.a1 = idx.a2 AND stream.b1 = idx.b2, output: [stream.id1, stream.a1, idx.id2, idx.a2, stream._row_id, stream.b1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.a1 = idx.a2 AND stream.b1 = idx.b2, nested_loop: false, output: [stream.id1, stream.a1, idx.id2, idx.a2, stream._row_id, stream.b1] } ├─StreamExchange { dist: HashShard(stream.a1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx.a2) } @@ -244,7 +244,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.a1, idx2.id2, stream._row_id, stream.b1) } - └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, nested_loop: false, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } ├─StreamExchange { dist: HashShard(stream.a1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.a2) } @@ -260,7 +260,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden), stream.c1(hidden)], stream_key: [stream._row_id, id1, a1, stream.b1, stream.c1], pk_columns: [stream._row_id, id1, a1, stream.b1, stream.c1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream.a1, stream._row_id, stream.b1, stream.c1) } - └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.id1 = version.id2 AND (stream.a1 = version.a2) AND (stream.b1 = version.b2) AND (stream.c1 = version.c2), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id, stream.b1, stream.c1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.id1 = version.id2 AND (stream.a1 = version.a2) AND (stream.b1 = version.b2) AND (stream.c1 = version.c2), nested_loop: false, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id, stream.b1, stream.c1] } ├─StreamExchange { dist: HashShard(stream.id1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream.c1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index d7e8eb79dd70..98469f4d0542 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -997,16 +997,7 @@ impl LogicalJoin { result_plan } - fn to_stream_temporal_join( - &self, - predicate: EqJoinPredicate, - ctx: &mut ToStreamContext, - ) -> Result { - use super::stream::prelude::*; - - assert!(predicate.has_eq()); - - let right = self.right(); + fn check_temporal_rhs(right: &PlanRef) -> Result<&LogicalScan> { let Some(logical_scan) = right.as_logical_scan() else { return Err(RwError::from(ErrorCode::NotSupported( "Temporal join requires a table scan as its lookup table".into(), @@ -1020,6 +1011,88 @@ impl LogicalJoin { "Please use FOR SYSTEM_TIME AS OF PROCTIME() syntax".into(), ))); } + Ok(logical_scan) + } + + fn temporal_join_scan_predicate_pull_up( + logical_scan: &LogicalScan, + predicate: EqJoinPredicate, + output_indices: &[usize], + left_schema_len: usize, + ) -> Result<(StreamTableScan, EqJoinPredicate, Condition, Vec)> { + // Extract the predicate from logical scan. Only pure scan is supported. + let (new_scan, scan_predicate, project_expr) = logical_scan.predicate_pull_up(); + // Construct output column to require column mapping + let o2r = if let Some(project_expr) = project_expr { + project_expr + .into_iter() + .map(|x| x.as_input_ref().unwrap().index) + .collect_vec() + } else { + (0..logical_scan.output_col_idx().len()).collect_vec() + }; + let mut join_predicate_rewriter = LookupJoinPredicateRewriter { + offset: left_schema_len, + mapping: o2r.clone(), + }; + + let new_eq_cond = predicate + .eq_cond() + .rewrite_expr(&mut join_predicate_rewriter); + + let mut scan_predicate_rewriter = LookupJoinScanPredicateRewriter { + offset: left_schema_len, + }; + + let new_other_cond = predicate + .other_cond() + .clone() + .rewrite_expr(&mut join_predicate_rewriter) + .and(scan_predicate.rewrite_expr(&mut scan_predicate_rewriter)); + + let new_join_on = new_eq_cond.and(new_other_cond); + + let new_predicate = EqJoinPredicate::create( + left_schema_len, + new_scan.schema().len(), + new_join_on.clone(), + ); + + // Rewrite the join output indices and all output indices referred to the old scan need to + // rewrite. + let new_join_output_indices = output_indices + .iter() + .map(|&x| { + if x < left_schema_len { + x + } else { + o2r[x - left_schema_len] + left_schema_len + } + }) + .collect_vec(); + // Use UpstreamOnly chain type + let new_stream_table_scan = + StreamTableScan::new_with_stream_scan_type(new_scan, StreamScanType::UpstreamOnly); + Ok(( + new_stream_table_scan, + new_predicate, + new_join_on, + new_join_output_indices, + )) + } + + fn to_stream_temporal_join( + &self, + predicate: EqJoinPredicate, + ctx: &mut ToStreamContext, + ) -> Result { + use super::stream::prelude::*; + + assert!(predicate.has_eq()); + + let right = self.right(); + + let logical_scan = Self::check_temporal_rhs(&right)?; let table_desc = logical_scan.table_desc(); let output_column_ids = logical_scan.output_column_ids(); @@ -1085,67 +1158,79 @@ impl LogicalJoin { // Enforce a shuffle for the temporal join LHS to let the scheduler be able to schedule the join fragment together with the RHS with a `no_shuffle` exchange. let left = required_dist.enforce(left, &Order::any()); - // Extract the predicate from logical scan. Only pure scan is supported. - let (new_scan, scan_predicate, project_expr) = logical_scan.predicate_pull_up(); - // Construct output column to require column mapping - let o2r = if let Some(project_expr) = project_expr { - project_expr - .into_iter() - .map(|x| x.as_input_ref().unwrap().index) - .collect_vec() - } else { - (0..logical_scan.output_col_idx().len()).collect_vec() - }; - let left_schema_len = self.left().schema().len(); - let mut join_predicate_rewriter = LookupJoinPredicateRewriter { - offset: left_schema_len, - mapping: o2r.clone(), - }; + let (new_stream_table_scan, new_predicate, new_join_on, new_join_output_indices) = + Self::temporal_join_scan_predicate_pull_up( + logical_scan, + predicate, + self.output_indices(), + self.left().schema().len(), + )?; - let new_eq_cond = predicate - .eq_cond() - .rewrite_expr(&mut join_predicate_rewriter); + let right = RequiredDist::no_shuffle(new_stream_table_scan.into()); + if !new_predicate.has_eq() { + return Err(RwError::from(ErrorCode::NotSupported( + "Temporal join requires a non trivial join condition".into(), + "Please remove the false condition of the join".into(), + ))); + } - let mut scan_predicate_rewriter = LookupJoinScanPredicateRewriter { - offset: left_schema_len, - }; + // Construct a new logical join, because we have change its RHS. + let new_logical_join = generic::Join::new( + left, + right, + new_join_on, + self.join_type(), + new_join_output_indices, + ); - let new_other_cond = predicate - .other_cond() - .clone() - .rewrite_expr(&mut join_predicate_rewriter) - .and(scan_predicate.rewrite_expr(&mut scan_predicate_rewriter)); + let new_predicate = new_predicate.retain_prefix_eq_key(lookup_prefix_len); - let new_join_on = new_eq_cond.and(new_other_cond); - let new_predicate = EqJoinPredicate::create( - left_schema_len, - new_scan.schema().len(), - new_join_on.clone(), - ); + Ok(StreamTemporalJoin::new( + new_logical_join, + new_predicate, + false, + )) + } - if !new_predicate.has_eq() { + fn to_stream_nested_loop_temporal_join( + &self, + predicate: EqJoinPredicate, + ctx: &mut ToStreamContext, + ) -> Result { + use super::stream::prelude::*; + assert!(!predicate.has_eq()); + + let left = self.left().to_stream_with_dist_required( + &RequiredDist::PhysicalDist(Distribution::Broadcast), + ctx, + )?; + assert!(left.as_stream_exchange().is_some()); + + if self.join_type() != JoinType::Inner { return Err(RwError::from(ErrorCode::NotSupported( - "Temporal join requires a non trivial join condition".into(), - "Please remove the false condition of the join".into(), + "Temporal join requires an inner join".into(), + "Please use an inner join".into(), ))); } - // Rewrite the join output indices and all output indices referred to the old scan need to - // rewrite. - let new_join_output_indices = self - .output_indices() - .iter() - .map(|&x| { - if x < left_schema_len { - x - } else { - o2r[x - left_schema_len] + left_schema_len - } - }) - .collect_vec(); - // Use UpstreamOnly chain type - let new_stream_table_scan = - StreamTableScan::new_with_stream_scan_type(new_scan, StreamScanType::UpstreamOnly); + if !left.append_only() { + return Err(RwError::from(ErrorCode::NotSupported( + "Nested-loop Temporal join requires the left hash side to be append only".into(), + "Please ensure the left hash side is append only".into(), + ))); + } + + let right = self.right(); + let logical_scan = Self::check_temporal_rhs(&right)?; + + let (new_stream_table_scan, new_predicate, new_join_on, new_join_output_indices) = + Self::temporal_join_scan_predicate_pull_up( + logical_scan, + predicate, + self.output_indices(), + self.left().schema().len(), + )?; + let right = RequiredDist::no_shuffle(new_stream_table_scan.into()); // Construct a new logical join, because we have change its RHS. @@ -1157,9 +1242,11 @@ impl LogicalJoin { new_join_output_indices, ); - let new_predicate = new_predicate.retain_prefix_eq_key(lookup_prefix_len); - - Ok(StreamTemporalJoin::new(new_logical_join, new_predicate)) + Ok(StreamTemporalJoin::new( + new_logical_join, + new_predicate, + true, + )) } fn to_stream_dynamic_filter( @@ -1383,6 +1470,9 @@ impl ToStream for LogicalJoin { } else { self.to_stream_hash_join(predicate, ctx) } + } else if self.should_be_temporal_join() { + self.to_stream_nested_loop_temporal_join(predicate, ctx) + .map(|x| x.into()) } else if let Some(dynamic_filter) = self.to_stream_dynamic_filter(self.on().clone(), ctx)? { diff --git a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs index 7f50d4b27e62..390a141dcb38 100644 --- a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs @@ -42,12 +42,19 @@ pub struct StreamTemporalJoin { core: generic::Join, eq_join_predicate: EqJoinPredicate, append_only: bool, + is_nested_loop: bool, } impl StreamTemporalJoin { - pub fn new(core: generic::Join, eq_join_predicate: EqJoinPredicate) -> Self { + pub fn new( + core: generic::Join, + eq_join_predicate: EqJoinPredicate, + is_nested_loop: bool, + ) -> Self { assert!(core.join_type == JoinType::Inner || core.join_type == JoinType::LeftOuter); let append_only = core.left.append_only(); + assert!(!is_nested_loop || append_only); + let right = core.right.clone(); let exchange: &StreamExchange = right .as_stream_exchange() @@ -59,8 +66,16 @@ impl StreamTemporalJoin { .expect("should be a stream table scan"); assert!(matches!(scan.core().as_of, Some(AsOf::ProcessTime))); - let l2o = core.l2i_col_mapping().composite(&core.i2o_col_mapping()); - let dist = l2o.rewrite_provided_distribution(core.left.distribution()); + let dist = if is_nested_loop { + // Use right side distribution directly if it's nested loop temporal join. + let r2o = core.r2i_col_mapping().composite(&core.i2o_col_mapping()); + r2o.rewrite_provided_distribution(core.right.distribution()) + } else { + // Use left side distribution directly if it's hash temporal join. + // https://github.com/risingwavelabs/risingwave/pull/19201#discussion_r1824031780 + let l2o = core.l2i_col_mapping().composite(&core.i2o_col_mapping()); + l2o.rewrite_provided_distribution(core.left.distribution()) + }; // Use left side watermark directly. let watermark_columns = core.i2o_col_mapping().rewrite_bitset( @@ -89,6 +104,7 @@ impl StreamTemporalJoin { core, eq_join_predicate, append_only, + is_nested_loop, } } @@ -105,6 +121,10 @@ impl StreamTemporalJoin { self.append_only } + pub fn is_nested_loop(&self) -> bool { + self.eq_join_predicate().has_eq() + } + /// Return memo-table catalog and its `pk_indices`. /// (`join_key` + `left_pk` + `right_pk`) -> (`right_scan_schema` + `join_key` + `left_pk`) /// @@ -170,6 +190,8 @@ impl Distill for StreamTemporalJoin { }), )); + vec.push(("nested_loop", Pretty::debug(&self.is_nested_loop))); + if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) { vec.push(("output_watermarks", ow)); } @@ -196,7 +218,7 @@ impl PlanTreeNodeBinary for StreamTemporalJoin { let mut core = self.core.clone(); core.left = left; core.right = right; - Self::new(core, self.eq_join_predicate.clone()) + Self::new(core, self.eq_join_predicate.clone(), self.is_nested_loop) } } @@ -244,6 +266,7 @@ impl TryToStreamPb for StreamTemporalJoin { memo_table = memo_table.with_id(state.gen_table_id_wrapped()); Some(memo_table.to_internal_table_prost()) }, + is_nested_loop: self.is_nested_loop, })) } } @@ -256,7 +279,12 @@ impl ExprRewritable for StreamTemporalJoin { fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { let mut core = self.core.clone(); core.rewrite_exprs(r); - Self::new(core, self.eq_join_predicate.rewrite_exprs(r)).into() + Self::new( + core, + self.eq_join_predicate.rewrite_exprs(r), + self.is_nested_loop, + ) + .into() } } diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index a053e7dc5021..0e5e7862e960 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -140,6 +140,7 @@ pub use lookup_union::LookupUnionExecutor; pub use merge::MergeExecutor; pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream}; pub use mview::*; +pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor; pub use no_op::NoOpExecutor; pub use now::*; pub use over_window::*; diff --git a/src/stream/src/executor/nested_loop_temporal_join.rs b/src/stream/src/executor/nested_loop_temporal_join.rs index 842fa5254f8d..55d21b468a77 100644 --- a/src/stream/src/executor/nested_loop_temporal_join.rs +++ b/src/stream/src/executor/nested_loop_temporal_join.rs @@ -98,7 +98,6 @@ async fn phase1_handle_chunk( } impl NestedLoopTemporalJoinExecutor { - #[expect(dead_code)] #[expect(clippy::too_many_arguments)] pub fn new( ctx: ActorContextRef, diff --git a/src/stream/src/from_proto/temporal_join.rs b/src/stream/src/from_proto/temporal_join.rs index 805049a29688..99490355f326 100644 --- a/src/stream/src/from_proto/temporal_join.rs +++ b/src/stream/src/from_proto/temporal_join.rs @@ -24,7 +24,9 @@ use risingwave_storage::table::batch_table::storage_table::StorageTable; use super::*; use crate::common::table::state_table::StateTable; use crate::executor::monitor::StreamingMetrics; -use crate::executor::{ActorContextRef, JoinType, TemporalJoinExecutor}; +use crate::executor::{ + ActorContextRef, JoinType, NestedLoopTemporalJoinExecutor, TemporalJoinExecutor, +}; use crate::task::AtomicU64Ref; pub struct TemporalJoinExecutorBuilder; @@ -38,43 +40,6 @@ impl ExecutorBuilder for TemporalJoinExecutorBuilder { store: impl StateStore, ) -> StreamResult { let table_desc: &StorageTableDesc = node.get_table_desc()?; - let table = { - let column_ids = table_desc - .columns - .iter() - .map(|x| ColumnId::new(x.column_id)) - .collect_vec(); - - StorageTable::new_partial( - store.clone(), - column_ids, - params.vnode_bitmap.clone().map(Into::into), - table_desc, - ) - }; - - let table_stream_key_indices = table_desc - .stream_key - .iter() - .map(|&k| k as usize) - .collect_vec(); - - let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap(); - - let left_join_keys = node - .get_left_key() - .iter() - .map(|key| *key as usize) - .collect_vec(); - - let right_join_keys = node - .get_right_key() - .iter() - .map(|key| *key as usize) - .collect_vec(); - - let null_safe = node.get_null_safe().to_vec(); - let condition = match node.get_condition() { Ok(cond_prost) => Some(build_non_strict_from_prost( cond_prost, @@ -94,52 +59,118 @@ impl ExecutorBuilder for TemporalJoinExecutorBuilder { .iter() .map(|&x| x as usize) .collect_vec(); + let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap(); - let join_key_data_types = left_join_keys - .iter() - .map(|idx| source_l.schema().fields[*idx].data_type()) - .collect_vec(); + if node.get_is_nested_loop() { + let right_table = StorageTable::new_partial( + store.clone(), + table_output_indices + .iter() + .map(|&x| ColumnId::new(table_desc.columns[x].column_id)) + .collect_vec(), + params.vnode_bitmap.clone().map(Into::into), + table_desc, + ); - let memo_table = node.get_memo_table(); - let memo_table = match memo_table { - Ok(memo_table) => { - let vnodes = Arc::new( - params - .vnode_bitmap - .expect("vnodes not set for temporal join"), - ); - Some( - StateTable::from_table_catalog(memo_table, store.clone(), Some(vnodes.clone())) - .await, + let dispatcher_args = NestedLoopTemporalJoinExecutorDispatcherArgs { + ctx: params.actor_context, + info: params.info.clone(), + left: source_l, + right: source_r, + right_table, + condition, + output_indices, + chunk_size: params.env.config().developer.chunk_size, + metrics: params.executor_stats, + join_type_proto: node.get_join_type()?, + }; + Ok((params.info, dispatcher_args.dispatch()?).into()) + } else { + let table = { + let column_ids = table_desc + .columns + .iter() + .map(|x| ColumnId::new(x.column_id)) + .collect_vec(); + + StorageTable::new_partial( + store.clone(), + column_ids, + params.vnode_bitmap.clone().map(Into::into), + table_desc, ) - } - Err(_) => None, - }; - let append_only = memo_table.is_none(); - - let dispatcher_args = TemporalJoinExecutorDispatcherArgs { - ctx: params.actor_context, - info: params.info.clone(), - left: source_l, - right: source_r, - right_table: table, - left_join_keys, - right_join_keys, - null_safe, - condition, - output_indices, - table_output_indices, - table_stream_key_indices, - watermark_epoch: params.watermark_epoch, - chunk_size: params.env.config().developer.chunk_size, - metrics: params.executor_stats, - join_type_proto: node.get_join_type()?, - join_key_data_types, - memo_table, - append_only, - }; + }; + + let table_stream_key_indices = table_desc + .stream_key + .iter() + .map(|&k| k as usize) + .collect_vec(); + + let left_join_keys = node + .get_left_key() + .iter() + .map(|key| *key as usize) + .collect_vec(); + + let right_join_keys = node + .get_right_key() + .iter() + .map(|key| *key as usize) + .collect_vec(); + + let null_safe = node.get_null_safe().to_vec(); + + let join_key_data_types = left_join_keys + .iter() + .map(|idx| source_l.schema().fields[*idx].data_type()) + .collect_vec(); - Ok((params.info, dispatcher_args.dispatch()?).into()) + let memo_table = node.get_memo_table(); + let memo_table = match memo_table { + Ok(memo_table) => { + let vnodes = Arc::new( + params + .vnode_bitmap + .expect("vnodes not set for temporal join"), + ); + Some( + StateTable::from_table_catalog( + memo_table, + store.clone(), + Some(vnodes.clone()), + ) + .await, + ) + } + Err(_) => None, + }; + let append_only = memo_table.is_none(); + + let dispatcher_args = TemporalJoinExecutorDispatcherArgs { + ctx: params.actor_context, + info: params.info.clone(), + left: source_l, + right: source_r, + right_table: table, + left_join_keys, + right_join_keys, + null_safe, + condition, + output_indices, + table_output_indices, + table_stream_key_indices, + watermark_epoch: params.watermark_epoch, + chunk_size: params.env.config().developer.chunk_size, + metrics: params.executor_stats, + join_type_proto: node.get_join_type()?, + join_key_data_types, + memo_table, + append_only, + }; + + Ok((params.info, dispatcher_args.dispatch()?).into()) + } } } @@ -221,3 +252,49 @@ impl HashKeyDispatcher for TemporalJoinExecutorDispatcherArgs &self.join_key_data_types } } + +struct NestedLoopTemporalJoinExecutorDispatcherArgs { + ctx: ActorContextRef, + info: ExecutorInfo, + left: Executor, + right: Executor, + right_table: StorageTable, + condition: Option, + output_indices: Vec, + chunk_size: usize, + metrics: Arc, + join_type_proto: JoinTypeProto, +} + +impl NestedLoopTemporalJoinExecutorDispatcherArgs { + fn dispatch(self) -> StreamResult> { + /// This macro helps to fill the const generic type parameter. + macro_rules! build { + ($join_type:ident) => { + Ok(Box::new(NestedLoopTemporalJoinExecutor::< + S, + { JoinType::$join_type }, + >::new( + self.ctx, + self.info, + self.left, + self.right, + self.right_table, + self.condition, + self.output_indices, + self.metrics, + self.chunk_size, + ))) + }; + } + match self.join_type_proto { + JoinTypeProto::Inner => { + build!(Inner) + } + JoinTypeProto::LeftOuter => { + build!(LeftOuter) + } + _ => unreachable!(), + } + } +}