Skip to content

Commit

Permalink
interim commit: check backfill type
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jan 5, 2024
1 parent a27ece1 commit a5079d6
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 9 deletions.
10 changes: 10 additions & 0 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1285,10 +1285,15 @@ impl DdlController {
})
.collect();

let uses_arrangement_backfill = match stream_job {
StreamingJob::MaterializedView(_) => fragment_graph.has_arrangement_backfill(),
_ => false,
};
let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
fragment_graph,
upstream_root_fragments,
stream_job.table_job_type(),
uses_arrangement_backfill,
)?;

// 2. Build the actor graph.
Expand Down Expand Up @@ -1704,10 +1709,15 @@ impl DdlController {
)
})?;

let uses_arrangement_backfill = match stream_job {
StreamingJob::MaterializedView(_) => fragment_graph.has_arrangement_backfill(),
_ => false,
};
let complete_graph = CompleteStreamFragmentGraph::with_downstreams(
fragment_graph,
original_table_fragment.fragment_id,
downstream_fragments,
uses_arrangement_backfill,
)?;

// 2. Build the actor graph.
Expand Down
55 changes: 46 additions & 9 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use risingwave_pb::stream_plan::stream_fragment_graph::{
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{
DispatchStrategy, DispatcherType, FragmentTypeFlag, StreamActor,
StreamFragmentGraph as StreamFragmentGraphProto,
StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanType,
};

use crate::manager::{MetaSrvEnv, StreamingJob};
Expand Down Expand Up @@ -468,6 +468,31 @@ impl StreamFragmentGraph {
) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
self.upstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
}

pub fn has_arrangement_backfill(&self) -> bool {
fn has_arrangement_backfill_node(stream_node: &StreamNode) -> bool {
let is_backfill = if let Some(node) = &stream_node.node_body
&& let Some(node) = node.as_stream_scan()
{
node.stream_scan_type == StreamScanType::ArrangementBackfill as i32
} else {
false
};
is_backfill
|| stream_node
.get_input()
.iter()
.any(has_arrangement_backfill_node)
}
for (_id, fragment) in &self.fragments {
let fragment = &**fragment;
let node = fragment.node.as_ref().unwrap();
if has_arrangement_backfill_node(node) {
return true;
}
}
false
}
}

static EMPTY_HASHMAP: LazyLock<HashMap<GlobalFragmentId, StreamFragmentEdge>> =
Expand Down Expand Up @@ -536,6 +561,7 @@ impl CompleteStreamFragmentGraph {
graph: StreamFragmentGraph,
upstream_root_fragments: HashMap<TableId, Fragment>,
table_job_type: Option<TableJobType>,
uses_arrangement_backfill: bool,
) -> MetaResult<Self> {
Self::build_helper(
graph,
Expand All @@ -544,6 +570,7 @@ impl CompleteStreamFragmentGraph {
}),
None,
table_job_type,
uses_arrangement_backfill,
)
}

Expand All @@ -553,6 +580,7 @@ impl CompleteStreamFragmentGraph {
graph: StreamFragmentGraph,
original_table_fragment_id: FragmentId,
downstream_fragments: Vec<(DispatchStrategy, Fragment)>,
uses_arrangement_backfill: bool,
) -> MetaResult<Self> {
Self::build_helper(
graph,
Expand All @@ -562,6 +590,7 @@ impl CompleteStreamFragmentGraph {
downstream_fragments,
}),
None,
uses_arrangement_backfill,
)
}

Expand All @@ -570,6 +599,7 @@ impl CompleteStreamFragmentGraph {
upstream_ctx: Option<FragmentGraphUpstreamContext>,
downstream_ctx: Option<FragmentGraphDownstreamContext>,
table_job_type: Option<TableJobType>,
uses_arrangement_backfill: bool,
) -> MetaResult<Self> {
let mut extra_downstreams = HashMap::new();
let mut extra_upstreams = HashMap::new();
Expand Down Expand Up @@ -651,19 +681,26 @@ impl CompleteStreamFragmentGraph {
.collect::<Option<Vec<_>>>()
.context("column not found in the upstream materialized view")?
};
let dispatch_strategy = if uses_arrangement_backfill {
DispatchStrategy {
r#type: DispatcherType::Hash as _,
dist_key_indices: vec![], // not used for `Exchange`
output_indices,
}
} else {
DispatchStrategy {
r#type: DispatcherType::NoShuffle as _,
dist_key_indices: vec![], // not used for `NoShuffle`
output_indices,
}
};
let edge = StreamFragmentEdge {
id: EdgeId::UpstreamExternal {
upstream_table_id,
downstream_fragment_id: id,
},
// We always use `NoShuffle` for the exchange between the upstream
// `Materialize` and the downstream `StreamScan` of the
// new materialized view.
dispatch_strategy: DispatchStrategy {
r#type: DispatcherType::NoShuffle as _,
dist_key_indices: vec![], // not used for `NoShuffle`
output_indices,
},
dispatch_strategy,

};

(mview_id, edge)
Expand Down

0 comments on commit a5079d6

Please sign in to comment.