From 467ba4b00c2b30d2228c9ee1638fbe80fb53fa12 Mon Sep 17 00:00:00 2001 From: stonepage <40830455+st1page@users.noreply.github.com> Date: Fri, 15 Sep 2023 16:28:13 +0800 Subject: [PATCH] fix: stream backfill executor use correct schema (#12314) Co-authored-by: Noel Kwan <47273164+kwannoel@users.noreply.github.com> --- e2e_test/streaming/bug_fixes/issue_12299.slt | 31 ++++++++++++++++++++ src/stream/src/from_proto/chain.rs | 25 +++++++++------- 2 files changed, 45 insertions(+), 11 deletions(-) create mode 100644 e2e_test/streaming/bug_fixes/issue_12299.slt diff --git a/e2e_test/streaming/bug_fixes/issue_12299.slt b/e2e_test/streaming/bug_fixes/issue_12299.slt new file mode 100644 index 0000000000000..7be47038f15cf --- /dev/null +++ b/e2e_test/streaming/bug_fixes/issue_12299.slt @@ -0,0 +1,31 @@ +# https://github.com/risingwavelabs/risingwave/issues/12299 +# TL;DR When upstream's stream key is not pk and the stream scan does not contain whole pk. + +statement ok +create table t1( + id bigint primary key, + i bigint +); + +statement ok +create materialized view mv1 as select id, i from t1 order by id, i; + +statement ok +insert into t1 values(1, 1); + +statement ok +create materialized view mv2 as select id from mv1; + +query I +select * from mv2; +---- +1 + +statement ok +drop materialized view mv2; + +statement ok +drop materialized view mv1; + +statement ok +drop table t1; \ No newline at end of file diff --git a/src/stream/src/from_proto/chain.rs b/src/stream/src/from_proto/chain.rs index d1a971a5cbb4a..667772fcfdd60 100644 --- a/src/stream/src/from_proto/chain.rs +++ b/src/stream/src/from_proto/chain.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId, TableOption}; +use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption}; use risingwave_common::util::sort_util::OrderType; use risingwave_pb::plan_common::StorageTableDesc; use risingwave_pb::stream_plan::{ChainNode, ChainType}; @@ -40,28 +40,31 @@ impl ExecutorBuilder for ChainExecutorBuilder { stream: &mut LocalStreamManagerCore, ) -> StreamResult { let [mview, snapshot]: [_; 2] = params.input.try_into().unwrap(); - // For reporting the progress. let progress = stream .context .register_create_mview_progress(params.actor_context.id); - // The batch query executor scans on a mapped adhoc mview table, thus we should directly use - // its schema. - let schema = snapshot.schema().clone(); - let output_indices = node .output_indices .iter() .map(|&i| i as usize) .collect_vec(); - // For `Chain`s other than `Backfill`, there should be no extra mapping required. We can - // directly output the columns received from the upstream or snapshot. - if !matches!(node.chain_type(), ChainType::Backfill) { - let all_indices = (0..schema.len()).collect_vec(); + let schema = if matches!(node.chain_type(), ChainType::Backfill) { + Schema::new( + output_indices + .iter() + .map(|i| snapshot.schema().fields()[*i].clone()) + .collect_vec(), + ) + } else { + // For `Chain`s other than `Backfill`, there should be no extra mapping required. We can + // directly output the columns received from the upstream or snapshot. + let all_indices = (0..snapshot.schema().len()).collect_vec(); assert_eq!(output_indices, all_indices); - } + snapshot.schema().clone() + }; let executor = match node.chain_type() { ChainType::Chain | ChainType::UpstreamOnly => {