Skip to content

Commit

Permalink
refactor(proto): use separate proto for cdc scan (#13502)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Nov 20, 2023
1 parent 5bb9776 commit a22e20c
Show file tree
Hide file tree
Showing 14 changed files with 222 additions and 147 deletions.
20 changes: 16 additions & 4 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -463,9 +463,6 @@ enum StreamScanType {

// ChainExecutor with upstream_only = true
STREAM_SCAN_TYPE_UPSTREAM_ONLY = 4;

// CdcBackfillExecutor
STREAM_SCAN_TYPE_CDC_BACKFILL = 5;
}

// StreamScanNode reads data from upstream table first, and then pass all events to downstream.
Expand Down Expand Up @@ -502,9 +499,23 @@ message StreamScanNode {

// Snapshot read every N barriers
uint32 snapshot_read_barrier_interval = 9 [deprecated = true];
}

message StreamCdcScanNode {
uint32 table_id = 1;

// The columns from the upstream table that'll be internally required by this stream scan node.
// Contains Primary Keys and Output columns.
repeated int32 upstream_column_ids = 2;

// Strips the primary key columns if they're unnecessary.
repeated uint32 output_indices = 3;

/// The state table used by Backfill operator for persisting internal state
catalog.Table state_table = 4;

// The external table that will be backfilled for CDC.
plan_common.ExternalTableDesc cdc_table_desc = 10;
plan_common.ExternalTableDesc cdc_table_desc = 5;
}

// BatchPlanNode is used for mv on mv snapshot read.
Expand Down Expand Up @@ -700,6 +711,7 @@ message StreamNode {
EowcOverWindowNode eowc_over_window = 136;
OverWindowNode over_window = 137;
StreamFsFetchNode stream_fs_fetch = 138;
StreamCdcScanNode stream_cdc_scan = 139;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down
5 changes: 5 additions & 0 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ fn visit_stream_node_tables_inner<F>(
optional!(node.state_table, "StreamScan")
}

// Stream Cdc Scan
NodeBody::StreamCdcScan(node) => {
always!(node.state_table, "StreamCdcScan")
}

// Note: add internal tables for new nodes here.
NodeBody::Materialize(node) if !internal_tables_only => {
always!(node.table, "Materialize")
Expand Down
54 changes: 5 additions & 49 deletions src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ use risingwave_common::catalog::{ColumnCatalog, Field};
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::{PbStreamNode, StreamScanType};
use risingwave_pb::stream_plan::PbStreamNode;

use super::stream::prelude::*;
use super::utils::{childless_record, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanNodeId, PlanRef, StreamNode};
use super::{generic, ExprRewritable, PlanBase, PlanRef, StreamNode};
use crate::catalog::ColumnId;
use crate::expr::ExprRewriter;
use crate::handler::create_source::debezium_cdc_source_schema;
Expand All @@ -37,13 +37,10 @@ use crate::{Explain, TableCatalog};
pub struct StreamCdcTableScan {
pub base: PlanBase<Stream>,
core: generic::CdcScan,
batch_plan_id: PlanNodeId,
stream_scan_type: StreamScanType,
}

impl StreamCdcTableScan {
pub fn new(core: generic::CdcScan) -> Self {
let batch_plan_id = core.ctx.next_plan_node_id();
let distribution = Distribution::SomeShard;
let base = PlanBase::new_stream_with_core(
&core,
Expand All @@ -52,12 +49,7 @@ impl StreamCdcTableScan {
false,
core.watermark_columns(),
);
Self {
base,
core,
batch_plan_id,
stream_scan_type: StreamScanType::CdcBackfill,
}
Self { base, core }
}

pub fn table_name(&self) -> &str {
Expand All @@ -68,10 +60,6 @@ impl StreamCdcTableScan {
&self.core
}

pub fn stream_scan_type(&self) -> StreamScanType {
StreamScanType::CdcBackfill
}

/// Build catalog for cdc backfill state
/// Right now we only persist whether the backfill is finished and the corresponding cdc offset
/// schema: | `split_id` | `pk...` | `backfill_finished` | `row_count` | `cdc_offset` |
Expand Down Expand Up @@ -165,20 +153,6 @@ impl StreamCdcTableScan {
.map(ColumnId::get_id)
.collect_vec();

// The schema of the snapshot read stream
let snapshot_schema = upstream_column_ids
.iter()
.map(|&id| {
let col = self
.core
.get_table_columns()
.iter()
.find(|c| c.column_id.get_id() == id)
.unwrap();
Field::from(col).to_prost()
})
.collect_vec();

// The schema of the shared cdc source upstream is different from snapshot,
// refer to `debezium_cdc_source_schema()` for details.
let upstream_schema = {
Expand All @@ -202,28 +176,19 @@ impl StreamCdcTableScan {
})
.collect_vec();

let batch_plan_node = BatchPlanNode {
table_desc: None,
column_ids: upstream_column_ids.clone(),
};

let catalog = self
.build_backfill_state_catalog(state)
.to_internal_table_prost();

// We need to pass the id of upstream source job here
let upstream_source_id = self.core.cdc_table_desc.source_id.table_id;
let node_body = PbNodeBody::StreamScan(StreamScanNode {
let node_body = PbNodeBody::StreamCdcScan(StreamCdcScanNode {
table_id: upstream_source_id,
stream_scan_type: self.stream_scan_type as i32,
// The column indices need to be forwarded to the downstream
output_indices,
upstream_column_ids,
output_indices,
// The table desc used by backfill executor
state_table: Some(catalog),
rate_limit: None,
cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()),
..Default::default()
});

PbStreamNode {
Expand All @@ -237,15 +202,6 @@ impl StreamCdcTableScan {
stream_key: vec![], // not used
..Default::default()
},
PbStreamNode {
node_body: Some(PbNodeBody::BatchPlan(batch_plan_node)),
operator_id: self.batch_plan_id.0 as u64,
identity: "BatchPlanNode".into(),
fields: snapshot_schema,
stream_key: vec![], // not used
input: vec![],
append_only: true,
},
],

node_body: Some(node_body),
Expand Down
4 changes: 1 addition & 3 deletions src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,7 @@ impl StreamTableScan {
// The required columns from the table (both scan and upstream).
let upstream_column_ids = match self.stream_scan_type {
// For backfill, we additionally need the primary key columns.
StreamScanType::Backfill | StreamScanType::CdcBackfill => {
self.core.output_and_pk_column_ids()
}
StreamScanType::Backfill => self.core.output_and_pk_column_ids(),
StreamScanType::Chain | StreamScanType::Rearrange | StreamScanType::UpstreamOnly => {
self.core.output_column_ids()
}
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/src/stream_fragmenter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ fn is_stateful_executor(stream_node: &StreamNode) -> bool {
| NodeBody::HashJoin(_)
| NodeBody::DeltaIndexJoin(_)
| NodeBody::StreamScan(_)
| NodeBody::StreamCdcScan(_)
| NodeBody::DynamicFilter(_)
)
}
Expand Down Expand Up @@ -289,6 +290,16 @@ fn build_fragment(
current_fragment.upstream_table_ids.push(node.table_id);
}

NodeBody::StreamCdcScan(node) => {
current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32;
// memorize table id for later use
// The table id could be a upstream CDC source
state
.dependent_table_ids
.insert(TableId::new(node.table_id));
current_fragment.upstream_table_ids.push(node.table_id);
}

NodeBody::Now(_) => {
// TODO: Remove this and insert a `BarrierRecv` instead.
current_fragment.fragment_type_mask |= FragmentTypeFlag::Now as u32;
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/utils/stream_graph_formatter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ impl StreamGraphFormatter {
"state table",
self.pretty_add_table(node.get_state_table().unwrap()),
)),
stream_node::NodeBody::StreamCdcScan(node) => fields.push((
"state table",
self.pretty_add_table(node.get_state_table().unwrap()),
)),
stream_node::NodeBody::Sort(node) => {
fields.push((
"state table",
Expand Down
2 changes: 1 addition & 1 deletion src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ fn fill_table_stream_graph_info(
}

// fill table id for cdc backfill
if let NodeBody::StreamScan(node) = node_body && table_job_type == TableJobType::SharedCdcSource {
if let NodeBody::StreamCdcScan(node) = node_body && table_job_type == TableJobType::SharedCdcSource {
if let Some(table) = node.cdc_table_desc.as_mut() {
table.table_id = table_id;
}
Expand Down
12 changes: 7 additions & 5 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,11 +364,13 @@ impl TableFragments {

/// Resolve dependent table
fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap<TableId, usize>) {
if let Some(NodeBody::StreamScan(stream_scan)) = stream_node.node_body.as_ref() {
table_ids
.entry(TableId::new(stream_scan.table_id))
.or_default()
.add_assign(1);
let table_id = match stream_node.node_body.as_ref() {
Some(NodeBody::StreamScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
Some(NodeBody::StreamCdcScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
_ => None,
};
if let Some(table_id) = table_id {
table_ids.entry(table_id).or_default().add_assign(1);
}

for child in &stream_node.input {
Expand Down
47 changes: 38 additions & 9 deletions src/meta/src/stream/stream_graph/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
use risingwave_pb::stream_plan::{
DispatchStrategy, Dispatcher, DispatcherType, MergeNode, StreamActor, StreamNode,
StreamScanType,
};

use super::id::GlobalFragmentIdsExt;
Expand Down Expand Up @@ -160,9 +159,6 @@ impl ActorBuilder {

// "Leaf" node `StreamScan`.
NodeBody::StreamScan(stream_scan) => {
let cdc_backfill =
stream_scan.stream_scan_type == StreamScanType::CdcBackfill as i32;

let input = stream_node.get_input();
assert_eq!(input.len(), 2);

Expand All @@ -188,11 +184,7 @@ impl ActorBuilder {
node_body: Some(NodeBody::Merge(MergeNode {
upstream_actor_id,
upstream_fragment_id: upstreams.fragment_id.as_global_id(),
upstream_dispatcher_type: if cdc_backfill {
DispatcherType::CdcTablename as _
} else {
DispatcherType::NoShuffle as _
},
upstream_dispatcher_type: DispatcherType::NoShuffle as _,
fields: merge_node.fields.clone(),
})),
..merge_node.clone()
Expand All @@ -206,6 +198,43 @@ impl ActorBuilder {
})
}

// "Leaf" node `StreamScan`.
NodeBody::StreamCdcScan(stream_scan) => {
let input = stream_node.get_input();
assert_eq!(input.len(), 1);

let merge_node = &input[0];
assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));

// Index the upstreams by the an external edge ID.
let upstreams = &self.upstreams[&EdgeId::UpstreamExternal {
upstream_table_id: stream_scan.table_id.into(),
downstream_fragment_id: self.fragment_id,
}];

// Upstream Cdc Source should be singleton.
let upstream_actor_id = upstreams.actors.as_global_ids();
assert_eq!(upstream_actor_id.len(), 1);

let input = vec![
// Fill the merge node body with correct upstream info.
StreamNode {
node_body: Some(NodeBody::Merge(MergeNode {
upstream_actor_id,
upstream_fragment_id: upstreams.fragment_id.as_global_id(),
upstream_dispatcher_type: DispatcherType::CdcTablename as _,
fields: merge_node.fields.clone(),
})),
..merge_node.clone()
},
];

Ok(StreamNode {
input,
..stream_node.clone()
})
}

// For other nodes, visit the children recursively.
_ => {
let mut new_stream_node = stream_node.clone();
Expand Down
23 changes: 15 additions & 8 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,20 @@ impl BuildingFragment {
let mut table_columns = HashMap::new();

stream_graph_visitor::visit_fragment(fragment, |node_body| {
if let NodeBody::StreamScan(stream_scan) = node_body {
let table_id = stream_scan.table_id.into();
let column_ids = stream_scan.upstream_column_ids.clone();
table_columns
.try_insert(table_id, column_ids)
.expect("currently there should be no two same upstream tables in a fragment");
}
let (table_id, column_ids) = match node_body {
NodeBody::StreamScan(stream_scan) => (
stream_scan.table_id.into(),
stream_scan.upstream_column_ids.clone(),
),
NodeBody::StreamCdcScan(stream_cdc_scan) => (
stream_cdc_scan.table_id.into(),
stream_cdc_scan.upstream_column_ids.clone(),
),
_ => return,
};
table_columns
.try_insert(table_id, column_ids)
.expect("currently there should be no two same upstream tables in a fragment");
});

assert_eq!(table_columns.len(), fragment.upstream_table_ids.len());
Expand Down Expand Up @@ -563,7 +570,7 @@ impl CompleteStreamFragmentGraph {
// extract the upstream full_table_name from the source fragment
let mut full_table_name = None;
visit_fragment(&mut fragment.inner, |node_body| {
if let NodeBody::StreamScan(stream_scan) = node_body {
if let NodeBody::StreamCdcScan(stream_scan) = node_body {
full_table_name = stream_scan
.cdc_table_desc
.as_ref()
Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ mod sink;
mod sort;
mod source;
mod stateless_simple_agg;
mod stream_cdc_scan;
mod stream_scan;
mod temporal_join;
mod top_n;
Expand Down Expand Up @@ -82,6 +83,7 @@ use self::sink::*;
use self::sort::*;
use self::source::*;
use self::stateless_simple_agg::*;
use self::stream_cdc_scan::*;
use self::stream_scan::*;
use self::temporal_join::*;
use self::top_n::*;
Expand Down Expand Up @@ -140,6 +142,7 @@ pub async fn create_executor(
NodeBody::HashJoin => HashJoinExecutorBuilder,
NodeBody::HopWindow => HopWindowExecutorBuilder,
NodeBody::StreamScan => StreamScanExecutorBuilder,
NodeBody::StreamCdcScan => StreamCdcScanExecutorBuilder,
NodeBody::BatchPlan => BatchQueryExecutorBuilder,
NodeBody::Merge => MergeExecutorBuilder,
NodeBody::Materialize => MaterializeExecutorBuilder,
Expand Down
Loading

0 comments on commit a22e20c

Please sign in to comment.