Skip to content

Commit

Permalink
save work: may pass connector properties via the first barrier
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Sep 22, 2023
1 parent 7d684b9 commit 8ac91c1
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 14 deletions.
1 change: 1 addition & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ message MergeNode {
repeated plan_common.Field fields = 4;
// Whether the upstream is a CDC stream, used in chaining a Table job with a CDC source job.
bool cdc_upstream = 5;
optional map<string, string> connector_properties = 6;
}

// passed from frontend to meta, used by fragmenter to generate `MergeNode`
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,23 @@ pub struct StreamTableScan {
logical: generic::Scan,
batch_plan_id: PlanNodeId,
chain_type: ChainType,
connector_properties: Option<HashMap<String, String>>,
}

impl StreamTableScan {
pub fn new(logical: generic::Scan) -> Self {
Self::new_with_chain_type(logical, ChainType::Backfill)
}

pub fn new_for_cdc_scan(
logical: generic::Scan,
connector_properties: HashMap<String, String>,
) -> Self {
let mut plan = Self::new_with_chain_type(logical, ChainType::CdcBackfill);
plan.connector_properties = Some(connector_properties);
plan
}

pub fn new_with_chain_type(logical: generic::Scan, chain_type: ChainType) -> Self {
let batch_plan_id = logical.ctx.next_plan_node_id();

Expand Down Expand Up @@ -271,6 +281,7 @@ impl StreamTableScan {
PbStreamNode {
node_body: Some(PbNodeBody::Merge(MergeNode {
cdc_upstream,
connector_properties: self.connector_properties.clone(),
..Default::default()
})),
identity: "Upstream".into(),
Expand Down
43 changes: 29 additions & 14 deletions src/stream/src/from_proto/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ use risingwave_storage::table::Distribution;

use super::*;
use crate::common::table::state_table::StateTable;
use crate::executor::external::ExternalStorageTable;
use crate::executor::{
BackfillExecutor, ChainExecutor, FlowControlExecutor, RearrangedChainExecutor,
BackfillExecutor, CdcBackfillExecutor, ChainExecutor, FlowControlExecutor,
RearrangedChainExecutor,
};

pub struct ChainExecutorBuilder;
Expand Down Expand Up @@ -84,19 +86,32 @@ impl ExecutorBuilder for ChainExecutorBuilder {
.boxed()
}
ChainType::CdcBackfill => {
todo!("CdcBackfill is not supported yet")
// let cdc_backfill = CdcBackfillExecutor::new(
// params.actor_context.clone(),
// external_table,
// Box::new(source_exec),
// (0..source.columns.len()).collect_vec(), /* eliminate the last column (_rw_offset) */
// None,
// schema.clone(),
// pk_indices,
// params.executor_stats,
// source_state_handler,
// source_ctrl_opts.chunk_size,
// );
// todo!("CdcBackfill is not supported yet")

let table_reader =
table_type.create_table_reader(source.properties.clone(), schema.clone())?;
let external_table = ExternalStorageTable::new(
TableId::new(source.source_id),
upstream_table_name,
table_reader,
schema.clone(),
order_types,
pk_indices.clone(),
(0..table_desc.columns.len()).collect_vec(),
);

let cdc_backfill = CdcBackfillExecutor::new(
params.actor_context.clone(),
external_table,
Box::new(source_exec),
(0..source.columns.len()).collect_vec(), /* eliminate the last column (_rw_offset) */
None,
schema.clone(),
pk_indices,
params.executor_stats,
source_state_handler,
source_ctrl_opts.chunk_size,
);
}
ChainType::Backfill => {
let table_desc: &StorageTableDesc = node.get_table_desc()?;
Expand Down

0 comments on commit 8ac91c1

Please sign in to comment.