Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): support non-append-only process time temporal join #16286

Merged
merged 15 commits into from
Apr 18, 2024
47 changes: 47 additions & 0 deletions e2e_test/streaming/temporal_join/non_append_only/temporal_join.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table stream(id1 int, a1 int, b1 int);

statement ok
create table version(id2 int, a2 int, b2 int, primary key (id2));

statement ok
create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2

statement ok
insert into stream values(1, 11, 111);

statement ok
insert into version values(1, 11, 111);

statement ok
insert into stream values(1, 11, 111);

statement ok
delete from version;

query IIII rowsort
select * from v;
----
1 11 1 11
1 11 NULL NULL

statement ok
update stream set a1 = 22, b1 = 222

query IIII rowsort
select * from v;
----
1 22 NULL NULL
1 22 NULL NULL

statement ok
drop materialized view v;

statement ok
drop table stream;

statement ok
drop table version;
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table stream(id1 int, a1 int, b1 int);

statement ok
create table version(id2 int, a2 int, b2 int, primary key (id2));

statement ok
create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 and a1 > a2;

statement ok
insert into stream values(1, 11, 111);

statement ok
insert into version values(1, 12, 111);

statement ok
insert into stream values(1, 13, 111);

statement ok
delete from version;

query IIII rowsort
select * from v;
----
1 11 NULL NULL
1 13 1 12

statement ok
delete from stream;

statement ok
insert into version values(2, 22, 222);

statement ok
insert into stream values(2, 23, 222);

query IIII rowsort
select * from v;
----
2 23 2 22

statement ok
delete from stream;

query IIII rowsort
select * from v;
----


statement ok
drop materialized view v;

statement ok
drop table stream;

statement ok
drop table version;

statement ok
create table stream(id1 int, a1 int, b1 int);

statement ok
create table version(id2 int, a2 int, b2 int, primary key (id2));

statement ok
create materialized view v as select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 and a1 > a2;

statement ok
insert into version values (1, 12, 111), (2, 12, 111);

statement ok
insert into stream values (1, 11, 111), (2, 13, 111);

query IIII rowsort
select * from v;
----
2 13 2 12

statement ok
update stream set a1 = 222, b1 = 333 where id1 = 1;

statement ok
update stream set a1 = 2, b1 = 3 where id1 = 2;

query IIII rowsort
select * from v;
----
1 222 1 12

statement ok
drop materialized view v;

statement ok
drop table stream;

statement ok
drop table version;
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table stream(id1 int, a1 int, b1 int);

statement ok
create table version(id2 int, a2 int, b2 int, primary key (id2));

statement ok
create index idx on version (a2);

statement ok
create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on b1 = b2 and a1 = a2;

statement ok
insert into version values(1, 11, 111);

statement ok
insert into stream values(1, 11, 111);

query IIII rowsort
select * from v;
----
1 11 1 11

statement ok
update stream set a1 = 22 where id1 = 1;

query IIII rowsort
select * from v;
----
1 22 NULL NULL

statement ok
drop materialized view v;

statement ok
drop table stream;

statement ok
drop table version;


2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,8 @@ message TemporalJoinNode {
plan_common.StorageTableDesc table_desc = 7;
// The output indices of the lookup side table
repeated uint32 table_output_indices = 8;
// The state table used for non-append-only temporal join.
catalog.Table memo_table = 9;
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved
}

