Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(watermark): handle watermark in project_set #12128

Merged
merged 5 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions e2e_test/streaming/watermark.slt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ create table t (
) append only;

statement ok
create materialized view mv as select * from t emit on window close;
create materialized view mv1 as select * from t emit on window close;

statement ok
create materialized view mv2 as select t.ts, unnest(Array[1,2,3]) from t emit on window close;

statement ok
insert into t values ('2023-05-06 16:51:00', 1), ('2023-05-06 16:51:00', 2), ('2023-05-06 16:51:00', 3);
Expand All @@ -22,14 +25,31 @@ sleep 5s

skipif in-memory
query TI
select * from mv;
select * from mv1;
----
2023-05-06 16:51:00 1
2023-05-06 16:51:00 2
2023-05-06 16:51:00 3

skipif in-memory
query TI
select * from mv2;
----
2023-05-06 16:51:00 1
2023-05-06 16:51:00 2
2023-05-06 16:51:00 3
2023-05-06 16:51:00 1
2023-05-06 16:51:00 2
2023-05-06 16:51:00 3
2023-05-06 16:51:00 1
2023-05-06 16:51:00 2
2023-05-06 16:51:00 3

statement ok
drop materialized view mv1;

statement ok
drop materialized view mv;
drop materialized view mv2;

statement ok
drop table t;
Expand Down
6 changes: 6 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,12 @@ message ExpandNode {

message ProjectSetNode {
repeated expr.ProjectSetSelectItem select_list = 1;
// this two field is expressing a list of usize pair, which means when project receives a
// watermark with `watermark_input_cols[i]` column index, it should derive a new watermark
// with `watermark_output_cols[i]`th expression
repeated uint32 watermark_input_cols = 2;
repeated uint32 watermark_expr_indices = 3;
repeated uint32 nondecreasing_exprs = 4;
}

// Sorts inputs and outputs ordered data based on watermark.
Expand Down
6 changes: 6 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/watermark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,9 @@
select window_start from hop(t, ts, interval '1' minute, interval '3' minute);
expected_outputs:
- stream_plan
- name: unnest
sql: |
create table t (ts timestamp with time zone, v1 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
explain create materialized view mv as select t.ts, unnest(Array[1,2,3]) from t emit on window close;
expected_outputs:
- explain_output
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,12 @@
└─StreamHopWindow { time_col: t.ts, slide: 00:01:00, size: 00:03:00, output: [window_start, t._row_id], output_watermarks: [window_start] }
└─StreamFilter { predicate: IsNotNull(t.ts) }
└─StreamTableScan { table: t, columns: [t.ts, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- name: unnest
sql: |
create table t (ts timestamp with time zone, v1 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
explain create materialized view mv as select t.ts, unnest(Array[1,2,3]) from t emit on window close;
explain_output: |
StreamMaterialize { columns: [projected_row_id(hidden), ts, unnest, t._row_id(hidden)], stream_key: [t._row_id, projected_row_id], pk_columns: [t._row_id, projected_row_id], pk_conflict: NoCheck, watermark_columns: [ts] }
└─StreamEowcSort { sort_column: t.ts }
└─StreamProjectSet { select_list: [$0, Unnest(ARRAY[1, 2, 3]:List(Int32)), $1] }
└─StreamTableScan { table: t, columns: [ts, _row_id] }
10 changes: 2 additions & 8 deletions src/frontend/src/optimizer/plan_node/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,14 +711,8 @@ pub fn to_stream_prost_body(
table: Some(me.table.to_internal_table_prost()),
})
}
Node::ProjectSet(me) => {
let me = &me.core;
let select_list = me
.select_list
.iter()
.map(ExprImpl::to_project_set_select_item_proto)
.collect();
PbNodeBody::ProjectSet(ProjectSetNode { select_list })
Node::ProjectSet(_) => {
unreachable!()
}
Node::Project(me) => PbNodeBody::Project(ProjectNode {
select_list: me.core.exprs.iter().map(|x| x.to_expr_proto()).collect(),
Expand Down
40 changes: 35 additions & 5 deletions src/frontend/src/optimizer/plan_node/stream_project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ use crate::utils::ColIndexMappingRewriteExt;
pub struct StreamProjectSet {
pub base: PlanBase,
logical: generic::ProjectSet<PlanRef>,
/// All the watermark derivations, (input_column_idx, expr_idx). And the
/// derivation expression is the project_set's expression itself.
watermark_derivations: Vec<(usize, usize)>,
/// Nondecreasing expression indices. `ProjectSet` can produce watermarks for these
/// expressions.
nondecreasing_exprs: Vec<usize>,
}

impl StreamProjectSet {
Expand All @@ -37,15 +43,26 @@ impl StreamProjectSet {
.i2o_col_mapping()
.rewrite_provided_distribution(input.distribution());

let mut watermark_derivations = vec![];
let mut nondecreasing_exprs = vec![];
let mut watermark_columns = FixedBitSet::with_capacity(logical.output_len());
for (expr_idx, expr) in logical.select_list.iter().enumerate() {
if let WatermarkDerivation::Watermark(input_idx) = try_derive_watermark(expr) {
if input.watermark_columns().contains(input_idx) {
// The first column of ProjectSet is `projected_row_id`.
match try_derive_watermark(expr) {
WatermarkDerivation::Watermark(input_idx) => {
if input.watermark_columns().contains(input_idx) {
watermark_derivations.push((input_idx, expr_idx));
watermark_columns.insert(expr_idx + 1);
}
}
WatermarkDerivation::Nondecreasing => {
nondecreasing_exprs.push(expr_idx);
watermark_columns.insert(expr_idx + 1);
}
WatermarkDerivation::Constant => {
// XXX(rc): we can produce one watermark on each recovery for this case.
}
WatermarkDerivation::None => {}
}
// XXX(rc): do we need to handle `WatermarkDerivation::Nondecreasing` here?
}

// ProjectSet executor won't change the append-only behavior of the stream, so it depends on
Expand All @@ -57,7 +74,12 @@ impl StreamProjectSet {
input.emit_on_window_close(),
watermark_columns,
);
StreamProjectSet { base, logical }
StreamProjectSet {
base,
logical,
watermark_derivations,
nondecreasing_exprs,
}
}
}
impl_distill_by_unit!(StreamProjectSet, logical, "StreamProjectSet");
Expand All @@ -77,13 +99,21 @@ impl PlanTreeNodeUnary for StreamProjectSet {

impl StreamNode for StreamProjectSet {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
let (watermark_input_cols, watermark_expr_indices) = self
.watermark_derivations
.iter()
.map(|(i, o)| (*i as u32, *o as u32))
.unzip();
PbNodeBody::ProjectSet(ProjectSetNode {
select_list: self
.logical
.select_list
.iter()
.map(|select_item| select_item.to_project_set_select_item_proto())
.collect_vec(),
watermark_input_cols,
watermark_expr_indices,
nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(),
})
}
}
Expand Down
Loading