Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): support non-append-only process time temporal join #16286

Merged
merged 15 commits into from
Apr 18, 2024
47 changes: 47 additions & 0 deletions e2e_test/streaming/temporal_join/non_append_only/temporal_join.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table stream(id1 int, a1 int, b1 int);

statement ok
create table version(id2 int, a2 int, b2 int, primary key (id2));

statement ok
create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2

statement ok
insert into stream values(1, 11, 111);

statement ok
insert into version values(1, 11, 111);

statement ok
insert into stream values(1, 11, 111);

statement ok
delete from version;

query IIII rowsort
select * from v;
----
1 11 1 11
1 11 NULL NULL

statement ok
update stream set a1 = 22, b1 = 222

query IIII rowsort
select * from v;
----
1 22 NULL NULL
1 22 NULL NULL

statement ok
drop materialized view v;

statement ok
drop table stream;

statement ok
drop table version;
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table stream(id1 int, a1 int, b1 int);

statement ok
create table version(id2 int, a2 int, b2 int, primary key (id2));

statement ok
create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 and a1 > a2;

statement ok
insert into stream values(1, 11, 111);

statement ok
insert into version values(1, 12, 111);

statement ok
insert into stream values(1, 13, 111);

statement ok
delete from version;

query IIII rowsort
select * from v;
----
1 11 NULL NULL
1 13 1 12

statement ok
delete from stream;

statement ok
insert into version values(2, 22, 222);

statement ok
insert into stream values(2, 23, 222);

query IIII rowsort
select * from v;
----
2 23 2 22

statement ok
delete from stream;

query IIII rowsort
select * from v;
----


statement ok
drop materialized view v;

statement ok
drop table stream;

statement ok
drop table version;

statement ok
create table stream(id1 int, a1 int, b1 int);

statement ok
create table version(id2 int, a2 int, b2 int, primary key (id2));

statement ok
create materialized view v as select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 and a1 > a2;

statement ok
insert into version values (1, 12, 111), (2, 12, 111);

statement ok
insert into stream values (1, 11, 111), (2, 13, 111);

query IIII rowsort
select * from v;
----
2 13 2 12

statement ok
update stream set a1 = 222, b1 = 333 where id1 = 1;

statement ok
update stream set a1 = 2, b1 = 3 where id1 = 2;

query IIII rowsort
select * from v;
----
1 222 1 12

statement ok
drop materialized view v;

statement ok
drop table stream;

statement ok
drop table version;
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table stream(id1 int, a1 int, b1 int);

statement ok
create table version(id2 int, a2 int, b2 int, primary key (id2));

statement ok
create index idx on version (a2);

statement ok
create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on b1 = b2 and a1 = a2;

statement ok
insert into version values(1, 11, 111);

statement ok
insert into stream values(1, 11, 111);

query IIII rowsort
select * from v;
----
1 11 1 11

statement ok
update stream set a1 = 22 where id1 = 1;

query IIII rowsort
select * from v;
----
1 22 NULL NULL

statement ok
drop materialized view v;

statement ok
drop table stream;

statement ok
drop table version;


Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table stream(id1 int, a1 int, b1 int);

statement ok
create table version(id2 int, a2 int, b2 int, primary key (id2));

statement ok
create index idx on version(a2, b2) distributed by (a2);

statement ok
create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on b1 = b2 and a1 = a2;

statement ok
insert into version values(1, 11, 111);

statement ok
insert into stream values(1, 11, 111);

query IIII rowsort
select * from v;
----
1 11 1 11

statement ok
update stream set a1 = 22 where id1 = 1;

query IIII rowsort
select * from v;
----
1 22 NULL NULL

statement ok
drop materialized view v;

statement ok
drop table stream;

statement ok
drop table version;


Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table stream(id1 int, a1 int, b1 int);

statement ok
create table version(id2 int, a2 int, b2 int, primary key (id2));

statement ok
create index idx on version(a2, b2) distributed by (a2, b2);

statement ok
create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on b1 = b2 and a1 = a2;

statement ok
insert into version values(1, 11, 111);

statement ok
insert into stream values(1, 11, 111);

query IIII rowsort
select * from v;
----
1 11 1 11

statement ok
update stream set a1 = 22 where id1 = 1;

query IIII rowsort
select * from v;
----
1 22 NULL NULL

statement ok
drop materialized view v;

statement ok
drop table stream;

statement ok
drop table version;


2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,8 @@ message TemporalJoinNode {
plan_common.StorageTableDesc table_desc = 7;
// The output indices of the lookup side table
repeated uint32 table_output_indices = 8;
// The state table used for non-append-only temporal join.
catalog.Table memo_table = 9;
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved
}

message DynamicFilterNode {
Expand Down
3 changes: 3 additions & 0 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ pub fn visit_stream_node_tables_inner<F>(
always!(node.right_table, "HashJoinRight");
always!(node.right_degree_table, "HashJoinDegreeRight");
}
NodeBody::TemporalJoin(node) => {
optional!(node.memo_table, "TemporalJoinMemo");
}
NodeBody::DynamicFilter(node) => {
if node.condition_always_relax {
always!(node.left_table, "DynamicFilterLeftNotSatisfy");
Expand Down
Loading
Loading