From bf3975f984779fb16fda395e4fd39276ccdace17 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Fri, 24 Nov 2023 12:40:08 +0800 Subject: [PATCH] refactor(cdc): refactor query plan for table-on-source cdc backfill (#13553) Co-authored-by: Noel Kwan Co-authored-by: Noel Kwan <47273164+kwannoel@users.noreply.github.com> --- ci/scripts/e2e-source-test.sh | 9 +- e2e_test/source/cdc/cdc.check_new_rows.slt | 2 +- e2e_test/source/cdc/cdc.share_stream.slt | 16 +- proto/stream_plan.proto | 17 +- src/common/src/catalog/external_table.rs | 2 +- src/common/src/catalog/mod.rs | 4 +- src/common/src/util/column_index_mapping.rs | 1 - src/frontend/src/expr/mod.rs | 2 +- src/frontend/src/handler/create_source.rs | 12 +- .../plan_node/stream_cdc_table_scan.rs | 177 +++++++++++++++--- .../optimizer/plan_node/stream_exchange.rs | 2 - src/frontend/src/stream_fragmenter/mod.rs | 18 +- .../stream_fragmenter/rewrite/delta_join.rs | 2 - .../src/utils/stream_graph_formatter.rs | 3 +- src/meta/model_v2/src/actor_dispatcher.rs | 7 - src/meta/src/barrier/command.rs | 25 ++- src/meta/src/controller/fragment.rs | 1 - src/meta/src/manager/catalog/fragment.rs | 1 - src/meta/src/stream/scale.rs | 10 +- src/meta/src/stream/stream_graph/actor.rs | 51 ++--- src/meta/src/stream/stream_graph/fragment.rs | 66 +++---- src/meta/src/stream/stream_graph/schedule.rs | 2 - src/meta/src/stream/test_fragmenter.rs | 3 - src/stream/src/executor/dispatch.rs | 152 +-------------- src/stream/src/from_proto/cdc_filter.rs | 39 ++++ src/stream/src/from_proto/merge.rs | 2 +- src/stream/src/from_proto/mod.rs | 3 + src/stream/src/from_proto/stream_cdc_scan.rs | 8 +- 28 files changed, 307 insertions(+), 330 deletions(-) create mode 100644 src/stream/src/from_proto/cdc_filter.rs diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 2124b3cee3dfe..6ccb09dc72ff5 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -60,17 +60,16 @@ echo "--- mysql & postgres cdc validate test" sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.validate.mysql.slt' sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.validate.postgres.slt' +# cdc share stream test cases +export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456 +sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.share_stream.slt' + echo "--- mysql & postgres load and check" sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.load.slt' # wait for cdc loading sleep 10 sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.check.slt' -# cdc share stream test cases -export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456 -sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.share_stream.slt' - - # kill cluster cargo make kill echo "cluster killed " diff --git a/e2e_test/source/cdc/cdc.check_new_rows.slt b/e2e_test/source/cdc/cdc.check_new_rows.slt index 58362225660ea..cccf0272d9ff4 100644 --- a/e2e_test/source/cdc/cdc.check_new_rows.slt +++ b/e2e_test/source/cdc/cdc.check_new_rows.slt @@ -52,7 +52,7 @@ select v1, v2, v3 from mytable order by v1; query I SELECT * from products_test_cnt ---- -11 +12 query I SELECT * from orders_test_cnt diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index 5a6342a8cf35c..89d8416cd5c91 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -34,6 +34,17 @@ create table products_test ( id INT, PRIMARY KEY (id) ) from mysql_mytest table 'mytest.products'; +# check the fragment distribution +query TT +select distribution_type,flags from rw_fragments order by fragment_id; +---- +SINGLE {SOURCE} +HASH {MVIEW} +SINGLE {STREAM_SCAN} +SINGLE {CDC_FILTER} +HASH {SOURCE,DML} + + statement ok create table orders_test ( order_id int, @@ -51,13 +62,16 @@ create materialized view products_test_cnt as select count(*) as cnt from produc statement ok create materialized view orders_test_cnt as select count(*) as cnt from orders_test; +system ok +mysql --protocol=tcp -u root mytest -e "INSERT INTO products VALUES(default, 'Milk', '100ml Milk');" + sleep 5s # check ingestion results query I SELECT * from products_test_cnt ---- -9 +10 query I SELECT * from orders_test_cnt diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 4d0f989ba6ea0..fd7988413aeed 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -240,6 +240,12 @@ message FilterNode { expr.ExprNode search_condition = 1; } +message CdcFilterNode { + expr.ExprNode search_condition = 1; + uint32 upstream_source_id = 2; + repeated int32 upstream_column_ids = 3; +} + // A materialized view is regarded as a table. // In addition, we also specify primary key to MV for efficient point lookup during update and deletion. // @@ -725,6 +731,7 @@ message StreamNode { OverWindowNode over_window = 137; StreamFsFetchNode stream_fs_fetch = 138; StreamCdcScanNode stream_cdc_scan = 139; + CdcFilterNode cdc_filter = 140; } // The id for the operator. This is local per mview. // TODO: should better be a uint32. @@ -753,11 +760,6 @@ enum DispatcherType { // piped into the downstream actor, if there are the same number of actors. If number of actors // are not the same, should use hash instead. Should be only used when distribution is the same. DISPATCHER_TYPE_NO_SHUFFLE = 4; - - // Dispatch by table name from upstream DB, used in CDC scenario which should has only one downstream actor. - // From the optimizer's point of view, it can be treated as a specialized version of HASH distribution - // that the hash key is the upstream table name. - DISPATCHER_TYPE_CDC_TABLENAME = 5; } // The property of an edge in the fragment graph. @@ -766,8 +768,6 @@ message DispatchStrategy { DispatcherType type = 1; repeated uint32 dist_key_indices = 2; repeated uint32 output_indices = 3; - // The full table name of the downstream CDC table. - optional string downstream_table_name = 4; } // A dispatcher redistribute messages. @@ -789,8 +789,6 @@ message Dispatcher { uint64 dispatcher_id = 4; // Number of downstreams decides how many endpoints a dispatcher should dispatch. repeated uint32 downstream_actor_id = 5; - // The full table name of the downstream CDC table. - optional string downstream_table_name = 7; } // A StreamActor is a running fragment of the overall stream graph, @@ -824,6 +822,7 @@ enum FragmentTypeFlag { FRAGMENT_TYPE_FLAG_BARRIER_RECV = 32; FRAGMENT_TYPE_FLAG_VALUES = 64; FRAGMENT_TYPE_FLAG_DML = 128; + FRAGMENT_TYPE_FLAG_CDC_FILTER = 256; } // The environment associated with a stream plan diff --git a/src/common/src/catalog/external_table.rs b/src/common/src/catalog/external_table.rs index b006bbd50d362..f0af14b85b8ce 100644 --- a/src/common/src/catalog/external_table.rs +++ b/src/common/src/catalog/external_table.rs @@ -29,7 +29,7 @@ pub struct CdcTableDesc { /// Id of the upstream source in sharing cdc mode pub source_id: TableId, - /// The full name of the table in external database, e.g. `database_name.table.name` in MySQL + /// The full name of the table in external database, e.g. `database_name.table_name` in MySQL /// and `schema_name.table_name` in the Postgres. pub external_table_name: String, /// The key used to sort in storage. diff --git a/src/common/src/catalog/mod.rs b/src/common/src/catalog/mod.rs index 204a5005cd2de..6207b82538dff 100644 --- a/src/common/src/catalog/mod.rs +++ b/src/common/src/catalog/mod.rs @@ -106,7 +106,9 @@ pub fn row_id_column_desc() -> ColumnDesc { pub const OFFSET_COLUMN_NAME: &str = "_rw_offset"; -pub const CDC_SOURCE_COLUMN_NUM: u32 = 4; +// The number of columns output by the cdc source job +// see `debezium_cdc_source_schema()` for details +pub const CDC_SOURCE_COLUMN_NUM: u32 = 3; pub const TABLE_NAME_COLUMN_NAME: &str = "_rw_table_name"; pub fn is_offset_column_name(name: &str) -> bool { diff --git a/src/common/src/util/column_index_mapping.rs b/src/common/src/util/column_index_mapping.rs index 49aac42d0815d..212c07df1e285 100644 --- a/src/common/src/util/column_index_mapping.rs +++ b/src/common/src/util/column_index_mapping.rs @@ -335,7 +335,6 @@ impl ColIndexMapping { r#type: strategy.r#type, dist_key_indices: map(&strategy.dist_key_indices)?, output_indices: map(&strategy.output_indices)?, - downstream_table_name: strategy.downstream_table_name.clone(), }) } } diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index c3aecb060afc6..dcac41f85070e 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -312,7 +312,7 @@ impl ExprImpl { /// /// TODO: This is a naive implementation. We should avoid proto ser/de. /// Tracking issue: - async fn eval_row(&self, input: &OwnedRow) -> RwResult { + pub async fn eval_row(&self, input: &OwnedRow) -> RwResult { let backend_expr = build_from_prost(&self.to_expr_proto())?; Ok(backend_expr.eval_row(input).await?) } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index cf2ed281805be..7c8e597bad3db 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -1250,12 +1250,13 @@ pub mod tests { use std::collections::HashMap; use risingwave_common::catalog::{ - DEFAULT_DATABASE_NAME, DEFAULT_KEY_COLUMN_NAME, DEFAULT_SCHEMA_NAME, OFFSET_COLUMN_NAME, - ROWID_PREFIX, TABLE_NAME_COLUMN_NAME, + CDC_SOURCE_COLUMN_NUM, DEFAULT_DATABASE_NAME, DEFAULT_KEY_COLUMN_NAME, DEFAULT_SCHEMA_NAME, + OFFSET_COLUMN_NAME, ROWID_PREFIX, TABLE_NAME_COLUMN_NAME, }; use risingwave_common::types::DataType; use crate::catalog::root_catalog::SchemaPath; + use crate::handler::create_source::debezium_cdc_source_schema; use crate::test_utils::{create_proto_file, LocalFrontend, PROTO_FILE_DATA}; #[tokio::test] @@ -1341,4 +1342,11 @@ pub mod tests { }; assert_eq!(columns, expected_columns); } + + #[tokio::test] + async fn test_cdc_source_job_schema() { + let columns = debezium_cdc_source_schema(); + // make sure it doesn't broken by future PRs + assert_eq!(CDC_SOURCE_COLUMN_NUM, columns.len() as u32); + } } diff --git a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs index b725378575ec4..58db1a9b0e319 100644 --- a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs @@ -14,7 +14,7 @@ use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; -use risingwave_common::catalog::{ColumnCatalog, Field}; +use risingwave_common::catalog::Field; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::OrderType; use risingwave_pb::stream_plan::stream_node::PbNodeBody; @@ -24,7 +24,7 @@ use super::stream::prelude::*; use super::utils::{childless_record, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, StreamNode}; use crate::catalog::ColumnId; -use crate::expr::{ExprRewriter, ExprVisitor}; +use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, FunctionCall, InputRef}; use crate::handler::create_source::debezium_cdc_source_schema; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder}; @@ -146,25 +146,85 @@ impl StreamCdcTableScan { .map(|x| *x as u32) .collect_vec(); - // The required columns from the table (both scan and upstream). - let upstream_column_ids = self - .core - .output_and_pk_column_ids() - .iter() - .map(ColumnId::get_id) - .collect_vec(); - // The schema of the shared cdc source upstream is different from snapshot, // refer to `debezium_cdc_source_schema()` for details. - let upstream_schema = { - let mut columns = debezium_cdc_source_schema(); - columns.push(ColumnCatalog::row_id_column()); + let cdc_source_schema = { + let columns = debezium_cdc_source_schema(); columns .into_iter() .map(|c| Field::from(c.column_desc).to_prost()) .collect_vec() }; + let catalog = self + .build_backfill_state_catalog(state) + .to_internal_table_prost(); + + // We need to pass the id of upstream source job here + let upstream_source_id = self.core.cdc_table_desc.source_id.table_id; + + // split the table name from the qualified table name, e.g. `database_name.table_name` + let (_, cdc_table_name) = self + .core + .cdc_table_desc + .external_table_name + .split_once('.') + .unwrap(); + + // jsonb filter expr: payload->'source'->>'table' = + let filter_expr = Self::build_cdc_filter_expr(cdc_table_name); + + let filter_operator_id = self.core.ctx.next_plan_node_id(); + // The filter node receive chunks in `(payload, _rw_offset, _rw_table_name)` schema + let filter_stream_node = StreamNode { + operator_id: filter_operator_id.0 as _, + input: vec![ + // The merge node body will be filled by the `ActorBuilder` on the meta service. + PbStreamNode { + node_body: Some(PbNodeBody::Merge(Default::default())), + identity: "Upstream".into(), + fields: cdc_source_schema.clone(), + stream_key: vec![], // not used + ..Default::default() + }, + ], + stream_key: vec![], // not used + append_only: true, + identity: "StreamCdcFilter".to_string(), + fields: cdc_source_schema.clone(), + node_body: Some(PbNodeBody::CdcFilter(CdcFilterNode { + search_condition: Some(filter_expr.to_expr_proto()), + upstream_source_id, + upstream_column_ids: vec![], // not used, + })), + }; + + let exchange_operator_id = self.core.ctx.next_plan_node_id(); + // Add a simple exchange node between filter and stream scan + let exchange_stream_node = StreamNode { + operator_id: exchange_operator_id.0 as _, + input: vec![filter_stream_node], + stream_key: vec![], // not used + append_only: true, + identity: "Exchange".to_string(), + fields: cdc_source_schema.clone(), + node_body: Some(PbNodeBody::Exchange(ExchangeNode { + strategy: Some(DispatchStrategy { + r#type: DispatcherType::Simple as _, + dist_key_indices: vec![], // simple exchange doesn't need dist key + output_indices: (0..cdc_source_schema.len() as u32).collect(), + }), + })), + }; + + // The required columns from the external table + let upstream_column_ids = self + .core + .output_and_pk_column_ids() + .iter() + .map(ColumnId::get_id) + .collect_vec(); + let output_indices = self .core .output_column_ids() @@ -177,13 +237,14 @@ impl StreamCdcTableScan { }) .collect_vec(); - let catalog = self - .build_backfill_state_catalog(state) - .to_internal_table_prost(); + tracing::debug!( + "output_column_ids: {:?}, upstream_column_ids: {:?}, output_indices: {:?}", + self.core.output_column_ids(), + upstream_column_ids, + output_indices + ); - // We need to pass the id of upstream source job here - let upstream_source_id = self.core.cdc_table_desc.source_id.table_id; - let node_body = PbNodeBody::StreamCdcScan(StreamCdcScanNode { + let stream_scan_body = PbNodeBody::StreamCdcScan(StreamCdcScanNode { table_id: upstream_source_id, upstream_column_ids, output_indices, @@ -192,26 +253,46 @@ impl StreamCdcTableScan { cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()), }); + // plan: merge -> filter -> exchange(simple) -> stream_scan PbStreamNode { fields: self.schema().to_prost(), - input: vec![ - // The merge node body will be filled by the `ActorBuilder` on the meta service. - PbStreamNode { - node_body: Some(PbNodeBody::Merge(Default::default())), - identity: "Upstream".into(), - fields: upstream_schema.clone(), - stream_key: vec![], // not used - ..Default::default() - }, - ], - - node_body: Some(node_body), + input: vec![exchange_stream_node], + node_body: Some(stream_scan_body), stream_key, operator_id: self.base.id().0 as u64, identity: self.distill_to_string(), append_only: self.append_only(), } } + + pub fn build_cdc_filter_expr(cdc_table_name: &str) -> ExprImpl { + // jsonb filter expr: payload->'source'->>'table' = + FunctionCall::new( + ExprType::Equal, + vec![ + FunctionCall::new( + ExprType::JsonbAccessStr, + vec![ + FunctionCall::new( + ExprType::JsonbAccess, + vec![ + InputRef::new(0, DataType::Jsonb).into(), + ExprImpl::literal_varchar("source".into()), + ], + ) + .unwrap() + .into(), + ExprImpl::literal_varchar("table".into()), + ], + ) + .unwrap() + .into(), + ExprImpl::literal_varchar(cdc_table_name.into()), + ], + ) + .unwrap() + .into() + } } impl ExprRewritable for StreamCdcTableScan { @@ -231,3 +312,37 @@ impl ExprVisitable for StreamCdcTableScan { self.core.visit_exprs(v); } } + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use risingwave_common::row::OwnedRow; + use risingwave_common::types::{JsonbVal, ScalarImpl}; + + use super::*; + + #[tokio::test] + async fn test_cdc_filter_expr() { + let t1_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 111, "v2": 222.2 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t1", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 774, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap(); + let t2_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 333, "v2": 666.6 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t2", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 884, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap(); + let row1 = OwnedRow::new(vec![ + Some(t1_json.into()), + Some(r#"{"file": "1.binlog", "pos": 100}"#.into()), + ]); + let row2 = OwnedRow::new(vec![ + Some(t2_json.into()), + Some(r#"{"file": "2.binlog", "pos": 100}"#.into()), + ]); + + let filter_expr = StreamCdcTableScan::build_cdc_filter_expr("t1"); + assert_eq!( + filter_expr.eval_row(&row1).await.unwrap(), + Some(ScalarImpl::Bool(true)) + ); + assert_eq!( + filter_expr.eval_row(&row2).await.unwrap(), + Some(ScalarImpl::Bool(false)) + ); + } +} diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs index 1fceee4db6401..759c504eb84e1 100644 --- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -116,7 +116,6 @@ impl StreamNode for StreamExchange { r#type: DispatcherType::NoShuffle as i32, dist_key_indices: vec![], output_indices: (0..self.schema().len() as u32).collect(), - downstream_table_name: None, }) } else { Some(DispatchStrategy { @@ -133,7 +132,6 @@ impl StreamNode for StreamExchange { _ => vec![], }, output_indices: (0..self.schema().len() as u32).collect(), - downstream_table_name: None, }) }, }) diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index 20fc741f64397..a5da1e92dadfd 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -292,14 +292,21 @@ fn build_fragment( current_fragment.upstream_table_ids.push(node.table_id); } - NodeBody::StreamCdcScan(node) => { + NodeBody::StreamCdcScan(_) => { current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32; - // memorize table id for later use - // The table id could be a upstream CDC source + // the backfill algorithm is not parallel safe + current_fragment.requires_singleton = true; + } + + NodeBody::CdcFilter(node) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::CdcFilter as u32; + // memorize upstream source id for later use state .dependent_table_ids - .insert(TableId::new(node.table_id)); - current_fragment.upstream_table_ids.push(node.table_id); + .insert(TableId::new(node.upstream_source_id)); + current_fragment + .upstream_table_ids + .push(node.upstream_source_id); } NodeBody::Now(_) => { @@ -376,7 +383,6 @@ fn build_fragment( r#type: DispatcherType::NoShuffle as i32, dist_key_indices: vec![], output_indices: (0..ref_fragment_node.fields.len() as u32).collect(), - downstream_table_name: None, }; let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64; diff --git a/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs b/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs index c108488b2aac7..b09dc847fc3fd 100644 --- a/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs +++ b/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs @@ -71,7 +71,6 @@ fn dispatch_no_shuffle(output_indices: Vec) -> DispatchStrategy { r#type: DispatcherType::NoShuffle.into(), dist_key_indices: vec![], output_indices, - downstream_table_name: None, } } @@ -84,7 +83,6 @@ fn dispatch_consistent_hash_shuffle( r#type: DispatcherType::Hash.into(), dist_key_indices, output_indices, - downstream_table_name: None, } } diff --git a/src/frontend/src/utils/stream_graph_formatter.rs b/src/frontend/src/utils/stream_graph_formatter.rs index 8b022dbbe3da3..489c0bd4e453c 100644 --- a/src/frontend/src/utils/stream_graph_formatter.rs +++ b/src/frontend/src/utils/stream_graph_formatter.rs @@ -148,8 +148,6 @@ impl StreamGraphFormatter { DispatcherType::Broadcast => "Broadcast".to_string(), DispatcherType::Simple => "Single".to_string(), DispatcherType::NoShuffle => "NoShuffle".to_string(), - DispatcherType::CdcTablename => - format!("CdcTableName({:?})", dist.downstream_table_name), }, upstream_fragment_id ) @@ -299,6 +297,7 @@ impl StreamGraphFormatter { } stream_node::NodeBody::Project(_) | stream_node::NodeBody::Filter(_) + | stream_node::NodeBody::CdcFilter(_) | stream_node::NodeBody::StatelessSimpleAgg(_) | stream_node::NodeBody::HopWindow(_) | stream_node::NodeBody::Merge(_) diff --git a/src/meta/model_v2/src/actor_dispatcher.rs b/src/meta/model_v2/src/actor_dispatcher.rs index 07986df61fb0e..a8402a58ff187 100644 --- a/src/meta/model_v2/src/actor_dispatcher.rs +++ b/src/meta/model_v2/src/actor_dispatcher.rs @@ -28,8 +28,6 @@ pub enum DispatcherType { Simple, #[sea_orm(string_value = "NO_SHUFFLE")] NoShuffle, - #[sea_orm(string_value = "CDC_TABLE_NAME")] - CdcTableName, } impl From for DispatcherType { @@ -40,7 +38,6 @@ impl From for DispatcherType { PbDispatcherType::Broadcast => DispatcherType::Broadcast, PbDispatcherType::Simple => DispatcherType::Simple, PbDispatcherType::NoShuffle => DispatcherType::NoShuffle, - PbDispatcherType::CdcTablename => DispatcherType::CdcTableName, } } } @@ -52,7 +49,6 @@ impl From for PbDispatcherType { DispatcherType::Broadcast => PbDispatcherType::Broadcast, DispatcherType::Simple => PbDispatcherType::Simple, DispatcherType::NoShuffle => PbDispatcherType::NoShuffle, - DispatcherType::CdcTableName => PbDispatcherType::CdcTablename, } } } @@ -68,7 +64,6 @@ impl From<(u32, PbDispatcher)> for Model { hash_mapping: dispatcher.hash_mapping.map(ActorMapping), dispatcher_id: dispatcher.dispatcher_id as _, downstream_actor_ids: dispatcher.downstream_actor_id.into(), - downstream_table_name: dispatcher.downstream_table_name, } } } @@ -82,7 +77,6 @@ impl From for PbDispatcher { hash_mapping: model.hash_mapping.map(|mapping| mapping.into_inner()), dispatcher_id: model.dispatcher_id as _, downstream_actor_id: model.downstream_actor_ids.into_u32_array(), - downstream_table_name: model.downstream_table_name, } } } @@ -99,7 +93,6 @@ pub struct Model { pub hash_mapping: Option, pub dispatcher_id: FragmentId, pub downstream_actor_ids: I32Array, - pub downstream_table_name: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 359f7633437dc..ce5ec2c09f040 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -27,8 +27,8 @@ use risingwave_pb::stream_plan::barrier::{BarrierKind, Mutation}; use risingwave_pb::stream_plan::throttle_mutation::RateLimit; use risingwave_pb::stream_plan::update_mutation::*; use risingwave_pb::stream_plan::{ - AddMutation, Dispatcher, Dispatchers, PauseMutation, ResumeMutation, SourceChangeSplitMutation, - StopMutation, ThrottleMutation, UpdateMutation, + AddMutation, Dispatcher, Dispatchers, FragmentTypeFlag, PauseMutation, ResumeMutation, + SourceChangeSplitMutation, StopMutation, ThrottleMutation, UpdateMutation, }; use risingwave_pb::stream_service::{DropActorsRequest, WaitEpochCommitRequest}; use risingwave_rpc_client::StreamClientPoolRef; @@ -586,12 +586,21 @@ impl CommandContext { dispatchers, table_fragments, .. - } => dispatchers - .values() - .flatten() - .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter().copied()) - .chain(table_fragments.values_actor_ids()) - .collect(), + } => { + // cdc backfill table job doesn't need to be tracked + if table_fragments.fragments().iter().any(|fragment| { + fragment.fragment_type_mask & FragmentTypeFlag::CdcFilter as u32 != 0 + }) { + Default::default() + } else { + dispatchers + .values() + .flatten() + .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter().copied()) + .chain(table_fragments.values_actor_ids()) + .collect() + } + } _ => Default::default(), } } diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 05c88a44d3a78..dc36c6466e57c 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -860,7 +860,6 @@ impl CatalogController { r#type: d.r#type, dist_key_indices: d.dist_key_indices.clone(), output_indices: d.output_indices.clone(), - downstream_table_name: d.downstream_table_name.clone(), }; (fragment_id, strategy) }) diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index d0c41f618ddc1..570813a7ab53a 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -1336,7 +1336,6 @@ impl FragmentManager { r#type: d.r#type, dist_key_indices: d.dist_key_indices.clone(), output_indices: d.output_indices.clone(), - downstream_table_name: d.downstream_table_name.clone(), }; (fragment_id, strategy) }) diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 9656b2409226c..2da61c4e9fb9a 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -1369,10 +1369,7 @@ impl ScaleController { match upstream_dispatch_type { DispatcherType::Unspecified => unreachable!(), - DispatcherType::Hash - | DispatcherType::Broadcast - | DispatcherType::Simple - | DispatcherType::CdcTablename => { + DispatcherType::Hash | DispatcherType::Broadcast | DispatcherType::Simple => { let upstream_fragment = &ctx.fragment_map.get(upstream_fragment_id).unwrap(); let mut upstream_actor_ids = upstream_fragment .actors @@ -1454,10 +1451,7 @@ impl ScaleController { fragment_actors_to_create.get(&downstream_fragment_id); match dispatcher.r#type() { - d @ (DispatcherType::Hash - | DispatcherType::Simple - | DispatcherType::Broadcast - | DispatcherType::CdcTablename) => { + d @ (DispatcherType::Hash | DispatcherType::Simple | DispatcherType::Broadcast) => { if let Some(downstream_actors_to_remove) = downstream_fragment_actors_to_remove { dispatcher diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index 5120e8e26869b..4a5339431f0aa 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -198,17 +198,24 @@ impl ActorBuilder { }) } - // "Leaf" node `StreamScan`. - NodeBody::StreamCdcScan(stream_scan) => { + // "Leaf" node `CdcFilter` used in multi-table cdc backfill plan: + // cdc_filter -> backfill -> mview + NodeBody::CdcFilter(node) => { let input = stream_node.get_input(); assert_eq!(input.len(), 1); let merge_node = &input[0]; assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_))); + let upstream_source_id = node.upstream_source_id; + tracing::debug!( + "rewrite leaf cdc filter node: upstream source id {}", + upstream_source_id, + ); + // Index the upstreams by the an external edge ID. let upstreams = &self.upstreams[&EdgeId::UpstreamExternal { - upstream_table_id: stream_scan.table_id.into(), + upstream_table_id: upstream_source_id.into(), downstream_fragment_id: self.fragment_id, }]; @@ -216,19 +223,19 @@ impl ActorBuilder { let upstream_actor_id = upstreams.actors.as_global_ids(); assert_eq!(upstream_actor_id.len(), 1); + // rewrite the input of `CdcFilter` let input = vec![ // Fill the merge node body with correct upstream info. StreamNode { node_body: Some(NodeBody::Merge(MergeNode { upstream_actor_id, upstream_fragment_id: upstreams.fragment_id.as_global_id(), - upstream_dispatcher_type: DispatcherType::CdcTablename as _, + upstream_dispatcher_type: DispatcherType::NoShuffle as _, fields: merge_node.fields.clone(), })), ..merge_node.clone() }, ]; - Ok(StreamNode { input, ..stream_node.clone() @@ -389,7 +396,6 @@ impl ActorGraphBuildStateInner { hash_mapping: Some(downstream_actor_mapping.to_protobuf()), dispatcher_id: downstream_fragment_id.as_global_id() as u64, downstream_actor_id: downstream_actors.as_global_ids(), - downstream_table_name: strategy.downstream_table_name.clone(), } } @@ -409,28 +415,6 @@ impl ActorGraphBuildStateInner { hash_mapping: None, dispatcher_id: downstream_fragment_id.as_global_id() as u64, downstream_actor_id: downstream_actors.as_global_ids(), - downstream_table_name: None, - } - } - - /// Create a new dispatcher for cdc event dispatch. - fn new_cdc_dispatcher( - strategy: &DispatchStrategy, - downstream_fragment_id: GlobalFragmentId, - downstream_actors: &[GlobalActorId], - ) -> Dispatcher { - // dist key is the index to `_rw_table_name` column - assert_eq!(strategy.dist_key_indices.len(), 1); - assert!(strategy.downstream_table_name.is_some()); - - Dispatcher { - r#type: strategy.r#type, - dist_key_indices: strategy.dist_key_indices.clone(), - output_indices: strategy.output_indices.clone(), - hash_mapping: None, - dispatcher_id: downstream_fragment_id.as_global_id() as u64, - downstream_actor_id: downstream_actors.as_global_ids(), - downstream_table_name: strategy.downstream_table_name.clone(), } } @@ -531,10 +515,7 @@ impl ActorGraphBuildStateInner { } // Otherwise, make m * n links between the actors. - DispatcherType::Hash - | DispatcherType::Broadcast - | DispatcherType::Simple - | DispatcherType::CdcTablename => { + DispatcherType::Hash | DispatcherType::Broadcast | DispatcherType::Simple => { // Add dispatchers for the upstream actors. let dispatcher = if let DispatcherType::Hash = dt { // Transform the `ParallelUnitMapping` from the downstream distribution to the @@ -556,12 +537,6 @@ impl ActorGraphBuildStateInner { downstream.actor_ids, actor_mapping, ) - } else if let DispatcherType::CdcTablename = dt { - Self::new_cdc_dispatcher( - &edge.dispatch_strategy, - downstream.fragment_id, - downstream.actor_ids, - ) } else { Self::new_normal_dispatcher( &edge.dispatch_strategy, diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 0de28324a1e76..a47380e21950f 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -22,11 +22,10 @@ use enum_as_inner::EnumAsInner; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::catalog::{ - generate_internal_table_name_with_type, TableId, CDC_SOURCE_COLUMN_NUM, TABLE_NAME_COLUMN_NAME, + generate_internal_table_name_with_type, TableId, CDC_SOURCE_COLUMN_NUM, }; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::stream_graph_visitor; -use risingwave_common::util::stream_graph_visitor::visit_fragment; use risingwave_pb::catalog::Table; use risingwave_pb::ddl_service::TableJobType; use risingwave_pb::meta::table_fragments::Fragment; @@ -175,9 +174,9 @@ impl BuildingFragment { stream_scan.table_id.into(), stream_scan.upstream_column_ids.clone(), ), - NodeBody::StreamCdcScan(stream_cdc_scan) => ( - stream_cdc_scan.table_id.into(), - stream_cdc_scan.upstream_column_ids.clone(), + NodeBody::CdcFilter(cdc_filter) => ( + cdc_filter.upstream_source_id.into(), + cdc_filter.upstream_column_ids.clone(), ), _ => return, }; @@ -186,7 +185,12 @@ impl BuildingFragment { .expect("currently there should be no two same upstream tables in a fragment"); }); - assert_eq!(table_columns.len(), fragment.upstream_table_ids.len()); + assert_eq!( + table_columns.len(), + fragment.upstream_table_ids.len(), + "fragment type: {}", + fragment.fragment_type_mask + ); table_columns } @@ -567,51 +571,36 @@ impl CompleteStreamFragmentGraph { for (&upstream_table_id, output_columns) in &fragment.upstream_table_columns { let (up_fragment_id, edge) = match table_job_type.as_ref() { Some(TableJobType::SharedCdcSource) => { - // extract the upstream full_table_name from the source fragment - let mut full_table_name = None; - visit_fragment(&mut fragment.inner, |node_body| { - if let NodeBody::StreamCdcScan(stream_scan) = node_body { - full_table_name = stream_scan - .cdc_table_desc - .as_ref() - .map(|desc| desc.table_name.clone()); - } - }); - - let source_fragment = - upstream_root_fragments - .get(&upstream_table_id) - .context("upstream materialized view fragment not found")?; + let source_fragment = upstream_root_fragments + .get(&upstream_table_id) + .context("upstream source fragment not found")?; let source_job_id = GlobalFragmentId::new(source_fragment.fragment_id); - // extract `_rw_table_name` column index - let rw_table_name_index = { - let node = source_fragment.actors[0].get_nodes().unwrap(); - // may remove the expect to extend other scenarios, currently only target the CDC scenario - node.fields - .iter() - .position(|f| f.name.as_str() == TABLE_NAME_COLUMN_NAME) - .expect("table name column not found") - }; + // we traverse all fragments in the graph, and we should find out the + // CdcFilter fragment and add an edge between upstream source fragment and it. + assert_ne!( + (fragment.fragment_type_mask & FragmentTypeFlag::CdcFilter as u32), + 0 + ); - assert!(full_table_name.is_some()); tracing::debug!( - ?full_table_name, ?source_job_id, - ?rw_table_name_index, ?output_columns, - "StreamScan with upstream source fragment" + identity = ?fragment.inner.get_node().unwrap().get_identity(), + current_frag_id=?id, + "CdcFilter with upstream source fragment" ); let edge = StreamFragmentEdge { id: EdgeId::UpstreamExternal { upstream_table_id, downstream_fragment_id: id, }, + // We always use `NoShuffle` for the exchange between the upstream + // `Source` and the downstream `StreamScan` of the new cdc table. dispatch_strategy: DispatchStrategy { - r#type: DispatcherType::CdcTablename as _, /* there may have multiple downstream table jobs, so we use `Hash` here */ - dist_key_indices: vec![rw_table_name_index as _], /* index to `_rw_table_name` column */ - output_indices: (0..CDC_SOURCE_COLUMN_NUM as _).collect_vec(), /* require all columns from the cdc source */ - downstream_table_name: full_table_name, + r#type: DispatcherType::NoShuffle as _, + dist_key_indices: vec![], // not used for `NoShuffle` + output_indices: (0..CDC_SOURCE_COLUMN_NUM as _).collect(), }, }; @@ -660,7 +649,6 @@ impl CompleteStreamFragmentGraph { r#type: DispatcherType::NoShuffle as _, dist_key_indices: vec![], // not used for `NoShuffle` output_indices, - downstream_table_name: None, }, }; diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index f24db98220678..823e9fb9fd204 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -110,8 +110,6 @@ crepe::crepe! { // The downstream fragment of a `Simple` edge must be singleton. SingletonReq(y) <- Edge(_, y, Simple); - // The downstream fragment of a `CdcTablename` edge must be singleton. - SingletonReq(y) <- Edge(_, y, CdcTablename); // Singleton requirements propagate through `NoShuffle` edges. SingletonReq(x) <- Edge(x, y, NoShuffle), SingletonReq(y); SingletonReq(y) <- Edge(x, y, NoShuffle), SingletonReq(x); diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index 4d59533b068bc..1ad03e1967d1a 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -228,7 +228,6 @@ fn make_stream_fragments() -> Vec { r#type: DispatcherType::Hash as i32, dist_key_indices: vec![0], output_indices: vec![0, 1, 2], - ..Default::default() }), })), fields: vec![ @@ -391,7 +390,6 @@ fn make_fragment_edges() -> Vec { r#type: DispatcherType::Simple as i32, dist_key_indices: vec![], output_indices: vec![], - ..Default::default() }), link_id: 4, upstream_id: 1, @@ -402,7 +400,6 @@ fn make_fragment_edges() -> Vec { r#type: DispatcherType::Hash as i32, dist_key_indices: vec![0], output_indices: vec![], - ..Default::default() }), link_id: 1, upstream_id: 2, diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index cfc714e28b90d..20ea1498fb226 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -25,8 +25,6 @@ use itertools::Itertools; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::BitmapBuilder; use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode}; -use risingwave_common::row::Row; -use risingwave_common::types::ScalarRefImpl; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate; use risingwave_pb::stream_plan::PbDispatcher; @@ -347,7 +345,6 @@ pub enum DispatcherImpl { Broadcast(BroadcastDispatcher), Simple(SimpleDispatcher), RoundRobin(RoundRobinDataDispatcher), - CdcTableName(CdcTableNameDispatcher), } impl DispatcherImpl { @@ -389,29 +386,6 @@ impl DispatcherImpl { dispatcher.dispatcher_id, )) } - CdcTablename => { - assert!(!outputs.is_empty()); - assert!(dispatcher.downstream_table_name.is_some()); - let dist_key_indices: Vec = dispatcher - .dist_key_indices - .iter() - .map(|i| *i as usize) - .collect_vec(); - - assert_eq!( - dist_key_indices.len(), - 1, - "expect only one table name column index" - ); - DispatcherImpl::CdcTableName(CdcTableNameDispatcher::new( - outputs, - dist_key_indices[0], - output_indices, - dispatcher.dispatcher_id, - dispatcher.downstream_table_name.clone(), - )) - } - Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new( outputs, output_indices, @@ -492,8 +466,7 @@ macro_rules! for_all_dispatcher_variants { { Hash }, { Broadcast }, { Simple }, - { RoundRobin }, - { CdcTableName } + { RoundRobin } } }; } @@ -845,129 +818,6 @@ impl Dispatcher for BroadcastDispatcher { } } -/// Dispatch stream chunk based on table name from upstream DB -#[derive(Debug)] -pub struct CdcTableNameDispatcher { - outputs: Vec, - // column index to the `_rw_table_name` column - table_name_col_index: usize, - output_indices: Vec, - dispatcher_id: DispatcherId, - dispatcher_id_str: String, - downstream_table_name: Option, -} - -impl CdcTableNameDispatcher { - pub fn new( - outputs: Vec, - table_name_col_index: usize, - output_indices: Vec, - dispatcher_id: DispatcherId, - downstream_table_name: Option, - ) -> Self { - Self { - outputs, - table_name_col_index, - output_indices, - dispatcher_id, - dispatcher_id_str: dispatcher_id.to_string(), - downstream_table_name, - } - } -} - -impl Dispatcher for CdcTableNameDispatcher { - async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> { - let num_outputs = self.outputs.len(); - - let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity())) - .take(num_outputs) - .collect_vec(); - - let chunk = chunk.project(&self.output_indices); - - // TODO: use a more efficient way to filter data, e.g. add a Filter node before Chain - for (visible, row) in chunk - .visibility() - .iter() - .zip_eq_fast(chunk.data_chunk().rows_with_holes()) - { - // Build visibility map for every output chunk. - for vis_map in &mut vis_maps { - let should_emit = if let Some(row) = row - && let Some(full_table_name) = self.downstream_table_name.as_ref() - { - let table_name_datum = row.datum_at(self.table_name_col_index).unwrap(); - tracing::trace!(target: "events::stream::dispatch::cdc", "keys: {:?}, table: {}", self.table_name_col_index, full_table_name); - // dispatch based on downstream table name - table_name_datum == ScalarRefImpl::Utf8(full_table_name) - } else { - true - }; - vis_map.append(visible && should_emit); - } - } - - for (vis_map, output) in vis_maps.into_iter().zip_eq_fast(self.outputs.iter_mut()) { - let vis_map = vis_map.finish(); - let new_stream_chunk = - StreamChunk::with_visibility(chunk.ops(), chunk.columns().into(), vis_map); - if new_stream_chunk.cardinality() > 0 { - event!( - tracing::Level::TRACE, - msg = "chunk", - downstream = output.actor_id(), - "send = \n{:#?}", - new_stream_chunk - ); - output.send(Message::Chunk(new_stream_chunk)).await?; - } - } - - Ok(()) - } - - async fn dispatch_barrier(&mut self, barrier: Barrier) -> StreamResult<()> { - // always broadcast barrier - for output in &mut self.outputs { - output.send(Message::Barrier(barrier.clone())).await?; - } - Ok(()) - } - - async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> { - if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) { - // always broadcast watermark - for output in &mut self.outputs { - output.send(Message::Watermark(watermark.clone())).await?; - } - } - Ok(()) - } - - fn add_outputs(&mut self, outputs: impl IntoIterator) { - self.outputs.extend(outputs); - } - - fn remove_outputs(&mut self, actor_ids: &HashSet) { - self.outputs - .extract_if(|output| actor_ids.contains(&output.actor_id())) - .count(); - } - - fn dispatcher_id(&self) -> DispatcherId { - self.dispatcher_id - } - - fn dispatcher_id_str(&self) -> &str { - &self.dispatcher_id_str - } - - fn is_empty(&self) -> bool { - self.outputs.is_empty() - } -} - /// `SimpleDispatcher` dispatches message to a single output. #[derive(Debug)] pub struct SimpleDispatcher { diff --git a/src/stream/src/from_proto/cdc_filter.rs b/src/stream/src/from_proto/cdc_filter.rs new file mode 100644 index 0000000000000..96b6dcd47aab9 --- /dev/null +++ b/src/stream/src/from_proto/cdc_filter.rs @@ -0,0 +1,39 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_expr::expr::build_non_strict_from_prost; +use risingwave_pb::stream_plan::CdcFilterNode; + +use super::*; +use crate::executor::FilterExecutor; + +pub struct CdcFilterExecutorBuilder; + +/// `CdcFilter` is an extension to the Filter executor +impl ExecutorBuilder for CdcFilterExecutorBuilder { + type Node = CdcFilterNode; + + async fn new_boxed_executor( + params: ExecutorParams, + node: &Self::Node, + _store: impl StateStore, + _stream: &mut LocalStreamManagerCore, + ) -> StreamResult { + let [input]: [_; 1] = params.input.try_into().unwrap(); + let search_condition = + build_non_strict_from_prost(node.get_search_condition()?, params.eval_error_report)?; + + Ok(FilterExecutor::new(params.actor_context, params.info, input, search_condition).boxed()) + } +} diff --git a/src/stream/src/from_proto/merge.rs b/src/stream/src/from_proto/merge.rs index 175109bd09649..60323a0a17515 100644 --- a/src/stream/src/from_proto/merge.rs +++ b/src/stream/src/from_proto/merge.rs @@ -54,7 +54,7 @@ impl ExecutorBuilder for MergeExecutorBuilder { // There could be arbitrary number of upstreams with simple dispatcher. DispatcherType::Simple => false, // There should be always only one upstream with no-shuffle dispatcher. - DispatcherType::NoShuffle | DispatcherType::CdcTablename => true, + DispatcherType::NoShuffle => true, }; if always_single_input { diff --git a/src/stream/src/from_proto/mod.rs b/src/stream/src/from_proto/mod.rs index 169ffad718d03..23e22794af26f 100644 --- a/src/stream/src/from_proto/mod.rs +++ b/src/stream/src/from_proto/mod.rs @@ -18,6 +18,7 @@ mod agg_common; mod append_only_dedup; mod barrier_recv; mod batch_query; +mod cdc_filter; mod dml; mod dynamic_filter; mod eowc_over_window; @@ -59,6 +60,7 @@ use risingwave_storage::StateStore; use self::append_only_dedup::*; use self::barrier_recv::*; use self::batch_query::*; +use self::cdc_filter::CdcFilterExecutorBuilder; use self::dml::*; use self::dynamic_filter::*; use self::eowc_over_window::*; @@ -147,6 +149,7 @@ pub async fn create_executor( NodeBody::Merge => MergeExecutorBuilder, NodeBody::Materialize => MaterializeExecutorBuilder, NodeBody::Filter => FilterExecutorBuilder, + NodeBody::CdcFilter => CdcFilterExecutorBuilder, NodeBody::Arrange => ArrangeExecutorBuilder, NodeBody::Lookup => LookupExecutorBuilder, NodeBody::Union => UnionExecutorBuilder, diff --git a/src/stream/src/from_proto/stream_cdc_scan.rs b/src/stream/src/from_proto/stream_cdc_scan.rs index c9a496c3a6832..b96990cd44641 100644 --- a/src/stream/src/from_proto/stream_cdc_scan.rs +++ b/src/stream/src/from_proto/stream_cdc_scan.rs @@ -35,13 +35,9 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder { params: ExecutorParams, node: &Self::Node, state_store: impl StateStore, - stream: &mut LocalStreamManagerCore, + _stream: &mut LocalStreamManagerCore, ) -> StreamResult { let [upstream]: [_; 1] = params.input.try_into().unwrap(); - // For reporting the progress. - let progress = stream - .context - .register_create_mview_progress(params.actor_context.id); let output_indices = node .output_indices @@ -100,7 +96,7 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder { external_table, upstream, output_indices, - Some(progress), + None, params.executor_stats, Some(state_table), None,