diff --git a/dashboard/proto/gen/stream_plan.ts b/dashboard/proto/gen/stream_plan.ts index 485186d8988f3..1a5c953455dca 100644 --- a/dashboard/proto/gen/stream_plan.ts +++ b/dashboard/proto/gen/stream_plan.ts @@ -17,6 +17,56 @@ import { ConnectorSplits } from "./source"; export const protobufPackage = "stream_plan"; +export const ChainType = { + CHAIN_UNSPECIFIED: "CHAIN_UNSPECIFIED", + /** CHAIN - CHAIN is corresponding to the chain executor. */ + CHAIN: "CHAIN", + /** REARRANGE - REARRANGE is corresponding to the rearranged chain executor. */ + REARRANGE: "REARRANGE", + /** BACKFILL - BACKFILL is corresponding to the backfill executor. */ + BACKFILL: "BACKFILL", + UNRECOGNIZED: "UNRECOGNIZED", +} as const; + +export type ChainType = typeof ChainType[keyof typeof ChainType]; + +export function chainTypeFromJSON(object: any): ChainType { + switch (object) { + case 0: + case "CHAIN_UNSPECIFIED": + return ChainType.CHAIN_UNSPECIFIED; + case 1: + case "CHAIN": + return ChainType.CHAIN; + case 2: + case "REARRANGE": + return ChainType.REARRANGE; + case 3: + case "BACKFILL": + return ChainType.BACKFILL; + case -1: + case "UNRECOGNIZED": + default: + return ChainType.UNRECOGNIZED; + } +} + +export function chainTypeToJSON(object: ChainType): string { + switch (object) { + case ChainType.CHAIN_UNSPECIFIED: + return "CHAIN_UNSPECIFIED"; + case ChainType.CHAIN: + return "CHAIN"; + case ChainType.REARRANGE: + return "REARRANGE"; + case ChainType.BACKFILL: + return "BACKFILL"; + case ChainType.UNRECOGNIZED: + default: + return "UNRECOGNIZED"; + } +} + export const DispatcherType = { UNSPECIFIED: "UNSPECIFIED", /** HASH - Dispatch by hash key, hashed by consistent hash. */ @@ -508,9 +558,9 @@ export interface ChainNode { * Generally, the barrier needs to be rearranged during the MV creation process, so that data can * be flushed to shared buffer periodically, instead of making the first epoch from batch query extra * large. However, in some cases, e.g., shared state, the barrier cannot be rearranged in ChainNode. - * This option is used to disable barrier rearrangement. + * ChainType is used to decide which implementation for the ChainNode. */ - disableRearrange: boolean; + chainType: ChainType; /** Whether to place this chain on the same worker node as upstream actors. */ sameWorkerNode: boolean; /** @@ -519,6 +569,8 @@ export interface ChainNode { * fragment in the downstream mview. Remove this when we refactor the fragmenter. */ isSingleton: boolean; + /** The upstream materialized view info used by backfill. */ + tableDesc: StorageTableDesc | undefined; } /** @@ -2498,9 +2550,10 @@ function createBaseChainNode(): ChainNode { tableId: 0, upstreamFields: [], upstreamColumnIndices: [], - disableRearrange: false, + chainType: ChainType.CHAIN_UNSPECIFIED, sameWorkerNode: false, isSingleton: false, + tableDesc: undefined, }; } @@ -2514,9 +2567,10 @@ export const ChainNode = { upstreamColumnIndices: Array.isArray(object?.upstreamColumnIndices) ? object.upstreamColumnIndices.map((e: any) => Number(e)) : [], - disableRearrange: isSet(object.disableRearrange) ? Boolean(object.disableRearrange) : false, + chainType: isSet(object.chainType) ? chainTypeFromJSON(object.chainType) : ChainType.CHAIN_UNSPECIFIED, sameWorkerNode: isSet(object.sameWorkerNode) ? Boolean(object.sameWorkerNode) : false, isSingleton: isSet(object.isSingleton) ? Boolean(object.isSingleton) : false, + tableDesc: isSet(object.tableDesc) ? StorageTableDesc.fromJSON(object.tableDesc) : undefined, }; }, @@ -2533,9 +2587,11 @@ export const ChainNode = { } else { obj.upstreamColumnIndices = []; } - message.disableRearrange !== undefined && (obj.disableRearrange = message.disableRearrange); + message.chainType !== undefined && (obj.chainType = chainTypeToJSON(message.chainType)); message.sameWorkerNode !== undefined && (obj.sameWorkerNode = message.sameWorkerNode); message.isSingleton !== undefined && (obj.isSingleton = message.isSingleton); + message.tableDesc !== undefined && + (obj.tableDesc = message.tableDesc ? StorageTableDesc.toJSON(message.tableDesc) : undefined); return obj; }, @@ -2544,9 +2600,12 @@ export const ChainNode = { message.tableId = object.tableId ?? 0; message.upstreamFields = object.upstreamFields?.map((e) => Field.fromPartial(e)) || []; message.upstreamColumnIndices = object.upstreamColumnIndices?.map((e) => e) || []; - message.disableRearrange = object.disableRearrange ?? false; + message.chainType = object.chainType ?? ChainType.CHAIN_UNSPECIFIED; message.sameWorkerNode = object.sameWorkerNode ?? false; message.isSingleton = object.isSingleton ?? false; + message.tableDesc = (object.tableDesc !== undefined && object.tableDesc !== null) + ? StorageTableDesc.fromPartial(object.tableDesc) + : undefined; return message; }, }; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 6e3874a54cfad..ca1891cf0bbbc 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -300,6 +300,18 @@ message ExchangeNode { DispatchStrategy strategy = 1; } +enum ChainType { + CHAIN_UNSPECIFIED = 0; + + // CHAIN is corresponding to the chain executor. + CHAIN = 1; + + // REARRANGE is corresponding to the rearranged chain executor. + REARRANGE = 2; + + // BACKFILL is corresponding to the backfill executor. + BACKFILL = 3; +} // ChainNode is used for mv on mv. // ChainNode is like a "UNION" on mv snapshot and streaming. So it takes two inputs with fixed order: // 1. MergeNode (as a placeholder) for streaming read. @@ -313,14 +325,17 @@ message ChainNode { // Generally, the barrier needs to be rearranged during the MV creation process, so that data can // be flushed to shared buffer periodically, instead of making the first epoch from batch query extra // large. However, in some cases, e.g., shared state, the barrier cannot be rearranged in ChainNode. - // This option is used to disable barrier rearrangement. - bool disable_rearrange = 4; + // ChainType is used to decide which implementation for the ChainNode. + ChainType chain_type = 4; // Whether to place this chain on the same worker node as upstream actors. bool same_worker_node = 5; // Whether the upstream materialize is and this chain should be a singleton. // FIXME: This is a workaround for fragmenter since the distribution info will be lost if there's only one // fragment in the downstream mview. Remove this when we refactor the fragmenter. bool is_singleton = 6; + + // The upstream materialized view info used by backfill. + plan_common.StorageTableDesc table_desc = 7; } // BatchPlanNode is used for mv on mv snapshot read. diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index 92920fa429d7b..b2fc3e3f8a986 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -380,7 +380,10 @@ impl RowSeqScanExecutor { .batch_iter_with_pk_bounds( HummockReadEpoch::Committed(epoch), &pk_prefix, - next_col_bounds, + ( + next_col_bounds.0.map(|x| Row::new(vec![x])), + next_col_bounds.1.map(|x| Row::new(vec![x])), + ), ) .await?; diff --git a/src/batch/src/lib.rs b/src/batch/src/lib.rs index e0df8a45b0acc..115f50795318f 100644 --- a/src/batch/src/lib.rs +++ b/src/batch/src/lib.rs @@ -27,6 +27,7 @@ #![feature(is_sorted)] #![recursion_limit = "256"] #![feature(let_chains)] +#![feature(bound_map)] mod error; pub mod exchange_source; diff --git a/src/frontend/src/optimizer/plan_node/stream_index_scan.rs b/src/frontend/src/optimizer/plan_node/stream_index_scan.rs index 7ede90a0f7027..4a7d50dbdfdf4 100644 --- a/src/frontend/src/optimizer/plan_node/stream_index_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_index_scan.rs @@ -151,7 +151,7 @@ impl StreamIndexScan { node_body: Some(ProstStreamNode::Chain(ChainNode { table_id: self.logical.table_desc().table_id.table_id, same_worker_node: true, - disable_rearrange: true, + chain_type: ChainType::Chain as i32, // The fields from upstream upstream_fields: self .logical @@ -171,6 +171,7 @@ impl StreamIndexScan { .map(|&i| i as _) .collect(), is_singleton: false, + table_desc: Some(self.logical.table_desc().to_protobuf()), })), stream_key, operator_id: self.base.id.0 as u64, diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index ddd104cc4a99b..9a6b23c6a3b14 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -192,7 +192,7 @@ impl StreamTableScan { node_body: Some(ProstStreamNode::Chain(ChainNode { table_id: self.logical.table_desc().table_id.table_id, same_worker_node: false, - disable_rearrange: false, + chain_type: ChainType::Backfill as i32, // The fields from upstream upstream_fields: self .logical @@ -212,6 +212,8 @@ impl StreamTableScan { .map(|&i| i as _) .collect(), is_singleton: *self.distribution() == Distribution::Single, + // The table desc used by backfill executor + table_desc: Some(self.logical.table_desc().to_protobuf()), })), stream_key, operator_id: self.base.id.0 as u64, diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 63880d34234f8..65bd99c947e73 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -25,7 +25,7 @@ use itertools::Itertools; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption}; use risingwave_common::row::{self, Row, Row2, RowDeserializer, RowExt}; -use risingwave_common::types::{Datum, VirtualNode}; +use risingwave_common::types::VirtualNode; use risingwave_common::util::ordered::*; use risingwave_common::util::sort_util::OrderType; use risingwave_hummock_sdk::key::{end_bound_of_prefix, next_key, prefixed_range}; @@ -363,26 +363,24 @@ impl StorageTable { Ok(iter) } - /// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds of - /// the next primary key column in `next_col_bounds`. - // TODO: support multiple datums or `Row` for `next_col_bounds`. + /// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds. async fn iter_with_pk_bounds( &self, epoch: HummockReadEpoch, pk_prefix: impl Row2, - next_col_bounds: impl RangeBounds, + range_bounds: impl RangeBounds, ordered: bool, ) -> StorageResult> { fn serialize_pk_bound( pk_serializer: &OrderedRowSerde, pk_prefix: impl Row2, - next_col_bound: Bound<&Datum>, + range_bound: Bound<&Row>, is_start_bound: bool, ) -> Bound> { - match next_col_bound { + match range_bound { Included(k) => { - let pk_prefix_serializer = pk_serializer.prefix(pk_prefix.len() + 1); - let key = pk_prefix.chain(row::once(k)); + let pk_prefix_serializer = pk_serializer.prefix(pk_prefix.len() + k.0.len()); + let key = pk_prefix.chain(k); let serialized_key = serialize_pk(&key, &pk_prefix_serializer); if is_start_bound { Included(serialized_key) @@ -393,15 +391,17 @@ impl StorageTable { } } Excluded(k) => { - let pk_prefix_serializer = pk_serializer.prefix(pk_prefix.len() + 1); - let key = pk_prefix.chain(row::once(k)); + let pk_prefix_serializer = pk_serializer.prefix(pk_prefix.len() + k.0.len()); + let key = pk_prefix.chain(k); let serialized_key = serialize_pk(&key, &pk_prefix_serializer); if is_start_bound { - // storage doesn't support excluded begin key yet, so transform it to - // included - // FIXME: What if `serialized_key` is `\xff\xff..`? Should the frontend - // reject this? - Included(next_key(&serialized_key)) + // Storage doesn't support excluded begin key yet, so transform it to + // included. + // We always serialize a u8 for null of datum which is not equal to '\xff', + // so we can assert that the next_key would never be empty. + let next_serialized_key = next_key(&serialized_key); + assert!(!next_serialized_key.is_empty()); + Included(next_serialized_key) } else { Excluded(serialized_key) } @@ -423,13 +423,13 @@ impl StorageTable { let start_key = serialize_pk_bound( &self.pk_serializer, &pk_prefix, - next_col_bounds.start_bound(), + range_bounds.start_bound(), true, ); let end_key = serialize_pk_bound( &self.pk_serializer, &pk_prefix, - next_col_bounds.end_bound(), + range_bounds.end_bound(), false, ); @@ -482,9 +482,9 @@ impl StorageTable { &self, epoch: HummockReadEpoch, pk_prefix: impl Row2, - next_col_bounds: impl RangeBounds, + range_bounds: impl RangeBounds, ) -> StorageResult> { - self.iter_with_pk_bounds(epoch, pk_prefix, next_col_bounds, true) + self.iter_with_pk_bounds(epoch, pk_prefix, range_bounds, true) .await } diff --git a/src/stream/src/executor/backfill.rs b/src/stream/src/executor/backfill.rs new file mode 100644 index 0000000000000..54315ea89af6b --- /dev/null +++ b/src/stream/src/executor/backfill.rs @@ -0,0 +1,377 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; +use std::ops::Bound; +use std::sync::Arc; + +use async_stack_trace::StackTrace; +use either::Either; +use futures::stream::select_with_strategy; +use futures::{pin_mut, stream, StreamExt}; +use futures_async_stream::try_stream; +use risingwave_common::array::{Op, Row, StreamChunk}; +use risingwave_common::buffer::BitmapBuilder; +use risingwave_common::catalog::Schema; +use risingwave_common::row::{Row2, RowExt}; +use risingwave_hummock_sdk::HummockReadEpoch; +use risingwave_storage::table::batch_table::storage_table::StorageTable; +use risingwave_storage::table::TableIter; +use risingwave_storage::StateStore; + +use super::error::StreamExecutorError; +use super::{expect_first_barrier, BoxedExecutor, Executor, ExecutorInfo, Message}; +use crate::executor::PkIndices; +use crate::task::{ActorId, CreateMviewProgress}; + +/// An implementation of the RFC: Use Backfill To Let Mv On Mv Stream Again.(https://github.com/risingwavelabs/rfcs/pull/13) +/// `BackfillExecutor` is used to create a materialized view on another materialized view. +/// +/// It can only buffer chunks between two barriers instead of unbundled memory usage of +/// `RearrangedChainExecutor`. +/// +/// It uses the latest epoch to read the snapshot of the upstream mv during two barriers and all the +/// `StreamChunk` of the snapshot read will forward to the downstream. +/// +/// It uses `current_pos` to record the progress of the backfill (the pk of the upstream mv) and +/// `current_pos` is initiated as an empty `Row`. +/// +/// All upstream messages during the two barriers interval will be buffered and decide to forward or +/// ignore based on the `current_pos` at the end of the later barrier. Once `current_pos` reaches +/// the end of the upstream mv pk, the backfill would finish. +/// +/// Notice: +/// The pk we are talking about here refers to the storage primary key. +/// We rely on the scheduler to schedule the `BackfillExecutor` together with the upstream mv/table +/// in the same worker, so that we can read uncommitted data from the upstream table without +/// waiting. +pub struct BackfillExecutor { + /// Upstream table + table: StorageTable, + /// Upstream with the same schema with the upstream table. + upstream: BoxedExecutor, + + /// The column indices need to be forwarded to the downstream. + upstream_indices: Arc<[usize]>, + + /// Current position of the table storage primary key. + /// None means it starts from the beginning. + current_pos: Option, + + progress: CreateMviewProgress, + + actor_id: ActorId, + + info: ExecutorInfo, +} + +const CHUNK_SIZE: usize = 1024; + +impl BackfillExecutor +where + S: StateStore, +{ + pub fn new( + table: StorageTable, + upstream: BoxedExecutor, + upstream_indices: Vec, + progress: CreateMviewProgress, + schema: Schema, + pk_indices: PkIndices, + ) -> Self { + Self { + info: ExecutorInfo { + schema, + pk_indices, + identity: "BackfillExecutor".to_owned(), + }, + table, + upstream, + upstream_indices: upstream_indices.into(), + current_pos: None, + actor_id: progress.actor_id(), + progress, + } + } + + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn execute_inner(mut self) { + // Table storage primary key. + let table_pk_indices = self.table.pk_indices().to_vec(); + let upstream_indices = self.upstream_indices.to_vec(); + + let mut upstream = self.upstream.execute(); + + // Poll the upstream to get the first barrier. + let first_barrier = expect_first_barrier(&mut upstream).await?; + let init_epoch = first_barrier.epoch.prev; + + // If the barrier is a conf change of creating this mview, init backfill from its epoch. + // Otherwise, it means we've recovered and the backfill is already finished. + let to_backfill = first_barrier.is_add_dispatcher(self.actor_id); + + // The first barrier message should be propagated. + yield Message::Barrier(first_barrier.clone()); + + if !to_backfill { + // Forward messages directly to the downstream. + let upstream = upstream + .map(move |result| result.map(|msg| Self::mapping_message(msg, &upstream_indices))); + #[for_await] + for message in upstream { + yield message?; + } + return Ok(()); + } + + // The epoch used to snapshot read upstream mv. + let mut snapshot_read_epoch = init_epoch; + + // Backfill Algorithm: + // + // backfill_stream + // / \ + // upstream snapshot + // + // We construct a backfill stream with upstream as its left input and mv snapshot read + // stream as its right input. When a chunk comes from upstream, we will buffer it. + // + // When a barrier comes from upstream: + // - Update the `snapshot_read_epoch`. + // - For each row of the upstream chunk buffer, forward it to downstream if its pk <= + // `current_pos`, otherwise ignore it. + // - reconstruct the whole backfill stream with upstream and new mv snapshot read stream + // with the `snapshot_read_epoch`. + // + // When a chunk comes from snapshot, we forward it to the downstream and raise + // `current_pos`. + // + // When we reach the end of the snapshot read stream, it means backfill has been + // finished. + // + // Once the backfill loop ends, we forward the upstream directly to the downstream. + 'backfill_loop: loop { + let mut upstream_chunk_buffer: Vec = vec![]; + + let mut left_upstream = (&mut upstream).map(Either::Left); + + let right_snapshot = Box::pin( + Self::snapshot_read(&self.table, snapshot_read_epoch, self.current_pos.clone()) + .map(Either::Right), + ); + + // Prefer to select upstream, so we can stop snapshot stream as soon as the barrier + // comes. + let backfill_stream = + select_with_strategy(&mut left_upstream, right_snapshot, |_: &mut ()| { + stream::PollNext::Left + }); + + #[for_await] + for either in backfill_stream { + match either { + Either::Left(msg) => { + match msg? { + Message::Barrier(barrier) => { + // If it is a barrier, switch snapshot and consume + // upstream buffer chunk + + // Consume upstream buffer chunk + for chunk in upstream_chunk_buffer.drain(..) { + if let Some(current_pos) = self.current_pos.as_ref() { + yield Message::Chunk(Self::mapping_chunk( + Self::mark_chunk(chunk, current_pos, &table_pk_indices), + &upstream_indices, + )); + } + } + + // Update snapshot read epoch. + snapshot_read_epoch = barrier.epoch.prev; + + yield Message::Barrier(barrier); + + self.progress + .update(snapshot_read_epoch, snapshot_read_epoch); + // Break the for loop and start a new snapshot read stream. + break; + } + Message::Chunk(chunk) => { + // Buffer the upstream chunk. + upstream_chunk_buffer.push(chunk.compact()); + } + Message::Watermark(_) => { + todo!("https://github.com/risingwavelabs/risingwave/issues/6042") + } + } + } + Either::Right(msg) => { + match msg? { + None => { + // End of the snapshot read stream. + // We need to set current_pos to the maximum value or do not + // mark the chunk anymore, otherwise, we will ignore some rows + // in the buffer. Here we choose to never mark the chunk. + // Consume with the renaming stream buffer chunk without mark. + for chunk in upstream_chunk_buffer.drain(..) { + yield Message::Chunk(Self::mapping_chunk( + chunk, + &upstream_indices, + )); + } + + // Finish backfill. + break 'backfill_loop; + } + Some(chunk) => { + // Raise the current position. + // As snapshot read streams are ordered by pk, so we can + // just use the last row to update `current_pos`. + self.current_pos = Some( + chunk + .rows() + .last() + .unwrap() + .1 + .row_by_indices(&table_pk_indices), + ); + + yield Message::Chunk(Self::mapping_chunk( + chunk, + &self.upstream_indices, + )); + } + } + } + } + } + } + + let mut finish_on_barrier = |msg: &Message| { + if let Some(barrier) = msg.as_barrier() { + self.progress.finish(barrier.epoch.curr); + } + }; + + tracing::debug!( + actor = self.actor_id, + "Backfill has already finished and forward messages directly to the downstream" + ); + + // Backfill has already finished. + // Forward messages directly to the downstream. + let upstream = upstream + .map(move |result| result.map(|msg| Self::mapping_message(msg, &upstream_indices))); + #[for_await] + for msg in upstream { + let msg: Message = msg?; + finish_on_barrier(&msg); + yield msg; + } + } + + #[expect(clippy::needless_lifetimes, reason = "code generated by try_stream")] + #[try_stream(ok = Option, error = StreamExecutorError)] + async fn snapshot_read(table: &StorageTable, epoch: u64, current_pos: Option) { + // `current_pos` is None means it needs to scan from the beginning, so we use Unbounded to + // scan. Otherwise, use Excluded. + let range_bounds = if let Some(current_pos) = current_pos { + (Bound::Excluded(current_pos), Bound::Unbounded) + } else { + (Bound::Unbounded, Bound::Unbounded) + }; + // We use uncommitted read here, because we have already scheduled the `BackfillExecutor` + // together with the upstream mv. + let iter = table + .batch_iter_with_pk_bounds(HummockReadEpoch::NoWait(epoch), Row::empty(), range_bounds) + .await?; + + pin_mut!(iter); + + while let Some(data_chunk) = iter + .collect_data_chunk(table.schema(), Some(CHUNK_SIZE)) + .stack_trace("backfill_snapshot_read") + .await? + { + if data_chunk.cardinality() != 0 { + let ops = vec![Op::Insert; data_chunk.capacity()]; + let stream_chunk = StreamChunk::from_parts(ops, data_chunk); + yield Some(stream_chunk); + } + } + + yield None; + } + + /// Mark chunk: + /// For each row of the chunk, forward it to downstream if its pk <= `current_pos`, otherwise + /// ignore it. We implement it by changing the visibility bitmap. + fn mark_chunk( + chunk: StreamChunk, + current_pos: &Row, + table_pk_indices: &PkIndices, + ) -> StreamChunk { + let chunk = chunk.compact(); + let (data, ops) = chunk.into_parts(); + let mut new_visibility = BitmapBuilder::with_capacity(ops.len()); + // Use project to avoid allocation. + for v in data.rows().map(|row| { + match row.project(table_pk_indices).iter().cmp(current_pos.iter()) { + Ordering::Less | Ordering::Equal => true, + Ordering::Greater => false, + } + }) { + new_visibility.append(v); + } + let (columns, _) = data.into_parts(); + StreamChunk::new(ops, columns, Some(new_visibility.finish())) + } + + fn mapping_chunk(chunk: StreamChunk, upstream_indices: &[usize]) -> StreamChunk { + let (ops, columns, visibility) = chunk.into_inner(); + let mapped_columns = upstream_indices + .iter() + .map(|&i| columns[i].clone()) + .collect(); + StreamChunk::new(ops, mapped_columns, visibility) + } + + fn mapping_message(msg: Message, upstream_indices: &[usize]) -> Message { + match msg { + Message::Barrier(_) | Message::Watermark(_) => msg, + Message::Chunk(chunk) => Message::Chunk(Self::mapping_chunk(chunk, upstream_indices)), + } + } +} + +impl Executor for BackfillExecutor +where + S: StateStore, +{ + fn execute(self: Box) -> super::BoxedMessageStream { + self.execute_inner().boxed() + } + + fn schema(&self) -> &Schema { + &self.info.schema + } + + fn pk_indices(&self) -> super::PkIndicesRef<'_> { + &self.info.pk_indices + } + + fn identity(&self) -> &str { + &self.info.identity + } +} diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index f0ae2a6bd8faa..50eb6ccdd2015 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -82,6 +82,7 @@ mod union; mod watermark_filter; mod wrapper; +mod backfill; #[cfg(test)] mod integration_tests; #[cfg(test)] @@ -89,6 +90,7 @@ mod test_utils; pub use actor::{Actor, ActorContext, ActorContextRef}; use anyhow::Context; +pub use backfill::*; pub use batch_query::BatchQueryExecutor; pub use chain::ChainExecutor; pub use dispatch::{DispatchExecutor, DispatcherImpl}; diff --git a/src/stream/src/from_proto/chain.rs b/src/stream/src/from_proto/chain.rs index 49da8a7bb2042..52ac66a83abd7 100644 --- a/src/stream/src/from_proto/chain.rs +++ b/src/stream/src/from_proto/chain.rs @@ -12,8 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::catalog::{ColumnDesc, TableId, TableOption}; +use risingwave_common::util::sort_util::OrderType; +use risingwave_pb::plan_common::{OrderType as ProstOrderType, StorageTableDesc}; +use risingwave_pb::stream_plan::ChainType; +use risingwave_storage::table::batch_table::storage_table::StorageTable; +use risingwave_storage::table::Distribution; + use super::*; -use crate::executor::{ChainExecutor, RearrangedChainExecutor}; +use crate::executor::{BackfillExecutor, ChainExecutor, RearrangedChainExecutor}; pub struct ChainExecutorBuilder; @@ -22,7 +29,7 @@ impl ExecutorBuilder for ChainExecutorBuilder { async fn new_boxed_executor( params: ExecutorParams, node: &StreamNode, - _store: impl StateStore, + state_store: impl StateStore, stream: &mut LocalStreamManagerCore, ) -> StreamResult { let node = try_match_expand!(node.get_node_body().unwrap(), NodeBody::Chain)?; @@ -43,26 +50,99 @@ impl ExecutorBuilder for ChainExecutorBuilder { // its schema. let schema = snapshot.schema().clone(); - if node.disable_rearrange { - let executor = ChainExecutor::new( + let executor = match node.chain_type() { + ChainType::Chain => ChainExecutor::new( snapshot, mview, upstream_indices, progress, schema, params.pk_indices, - ); - Ok(executor.boxed()) - } else { - let executor = RearrangedChainExecutor::new( + ) + .boxed(), + ChainType::Rearrange => RearrangedChainExecutor::new( snapshot, mview, upstream_indices, progress, schema, params.pk_indices, - ); - Ok(executor.boxed()) - } + ) + .boxed(), + ChainType::Backfill => { + let table_desc: &StorageTableDesc = node.get_table_desc()?; + let table_id = TableId { + table_id: table_desc.table_id, + }; + + let order_types = table_desc + .pk + .iter() + .map(|desc| { + OrderType::from_prost(&ProstOrderType::from_i32(desc.order_type).unwrap()) + }) + .collect_vec(); + + let column_descs = table_desc + .columns + .iter() + .map(ColumnDesc::from) + .collect_vec(); + let column_ids = column_descs.iter().map(|x| x.column_id).collect_vec(); + + // Use indices based on full table instead of streaming executor output. + let pk_indices = table_desc.pk.iter().map(|k| k.index as usize).collect_vec(); + + let dist_key_indices = table_desc + .dist_key_indices + .iter() + .map(|&k| k as usize) + .collect_vec(); + let distribution = match params.vnode_bitmap { + Some(vnodes) => Distribution { + dist_key_indices, + vnodes: vnodes.into(), + }, + None => Distribution::fallback(), + }; + + let table_option = TableOption { + retention_seconds: if table_desc.retention_seconds > 0 { + Some(table_desc.retention_seconds) + } else { + None + }, + }; + let value_indices = table_desc + .get_value_indices() + .iter() + .map(|&k| k as usize) + .collect_vec(); + // TODO: refactor it with from_table_catalog in the future. + let table = StorageTable::new_partial( + state_store, + table_id, + column_descs, + column_ids, + order_types, + pk_indices, + distribution, + table_option, + value_indices, + ); + + BackfillExecutor::new( + table, + mview, + upstream_indices, + progress, + schema, + params.pk_indices, + ) + .boxed() + } + ChainType::ChainUnspecified => unreachable!(), + }; + Ok(executor) } }