message DynamicFilterNode {
Expand Down
3 changes: 3 additions & 0 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ pub fn visit_stream_node_tables_inner<F>(
always!(node.right_table, "HashJoinRight");
always!(node.right_degree_table, "HashJoinDegreeRight");
}
NodeBody::TemporalJoin(node) => {
optional!(node.memo_table, "TemporalJoinMemo");
}
NodeBody::DynamicFilter(node) => {
if node.condition_always_relax {
always!(node.left_table, "DynamicFilterLeftNotSatisfy");
Expand Down
26 changes: 13 additions & 13 deletions src/frontend/planner_test/tests/testdata/input/temporal_join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@
- stream_error
- name: Temporal join append only test
sql: |
create table stream(id1 int, a1 int, b1 int);
create table stream(id1 int, a1 int, b1 int) append only;
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 where a2 < 10;
expected_outputs:
- stream_error
- stream_plan
- name: Temporal join type test
sql: |
create table stream(id1 int, a1 int, b1 int);
Expand All @@ -57,7 +57,7 @@
- stream_error
- name: multi-way temporal join with the same key
sql: |
create table stream(k int, a1 int, b1 int) APPEND ONLY;
create table stream(k int, a1 int, b1 int);
create table version1(k int, x1 int, y2 int, primary key (k));
create table version2(k int, x2 int, y2 int, primary key (k));
select stream.k, x1, x2, a1, b1
Expand All @@ -68,7 +68,7 @@
- stream_plan
- name: multi-way temporal join with different keys
sql: |
create table stream(id1 int, id2 int, a1 int, b1 int) APPEND ONLY;
create table stream(id1 int, id2 int, a1 int, b1 int);
create table version1(id1 int, x1 int, y2 int, primary key (id1));
create table version2(id2 int, x2 int, y2 int, primary key (id2));
select stream.id1, x1, stream.id2, x2, a1, b1
Expand All @@ -79,7 +79,7 @@
- stream_plan
- name: multi-way temporal join with different keys
sql: |
create table stream(id1 int, id2 int, a1 int, b1 int) APPEND ONLY;
create table stream(id1 int, id2 int, a1 int, b1 int);
create table version1(id1 int, x1 int, y2 int, primary key (id1));
create table version2(id2 int, x2 int, y2 int, primary key (id2));
select stream.id1, x1, stream.id2, x2, a1, b1
Expand All @@ -90,46 +90,46 @@
- stream_plan
- name: temporal join with an index (distribution key size = 1)
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table stream(id1 int, a1 int, b1 int);
create table version(id2 int, a2 int, b2 int, primary key (id2));
create index idx2 on version (a2, b2) distributed by (a2);
select id1, a1, id2, a2 from stream left join idx2 FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2;
expected_outputs:
- stream_plan
- name: temporal join with an index (distribution key size = 2)
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table stream(id1 int, a1 int, b1 int);
create table version(id2 int, a2 int, b2 int, primary key (id2));
create index idx2 on version (a2, b2);
select id1, a1, id2, a2 from stream left join idx2 FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2;
expected_outputs:
- stream_plan
- name: temporal join with an index (index column size = 1)
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table stream(id1 int, a1 int, b1 int);
create table version(id2 int, a2 int, b2 int, primary key (id2));
create index idx2 on version (b2);
select id1, a1, id2, a2 from stream left join idx2 FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2;
expected_outputs:
- stream_plan
- name: temporal join with singleton table
sql: |
create table t (a int) append only;
create table t (a int);
create materialized view v as select count(*) from t;
select * from t left join v FOR SYSTEM_TIME AS OF PROCTIME() on a = count;
expected_outputs:
- stream_plan
- name: index selection for temporal join (with one index).
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table stream(id1 int, a1 int, b1 int);
create table version(id2 int, a2 int, b2 int, primary key (id2));
create index idx on version (a2, b2);
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2;
expected_outputs:
- stream_plan
- name: index selection for temporal join (with two indexes) and should choose the index with a longer prefix..
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table stream(id1 int, a1 int, b1 int);
create table version(id2 int, a2 int, b2 int, primary key (id2));
create index idx1 on version (a2);
create index idx2 on version (a2, b2);
Expand All @@ -138,7 +138,7 @@
- stream_plan
- name: index selection for temporal join (with three indexes) and should choose primary table.
sql: |
create table stream(id1 int, a1 int, b1 int, c1 int) APPEND ONLY;
create table stream(id1 int, a1 int, b1 int, c1 int);
create table version(id2 int, a2 int, b2 int, c2 int, primary key (id2));
create index idx1 on version (a2);
create index idx2 on version (b2);
Expand All @@ -148,7 +148,7 @@
- stream_plan
- name: index selection for temporal join (two index) and no one matches.
sql: |
create table stream(id1 int, a1 int, b1 int, c1 int) APPEND ONLY;
create table stream(id1 int, a1 int, b1 int, c1 int);
create table version(id2 int, a2 int, b2 int, c2 int, primary key (id2));
create index idx1 on version (a2);
create index idx2 on version (a2, b2);
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/planner_test/tests/testdata/output/nexmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1002,7 +1002,7 @@
ON mod(B.auction, 10000) = S.key
sink_plan: |-
StreamSink { type: append-only, columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr10018(hidden), side_input.key(hidden)] }
└─StreamTemporalJoin { type: Inner, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] }
├─StreamExchange { dist: HashShard($expr1) }
│ └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, (bid.auction % 10000:Int32) as $expr1, bid._row_id] }
│ └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) }
Expand All @@ -1011,7 +1011,7 @@
stream_plan: |-
StreamMaterialize { columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr1(hidden), side_input.key(hidden)], stream_key: [bid._row_id, $expr1], pk_columns: [bid._row_id, $expr1], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(bid._row_id, $expr1) }
└─StreamTemporalJoin { type: Inner, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] }
├─StreamExchange { dist: HashShard($expr1) }
│ └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, (bid.auction % 10000:Int32) as $expr1, bid._row_id] }
│ └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) }
Expand All @@ -1024,7 +1024,7 @@
└── StreamExchange Hash([5, 6]) from 1

Fragment 1
StreamTemporalJoin { type: Inner, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] }
StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] }
├── StreamExchange Hash([4]) from 2
└── StreamExchange NoShuffle from 3

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@
StreamMaterialize { columns: [id1, a1, id2, v1, stream._row_id(hidden)], stream_key: [stream._row_id, id1], pk_columns: [stream._row_id, id1], pk_conflict: NoCheck, watermark_columns: [v1] }
└─StreamProject { exprs: [stream.id1, stream.a1, version.id2, stream.v1, stream._row_id], output_watermarks: [stream.v1] }
└─StreamDynamicFilter { predicate: (stream.v1 > now), output_watermarks: [stream.v1], output: [stream.id1, stream.a1, stream.v1, version.id2, stream._row_id], cleaned_by_watermark: true }
├─StreamTemporalJoin { type: LeftOuter, predicate: stream.id1 = version.id2, output: [stream.id1, stream.a1, stream.v1, version.id2, stream._row_id] }
├─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.id1 = version.id2, output: [stream.id1, stream.a1, stream.v1, version.id2, stream._row_id] }
│ ├─StreamExchange { dist: HashShard(stream.id1) }
│ │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.v1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) }
│ └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) }
Expand Down
Loading
Loading