From 8ac91c123bb203830c09b2559d1dc8e14f78f09a Mon Sep 17 00:00:00 2001 From: StrikeW Date: Fri, 22 Sep 2023 16:56:49 +0800 Subject: [PATCH] save work: may pass connector properties via the first barrier --- proto/stream_plan.proto | 1 + .../optimizer/plan_node/stream_table_scan.rs | 11 +++++ src/stream/src/from_proto/chain.rs | 43 +++++++++++++------ 3 files changed, 41 insertions(+), 14 deletions(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 8c5d60ae27de7..8cbd525e0909f 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -409,6 +409,7 @@ message MergeNode { repeated plan_common.Field fields = 4; // Whether the upstream is a CDC stream, used in chaining a Table job with a CDC source job. bool cdc_upstream = 5; + optional map connector_properties = 6; } // passed from frontend to meta, used by fragmenter to generate `MergeNode` diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 425ae19cf56db..fd06574ae7259 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -43,6 +43,7 @@ pub struct StreamTableScan { logical: generic::Scan, batch_plan_id: PlanNodeId, chain_type: ChainType, + connector_properties: Option>, } impl StreamTableScan { @@ -50,6 +51,15 @@ impl StreamTableScan { Self::new_with_chain_type(logical, ChainType::Backfill) } + pub fn new_for_cdc_scan( + logical: generic::Scan, + connector_properties: HashMap, + ) -> Self { + let mut plan = Self::new_with_chain_type(logical, ChainType::CdcBackfill); + plan.connector_properties = Some(connector_properties); + plan + } + pub fn new_with_chain_type(logical: generic::Scan, chain_type: ChainType) -> Self { let batch_plan_id = logical.ctx.next_plan_node_id(); @@ -271,6 +281,7 @@ impl StreamTableScan { PbStreamNode { node_body: Some(PbNodeBody::Merge(MergeNode { cdc_upstream, + connector_properties: self.connector_properties.clone(), ..Default::default() })), identity: "Upstream".into(), diff --git a/src/stream/src/from_proto/chain.rs b/src/stream/src/from_proto/chain.rs index 32c4506cd7cc4..d73195cbd9bfc 100644 --- a/src/stream/src/from_proto/chain.rs +++ b/src/stream/src/from_proto/chain.rs @@ -23,8 +23,10 @@ use risingwave_storage::table::Distribution; use super::*; use crate::common::table::state_table::StateTable; +use crate::executor::external::ExternalStorageTable; use crate::executor::{ - BackfillExecutor, ChainExecutor, FlowControlExecutor, RearrangedChainExecutor, + BackfillExecutor, CdcBackfillExecutor, ChainExecutor, FlowControlExecutor, + RearrangedChainExecutor, }; pub struct ChainExecutorBuilder; @@ -84,19 +86,32 @@ impl ExecutorBuilder for ChainExecutorBuilder { .boxed() } ChainType::CdcBackfill => { - todo!("CdcBackfill is not supported yet") - // let cdc_backfill = CdcBackfillExecutor::new( - // params.actor_context.clone(), - // external_table, - // Box::new(source_exec), - // (0..source.columns.len()).collect_vec(), /* eliminate the last column (_rw_offset) */ - // None, - // schema.clone(), - // pk_indices, - // params.executor_stats, - // source_state_handler, - // source_ctrl_opts.chunk_size, - // ); + // todo!("CdcBackfill is not supported yet") + + let table_reader = + table_type.create_table_reader(source.properties.clone(), schema.clone())?; + let external_table = ExternalStorageTable::new( + TableId::new(source.source_id), + upstream_table_name, + table_reader, + schema.clone(), + order_types, + pk_indices.clone(), + (0..table_desc.columns.len()).collect_vec(), + ); + + let cdc_backfill = CdcBackfillExecutor::new( + params.actor_context.clone(), + external_table, + Box::new(source_exec), + (0..source.columns.len()).collect_vec(), /* eliminate the last column (_rw_offset) */ + None, + schema.clone(), + pk_indices, + params.executor_stats, + source_state_handler, + source_ctrl_opts.chunk_size, + ); } ChainType::Backfill => { let table_desc: &StorageTableDesc = node.get_table_desc()?;