Skip to content

Commit

Permalink
refactor: minor refactor on fragment graph building
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jan 20, 2024
1 parent d6233c5 commit b440bc0
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 43 deletions.
3 changes: 2 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ message FilterNode {
message CdcFilterNode {
expr.ExprNode search_condition = 1;
uint32 upstream_source_id = 2;
repeated int32 upstream_column_ids = 3;
reserved "upstream_column_ids";
reserved 3;
}

// A materialized view is regarded as a table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ impl StreamCdcTableScan {
node_body: Some(PbNodeBody::CdcFilter(CdcFilterNode {
search_condition: Some(filter_expr.to_expr_proto()),
upstream_source_id,
upstream_column_ids: vec![], // not used,
})),
};

Expand Down
79 changes: 38 additions & 41 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub(super) struct BuildingFragment {
table_id: Option<u32>,

/// The required columns of each upstream table.
///
/// For shared CDC source on table, its `vec![]`, since the output is fixed.
upstream_table_columns: HashMap<TableId, Vec<i32>>,
}

Expand Down Expand Up @@ -174,10 +176,7 @@ impl BuildingFragment {
stream_scan.table_id.into(),
stream_scan.upstream_column_ids.clone(),
),
NodeBody::CdcFilter(cdc_filter) => (
cdc_filter.upstream_source_id.into(),
cdc_filter.upstream_column_ids.clone(),
),
NodeBody::CdcFilter(cdc_filter) => (cdc_filter.upstream_source_id.into(), vec![]),
_ => return,
};
table_columns
Expand Down Expand Up @@ -670,22 +669,8 @@ impl CompleteStreamFragmentGraph {
let nodes = mview_fragment.actors[0].get_nodes().unwrap();
let mview_node =
nodes.get_node_body().unwrap().as_materialize().unwrap();
let all_column_ids = mview_node
.get_table()
.unwrap()
.columns
.iter()
.map(|c| c.column_desc.as_ref().unwrap().column_id)
.collect_vec();
let dist_key_indices: Vec<_> = mview_node
.table
.as_ref()
.unwrap()
.distribution_key
.iter()
.map(|i| *i as u32)
.collect();

let all_column_ids = mview_node.column_ids();
let dist_key_indices = mview_node.dist_key_indices();
let output_indices = output_columns
.iter()
.map(|c| {
Expand All @@ -700,27 +685,11 @@ impl CompleteStreamFragmentGraph {
)?;
(dist_key_indices, output_indices)
};
let dispatch_strategy = if uses_arrangement_backfill {
if !dist_key_indices.is_empty() {
DispatchStrategy {
r#type: DispatcherType::Hash as _,
dist_key_indices,
output_indices,
}
} else {
DispatchStrategy {
r#type: DispatcherType::Simple as _,
dist_key_indices: vec![], // empty for Simple
output_indices,
}
}
} else {
DispatchStrategy {
r#type: DispatcherType::NoShuffle as _,
dist_key_indices: vec![], // not used for `NoShuffle`
output_indices,
}
};
let dispatch_strategy = mv_on_mv_dispatch_strategy(
uses_arrangement_backfill,
dist_key_indices,
output_indices,
);
let edge = StreamFragmentEdge {
id: EdgeId::UpstreamExternal {
upstream_table_id,
Expand Down Expand Up @@ -806,6 +775,34 @@ impl CompleteStreamFragmentGraph {
}
}

fn mv_on_mv_dispatch_strategy(
uses_arrangement_backfill: bool,
dist_key_indices: Vec<u32>,
output_indices: Vec<u32>,
) -> DispatchStrategy {
if uses_arrangement_backfill {
if !dist_key_indices.is_empty() {
DispatchStrategy {
r#type: DispatcherType::Hash as _,
dist_key_indices,
output_indices,
}
} else {
DispatchStrategy {
r#type: DispatcherType::Simple as _,
dist_key_indices: vec![], // empty for Simple
output_indices,
}
}
} else {
DispatchStrategy {
r#type: DispatcherType::NoShuffle as _,
dist_key_indices: vec![], // not used for `NoShuffle`
output_indices,
}
}
}

impl CompleteStreamFragmentGraph {
/// Returns **all** fragment IDs in the complete graph, including the ones that are not in the
/// building graph.
Expand Down
20 changes: 20 additions & 0 deletions src/prost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,26 @@ impl FromStr for crate::expr::table_function::PbType {
}
}

impl stream_plan::MaterializeNode {
pub fn dist_key_indices(&self) -> Vec<u32> {
self.get_table()
.unwrap()
.distribution_key
.iter()
.map(|i| *i as u32)
.collect()
}

pub fn column_ids(&self) -> Vec<i32> {
self.get_table()
.unwrap()
.columns
.iter()
.map(|c| c.get_column_desc().unwrap().column_id)
.collect()
}
}

#[cfg(test)]
mod tests {
use crate::data::{data_type, DataType};
Expand Down

0 comments on commit b440bc0

Please sign in to comment.