Skip to content

Commit

Permalink
fix: stream backfill executor use correct schema (#12314)
Browse files Browse the repository at this point in the history
Co-authored-by: Noel Kwan <[email protected]>
  • Loading branch information
st1page and kwannoel authored Sep 15, 2023
1 parent c443197 commit 467ba4b
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 11 deletions.
31 changes: 31 additions & 0 deletions e2e_test/streaming/bug_fixes/issue_12299.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# https://github.com/risingwavelabs/risingwave/issues/12299
# TL;DR When upstream's stream key is not pk and the stream scan does not contain whole pk.

statement ok
create table t1(
id bigint primary key,
i bigint
);

statement ok
create materialized view mv1 as select id, i from t1 order by id, i;

statement ok
insert into t1 values(1, 1);

statement ok
create materialized view mv2 as select id from mv1;

query I
select * from mv2;
----
1

statement ok
drop materialized view mv2;

statement ok
drop materialized view mv1;

statement ok
drop table t1;
25 changes: 14 additions & 11 deletions src/stream/src/from_proto/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::sync::Arc;

use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId, TableOption};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption};
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::plan_common::StorageTableDesc;
use risingwave_pb::stream_plan::{ChainNode, ChainType};
Expand All @@ -40,28 +40,31 @@ impl ExecutorBuilder for ChainExecutorBuilder {
stream: &mut LocalStreamManagerCore,
) -> StreamResult<BoxedExecutor> {
let [mview, snapshot]: [_; 2] = params.input.try_into().unwrap();

// For reporting the progress.
let progress = stream
.context
.register_create_mview_progress(params.actor_context.id);

// The batch query executor scans on a mapped adhoc mview table, thus we should directly use
// its schema.
let schema = snapshot.schema().clone();

let output_indices = node
.output_indices
.iter()
.map(|&i| i as usize)
.collect_vec();

// For `Chain`s other than `Backfill`, there should be no extra mapping required. We can
// directly output the columns received from the upstream or snapshot.
if !matches!(node.chain_type(), ChainType::Backfill) {
let all_indices = (0..schema.len()).collect_vec();
let schema = if matches!(node.chain_type(), ChainType::Backfill) {
Schema::new(
output_indices
.iter()
.map(|i| snapshot.schema().fields()[*i].clone())
.collect_vec(),
)
} else {
// For `Chain`s other than `Backfill`, there should be no extra mapping required. We can
// directly output the columns received from the upstream or snapshot.
let all_indices = (0..snapshot.schema().len()).collect_vec();
assert_eq!(output_indices, all_indices);
}
snapshot.schema().clone()
};

let executor = match node.chain_type() {
ChainType::Chain | ChainType::UpstreamOnly => {
Expand Down

0 comments on commit 467ba4b

Please sign in to comment.