Skip to content

Commit

Permalink
fix: column index mapping bug of stream_delta_join (#14398)
Browse files Browse the repository at this point in the history
  • Loading branch information
yufansong authored Jan 8, 2024
1 parent f33dac4 commit 569b589
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
statement ok
set rw_implicit_flush = true;

statement ok
set rw_streaming_enable_delta_join = true;

statement ok
create table A (k1 numeric, k2 smallint, v int);

statement ok
create index Ak1 on A(k1) include(k1,k2,v);

statement ok
create table B (k1 numeric, k2 smallint, v int);

statement ok
create index Bk1 on B(k1) include(k1,k2,v);

statement ok
insert into A values(1, 2, 4);

statement ok
insert into B values(1, 2, 4);

statement ok
create MATERIALIZED VIEW m1 as select A.v, B.v as Bv from A join B using(k1);


query I
SELECT * from m1;
----
4 4

statement ok
drop MATERIALIZED VIEW m1;

statement ok
drop index Ak1;

statement ok
drop index Bk1;

statement ok
drop table A;

statement ok
drop table B;
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,8 @@ message ArrangementInfo {
repeated plan_common.ColumnDesc column_descs = 2;
// Used to build storage table by stream lookup join of delta join.
plan_common.StorageTableDesc table_desc = 4;
// Output index columns
repeated uint32 output_col_idx = 5;
}

// Special node for shared state, which will only be produced in fragmenter. ArrangeNode will
Expand Down
10 changes: 10 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ impl StreamNode for StreamDeltaJoin {
.map(ColumnDesc::to_protobuf)
.collect(),
table_desc: Some(left_table_desc.to_protobuf()),
output_col_idx: left_table
.output_col_idx
.iter()
.map(|&v| v as u32)
.collect(),
}),
right_info: Some(ArrangementInfo {
// TODO: remove it
Expand All @@ -193,6 +198,11 @@ impl StreamNode for StreamDeltaJoin {
.map(ColumnDesc::to_protobuf)
.collect(),
table_desc: Some(right_table_desc.to_protobuf()),
output_col_idx: right_table
.output_col_idx
.iter()
.map(|&v| v as u32)
.collect(),
}),
output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
})
Expand Down
8 changes: 7 additions & 1 deletion src/stream/src/from_proto/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,13 @@ impl ExecutorBuilder for LookupExecutorBuilder {
.iter()
.map(ColumnDesc::from)
.collect_vec();
let column_ids = column_descs.iter().map(|x| x.column_id).collect_vec();

let column_ids = lookup
.get_arrangement_table_info()?
.get_output_col_idx()
.iter()
.map(|&idx| column_descs[idx as usize].column_id)
.collect_vec();

// Use indices based on full table instead of streaming executor output.
let pk_indices = table_desc
Expand Down

0 comments on commit 569b589

Please sign in to comment.