Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new table stream graph #12240

Merged
merged 25 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
59faa8f
Added modules and commented code blocks in `create_table.rs`. Importe…
shanicky Sep 11, 2023
7b2266e
Delete "recursive.rs" & modify `PlanRoot` struct logic for `stream_pl…
shanicky Sep 12, 2023
daccb1c
Remove println! statement, introduce is_external_source variable.
shanicky Sep 12, 2023
7906775
Refactor assigning source ID
shanicky Sep 12, 2023
945bb99
Modify distribution key in StreamTableScan and BatchScan operations f…
shanicky Sep 19, 2023
d60511c
Code changes: import type, add function, refactor code, add enum vari…
shanicky Sep 19, 2023
02e8e33
Code cleanup: Removal of unnecessary println! statements
shanicky Sep 19, 2023
a9b6c41
Update `no_shuffle.rs`: new import, predicate added, test case updated.
shanicky Sep 19, 2023
6f6c007
Import `Serial` type, add `fallback_row_id`, and modify `map` functio…
shanicky Sep 20, 2023
1c50108
Remove `Union` import, add `GenericPlanNode` import.
shanicky Oct 23, 2023
9343c67
Modified table scan and exchange distribution in query plan. Updated …
shanicky Oct 30, 2023
34be31c
Modified file vnode.rs: updated use statement, changed behavior of it…
shanicky Nov 7, 2023
3cb69c2
Add compute_row() method to VirtualNode implementation
shanicky Nov 7, 2023
d469547
update dapt
shanicky Nov 8, 2023
a5aa40b
add assert back
shanicky Nov 8, 2023
c6b0ba1
Update optimizer/mod.rs & optimizer/plan_node/stream_union.rs. Create…
shanicky Nov 13, 2023
ab9dc01
Modify `unique` method & simplify logic for creating new `StreamRowId…
shanicky Nov 13, 2023
8479f43
rollback
shanicky Nov 13, 2023
379e20a
Added `VirtualNode::compute_row`, removed logic for empty `project`.
shanicky Nov 13, 2023
f905f87
Update "create materialized view" clause & add comment about hash dif…
shanicky Nov 14, 2023
e12d2fa
Added StreamExchange operator with HashShard distribution strategy to…
shanicky Nov 14, 2023
0b87816
Add StreamUnion & StreamExchange operators for shuffling, modify Stre…
shanicky Nov 14, 2023
9eb0db1
Remove commented out portion, refactor code to clone and use input's …
shanicky Nov 14, 2023
2e447d8
Skip empty sources, fill ID with connector, generate random server ID…
shanicky Nov 14, 2023
9a89780
try update e2e
shanicky Nov 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions e2e_test/streaming/distinct_on.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ statement ok
create table t1 (k int, v int) append only;

statement ok
create materialized view mv1 as select distinct on (k) k + v as sum from t1;
create materialized view mv1 as select distinct on (k) k + v as sum from t1 order by k, v;

statement ok
insert into t1 values (1,11), (2,22), (3,33), (1,111);
Expand All @@ -21,7 +21,7 @@ statement ok
create table t2 (k int, v int);

statement ok
create materialized view mv2 as select distinct on (k) k + v as sum from t2;
create materialized view mv2 as select distinct on (k) k + v as sum from t2 order by k, v;

