diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index e69a712c9e3d..0ccf6718ecad 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -256,7 +256,8 @@ message FilterNode { message CdcFilterNode { expr.ExprNode search_condition = 1; uint32 upstream_source_id = 2; - repeated int32 upstream_column_ids = 3; + reserved "upstream_column_ids"; + reserved 3; } // A materialized view is regarded as a table. diff --git a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs index 49b39ef627e7..65eaba0525d0 100644 --- a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs @@ -199,7 +199,6 @@ impl StreamCdcTableScan { node_body: Some(PbNodeBody::CdcFilter(CdcFilterNode { search_condition: Some(filter_expr.to_expr_proto()), upstream_source_id, - upstream_column_ids: vec![], // not used, })), }; diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 925b01c8cbdc..937dadda21fb 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -55,6 +55,8 @@ pub(super) struct BuildingFragment { table_id: Option, /// The required columns of each upstream table. + /// + /// For shared CDC source on table, its `vec![]`, since the output is fixed. upstream_table_columns: HashMap>, } @@ -174,10 +176,7 @@ impl BuildingFragment { stream_scan.table_id.into(), stream_scan.upstream_column_ids.clone(), ), - NodeBody::CdcFilter(cdc_filter) => ( - cdc_filter.upstream_source_id.into(), - cdc_filter.upstream_column_ids.clone(), - ), + NodeBody::CdcFilter(cdc_filter) => (cdc_filter.upstream_source_id.into(), vec![]), _ => return, }; table_columns @@ -670,22 +669,8 @@ impl CompleteStreamFragmentGraph { let nodes = mview_fragment.actors[0].get_nodes().unwrap(); let mview_node = nodes.get_node_body().unwrap().as_materialize().unwrap(); - let all_column_ids = mview_node - .get_table() - .unwrap() - .columns - .iter() - .map(|c| c.column_desc.as_ref().unwrap().column_id) - .collect_vec(); - let dist_key_indices: Vec<_> = mview_node - .table - .as_ref() - .unwrap() - .distribution_key - .iter() - .map(|i| *i as u32) - .collect(); - + let all_column_ids = mview_node.column_ids(); + let dist_key_indices = mview_node.dist_key_indices(); let output_indices = output_columns .iter() .map(|c| { @@ -700,27 +685,11 @@ impl CompleteStreamFragmentGraph { )?; (dist_key_indices, output_indices) }; - let dispatch_strategy = if uses_arrangement_backfill { - if !dist_key_indices.is_empty() { - DispatchStrategy { - r#type: DispatcherType::Hash as _, - dist_key_indices, - output_indices, - } - } else { - DispatchStrategy { - r#type: DispatcherType::Simple as _, - dist_key_indices: vec![], // empty for Simple - output_indices, - } - } - } else { - DispatchStrategy { - r#type: DispatcherType::NoShuffle as _, - dist_key_indices: vec![], // not used for `NoShuffle` - output_indices, - } - }; + let dispatch_strategy = mv_on_mv_dispatch_strategy( + uses_arrangement_backfill, + dist_key_indices, + output_indices, + ); let edge = StreamFragmentEdge { id: EdgeId::UpstreamExternal { upstream_table_id, @@ -806,6 +775,34 @@ impl CompleteStreamFragmentGraph { } } +fn mv_on_mv_dispatch_strategy( + uses_arrangement_backfill: bool, + dist_key_indices: Vec, + output_indices: Vec, +) -> DispatchStrategy { + if uses_arrangement_backfill { + if !dist_key_indices.is_empty() { + DispatchStrategy { + r#type: DispatcherType::Hash as _, + dist_key_indices, + output_indices, + } + } else { + DispatchStrategy { + r#type: DispatcherType::Simple as _, + dist_key_indices: vec![], // empty for Simple + output_indices, + } + } + } else { + DispatchStrategy { + r#type: DispatcherType::NoShuffle as _, + dist_key_indices: vec![], // not used for `NoShuffle` + output_indices, + } + } +} + impl CompleteStreamFragmentGraph { /// Returns **all** fragment IDs in the complete graph, including the ones that are not in the /// building graph. diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index 0a503ff99017..dafa3a568780 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -170,6 +170,26 @@ impl FromStr for crate::expr::table_function::PbType { } } +impl stream_plan::MaterializeNode { + pub fn dist_key_indices(&self) -> Vec { + self.get_table() + .unwrap() + .distribution_key + .iter() + .map(|i| *i as u32) + .collect() + } + + pub fn column_ids(&self) -> Vec { + self.get_table() + .unwrap() + .columns + .iter() + .map(|c| c.get_column_desc().unwrap().column_id) + .collect() + } +} + #[cfg(test)] mod tests { use crate::data::{data_type, DataType};