diff --git a/e2e_test/streaming/temporal_join/issue_15257.slt b/e2e_test/streaming/temporal_join/append_only/issue_15257.slt similarity index 100% rename from e2e_test/streaming/temporal_join/issue_15257.slt rename to e2e_test/streaming/temporal_join/append_only/issue_15257.slt diff --git a/e2e_test/streaming/temporal_join/temporal_join.slt b/e2e_test/streaming/temporal_join/append_only/temporal_join.slt similarity index 100% rename from e2e_test/streaming/temporal_join/temporal_join.slt rename to e2e_test/streaming/temporal_join/append_only/temporal_join.slt diff --git a/e2e_test/streaming/temporal_join/temporal_join_multiple_rows.slt b/e2e_test/streaming/temporal_join/append_only/temporal_join_multiple_rows.slt similarity index 100% rename from e2e_test/streaming/temporal_join/temporal_join_multiple_rows.slt rename to e2e_test/streaming/temporal_join/append_only/temporal_join_multiple_rows.slt diff --git a/e2e_test/streaming/temporal_join/temporal_join_non_loopup_cond.slt b/e2e_test/streaming/temporal_join/append_only/temporal_join_non_loopup_cond.slt similarity index 100% rename from e2e_test/streaming/temporal_join/temporal_join_non_loopup_cond.slt rename to e2e_test/streaming/temporal_join/append_only/temporal_join_non_loopup_cond.slt diff --git a/e2e_test/streaming/temporal_join/temporal_join_watermark.slt b/e2e_test/streaming/temporal_join/append_only/temporal_join_watermark.slt similarity index 100% rename from e2e_test/streaming/temporal_join/temporal_join_watermark.slt rename to e2e_test/streaming/temporal_join/append_only/temporal_join_watermark.slt diff --git a/e2e_test/streaming/temporal_join/temporal_join_with_index.slt b/e2e_test/streaming/temporal_join/append_only/temporal_join_with_index.slt similarity index 100% rename from e2e_test/streaming/temporal_join/temporal_join_with_index.slt rename to e2e_test/streaming/temporal_join/append_only/temporal_join_with_index.slt diff --git a/e2e_test/streaming/temporal_join/non_append_only/temporal_join.slt b/e2e_test/streaming/temporal_join/non_append_only/temporal_join.slt new file mode 100644 index 0000000000000..b98658b6e8d2e --- /dev/null +++ b/e2e_test/streaming/temporal_join/non_append_only/temporal_join.slt @@ -0,0 +1,47 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table stream(id1 int, a1 int, b1 int); + +statement ok +create table version(id2 int, a2 int, b2 int, primary key (id2)); + +statement ok +create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 + +statement ok +insert into stream values(1, 11, 111); + +statement ok +insert into version values(1, 11, 111); + +statement ok +insert into stream values(1, 11, 111); + +statement ok +delete from version; + +query IIII rowsort +select * from v; +---- +1 11 1 11 +1 11 NULL NULL + +statement ok +update stream set a1 = 22, b1 = 222 + +query IIII rowsort +select * from v; +---- +1 22 NULL NULL +1 22 NULL NULL + +statement ok +drop materialized view v; + +statement ok +drop table stream; + +statement ok +drop table version; diff --git a/e2e_test/streaming/temporal_join/non_append_only/temporal_join_non_lookup_cond.slt b/e2e_test/streaming/temporal_join/non_append_only/temporal_join_non_lookup_cond.slt new file mode 100644 index 0000000000000..2c0cf094db998 --- /dev/null +++ b/e2e_test/streaming/temporal_join/non_append_only/temporal_join_non_lookup_cond.slt @@ -0,0 +1,100 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table stream(id1 int, a1 int, b1 int); + +statement ok +create table version(id2 int, a2 int, b2 int, primary key (id2)); + +statement ok +create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 and a1 > a2; + +statement ok +insert into stream values(1, 11, 111); + +statement ok +insert into version values(1, 12, 111); + +statement ok +insert into stream values(1, 13, 111); + +statement ok +delete from version; + +query IIII rowsort +select * from v; +---- +1 11 NULL NULL +1 13 1 12 + +statement ok +delete from stream; + +statement ok +insert into version values(2, 22, 222); + +statement ok +insert into stream values(2, 23, 222); + +query IIII rowsort +select * from v; +---- +2 23 2 22 + +statement ok +delete from stream; + +query IIII rowsort +select * from v; +---- + + +statement ok +drop materialized view v; + +statement ok +drop table stream; + +statement ok +drop table version; + +statement ok +create table stream(id1 int, a1 int, b1 int); + +statement ok +create table version(id2 int, a2 int, b2 int, primary key (id2)); + +statement ok +create materialized view v as select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 and a1 > a2; + +statement ok +insert into version values (1, 12, 111), (2, 12, 111); + +statement ok +insert into stream values (1, 11, 111), (2, 13, 111); + +query IIII rowsort +select * from v; +---- +2 13 2 12 + +statement ok +update stream set a1 = 222, b1 = 333 where id1 = 1; + +statement ok +update stream set a1 = 2, b1 = 3 where id1 = 2; + +query IIII rowsort +select * from v; +---- +1 222 1 12 + +statement ok +drop materialized view v; + +statement ok +drop table stream; + +statement ok +drop table version; diff --git a/e2e_test/streaming/temporal_join/non_append_only/temporal_join_with_index.slt b/e2e_test/streaming/temporal_join/non_append_only/temporal_join_with_index.slt new file mode 100644 index 0000000000000..e9e7e59a483d2 --- /dev/null +++ b/e2e_test/streaming/temporal_join/non_append_only/temporal_join_with_index.slt @@ -0,0 +1,44 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table stream(id1 int, a1 int, b1 int); + +statement ok +create table version(id2 int, a2 int, b2 int, primary key (id2)); + +statement ok +create index idx on version (a2); + +statement ok +create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on b1 = b2 and a1 = a2; + +statement ok +insert into version values(1, 11, 111); + +statement ok +insert into stream values(1, 11, 111); + +query IIII rowsort +select * from v; +---- +1 11 1 11 + +statement ok +update stream set a1 = 22 where id1 = 1; + +query IIII rowsort +select * from v; +---- +1 22 NULL NULL + +statement ok +drop materialized view v; + +statement ok +drop table stream; + +statement ok +drop table version; + + diff --git a/e2e_test/streaming/temporal_join/non_append_only/temporal_join_with_index2.slt b/e2e_test/streaming/temporal_join/non_append_only/temporal_join_with_index2.slt new file mode 100644 index 0000000000000..c560c135f07cf --- /dev/null +++ b/e2e_test/streaming/temporal_join/non_append_only/temporal_join_with_index2.slt @@ -0,0 +1,44 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table stream(id1 int, a1 int, b1 int); + +statement ok +create table version(id2 int, a2 int, b2 int, primary key (id2)); + +statement ok +create index idx on version(a2, b2) distributed by (a2); + +statement ok +create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on b1 = b2 and a1 = a2; + +statement ok +insert into version values(1, 11, 111); + +statement ok +insert into stream values(1, 11, 111); + +query IIII rowsort +select * from v; +---- +1 11 1 11 + +statement ok +update stream set a1 = 22 where id1 = 1; + +query IIII rowsort +select * from v; +---- +1 22 NULL NULL + +statement ok +drop materialized view v; + +statement ok +drop table stream; + +statement ok +drop table version; + + diff --git a/e2e_test/streaming/temporal_join/non_append_only/temporal_join_with_index3.slt b/e2e_test/streaming/temporal_join/non_append_only/temporal_join_with_index3.slt new file mode 100644 index 0000000000000..d9c52e2db8a3a --- /dev/null +++ b/e2e_test/streaming/temporal_join/non_append_only/temporal_join_with_index3.slt @@ -0,0 +1,44 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table stream(id1 int, a1 int, b1 int); + +statement ok +create table version(id2 int, a2 int, b2 int, primary key (id2)); + +statement ok +create index idx on version(a2, b2) distributed by (a2, b2); + +statement ok +create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on b1 = b2 and a1 = a2; + +statement ok +insert into version values(1, 11, 111); + +statement ok +insert into stream values(1, 11, 111); + +query IIII rowsort +select * from v; +---- +1 11 1 11 + +statement ok +update stream set a1 = 22 where id1 = 1; + +query IIII rowsort +select * from v; +---- +1 22 NULL NULL + +statement ok +drop materialized view v; + +statement ok +drop table stream; + +statement ok +drop table version; + + diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index b4393153b57a8..89c0521378972 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -435,6 +435,8 @@ message TemporalJoinNode { plan_common.StorageTableDesc table_desc = 7; // The output indices of the lookup side table repeated uint32 table_output_indices = 8; + // The state table used for non-append-only temporal join. + optional catalog.Table memo_table = 9; } message DynamicFilterNode { diff --git a/src/common/src/util/stream_graph_visitor.rs b/src/common/src/util/stream_graph_visitor.rs index a3bb869b55279..81f1189693c0d 100644 --- a/src/common/src/util/stream_graph_visitor.rs +++ b/src/common/src/util/stream_graph_visitor.rs @@ -109,6 +109,9 @@ pub fn visit_stream_node_tables_inner( always!(node.right_table, "HashJoinRight"); always!(node.right_degree_table, "HashJoinDegreeRight"); } + NodeBody::TemporalJoin(node) => { + optional!(node.memo_table, "TemporalJoinMemo"); + } NodeBody::DynamicFilter(node) => { if node.condition_always_relax { always!(node.left_table, "DynamicFilterLeftNotSatisfy"); diff --git a/src/frontend/planner_test/tests/testdata/input/temporal_join.yaml b/src/frontend/planner_test/tests/testdata/input/temporal_join.yaml index 500e4ae2c2984..7bf9769b4c431 100644 --- a/src/frontend/planner_test/tests/testdata/input/temporal_join.yaml +++ b/src/frontend/planner_test/tests/testdata/input/temporal_join.yaml @@ -43,11 +43,11 @@ - stream_error - name: Temporal join append only test sql: | - create table stream(id1 int, a1 int, b1 int); + create table stream(id1 int, a1 int, b1 int) append only; create table version(id2 int, a2 int, b2 int, primary key (id2)); select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 where a2 < 10; expected_outputs: - - stream_error + - stream_plan - name: Temporal join type test sql: | create table stream(id1 int, a1 int, b1 int); @@ -57,7 +57,7 @@ - stream_error - name: multi-way temporal join with the same key sql: | - create table stream(k int, a1 int, b1 int) APPEND ONLY; + create table stream(k int, a1 int, b1 int); create table version1(k int, x1 int, y2 int, primary key (k)); create table version2(k int, x2 int, y2 int, primary key (k)); select stream.k, x1, x2, a1, b1 @@ -68,7 +68,7 @@ - stream_plan - name: multi-way temporal join with different keys sql: | - create table stream(id1 int, id2 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, id2 int, a1 int, b1 int); create table version1(id1 int, x1 int, y2 int, primary key (id1)); create table version2(id2 int, x2 int, y2 int, primary key (id2)); select stream.id1, x1, stream.id2, x2, a1, b1 @@ -79,7 +79,7 @@ - stream_plan - name: multi-way temporal join with different keys sql: | - create table stream(id1 int, id2 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, id2 int, a1 int, b1 int); create table version1(id1 int, x1 int, y2 int, primary key (id1)); create table version2(id2 int, x2 int, y2 int, primary key (id2)); select stream.id1, x1, stream.id2, x2, a1, b1 @@ -90,7 +90,7 @@ - stream_plan - name: temporal join with an index (distribution key size = 1) sql: | - create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int); create table version(id2 int, a2 int, b2 int, primary key (id2)); create index idx2 on version (a2, b2) distributed by (a2); select id1, a1, id2, a2 from stream left join idx2 FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2; @@ -98,7 +98,7 @@ - stream_plan - name: temporal join with an index (distribution key size = 2) sql: | - create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int); create table version(id2 int, a2 int, b2 int, primary key (id2)); create index idx2 on version (a2, b2); select id1, a1, id2, a2 from stream left join idx2 FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2; @@ -106,7 +106,7 @@ - stream_plan - name: temporal join with an index (index column size = 1) sql: | - create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int); create table version(id2 int, a2 int, b2 int, primary key (id2)); create index idx2 on version (b2); select id1, a1, id2, a2 from stream left join idx2 FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2; @@ -114,14 +114,14 @@ - stream_plan - name: temporal join with singleton table sql: | - create table t (a int) append only; + create table t (a int); create materialized view v as select count(*) from t; select * from t left join v FOR SYSTEM_TIME AS OF PROCTIME() on a = count; expected_outputs: - stream_plan - name: index selection for temporal join (with one index). sql: | - create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int); create table version(id2 int, a2 int, b2 int, primary key (id2)); create index idx on version (a2, b2); select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2; @@ -129,7 +129,7 @@ - stream_plan - name: index selection for temporal join (with two indexes) and should choose the index with a longer prefix.. sql: | - create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int); create table version(id2 int, a2 int, b2 int, primary key (id2)); create index idx1 on version (a2); create index idx2 on version (a2, b2); @@ -138,7 +138,7 @@ - stream_plan - name: index selection for temporal join (with three indexes) and should choose primary table. sql: | - create table stream(id1 int, a1 int, b1 int, c1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int, c1 int); create table version(id2 int, a2 int, b2 int, c2 int, primary key (id2)); create index idx1 on version (a2); create index idx2 on version (b2); @@ -148,7 +148,7 @@ - stream_plan - name: index selection for temporal join (two index) and no one matches. sql: | - create table stream(id1 int, a1 int, b1 int, c1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int, c1 int); create table version(id2 int, a2 int, b2 int, c2 int, primary key (id2)); create index idx1 on version (a2); create index idx2 on version (a2, b2); diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml index 08916d1539c81..dcdd34ed2c153 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml @@ -1002,7 +1002,7 @@ ON mod(B.auction, 10000) = S.key sink_plan: |- StreamSink { type: append-only, columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr10018(hidden), side_input.key(hidden)] } - └─StreamTemporalJoin { type: Inner, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] } ├─StreamExchange { dist: HashShard($expr1) } │ └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, (bid.auction % 10000:Int32) as $expr1, bid._row_id] } │ └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) } @@ -1011,7 +1011,7 @@ stream_plan: |- StreamMaterialize { columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr1(hidden), side_input.key(hidden)], stream_key: [bid._row_id, $expr1], pk_columns: [bid._row_id, $expr1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(bid._row_id, $expr1) } - └─StreamTemporalJoin { type: Inner, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] } ├─StreamExchange { dist: HashShard($expr1) } │ └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, (bid.auction % 10000:Int32) as $expr1, bid._row_id] } │ └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) } @@ -1024,7 +1024,7 @@ └── StreamExchange Hash([5, 6]) from 1 Fragment 1 - StreamTemporalJoin { type: Inner, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] } + StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] } ├── StreamExchange Hash([4]) from 2 └── StreamExchange NoShuffle from 3 diff --git a/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml b/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml index fd5f6aec627f2..7bbd43ce3c35c 100644 --- a/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml +++ b/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml @@ -333,7 +333,7 @@ StreamMaterialize { columns: [id1, a1, id2, v1, stream._row_id(hidden)], stream_key: [stream._row_id, id1], pk_columns: [stream._row_id, id1], pk_conflict: NoCheck, watermark_columns: [v1] } └─StreamProject { exprs: [stream.id1, stream.a1, version.id2, stream.v1, stream._row_id], output_watermarks: [stream.v1] } └─StreamDynamicFilter { predicate: (stream.v1 > now), output_watermarks: [stream.v1], output: [stream.id1, stream.a1, stream.v1, version.id2, stream._row_id], cleaned_by_watermark: true } - ├─StreamTemporalJoin { type: LeftOuter, predicate: stream.id1 = version.id2, output: [stream.id1, stream.a1, stream.v1, version.id2, stream._row_id] } + ├─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.id1 = version.id2, output: [stream.id1, stream.a1, stream.v1, version.id2, stream._row_id] } │ ├─StreamExchange { dist: HashShard(stream.id1) } │ │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.v1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } │ └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } diff --git a/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml b/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml index ebf7af980d23b..aa8887e98bef6 100644 --- a/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml +++ b/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml @@ -7,7 +7,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], stream_key: [stream._row_id, id1], pk_columns: [stream._row_id, id1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream._row_id) } - └─StreamTemporalJoin { type: LeftOuter, predicate: stream.id1 = version.id2, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } + └─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.id1 = version.id2, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } ├─StreamExchange { dist: HashShard(stream.id1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } @@ -23,7 +23,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], stream_key: [stream._row_id, id1], pk_columns: [stream._row_id, id1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream._row_id) } - └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } ├─StreamExchange { dist: HashShard(stream.id1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } @@ -36,7 +36,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], stream_key: [stream._row_id, id1], pk_columns: [stream._row_id, id1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream._row_id) } - └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } ├─StreamExchange { dist: HashShard(stream.id1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } @@ -49,7 +49,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], stream_key: [stream._row_id, id1, a1], pk_columns: [stream._row_id, id1, a1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream.a1, stream._row_id) } - └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND stream.a1 = version.a2 AND (version.b2 <> version.a2), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version.id2 AND stream.a1 = version.a2 AND (version.b2 <> version.a2), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } ├─StreamExchange { dist: HashShard(stream.id1, stream.a1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2, version.a2) } @@ -65,7 +65,7 @@ └─StreamSimpleAgg [append_only] { aggs: [sum0(count), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count] } - └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream._row_id, stream.id1, version.id2] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream._row_id, stream.id1, version.id2] } ├─StreamExchange { dist: HashShard(stream.id1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } @@ -80,12 +80,17 @@ HINT: Please add the primary key of the lookup table to the join condition and remove any other conditions - name: Temporal join append only test sql: | - create table stream(id1 int, a1 int, b1 int); + create table stream(id1 int, a1 int, b1 int) append only; create table version(id2 int, a2 int, b2 int, primary key (id2)); select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 where a2 < 10; - stream_error: |- - Not supported: Temporal join requires an append-only left input - HINT: Please ensure your left input is append-only + stream_plan: |- + StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], stream_key: [stream._row_id, id1], pk_columns: [stream._row_id, id1], pk_conflict: NoCheck } + └─StreamExchange { dist: HashShard(stream.id1, stream._row_id) } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } + ├─StreamExchange { dist: HashShard(stream.id1) } + │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } + └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } + └─StreamTableScan { table: version, columns: [version.id2, version.a2], stream_scan_type: UpstreamOnly, stream_key: [version.id2], pk: [id2], dist: UpstreamHashShard(version.id2) } - name: Temporal join type test sql: | create table stream(id1 int, a1 int, b1 int); @@ -96,7 +101,7 @@ HINT: please check your temporal join syntax e.g. consider removing the right outer join if it is being used. - name: multi-way temporal join with the same key sql: | - create table stream(k int, a1 int, b1 int) APPEND ONLY; + create table stream(k int, a1 int, b1 int); create table version1(k int, x1 int, y2 int, primary key (k)); create table version2(k int, x2 int, y2 int, primary key (k)); select stream.k, x1, x2, a1, b1 @@ -106,9 +111,9 @@ stream_plan: |- StreamMaterialize { columns: [k, x1, x2, a1, b1, stream._row_id(hidden), version2.k(hidden)], stream_key: [stream._row_id, k], pk_columns: [stream._row_id, k], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.k, stream._row_id) } - └─StreamTemporalJoin { type: Inner, predicate: stream.k = version2.k, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version2.k] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.k = version2.k, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version2.k] } ├─StreamExchange { dist: HashShard(stream.k) } - │ └─StreamTemporalJoin { type: Inner, predicate: stream.k = version1.k, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] } + │ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.k = version1.k, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] } │ ├─StreamExchange { dist: HashShard(stream.k) } │ │ └─StreamFilter { predicate: (stream.a1 < 10:Int32) } │ │ └─StreamTableScan { table: stream, columns: [stream.k, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } @@ -118,7 +123,7 @@ └─StreamTableScan { table: version2, columns: [version2.k, version2.x2], stream_scan_type: UpstreamOnly, stream_key: [version2.k], pk: [k], dist: UpstreamHashShard(version2.k) } - name: multi-way temporal join with different keys sql: | - create table stream(id1 int, id2 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, id2 int, a1 int, b1 int); create table version1(id1 int, x1 int, y2 int, primary key (id1)); create table version2(id2 int, x2 int, y2 int, primary key (id2)); select stream.id1, x1, stream.id2, x2, a1, b1 @@ -128,9 +133,9 @@ stream_plan: |- StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version2.id2(hidden)], stream_key: [stream._row_id, id1, id2], pk_columns: [stream._row_id, id1, id2], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream.id2, stream._row_id) } - └─StreamTemporalJoin { type: Inner, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] } ├─StreamExchange { dist: HashShard(stream.id2) } - │ └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } + │ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } │ ├─StreamExchange { dist: HashShard(stream.id1) } │ │ └─StreamFilter { predicate: (stream.a1 < 10:Int32) } │ │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.id2, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } @@ -140,7 +145,7 @@ └─StreamTableScan { table: version2, columns: [version2.id2, version2.x2], stream_scan_type: UpstreamOnly, stream_key: [version2.id2], pk: [id2], dist: UpstreamHashShard(version2.id2) } - name: multi-way temporal join with different keys sql: | - create table stream(id1 int, id2 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, id2 int, a1 int, b1 int); create table version1(id1 int, x1 int, y2 int, primary key (id1)); create table version2(id2 int, x2 int, y2 int, primary key (id2)); select stream.id1, x1, stream.id2, x2, a1, b1 @@ -150,9 +155,9 @@ stream_plan: |- StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version2.id2(hidden)], stream_key: [stream._row_id, id1, id2], pk_columns: [stream._row_id, id1, id2], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream.id2, stream._row_id) } - └─StreamTemporalJoin { type: Inner, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] } ├─StreamExchange { dist: HashShard(stream.id2) } - │ └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } + │ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } │ ├─StreamExchange { dist: HashShard(stream.id1) } │ │ └─StreamFilter { predicate: (stream.a1 < 10:Int32) } │ │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.id2, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } @@ -162,54 +167,54 @@ └─StreamTableScan { table: version2, columns: [version2.id2, version2.x2], stream_scan_type: UpstreamOnly, stream_key: [version2.id2], pk: [id2], dist: UpstreamHashShard(version2.id2) } - name: temporal join with an index (distribution key size = 1) sql: | - create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int); create table version(id2 int, a2 int, b2 int, primary key (id2)); create index idx2 on version (a2, b2) distributed by (a2); select id1, a1, id2, a2 from stream left join idx2 FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2; stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.a1, idx2.id2, stream._row_id, stream.b1) } - └─StreamTemporalJoin { type: LeftOuter, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } ├─StreamExchange { dist: HashShard(stream.a1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.a2) } └─StreamTableScan { table: idx2, columns: [idx2.a2, idx2.b2, idx2.id2], stream_scan_type: UpstreamOnly, stream_key: [idx2.id2], pk: [a2, b2, id2], dist: UpstreamHashShard(idx2.a2) } - name: temporal join with an index (distribution key size = 2) sql: | - create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int); create table version(id2 int, a2 int, b2 int, primary key (id2)); create index idx2 on version (a2, b2); select id1, a1, id2, a2 from stream left join idx2 FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2; stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.a1, idx2.id2, stream._row_id, stream.b1) } - └─StreamTemporalJoin { type: LeftOuter, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } ├─StreamExchange { dist: HashShard(stream.a1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.a2) } └─StreamTableScan { table: idx2, columns: [idx2.a2, idx2.b2, idx2.id2], stream_scan_type: UpstreamOnly, stream_key: [idx2.id2], pk: [a2, b2, id2], dist: UpstreamHashShard(idx2.a2) } - name: temporal join with an index (index column size = 1) sql: | - create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int); create table version(id2 int, a2 int, b2 int, primary key (id2)); create index idx2 on version (b2); select id1, a1, id2, a2 from stream left join idx2 FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2; stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, stream.b1, a1], pk_columns: [stream._row_id, id2, stream.b1, a1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.a1, idx2.id2, stream._row_id, stream.b1) } - └─StreamTemporalJoin { type: LeftOuter, predicate: stream.b1 = idx2.b2 AND (stream.a1 = idx2.a2), output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.b1 = idx2.b2 AND (stream.a1 = idx2.a2), output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } ├─StreamExchange { dist: HashShard(stream.b1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.b2) } └─StreamTableScan { table: idx2, columns: [idx2.b2, idx2.id2, idx2.a2], stream_scan_type: UpstreamOnly, stream_key: [idx2.id2], pk: [b2, id2], dist: UpstreamHashShard(idx2.b2) } - name: temporal join with singleton table sql: | - create table t (a int) append only; + create table t (a int); create materialized view v as select count(*) from t; select * from t left join v FOR SYSTEM_TIME AS OF PROCTIME() on a = count; stream_plan: |- StreamMaterialize { columns: [a, count, t._row_id(hidden), $expr1(hidden)], stream_key: [t._row_id, $expr1], pk_columns: [t._row_id, $expr1], pk_conflict: NoCheck } - └─StreamTemporalJoin { type: LeftOuter, predicate: AND ($expr1 = v.count), output: [t.a, v.count, t._row_id, $expr1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: AND ($expr1 = v.count), output: [t.a, v.count, t._row_id, $expr1] } ├─StreamExchange { dist: Single } │ └─StreamProject { exprs: [t.a, t.a::Int64 as $expr1, t._row_id] } │ └─StreamTableScan { table: t, columns: [t.a, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } @@ -217,21 +222,21 @@ └─StreamTableScan { table: v, columns: [v.count], stream_scan_type: UpstreamOnly, stream_key: [], pk: [], dist: Single } - name: index selection for temporal join (with one index). sql: | - create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int); create table version(id2 int, a2 int, b2 int, primary key (id2)); create index idx on version (a2, b2); select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2; stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.a1, idx.id2, stream._row_id, stream.b1) } - └─StreamTemporalJoin { type: LeftOuter, predicate: stream.a1 = idx.a2 AND stream.b1 = idx.b2, output: [stream.id1, stream.a1, idx.id2, idx.a2, stream._row_id, stream.b1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.a1 = idx.a2 AND stream.b1 = idx.b2, output: [stream.id1, stream.a1, idx.id2, idx.a2, stream._row_id, stream.b1] } ├─StreamExchange { dist: HashShard(stream.a1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx.a2) } └─StreamTableScan { table: idx, columns: [idx.id2, idx.a2, idx.b2], stream_scan_type: UpstreamOnly, stream_key: [idx.id2], pk: [a2, b2, id2], dist: UpstreamHashShard(idx.a2) } - name: index selection for temporal join (with two indexes) and should choose the index with a longer prefix.. sql: | - create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int); create table version(id2 int, a2 int, b2 int, primary key (id2)); create index idx1 on version (a2); create index idx2 on version (a2, b2); @@ -239,14 +244,14 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.a1, idx2.id2, stream._row_id, stream.b1) } - └─StreamTemporalJoin { type: LeftOuter, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } ├─StreamExchange { dist: HashShard(stream.a1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.a2) } └─StreamTableScan { table: idx2, columns: [idx2.id2, idx2.a2, idx2.b2], stream_scan_type: UpstreamOnly, stream_key: [idx2.id2], pk: [a2, b2, id2], dist: UpstreamHashShard(idx2.a2) } - name: index selection for temporal join (with three indexes) and should choose primary table. sql: | - create table stream(id1 int, a1 int, b1 int, c1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int, c1 int); create table version(id2 int, a2 int, b2 int, c2 int, primary key (id2)); create index idx1 on version (a2); create index idx2 on version (b2); @@ -255,14 +260,14 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden), stream.c1(hidden)], stream_key: [stream._row_id, id1, a1, stream.b1, stream.c1], pk_columns: [stream._row_id, id1, a1, stream.b1, stream.c1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream.a1, stream._row_id, stream.b1, stream.c1) } - └─StreamTemporalJoin { type: LeftOuter, predicate: stream.id1 = version.id2 AND (stream.a1 = version.a2) AND (stream.b1 = version.b2) AND (stream.c1 = version.c2), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id, stream.b1, stream.c1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.id1 = version.id2 AND (stream.a1 = version.a2) AND (stream.b1 = version.b2) AND (stream.c1 = version.c2), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id, stream.b1, stream.c1] } ├─StreamExchange { dist: HashShard(stream.id1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream.c1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } └─StreamTableScan { table: version, columns: [version.id2, version.a2, version.b2, version.c2], stream_scan_type: UpstreamOnly, stream_key: [version.id2], pk: [id2], dist: UpstreamHashShard(version.id2) } - name: index selection for temporal join (two index) and no one matches. sql: | - create table stream(id1 int, a1 int, b1 int, c1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int, c1 int); create table version(id2 int, a2 int, b2 int, c2 int, primary key (id2)); create index idx1 on version (a2); create index idx2 on version (a2, b2); diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index 47c5238bde2ba..571efee542c2b 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -1071,13 +1071,6 @@ impl LogicalJoin { // Enforce a shuffle for the temporal join LHS to let the scheduler be able to schedule the join fragment together with the RHS with a `no_shuffle` exchange. let left = required_dist.enforce(left, &Order::any()); - if !left.append_only() { - return Err(RwError::from(ErrorCode::NotSupported( - "Temporal join requires an append-only left input".into(), - "Please ensure your left input is append-only".into(), - ))); - } - // Extract the predicate from logical scan. Only pure scan is supported. let (new_scan, scan_predicate, project_expr) = logical_scan.predicate_pull_up(); // Construct output column to require column mapping diff --git a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs index ecbdba1b32265..ce8753b9ddbc8 100644 --- a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs @@ -14,6 +14,7 @@ use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; +use risingwave_common::util::sort_util::OrderType; use risingwave_pb::plan_common::JoinType; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::TemporalJoinNode; @@ -26,26 +27,29 @@ use super::utils::{childless_record, watermark_pretty, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary}; use crate::expr::{Expr, ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::plan_node::generic::GenericPlanNode; use crate::optimizer::plan_node::plan_tree_node::PlanTreeNodeUnary; -use crate::optimizer::plan_node::utils::IndicesDisplay; +use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder}; use crate::optimizer::plan_node::{ EqJoinPredicate, EqJoinPredicateDisplay, StreamExchange, StreamTableScan, TryToStreamPb, }; use crate::scheduler::SchedulerResult; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::utils::ColIndexMappingRewriteExt; +use crate::TableCatalog; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamTemporalJoin { pub base: PlanBase, core: generic::Join, eq_join_predicate: EqJoinPredicate, + append_only: bool, } impl StreamTemporalJoin { pub fn new(core: generic::Join, eq_join_predicate: EqJoinPredicate) -> Self { assert!(core.join_type == JoinType::Inner || core.join_type == JoinType::LeftOuter); - assert!(core.left.append_only()); + let append_only = core.left.append_only(); let right = core.right.clone(); let exchange: &StreamExchange = right .as_stream_exchange() @@ -79,6 +83,7 @@ impl StreamTemporalJoin { base, core, eq_join_predicate, + append_only, } } @@ -90,6 +95,58 @@ impl StreamTemporalJoin { pub fn eq_join_predicate(&self) -> &EqJoinPredicate { &self.eq_join_predicate } + + pub fn append_only(&self) -> bool { + self.append_only + } + + /// Return memo-table catalog and its `pk_indices`. + /// (`join_key` + `left_pk` + `right_pk`) -> (`right_scan_schema` + `join_key` + `left_pk`) + /// + /// Write pattern: + /// for each left input row (with insert op), construct the memo table pk and insert the row into the memo table. + /// + /// Read pattern: + /// for each left input row (with delete op), construct pk prefix (`join_key` + `left_pk`) to fetch rows and delete them from the memo table. + pub fn infer_memo_table_catalog(&self, right_scan: &StreamTableScan) -> TableCatalog { + let left_eq_indexes = self.eq_join_predicate.left_eq_indexes(); + let read_prefix_len_hint = left_eq_indexes.len() + self.left().stream_key().unwrap().len(); + + // Build internal table + let mut internal_table_catalog_builder = TableCatalogBuilder::default(); + // Add right table fields + let right_scan_schema = right_scan.core().schema(); + for field in right_scan_schema.fields() { + internal_table_catalog_builder.add_column(field); + } + // Add join_key + left_pk + for field in left_eq_indexes + .iter() + .chain(self.core.left.stream_key().unwrap()) + .map(|idx| &self.core.left.schema().fields()[*idx]) + { + internal_table_catalog_builder.add_column(field); + } + + let mut pk_indices = vec![]; + pk_indices + .extend(right_scan_schema.len()..(right_scan_schema.len() + read_prefix_len_hint)); + pk_indices.extend(right_scan.stream_key().unwrap()); + + pk_indices.iter().for_each(|idx| { + internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending()) + }); + + let dist_key_len = right_scan + .core() + .distribution_key() + .map(|keys| keys.len()) + .unwrap_or(0); + + let internal_table_dist_keys = + (right_scan_schema.len()..(right_scan_schema.len() + dist_key_len)).collect(); + internal_table_catalog_builder.build(internal_table_dist_keys, read_prefix_len_hint) + } } impl Distill for StreamTemporalJoin { @@ -97,6 +154,7 @@ impl Distill for StreamTemporalJoin { let verbose = self.base.ctx().is_explain_verbose(); let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 }); vec.push(("type", Pretty::debug(&self.core.join_type))); + vec.push(("append_only", Pretty::debug(&self.append_only))); let concat_schema = self.core.concat_schema(); vec.push(( @@ -142,7 +200,7 @@ impl_plan_tree_node_for_binary! { StreamTemporalJoin } impl TryToStreamPb for StreamTemporalJoin { fn try_to_stream_prost_body( &self, - _state: &mut BuildFragmentGraphState, + state: &mut BuildFragmentGraphState, ) -> SchedulerResult { let left_jk_indices = self.eq_join_predicate.left_eq_indexes(); let right_jk_indices = self.eq_join_predicate.right_eq_indexes(); @@ -174,6 +232,13 @@ impl TryToStreamPb for StreamTemporalJoin { output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(), table_desc: Some(scan.core().table_desc.try_to_protobuf()?), table_output_indices: scan.core().output_col_idx.iter().map(|&i| i as _).collect(), + memo_table: if self.append_only { + None + } else { + let mut memo_table = self.infer_memo_table_catalog(scan); + memo_table = memo_table.with_id(state.gen_table_id_wrapped()); + Some(memo_table.to_internal_table_prost()) + }, })) } } diff --git a/src/frontend/src/optimizer/plan_visitor/temporal_join_validator.rs b/src/frontend/src/optimizer/plan_visitor/temporal_join_validator.rs index 2f8d6b3fc89b2..cbdd2c695ad83 100644 --- a/src/frontend/src/optimizer/plan_visitor/temporal_join_validator.rs +++ b/src/frontend/src/optimizer/plan_visitor/temporal_join_validator.rs @@ -15,6 +15,7 @@ use risingwave_sqlparser::ast::AsOf; use super::{DefaultBehavior, Merge}; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{ BatchSeqScan, LogicalScan, PlanTreeNodeBinary, StreamTableScan, StreamTemporalJoin, }; @@ -22,12 +23,21 @@ use crate::optimizer::plan_visitor::PlanVisitor; use crate::PlanRef; #[derive(Debug, Clone, Default)] -pub struct TemporalJoinValidator {} +pub struct TemporalJoinValidator { + found_non_append_only_temporal_join: bool, +} impl TemporalJoinValidator { pub fn exist_dangling_temporal_scan(plan: PlanRef) -> bool { - let mut decider = TemporalJoinValidator {}; - decider.visit(plan) + let mut decider = TemporalJoinValidator { + found_non_append_only_temporal_join: false, + }; + let ctx = plan.ctx(); + let has_dangling_temporal_scan = decider.visit(plan); + if decider.found_non_append_only_temporal_join { + ctx.session_ctx().notice_to_user("A non-append-only temporal join is used in the query. It would introduce a additional memo-table comparing to append-only temporal join."); + } + has_dangling_temporal_scan } } @@ -53,6 +63,9 @@ impl PlanVisitor for TemporalJoinValidator { } fn visit_stream_temporal_join(&mut self, stream_temporal_join: &StreamTemporalJoin) -> bool { + if !stream_temporal_join.append_only() { + self.found_non_append_only_temporal_join = true; + } self.visit(stream_temporal_join.left()) } } diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index 1d374189cb5e0..f365706317a90 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -22,6 +22,7 @@ use either::Either; use futures::stream::{self, PollNext}; use futures::{pin_mut, StreamExt, TryStreamExt}; use futures_async_stream::{for_await, try_stream}; +use itertools::Itertools; use local_stats_alloc::{SharedStatsAlloc, StatsAlloc}; use lru::DefaultHasher; use risingwave_common::array::{Op, StreamChunk}; @@ -45,12 +46,18 @@ use super::{ }; use crate::cache::{cache_may_stale, new_with_hasher_in, ManagedLruCache}; use crate::common::metrics::MetricsInfo; +use crate::common::table::state_table::StateTable; use crate::executor::join::builder::JoinStreamChunkBuilder; use crate::executor::monitor::StreamingMetrics; use crate::executor::{ActorContextRef, Executor, Watermark}; use crate::task::AtomicU64Ref; -pub struct TemporalJoinExecutor { +pub struct TemporalJoinExecutor< + K: HashKey, + S: StateStore, + const T: JoinTypePrimitive, + const APPEND_ONLY: bool, +> { ctx: ActorContextRef, #[allow(dead_code)] info: ExecutorInfo, @@ -63,9 +70,7 @@ pub struct TemporalJoinExecutor, output_indices: Vec, chunk_size: usize, - // TODO: update metrics - #[allow(dead_code)] - metrics: Arc, + memo_table: Option>, } #[derive(Default)] @@ -310,17 +315,21 @@ async fn align_input(left: Executor, right: Executor) { } mod phase1 { + use std::ops::Bound; + + use futures::{pin_mut, StreamExt}; use futures_async_stream::try_stream; use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::hash::{HashKey, NullBitmap}; - use risingwave_common::row::{self, Row, RowExt}; + use risingwave_common::row::{self, OwnedRow, Row, RowExt}; use risingwave_common::types::{DataType, DatumRef}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_hummock_sdk::HummockEpoch; use risingwave_storage::StateStore; use super::{StreamExecutorError, TemporalSide}; + use crate::common::table::state_table::StateTable; pub(super) trait Phase1Evaluation { /// Called when a matched row is found. @@ -425,13 +434,21 @@ mod phase1 { #[try_stream(ok = StreamChunk, error = StreamExecutorError)] #[allow(clippy::too_many_arguments)] - pub(super) async fn handle_chunk<'a, K: HashKey, S: StateStore, E: Phase1Evaluation>( + pub(super) async fn handle_chunk< + 'a, + K: HashKey, + S: StateStore, + E: Phase1Evaluation, + const APPEND_ONLY: bool, + >( chunk_size: usize, right_size: usize, full_schema: Vec, epoch: HummockEpoch, left_join_keys: &'a [usize], right_table: &'a mut TemporalSide, + memo_table_lookup_prefix: &'a [usize], + memo_table: &'a mut Option>, null_matched: &'a K::Bitmap, chunk: StreamChunk, ) { @@ -441,39 +458,142 @@ mod phase1 { .visibility() .iter() .zip_eq_debug(keys.iter()) - .filter_map(|(vis, key)| if vis { Some(key) } else { None }); + .zip_eq_debug(chunk.ops()) + .filter_map(|((vis, key), op)| { + if vis { + if APPEND_ONLY { + assert_eq!(*op, Op::Insert); + Some(key) + } else { + match op { + Op::Insert | Op::UpdateInsert => Some(key), + Op::Delete | Op::UpdateDelete => None, + } + } + } else { + None + } + }); right_table .fetch_or_promote_keys(to_fetch_keys, epoch) .await?; + for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) { let Some((op, left_row)) = r else { continue; }; + let mut matched = false; - if key.null_bitmap().is_subset(null_matched) - && let join_entry = right_table.force_peek(&key) - && !join_entry.is_empty() - { - matched = true; - for right_row in join_entry.cached.values() { - if let Some(chunk) = - E::append_matched_row(op, &mut builder, left_row, right_row) - { - yield chunk; + + if APPEND_ONLY { + // Append-only temporal join + if key.null_bitmap().is_subset(null_matched) + && let join_entry = right_table.force_peek(&key) + && !join_entry.is_empty() + { + matched = true; + for right_row in join_entry.cached.values() { + if let Some(chunk) = + E::append_matched_row(op, &mut builder, left_row, right_row) + { + yield chunk; + } + } + } + } else { + // Non-append-only temporal join + // The memo-table pk and columns: + // (`join_key` + `left_pk` + `right_pk`) -> (`right_scan_schema` + `join_key` + `left_pk`) + // + // Write pattern: + // for each left input row (with insert op), construct the memo table pk and insert the row into the memo table. + // Read pattern: + // for each left input row (with delete op), construct pk prefix (`join_key` + `left_pk`) to fetch rows and delete them from the memo table. + // + // Temporal join supports inner join and left outer join, additionally, it could contain other conditions. + // Surprisingly, we could handle them in a unified way with memo table. + // The memo table would persist rows fetched from the right table and appending the `join_key` and `left_pk` from the left row. + // The null rows generated by outer join and the other condition somehow is a stateless operation which means we can handle them without the memo table. + let memo_table = memo_table.as_mut().unwrap(); + match op { + Op::Insert | Op::UpdateInsert => { + if key.null_bitmap().is_subset(null_matched) + && let join_entry = right_table.force_peek(&key) + && !join_entry.is_empty() + { + matched = true; + for right_row in join_entry.cached.values() { + let right_row: OwnedRow = right_row.clone(); + // Insert into memo table + memo_table.insert(right_row.clone().chain( + left_row.project(memo_table_lookup_prefix).into_owned_row(), + )); + if let Some(chunk) = E::append_matched_row( + Op::Insert, + &mut builder, + left_row, + right_row, + ) { + yield chunk; + } + } + } + } + Op::Delete | Op::UpdateDelete => { + let mut memo_rows_to_delete = vec![]; + if key.null_bitmap().is_subset(null_matched) { + let sub_range: &(Bound, Bound) = + &(Bound::Unbounded, Bound::Unbounded); + let prefix = left_row.project(memo_table_lookup_prefix); + let state_table_iter = memo_table + .iter_with_prefix(prefix, sub_range, Default::default()) + .await?; + pin_mut!(state_table_iter); + + while let Some(memo_row) = state_table_iter.next().await { + matched = true; + let memo_row = memo_row?.into_owned_row(); + memo_rows_to_delete.push(memo_row.clone()); + if let Some(chunk) = E::append_matched_row( + Op::Delete, + &mut builder, + left_row, + memo_row.slice(0..right_size), + ) { + yield chunk; + } + } + } + for memo_row in memo_rows_to_delete { + // Delete from memo table + memo_table.delete(memo_row); + } } } } - if let Some(chunk) = E::match_end(&mut builder, op, left_row, right_size, matched) { + if let Some(chunk) = E::match_end( + &mut builder, + match op { + Op::Insert | Op::UpdateInsert => Op::Insert, + Op::Delete | Op::UpdateDelete => Op::Delete, + }, + left_row, + right_size, + matched, + ) { yield chunk; } } + if let Some(chunk) = builder.take() { yield chunk; } } } -impl TemporalJoinExecutor { +impl + TemporalJoinExecutor +{ #[allow(clippy::too_many_arguments)] pub fn new( ctx: ActorContextRef, @@ -492,6 +612,7 @@ impl TemporalJoinExecutor metrics: Arc, chunk_size: usize, join_key_data_types: Vec, + memo_table: Option>, ) -> Self { let alloc = StatsAlloc::new(Global).shared(); @@ -528,7 +649,7 @@ impl TemporalJoinExecutor condition, output_indices, chunk_size, - metrics, + memo_table, } } @@ -555,7 +676,14 @@ impl TemporalJoinExecutor let left_to_output: HashMap = HashMap::from_iter(left_map.iter().cloned()); + let left_stream_key_indices = self.left.pk_indices().to_vec(); let right_stream_key_indices = self.right.pk_indices().to_vec(); + let memo_table_lookup_prefix = self + .left_join_keys + .iter() + .cloned() + .chain(left_stream_key_indices) + .collect_vec(); let null_matched = K::Bitmap::from_bool_vec(self.null_safe); @@ -572,6 +700,8 @@ impl TemporalJoinExecutor .chain(self.right.schema().data_types().into_iter()) .collect(); + let mut wait_first_barrier = true; + #[for_await] for msg in align_input(self.left, self.right) { self.right_table.cache.evict(); @@ -591,13 +721,15 @@ impl TemporalJoinExecutor let full_schema = full_schema.clone(); if T == JoinType::Inner { - let st1 = phase1::handle_chunk::( + let st1 = phase1::handle_chunk::( self.chunk_size, right_size, full_schema, epoch, &self.left_join_keys, &mut self.right_table, + &memo_table_lookup_prefix, + &mut self.memo_table, &null_matched, chunk, ); @@ -621,13 +753,15 @@ impl TemporalJoinExecutor } } else if let Some(ref cond) = self.condition { // Joined result without evaluating non-lookup conditions. - let st1 = phase1::handle_chunk::( + let st1 = phase1::handle_chunk::( self.chunk_size, right_size, full_schema, epoch, &self.left_join_keys, &mut self.right_table, + &memo_table_lookup_prefix, + &mut self.memo_table, &null_matched, chunk, ); @@ -670,13 +804,15 @@ impl TemporalJoinExecutor // The last row should always be marker row, assert_eq!(matched_count, 0); } else { - let st1 = phase1::handle_chunk::( + let st1 = phase1::handle_chunk::( self.chunk_size, right_size, full_schema, epoch, &self.left_join_keys, &mut self.right_table, + &memo_table_lookup_prefix, + &mut self.memo_table, &null_matched, chunk, ); @@ -689,6 +825,18 @@ impl TemporalJoinExecutor } } InternalMessage::Barrier(updates, barrier) => { + if !A { + if wait_first_barrier { + wait_first_barrier = false; + self.memo_table.as_mut().unwrap().init_epoch(barrier.epoch); + } else { + self.memo_table + .as_mut() + .unwrap() + .commit(barrier.epoch) + .await?; + } + } if let Some(vnodes) = barrier.as_update_vnode_bitmap(self.ctx.id) { let prev_vnodes = self.right_table.source.update_vnode_bitmap(vnodes.clone()); @@ -710,8 +858,8 @@ impl TemporalJoinExecutor } } -impl Execute - for TemporalJoinExecutor +impl Execute + for TemporalJoinExecutor { fn execute(self: Box) -> super::BoxedMessageStream { self.into_stream().boxed() diff --git a/src/stream/src/from_proto/temporal_join.rs b/src/stream/src/from_proto/temporal_join.rs index 15badec97e5cc..805049a296883 100644 --- a/src/stream/src/from_proto/temporal_join.rs +++ b/src/stream/src/from_proto/temporal_join.rs @@ -22,6 +22,7 @@ use risingwave_pb::plan_common::{JoinType as JoinTypeProto, StorageTableDesc}; use risingwave_storage::table::batch_table::storage_table::StorageTable; use super::*; +use crate::common::table::state_table::StateTable; use crate::executor::monitor::StreamingMetrics; use crate::executor::{ActorContextRef, JoinType, TemporalJoinExecutor}; use crate::task::AtomicU64Ref; @@ -45,9 +46,9 @@ impl ExecutorBuilder for TemporalJoinExecutorBuilder { .collect_vec(); StorageTable::new_partial( - store, + store.clone(), column_ids, - params.vnode_bitmap.map(Into::into), + params.vnode_bitmap.clone().map(Into::into), table_desc, ) }; @@ -99,6 +100,23 @@ impl ExecutorBuilder for TemporalJoinExecutorBuilder { .map(|idx| source_l.schema().fields[*idx].data_type()) .collect_vec(); + let memo_table = node.get_memo_table(); + let memo_table = match memo_table { + Ok(memo_table) => { + let vnodes = Arc::new( + params + .vnode_bitmap + .expect("vnodes not set for temporal join"), + ); + Some( + StateTable::from_table_catalog(memo_table, store.clone(), Some(vnodes.clone())) + .await, + ) + } + Err(_) => None, + }; + let append_only = memo_table.is_none(); + let dispatcher_args = TemporalJoinExecutorDispatcherArgs { ctx: params.actor_context, info: params.info.clone(), @@ -117,6 +135,8 @@ impl ExecutorBuilder for TemporalJoinExecutorBuilder { metrics: params.executor_stats, join_type_proto: node.get_join_type()?, join_key_data_types, + memo_table, + append_only, }; Ok((params.info, dispatcher_args.dispatch()?).into()) @@ -141,6 +161,8 @@ struct TemporalJoinExecutorDispatcherArgs { metrics: Arc, join_type_proto: JoinTypeProto, join_key_data_types: Vec, + memo_table: Option>, + append_only: bool, } impl HashKeyDispatcher for TemporalJoinExecutorDispatcherArgs { @@ -149,11 +171,12 @@ impl HashKeyDispatcher for TemporalJoinExecutorDispatcherArgs fn dispatch_impl(self) -> Self::Output { /// This macro helps to fill the const generic type parameter. macro_rules! build { - ($join_type:ident) => { + ($join_type:ident, $append_only:ident) => { Ok(Box::new(TemporalJoinExecutor::< K, S, { JoinType::$join_type }, + { $append_only }, >::new( self.ctx, self.info, @@ -171,12 +194,25 @@ impl HashKeyDispatcher for TemporalJoinExecutorDispatcherArgs self.metrics, self.chunk_size, self.join_key_data_types, + self.memo_table, ))) }; } match self.join_type_proto { - JoinTypeProto::Inner => build!(Inner), - JoinTypeProto::LeftOuter => build!(LeftOuter), + JoinTypeProto::Inner => { + if self.append_only { + build!(Inner, true) + } else { + build!(Inner, false) + } + } + JoinTypeProto::LeftOuter => { + if self.append_only { + build!(LeftOuter, true) + } else { + build!(LeftOuter, false) + } + } _ => unreachable!(), } }