statement ok
insert into t2 values (1,11), (2,22), (3,33), (1,111);
Expand Down
16 changes: 12 additions & 4 deletions src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,18 @@ impl VirtualNode {
if let Ok(idx) = keys.iter().exactly_one()
&& let ArrayImpl::Serial(serial_array) = &**data_chunk.column_at(*idx)
{
return serial_array
.iter()
.map(|serial| extract_vnode_id_from_row_id(serial.unwrap().as_row_id()))
.collect();
return serial_array.iter().enumerate().map(|(idx, serial)| {
if let Some(serial) = serial {
extract_vnode_id_from_row_id(serial.as_row_id())
} else {
// NOTE: here it will hash the entire row when the `_row_id` is missing,
// which could result in rows from the same chunk being allocated to different chunks.
// This process doesn’t guarantee the order of rows, producing indeterminate results in some cases,
// such as when `distinct on` is used without an `order by`.
let (row, _) = data_chunk.row_at(idx);
row.hash(Crc32FastBuilder).into()
}
} ).collect();
}

data_chunk
Expand Down
258 changes: 129 additions & 129 deletions src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml

Large diffs are not rendered by default.

31 changes: 20 additions & 11 deletions src/frontend/planner_test/tests/testdata/output/create_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
) FORMAT PLAIN ENCODE CSV (delimiter = ',', without_header = true);
explain_output: |
StreamMaterialize { columns: [v1, v2, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: Overwrite }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamRowIdGen { row_id_index: 2 }
└─StreamDml { columns: [v1, v2, _row_id] }
└─StreamSource { source: s0, columns: [v1, v2, _row_id] }
└─StreamRowIdGen { row_id_index: 2 }
└─StreamUnion { all: true }
├─StreamExchange [no_shuffle] { dist: SomeShard }
│ └─StreamSource { source: s0, columns: [v1, v2, _row_id] }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamDml { columns: [v1, v2, _row_id] }
└─StreamSource
- id: csv_delimiter_tab
sql: |
explain create table s0 (v1 int, v2 varchar) with (
Expand All @@ -35,10 +38,13 @@
) FORMAT PLAIN ENCODE CSV (delimiter = E'\t', without_header = true);
explain_output: |
StreamMaterialize { columns: [v1, v2, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: Overwrite }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamRowIdGen { row_id_index: 2 }
└─StreamDml { columns: [v1, v2, _row_id] }
└─StreamSource { source: s0, columns: [v1, v2, _row_id] }
└─StreamRowIdGen { row_id_index: 2 }
└─StreamUnion { all: true }
├─StreamExchange [no_shuffle] { dist: SomeShard }
│ └─StreamSource { source: s0, columns: [v1, v2, _row_id] }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamDml { columns: [v1, v2, _row_id] }
└─StreamSource
- id: create_source_with_cdc_backfill
sql: |
create source mysql_mydb with (
Expand All @@ -57,8 +63,11 @@
) from mysql_mydb table 'mydb.t1';
explain_output: |
StreamMaterialize { columns: [v1, v2], stream_key: [v1], pk_columns: [v1], pk_conflict: Overwrite }
└─StreamExchange { dist: HashShard(mydb.t1.v1) }
└─StreamDml { columns: [v1, v2] }
└─StreamCdcTableScan { table: mydb.t1, columns: [v1, v2] }
└─StreamUnion { all: true }
├─StreamExchange { dist: HashShard(mydb.t1.v1) }
│ └─StreamCdcTableScan { table: mydb.t1, columns: [v1, v2] }
└─StreamExchange { dist: HashShard(v1) }
└─StreamDml { columns: [v1, v2] }
└─StreamSource
with_config_map:
CDC_BACKFILL: 'true'
20 changes: 12 additions & 8 deletions src/frontend/planner_test/tests/testdata/output/explain.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,19 @@
explain create table t (v1 int, v2 varchar);
explain_output: |
StreamMaterialize { columns: [v1, v2, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: Overwrite }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamRowIdGen { row_id_index: 2 }
└─StreamDml { columns: [v1, v2, _row_id] }
└─StreamSource
└─StreamRowIdGen { row_id_index: 2 }
└─StreamUnion { all: true }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamDml { columns: [v1, v2, _row_id] }
└─StreamSource
- sql: |
explain create table t (v1 int, v2 varchar) with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON;
explain_output: |
StreamMaterialize { columns: [v1, v2, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: Overwrite }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamRowIdGen { row_id_index: 2 }
└─StreamDml { columns: [v1, v2, _row_id] }
└─StreamSource { source: t, columns: [v1, v2, _row_id] }
└─StreamRowIdGen { row_id_index: 2 }
└─StreamUnion { all: true }
├─StreamExchange [no_shuffle] { dist: SomeShard }
│ └─StreamSource { source: t, columns: [v1, v2, _row_id] }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamDml { columns: [v1, v2, _row_id] }
└─StreamSource
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
explain create table t1 (v1 int as v2-1, v2 int, v3 int as v2+1);
explain_output: |
StreamMaterialize { columns: [v1, v2, v3, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: Overwrite }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamRowIdGen { row_id_index: 3 }
└─StreamProject { exprs: [(v2 - 1:Int32) as $expr1, v2, (v2 + 1:Int32) as $expr2, _row_id] }
└─StreamDml { columns: [v2, _row_id] }
└─StreamSource
└─StreamRowIdGen { row_id_index: 3 }
└─StreamUnion { all: true }
└─StreamExchange { dist: HashShard(_row_id) }
shanicky marked this conversation as resolved.
Show resolved Hide resolved
└─StreamProject { exprs: [(v2 - 1:Int32) as $expr1, v2, (v2 + 1:Int32) as $expr2, _row_id] }
└─StreamDml { columns: [v2, _row_id] }
└─StreamSource
- name: source with generated columns
sql: |
create source s1 (v1 int as v2-1, v2 int, v3 int as v2+1) with (connector = 'kinesis') FORMAT PLAIN ENCODE JSON;
Expand All @@ -30,18 +31,21 @@
explain create table t1 (proc_time TIMESTAMP AS proctime());
explain_output: |
StreamMaterialize { columns: [proc_time, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: Overwrite, watermark_columns: [proc_time] }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamRowIdGen { row_id_index: 1 }
└─StreamProject { exprs: [AtTimeZone(Proctime, 'UTC':Varchar) as $expr1, _row_id], output_watermarks: [$expr1] }
└─StreamDml { columns: [_row_id] }
└─StreamSource
└─StreamRowIdGen { row_id_index: 1 }
└─StreamUnion { all: true, output_watermarks: [$expr1] }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamProject { exprs: [AtTimeZone(Proctime, 'UTC':Varchar) as $expr1, _row_id], output_watermarks: [$expr1] }
└─StreamDml { columns: [_row_id] }
└─StreamSource
- name: watermark on generated column
sql: |
explain create table t (v int, w int as v+1, watermark for w as w) append only
explain_output: |
StreamMaterialize { columns: [v, w, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck, watermark_columns: [w] }
└─StreamRowIdGen { row_id_index: 2 }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: $expr1 }], output_watermarks: [$expr1] }
└─StreamProject { exprs: [v, (v + 1:Int32) as $expr1, _row_id] }
└─StreamDml { columns: [v, _row_id] }
└─StreamSource
└─StreamUnion { all: true }
└─StreamExchange [no_shuffle] { dist: SomeShard }
└─StreamProject { exprs: [v, (v + 1:Int32) as $expr1, _row_id] }
└─StreamDml { columns: [v, _row_id] }
└─StreamSource
2 changes: 1 addition & 1 deletion src/frontend/planner_test/tests/testdata/output/union.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@

Fragment 4
StreamProject { exprs: [t4.a, t4.b, t4.c, 3:Int32] }
└── StreamTableScan { table: t4, columns: [t4.a, t4.b, t4.c], pk: [t4.b, t4.a], dist: UpstreamHashShard(t4.a, t4.b) }
└── StreamTableScan { table: t4, columns: [t4.a, t4.b, t4.c], pk: [t4.b, t4.a], dist: UpstreamHashShard(t4.b, t4.a) }
├── state table: 3
├── Upstream
└── BatchPlanNode
Expand Down
14 changes: 10 additions & 4 deletions src/frontend/planner_test/tests/testdata/output/watermark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,23 @@
StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck, watermark_columns: [v1] }
└─StreamRowIdGen { row_id_index: 1 }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - '00:00:01':Interval) }], output_watermarks: [v1] }
└─StreamDml { columns: [v1, _row_id] }
└─StreamSource { source: t, columns: [v1, _row_id] }
└─StreamUnion { all: true }
├─StreamExchange [no_shuffle] { dist: SomeShard }
│ └─StreamSource { source: t, columns: [v1, _row_id] }
└─StreamExchange [no_shuffle] { dist: SomeShard }
└─StreamDml { columns: [v1, _row_id] }
└─StreamSource
- name: watermark on append only table without source
sql: |
explain create table t (v1 timestamp with time zone, watermark for v1 as v1 - INTERVAL '1' SECOND) append only;
explain_output: |
StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck, watermark_columns: [v1] }
└─StreamRowIdGen { row_id_index: 1 }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - '00:00:01':Interval) }], output_watermarks: [v1] }
└─StreamDml { columns: [v1, _row_id] }
└─StreamSource
└─StreamUnion { all: true }
└─StreamExchange [no_shuffle] { dist: SomeShard }
└─StreamDml { columns: [v1, _row_id] }
└─StreamSource
- name: hash agg
sql: |
create table t (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,8 @@ fn gen_table_plan_inner(
let connection_id =
resolve_privatelink_in_with_option(&mut with_options, &schema_name, &session)?;

let is_external_source = source_info.is_some();

let source = source_info.map(|source_info| PbSource {
id: TableId::placeholder().table_id,
schema_id,
Expand Down Expand Up @@ -776,6 +778,7 @@ fn gen_table_plan_inner(
append_only,
watermark_descs,
version,
is_external_source,
)?;

let mut table = materialize.table().to_prost(schema_id, database_id);
Expand Down Expand Up @@ -884,8 +887,9 @@ pub(crate) fn gen_create_table_plan_for_cdc_source(
pk_column_ids,
None,
append_only,
vec![], // no watermarks
vec![],
Some(col_id_gen.into_version()),
true,
)?;

let mut table = materialize.table().to_prost(schema_id, database_id);
Expand Down
Loading
Loading