From 569b5897891ce50ce88809343f2e57db0f8516cf Mon Sep 17 00:00:00 2001 From: Yufan Song <33971064+yufansong@users.noreply.github.com> Date: Sun, 7 Jan 2024 22:56:48 -0800 Subject: [PATCH] fix: column index mapping bug of stream_delta_join (#14398) --- ...in_upstream_with_index_different_types.slt | 47 +++++++++++++++++++ proto/stream_plan.proto | 2 + .../optimizer/plan_node/stream_delta_join.rs | 10 ++++ src/stream/src/from_proto/lookup.rs | 8 +++- 4 files changed, 66 insertions(+), 1 deletion(-) create mode 100644 e2e_test/streaming/delta_join/delta_join_upstream_with_index_different_types.slt diff --git a/e2e_test/streaming/delta_join/delta_join_upstream_with_index_different_types.slt b/e2e_test/streaming/delta_join/delta_join_upstream_with_index_different_types.slt new file mode 100644 index 0000000000000..76cb0314e3a48 --- /dev/null +++ b/e2e_test/streaming/delta_join/delta_join_upstream_with_index_different_types.slt @@ -0,0 +1,47 @@ +statement ok +set rw_implicit_flush = true; + +statement ok +set rw_streaming_enable_delta_join = true; + +statement ok +create table A (k1 numeric, k2 smallint, v int); + +statement ok +create index Ak1 on A(k1) include(k1,k2,v); + +statement ok +create table B (k1 numeric, k2 smallint, v int); + +statement ok +create index Bk1 on B(k1) include(k1,k2,v); + +statement ok +insert into A values(1, 2, 4); + +statement ok +insert into B values(1, 2, 4); + +statement ok +create MATERIALIZED VIEW m1 as select A.v, B.v as Bv from A join B using(k1); + + +query I +SELECT * from m1; +---- +4 4 + +statement ok +drop MATERIALIZED VIEW m1; + +statement ok +drop index Ak1; + +statement ok +drop index Bk1; + +statement ok +drop table A; + +statement ok +drop table B; \ No newline at end of file diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 04ba2246bb859..a168ea163f5b5 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -576,6 +576,8 @@ message ArrangementInfo { repeated plan_common.ColumnDesc column_descs = 2; // Used to build storage table by stream lookup join of delta join. plan_common.StorageTableDesc table_desc = 4; + // Output index columns + repeated uint32 output_col_idx = 5; } // Special node for shared state, which will only be produced in fragmenter. ArrangeNode will diff --git a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs index 6cadd8a31b9e3..25b45ac24c73a 100644 --- a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs @@ -182,6 +182,11 @@ impl StreamNode for StreamDeltaJoin { .map(ColumnDesc::to_protobuf) .collect(), table_desc: Some(left_table_desc.to_protobuf()), + output_col_idx: left_table + .output_col_idx + .iter() + .map(|&v| v as u32) + .collect(), }), right_info: Some(ArrangementInfo { // TODO: remove it @@ -193,6 +198,11 @@ impl StreamNode for StreamDeltaJoin { .map(ColumnDesc::to_protobuf) .collect(), table_desc: Some(right_table_desc.to_protobuf()), + output_col_idx: right_table + .output_col_idx + .iter() + .map(|&v| v as u32) + .collect(), }), output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(), }) diff --git a/src/stream/src/from_proto/lookup.rs b/src/stream/src/from_proto/lookup.rs index a35f0a4390b34..1c1733ae7e4ba 100644 --- a/src/stream/src/from_proto/lookup.rs +++ b/src/stream/src/from_proto/lookup.rs @@ -72,7 +72,13 @@ impl ExecutorBuilder for LookupExecutorBuilder { .iter() .map(ColumnDesc::from) .collect_vec(); - let column_ids = column_descs.iter().map(|x| x.column_id).collect_vec(); + + let column_ids = lookup + .get_arrangement_table_info()? + .get_output_col_idx() + .iter() + .map(|&idx| column_descs[idx as usize].column_id) + .collect_vec(); // Use indices based on full table instead of streaming executor output. let pk_indices = table_desc