From a22e20c5f0a8ec5d9b824bf489206b0945a2b05e Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Mon, 20 Nov 2023 20:00:48 +0800 Subject: [PATCH] refactor(proto): use separate proto for cdc scan (#13502) --- proto/stream_plan.proto | 20 +++- src/common/src/util/stream_graph_visitor.rs | 5 + .../plan_node/stream_cdc_table_scan.rs | 54 +-------- .../optimizer/plan_node/stream_table_scan.rs | 4 +- src/frontend/src/stream_fragmenter/mod.rs | 11 ++ .../src/utils/stream_graph_formatter.rs | 4 + src/meta/service/src/ddl_service.rs | 2 +- src/meta/src/model/stream.rs | 12 +- src/meta/src/stream/stream_graph/actor.rs | 47 ++++++-- src/meta/src/stream/stream_graph/fragment.rs | 23 ++-- src/stream/src/from_proto/mod.rs | 3 + src/stream/src/from_proto/stream_cdc_scan.rs | 112 ++++++++++++++++++ src/stream/src/from_proto/stream_scan.rs | 71 +---------- src/stream/src/task/stream_manager.rs | 1 + 14 files changed, 222 insertions(+), 147 deletions(-) create mode 100644 src/stream/src/from_proto/stream_cdc_scan.rs diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 706149b26978b..086fd75d698cf 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -463,9 +463,6 @@ enum StreamScanType { // ChainExecutor with upstream_only = true STREAM_SCAN_TYPE_UPSTREAM_ONLY = 4; - - // CdcBackfillExecutor - STREAM_SCAN_TYPE_CDC_BACKFILL = 5; } // StreamScanNode reads data from upstream table first, and then pass all events to downstream. @@ -502,9 +499,23 @@ message StreamScanNode { // Snapshot read every N barriers uint32 snapshot_read_barrier_interval = 9 [deprecated = true]; +} + +message StreamCdcScanNode { + uint32 table_id = 1; + + // The columns from the upstream table that'll be internally required by this stream scan node. + // Contains Primary Keys and Output columns. + repeated int32 upstream_column_ids = 2; + + // Strips the primary key columns if they're unnecessary. + repeated uint32 output_indices = 3; + + /// The state table used by Backfill operator for persisting internal state + catalog.Table state_table = 4; // The external table that will be backfilled for CDC. - plan_common.ExternalTableDesc cdc_table_desc = 10; + plan_common.ExternalTableDesc cdc_table_desc = 5; } // BatchPlanNode is used for mv on mv snapshot read. @@ -700,6 +711,7 @@ message StreamNode { EowcOverWindowNode eowc_over_window = 136; OverWindowNode over_window = 137; StreamFsFetchNode stream_fs_fetch = 138; + StreamCdcScanNode stream_cdc_scan = 139; } // The id for the operator. This is local per mview. // TODO: should better be a uint32. diff --git a/src/common/src/util/stream_graph_visitor.rs b/src/common/src/util/stream_graph_visitor.rs index baa1dd4e1fb28..79a94a6fca0c5 100644 --- a/src/common/src/util/stream_graph_visitor.rs +++ b/src/common/src/util/stream_graph_visitor.rs @@ -199,6 +199,11 @@ fn visit_stream_node_tables_inner( optional!(node.state_table, "StreamScan") } + // Stream Cdc Scan + NodeBody::StreamCdcScan(node) => { + always!(node.state_table, "StreamCdcScan") + } + // Note: add internal tables for new nodes here. NodeBody::Materialize(node) if !internal_tables_only => { always!(node.table, "Materialize") diff --git a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs index 4ef394f3748fd..a2bf32e54d505 100644 --- a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs @@ -18,11 +18,11 @@ use risingwave_common::catalog::{ColumnCatalog, Field}; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::OrderType; use risingwave_pb::stream_plan::stream_node::PbNodeBody; -use risingwave_pb::stream_plan::{PbStreamNode, StreamScanType}; +use risingwave_pb::stream_plan::PbStreamNode; use super::stream::prelude::*; use super::utils::{childless_record, Distill}; -use super::{generic, ExprRewritable, PlanBase, PlanNodeId, PlanRef, StreamNode}; +use super::{generic, ExprRewritable, PlanBase, PlanRef, StreamNode}; use crate::catalog::ColumnId; use crate::expr::ExprRewriter; use crate::handler::create_source::debezium_cdc_source_schema; @@ -37,13 +37,10 @@ use crate::{Explain, TableCatalog}; pub struct StreamCdcTableScan { pub base: PlanBase, core: generic::CdcScan, - batch_plan_id: PlanNodeId, - stream_scan_type: StreamScanType, } impl StreamCdcTableScan { pub fn new(core: generic::CdcScan) -> Self { - let batch_plan_id = core.ctx.next_plan_node_id(); let distribution = Distribution::SomeShard; let base = PlanBase::new_stream_with_core( &core, @@ -52,12 +49,7 @@ impl StreamCdcTableScan { false, core.watermark_columns(), ); - Self { - base, - core, - batch_plan_id, - stream_scan_type: StreamScanType::CdcBackfill, - } + Self { base, core } } pub fn table_name(&self) -> &str { @@ -68,10 +60,6 @@ impl StreamCdcTableScan { &self.core } - pub fn stream_scan_type(&self) -> StreamScanType { - StreamScanType::CdcBackfill - } - /// Build catalog for cdc backfill state /// Right now we only persist whether the backfill is finished and the corresponding cdc offset /// schema: | `split_id` | `pk...` | `backfill_finished` | `row_count` | `cdc_offset` | @@ -165,20 +153,6 @@ impl StreamCdcTableScan { .map(ColumnId::get_id) .collect_vec(); - // The schema of the snapshot read stream - let snapshot_schema = upstream_column_ids - .iter() - .map(|&id| { - let col = self - .core - .get_table_columns() - .iter() - .find(|c| c.column_id.get_id() == id) - .unwrap(); - Field::from(col).to_prost() - }) - .collect_vec(); - // The schema of the shared cdc source upstream is different from snapshot, // refer to `debezium_cdc_source_schema()` for details. let upstream_schema = { @@ -202,28 +176,19 @@ impl StreamCdcTableScan { }) .collect_vec(); - let batch_plan_node = BatchPlanNode { - table_desc: None, - column_ids: upstream_column_ids.clone(), - }; - let catalog = self .build_backfill_state_catalog(state) .to_internal_table_prost(); // We need to pass the id of upstream source job here let upstream_source_id = self.core.cdc_table_desc.source_id.table_id; - let node_body = PbNodeBody::StreamScan(StreamScanNode { + let node_body = PbNodeBody::StreamCdcScan(StreamCdcScanNode { table_id: upstream_source_id, - stream_scan_type: self.stream_scan_type as i32, - // The column indices need to be forwarded to the downstream - output_indices, upstream_column_ids, + output_indices, // The table desc used by backfill executor state_table: Some(catalog), - rate_limit: None, cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()), - ..Default::default() }); PbStreamNode { @@ -237,15 +202,6 @@ impl StreamCdcTableScan { stream_key: vec![], // not used ..Default::default() }, - PbStreamNode { - node_body: Some(PbNodeBody::BatchPlan(batch_plan_node)), - operator_id: self.batch_plan_id.0 as u64, - identity: "BatchPlanNode".into(), - fields: snapshot_schema, - stream_key: vec![], // not used - input: vec![], - append_only: true, - }, ], node_body: Some(node_body), diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 750548a315e9d..aa1e601116b7d 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -243,9 +243,7 @@ impl StreamTableScan { // The required columns from the table (both scan and upstream). let upstream_column_ids = match self.stream_scan_type { // For backfill, we additionally need the primary key columns. - StreamScanType::Backfill | StreamScanType::CdcBackfill => { - self.core.output_and_pk_column_ids() - } + StreamScanType::Backfill => self.core.output_and_pk_column_ids(), StreamScanType::Chain | StreamScanType::Rearrange | StreamScanType::UpstreamOnly => { self.core.output_column_ids() } diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index 887c55ecb84b3..b650d746b7a7e 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -136,6 +136,7 @@ fn is_stateful_executor(stream_node: &StreamNode) -> bool { | NodeBody::HashJoin(_) | NodeBody::DeltaIndexJoin(_) | NodeBody::StreamScan(_) + | NodeBody::StreamCdcScan(_) | NodeBody::DynamicFilter(_) ) } @@ -289,6 +290,16 @@ fn build_fragment( current_fragment.upstream_table_ids.push(node.table_id); } + NodeBody::StreamCdcScan(node) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32; + // memorize table id for later use + // The table id could be a upstream CDC source + state + .dependent_table_ids + .insert(TableId::new(node.table_id)); + current_fragment.upstream_table_ids.push(node.table_id); + } + NodeBody::Now(_) => { // TODO: Remove this and insert a `BarrierRecv` instead. current_fragment.fragment_type_mask |= FragmentTypeFlag::Now as u32; diff --git a/src/frontend/src/utils/stream_graph_formatter.rs b/src/frontend/src/utils/stream_graph_formatter.rs index f55e9f6860894..8b022dbbe3da3 100644 --- a/src/frontend/src/utils/stream_graph_formatter.rs +++ b/src/frontend/src/utils/stream_graph_formatter.rs @@ -267,6 +267,10 @@ impl StreamGraphFormatter { "state table", self.pretty_add_table(node.get_state_table().unwrap()), )), + stream_node::NodeBody::StreamCdcScan(node) => fields.push(( + "state table", + self.pretty_add_table(node.get_state_table().unwrap()), + )), stream_node::NodeBody::Sort(node) => { fields.push(( "state table", diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 10d4524db5370..e30c430b420cb 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -912,7 +912,7 @@ fn fill_table_stream_graph_info( } // fill table id for cdc backfill - if let NodeBody::StreamScan(node) = node_body && table_job_type == TableJobType::SharedCdcSource { + if let NodeBody::StreamCdcScan(node) = node_body && table_job_type == TableJobType::SharedCdcSource { if let Some(table) = node.cdc_table_desc.as_mut() { table.table_id = table_id; } diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index aba928fecde41..5f9b303c77a5c 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -364,11 +364,13 @@ impl TableFragments { /// Resolve dependent table fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap) { - if let Some(NodeBody::StreamScan(stream_scan)) = stream_node.node_body.as_ref() { - table_ids - .entry(TableId::new(stream_scan.table_id)) - .or_default() - .add_assign(1); + let table_id = match stream_node.node_body.as_ref() { + Some(NodeBody::StreamScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)), + Some(NodeBody::StreamCdcScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)), + _ => None, + }; + if let Some(table_id) = table_id { + table_ids.entry(table_id).or_default().add_assign(1); } for child in &stream_node.input { diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index 22f0740fc5c0f..5120e8e26869b 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -27,7 +27,6 @@ use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; use risingwave_pb::stream_plan::{ DispatchStrategy, Dispatcher, DispatcherType, MergeNode, StreamActor, StreamNode, - StreamScanType, }; use super::id::GlobalFragmentIdsExt; @@ -160,9 +159,6 @@ impl ActorBuilder { // "Leaf" node `StreamScan`. NodeBody::StreamScan(stream_scan) => { - let cdc_backfill = - stream_scan.stream_scan_type == StreamScanType::CdcBackfill as i32; - let input = stream_node.get_input(); assert_eq!(input.len(), 2); @@ -188,11 +184,7 @@ impl ActorBuilder { node_body: Some(NodeBody::Merge(MergeNode { upstream_actor_id, upstream_fragment_id: upstreams.fragment_id.as_global_id(), - upstream_dispatcher_type: if cdc_backfill { - DispatcherType::CdcTablename as _ - } else { - DispatcherType::NoShuffle as _ - }, + upstream_dispatcher_type: DispatcherType::NoShuffle as _, fields: merge_node.fields.clone(), })), ..merge_node.clone() @@ -206,6 +198,43 @@ impl ActorBuilder { }) } + // "Leaf" node `StreamScan`. + NodeBody::StreamCdcScan(stream_scan) => { + let input = stream_node.get_input(); + assert_eq!(input.len(), 1); + + let merge_node = &input[0]; + assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_))); + + // Index the upstreams by the an external edge ID. + let upstreams = &self.upstreams[&EdgeId::UpstreamExternal { + upstream_table_id: stream_scan.table_id.into(), + downstream_fragment_id: self.fragment_id, + }]; + + // Upstream Cdc Source should be singleton. + let upstream_actor_id = upstreams.actors.as_global_ids(); + assert_eq!(upstream_actor_id.len(), 1); + + let input = vec![ + // Fill the merge node body with correct upstream info. + StreamNode { + node_body: Some(NodeBody::Merge(MergeNode { + upstream_actor_id, + upstream_fragment_id: upstreams.fragment_id.as_global_id(), + upstream_dispatcher_type: DispatcherType::CdcTablename as _, + fields: merge_node.fields.clone(), + })), + ..merge_node.clone() + }, + ]; + + Ok(StreamNode { + input, + ..stream_node.clone() + }) + } + // For other nodes, visit the children recursively. _ => { let mut new_stream_node = stream_node.clone(); diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 29b61e1056f1e..0de28324a1e76 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -170,13 +170,20 @@ impl BuildingFragment { let mut table_columns = HashMap::new(); stream_graph_visitor::visit_fragment(fragment, |node_body| { - if let NodeBody::StreamScan(stream_scan) = node_body { - let table_id = stream_scan.table_id.into(); - let column_ids = stream_scan.upstream_column_ids.clone(); - table_columns - .try_insert(table_id, column_ids) - .expect("currently there should be no two same upstream tables in a fragment"); - } + let (table_id, column_ids) = match node_body { + NodeBody::StreamScan(stream_scan) => ( + stream_scan.table_id.into(), + stream_scan.upstream_column_ids.clone(), + ), + NodeBody::StreamCdcScan(stream_cdc_scan) => ( + stream_cdc_scan.table_id.into(), + stream_cdc_scan.upstream_column_ids.clone(), + ), + _ => return, + }; + table_columns + .try_insert(table_id, column_ids) + .expect("currently there should be no two same upstream tables in a fragment"); }); assert_eq!(table_columns.len(), fragment.upstream_table_ids.len()); @@ -563,7 +570,7 @@ impl CompleteStreamFragmentGraph { // extract the upstream full_table_name from the source fragment let mut full_table_name = None; visit_fragment(&mut fragment.inner, |node_body| { - if let NodeBody::StreamScan(stream_scan) = node_body { + if let NodeBody::StreamCdcScan(stream_scan) = node_body { full_table_name = stream_scan .cdc_table_desc .as_ref() diff --git a/src/stream/src/from_proto/mod.rs b/src/stream/src/from_proto/mod.rs index 35c529234160c..169ffad718d03 100644 --- a/src/stream/src/from_proto/mod.rs +++ b/src/stream/src/from_proto/mod.rs @@ -42,6 +42,7 @@ mod sink; mod sort; mod source; mod stateless_simple_agg; +mod stream_cdc_scan; mod stream_scan; mod temporal_join; mod top_n; @@ -82,6 +83,7 @@ use self::sink::*; use self::sort::*; use self::source::*; use self::stateless_simple_agg::*; +use self::stream_cdc_scan::*; use self::stream_scan::*; use self::temporal_join::*; use self::top_n::*; @@ -140,6 +142,7 @@ pub async fn create_executor( NodeBody::HashJoin => HashJoinExecutorBuilder, NodeBody::HopWindow => HopWindowExecutorBuilder, NodeBody::StreamScan => StreamScanExecutorBuilder, + NodeBody::StreamCdcScan => StreamCdcScanExecutorBuilder, NodeBody::BatchPlan => BatchQueryExecutorBuilder, NodeBody::Merge => MergeExecutorBuilder, NodeBody::Materialize => MaterializeExecutorBuilder, diff --git a/src/stream/src/from_proto/stream_cdc_scan.rs b/src/stream/src/from_proto/stream_cdc_scan.rs new file mode 100644 index 0000000000000..c9a496c3a6832 --- /dev/null +++ b/src/stream/src/from_proto/stream_cdc_scan.rs @@ -0,0 +1,112 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use risingwave_common::catalog::{Schema, TableId}; +use risingwave_common::util::sort_util::OrderType; +use risingwave_connector::source::external::{CdcTableType, SchemaTableName}; +use risingwave_pb::plan_common::ExternalTableDesc; +use risingwave_pb::stream_plan::StreamCdcScanNode; + +use super::*; +use crate::common::table::state_table::StateTable; +use crate::executor::external::ExternalStorageTable; +use crate::executor::CdcBackfillExecutor; + +pub struct StreamCdcScanExecutorBuilder; + +impl ExecutorBuilder for StreamCdcScanExecutorBuilder { + type Node = StreamCdcScanNode; + + async fn new_boxed_executor( + params: ExecutorParams, + node: &Self::Node, + state_store: impl StateStore, + stream: &mut LocalStreamManagerCore, + ) -> StreamResult { + let [upstream]: [_; 1] = params.input.try_into().unwrap(); + // For reporting the progress. + let progress = stream + .context + .register_create_mview_progress(params.actor_context.id); + + let output_indices = node + .output_indices + .iter() + .map(|&i| i as usize) + .collect_vec(); + + let table_desc: &ExternalTableDesc = node.get_cdc_table_desc()?; + + let table_schema: Schema = table_desc.columns.iter().map(Into::into).collect(); + assert_eq!(output_indices, (0..table_schema.len()).collect_vec()); + assert_eq!(table_schema.data_types(), params.info.schema.data_types()); + + let properties: HashMap = table_desc + .connect_properties + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + let table_type = CdcTableType::from_properties(&properties); + let table_reader = table_type + .create_table_reader(properties.clone(), table_schema.clone()) + .await?; + + let table_pk_order_types = table_desc + .pk + .iter() + .map(|desc| OrderType::from_protobuf(desc.get_order_type().unwrap())) + .collect_vec(); + let table_pk_indices = table_desc + .pk + .iter() + .map(|k| k.column_index as usize) + .collect_vec(); + + let schema_table_name = SchemaTableName::from_properties(&properties); + let external_table = ExternalStorageTable::new( + TableId::new(table_desc.table_id), + schema_table_name, + table_reader, + table_schema, + table_pk_order_types, + table_pk_indices, + output_indices.clone(), + ); + + let vnodes = params.vnode_bitmap.map(Arc::new); + // cdc backfill should be singleton, so vnodes must be None. + assert_eq!(None, vnodes); + let state_table = + StateTable::from_table_catalog(node.get_state_table()?, state_store, vnodes).await; + + // TODO(kwannoel): Should we apply flow control here as well? + Ok(CdcBackfillExecutor::new( + params.actor_context.clone(), + params.info, + external_table, + upstream, + output_indices, + Some(progress), + params.executor_stats, + Some(state_table), + None, + true, + params.env.config().developer.chunk_size, + ) + .boxed()) + } +} diff --git a/src/stream/src/from_proto/stream_scan.rs b/src/stream/src/from_proto/stream_scan.rs index 059a4c983fe20..9d5f57eaa3eb8 100644 --- a/src/stream/src/from_proto/stream_scan.rs +++ b/src/stream/src/from_proto/stream_scan.rs @@ -12,24 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::sync::Arc; use anyhow::anyhow; -use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption}; +use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId, TableOption}; use risingwave_common::util::sort_util::OrderType; -use risingwave_connector::source::external::{CdcTableType, SchemaTableName}; -use risingwave_pb::plan_common::{ExternalTableDesc, StorageTableDesc}; +use risingwave_pb::plan_common::StorageTableDesc; use risingwave_pb::stream_plan::{StreamScanNode, StreamScanType}; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::Distribution; use super::*; use crate::common::table::state_table::StateTable; -use crate::executor::external::ExternalStorageTable; use crate::executor::{ - BackfillExecutor, CdcBackfillExecutor, ChainExecutor, FlowControlExecutor, - RearrangedChainExecutor, + BackfillExecutor, ChainExecutor, FlowControlExecutor, RearrangedChainExecutor, }; pub struct StreamScanExecutorBuilder; @@ -63,67 +59,6 @@ impl ExecutorBuilder for StreamScanExecutorBuilder { StreamScanType::Rearrange => { RearrangedChainExecutor::new(params.info, snapshot, upstream, progress).boxed() } - StreamScanType::CdcBackfill => { - let table_desc: &ExternalTableDesc = node.get_cdc_table_desc()?; - - let table_schema: Schema = table_desc.columns.iter().map(Into::into).collect(); - assert_eq!(output_indices, (0..table_schema.len()).collect_vec()); - assert_eq!(table_schema.data_types(), params.info.schema.data_types()); - - let properties: HashMap = table_desc - .connect_properties - .iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect(); - let table_type = CdcTableType::from_properties(&properties); - let table_reader = table_type - .create_table_reader(properties.clone(), table_schema.clone()) - .await?; - - let table_pk_order_types = table_desc - .pk - .iter() - .map(|desc| OrderType::from_protobuf(desc.get_order_type().unwrap())) - .collect_vec(); - let table_pk_indices = table_desc - .pk - .iter() - .map(|k| k.column_index as usize) - .collect_vec(); - - let schema_table_name = SchemaTableName::from_properties(&properties); - let external_table = ExternalStorageTable::new( - TableId::new(table_desc.table_id), - schema_table_name, - table_reader, - table_schema, - table_pk_order_types, - table_pk_indices, - output_indices.clone(), - ); - - let vnodes = params.vnode_bitmap.map(Arc::new); - // cdc backfill should be singleton, so vnodes must be None. - assert_eq!(None, vnodes); - let state_table = - StateTable::from_table_catalog(node.get_state_table()?, state_store, vnodes) - .await; - - CdcBackfillExecutor::new( - params.actor_context.clone(), - params.info, - external_table, - upstream, - output_indices, - Some(progress), - params.executor_stats, - Some(state_table), - None, - true, - params.env.config().developer.chunk_size, - ) - .boxed() - } StreamScanType::Backfill => { let table_desc: &StorageTableDesc = node .get_table_desc() diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 35ebb2581dc40..7f0bcc03e2186 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -511,6 +511,7 @@ impl LocalStreamManagerCore { | NodeBody::DeltaIndexJoin(_) | NodeBody::Lookup(_) | NodeBody::StreamScan(_) + | NodeBody::StreamCdcScan(_) | NodeBody::DynamicFilter(_) | NodeBody::GroupTopN(_) | NodeBody::Now(_)