Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(steam): support stream change log #17132

Merged
merged 16 commits into from
Jun 26, 2024
146 changes: 146 additions & 0 deletions e2e_test/streaming/changed_log.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t1 (v1 int, v2 int);

statement ok
create table t2 (v1 int, v2 int);

statement ok
create table t3 (v1 int primary key, v2 int);

statement ok
create materialized view mv1 as with sub as changedlog from t1 select * from sub;
xxhZs marked this conversation as resolved.
Show resolved Hide resolved

statement ok
create materialized view mv2 as with sub as changedlog from t2 select * from sub;

statement ok
create materialized view mv3 as with sub as changedlog from t1 select v1, v2 from sub;

statement ok
create materialized view mv4 as with sub1 as changedlog from t1, sub2 as changedlog from t2
select sub1.v1 as v11, sub1.v2 as v12, sub2.v1 as v21, sub2.v2 as v22 from sub1 inner join sub2 on sub1.v1 = sub2.v1;

statement ok
create materialized view mv5 as with sub1 as changedlog from t1, sub2 as changedlog from t2
select sub1.v1 as v11, sub1.v2 as v12, sub2.v1 as v21, sub2.v2 as v22, sub1.op as op1, sub2.op as op2 from sub1 inner join sub2 on sub1.v1 = sub2.v1;

statement ok
create materialized view mv6 as with sub as changedlog from t3 select * from sub;

statement ok
insert into t1 values(1,1),(2,2);

statement ok
insert into t2 values(1,10),(2,20);

statement ok
insert into t3 values(5,5),(6,6);

statement ok
update t1 set v2 = 100 where v1 = 1;

statement ok
update t2 set v2 = 100 where v1 = 1;

statement ok
update t3 set v2 = 500 where v1 = 5;

statement ok
delete from t1 where v1 = 2;

query III rowsort
select * from mv1 order by v1;
----
1 1 1
1 1 4
1 100 3
2 2 1
2 2 2

query III rowsort
select * from mv2 order by v1;
----
1 10 1
1 10 4
1 100 3
2 20 1

query III rowsort
select * from mv3 order by v1;
----
1 1
1 1
1 100
2 2
2 2

query III rowsort
select * from mv4 order by v11,v21;
----
1 1 1 10
1 1 1 10
1 1 1 10
1 1 1 10
1 1 1 100
1 1 1 100
1 100 1 10
1 100 1 10
1 100 1 100
2 2 2 20
2 2 2 20


query III rowsort
select * from mv5 order by v11,v21;
----
1 1 1 10 1 1
1 1 1 10 1 4
1 1 1 10 4 1
1 1 1 10 4 4
1 1 1 100 1 3
1 1 1 100 4 3
1 100 1 10 3 1
1 100 1 10 3 4
1 100 1 100 3 3
2 2 2 20 1 1
2 2 2 20 2 1

query III rowsort
select * from mv6 order by v1;
----
5 5 1
5 5 4
5 500 3
6 6 1


statement ok
drop materialized view mv6;

statement ok
drop materialized view mv5;

statement ok
drop materialized view mv4;

statement ok
drop materialized view mv3;

statement ok
drop materialized view mv2;

statement ok
drop materialized view mv1;


statement ok
drop table t3;

statement ok
drop table t2;

statement ok
drop table t1;
5 changes: 5 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,10 @@ message FilterNode {
expr.ExprNode search_condition = 1;
}

message ChangedLogNode {
bool need_op = 1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is used for projection push-down to prune out op column if it is not used in downstream. Can we have some documentation to explain in which case need_op is false?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add documentation here in the pb in addition to the rust codes as well?

}

message CdcFilterNode {
expr.ExprNode search_condition = 1;
uint32 upstream_source_id = 2;
Expand Down Expand Up @@ -812,6 +816,7 @@ message StreamNode {
StreamCdcScanNode stream_cdc_scan = 139;
CdcFilterNode cdc_filter = 140;
SourceBackfillNode source_backfill = 142;
ChangedLogNode changed_log = 143;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down
9 changes: 8 additions & 1 deletion src/frontend/src/binder/bind_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
use risingwave_sqlparser::ast::TableAlias;

use crate::binder::Relation;
use crate::error::{ErrorCode, Result};

type LiteResult<T> = std::result::Result<T, ErrorCode>;
Expand Down Expand Up @@ -94,11 +95,17 @@ pub enum BindingCteState {
#[default]
Init,
/// We know the schema form after the base term resolved.
BaseResolved { base: BoundSetExpr },
BaseResolved {
base: BoundSetExpr,
},
/// We get the whole bound result of the (recursive) CTE.
Bound {
query: Either<BoundQuery, RecursiveUnion>,
},

ChangedLog {
table: Relation,
},
}

/// the entire `RecursiveUnion` represents a *bound* recursive cte.
Expand Down
Loading
Loading