Skip to content

Commit

Permalink
feat(streaming): support output indices in dispatchers (#8094)
Browse files Browse the repository at this point in the history
This PR adds support for output indices in each dispatcher. Here are the motivations:

- For multiple MVs on an upstream MV, it's possible that each of them requires different columns of the upstreams. Currently, we do this projection in the downstream `Chain` node. However, if we allow creating mview on remote compute nodes (like spot instances), directly pruning the unused columns in upstream will decrease the remote shuffle cost as described in #4529.

- For adding columns in schema change, there should be a layer that erases the schema change from `Materialize` to the downstream. By introducing the output indices in dispatchers, we can make the existing downstream MV receive chunks with the same schema and work correctly. For new downstream MVs after schema change, the new dispatcher will be able to output all columns. (#6903)

Note that the optimization mentioned in Motivation 1 is not implemented in this PR. Currently, we just always output all columns in every dispatcher.

Approved-By: fuyufjh
Approved-By: chenzl25
Approved-By: xxchan
  • Loading branch information
BugenZhao authored Feb 22, 2023
1 parent 8e499c7 commit 8dff620
Show file tree
Hide file tree
Showing 15 changed files with 199 additions and 75 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 40 additions & 14 deletions dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -553,9 +553,12 @@ enum DispatcherType {
NO_SHUFFLE = 4;
}

// The property of an edge in the fragment graph.
// This is essientially a "logical" version of `Dispatcher`. See the doc of `Dispatcher` for more details.
message DispatchStrategy {
DispatcherType type = 1;
repeated uint32 column_indices = 2;
repeated uint32 dist_key_indices = 2;
repeated uint32 output_indices = 3;
}

// A dispatcher redistribute messages.
Expand All @@ -564,7 +567,11 @@ message Dispatcher {
DispatcherType type = 1;
// Indices of the columns to be used for hashing.
// For dispatcher types other than HASH, this is ignored.
repeated uint32 column_indices = 2;
repeated uint32 dist_key_indices = 2;
// Indices of the columns to output.
// In most cases, this contains all columns in the input. But for some cases like MV on MV or
// schema change, we may only output a subset of the columns.
repeated uint32 output_indices = 6;
// The hash mapping for consistent hash.
// For dispatcher types other than HASH, this is ignored.
ActorMapping hash_mapping = 3;
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,10 +425,11 @@ pub fn to_stream_prost_body(
Distribution::Broadcast => DispatcherType::Broadcast,
_ => panic!("Do not allow Any or AnyShard in serialization process"),
} as i32,
column_indices: match &base.dist {
dist_key_indices: match &base.dist {
Distribution::HashShard(keys) => keys.iter().map(|&num| num as u32).collect(),
_ => vec![],
},
output_indices: (0..base.schema().len() as u32).collect(),
}),
}),
Node::DynamicFilter(me) => {
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,11 @@ impl StreamNode for StreamExchange {
Distribution::Broadcast => DispatcherType::Broadcast,
_ => panic!("Do not allow Any or AnyShard in serialization process"),
} as i32,
column_indices: match &self.base.dist {
dist_key_indices: match &self.base.dist {
Distribution::HashShard(keys) => keys.iter().map(|num| *num as u32).collect(),
_ => vec![],
},
output_indices: (0..self.schema().len() as u32).collect(),
}),
})
}
Expand Down
14 changes: 10 additions & 4 deletions src/frontend/src/optimizer/plan_node/stream_share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,33 +85,39 @@ impl StreamNode for StreamShare {
impl StreamShare {
pub fn adhoc_to_stream_prost(&self, state: &mut BuildFragmentGraphState) -> ProstStreamPlan {
let operator_id = self.base.id.0 as u32;
let output_indices = (0..self.schema().len() as u32).collect_vec();

match state.get_share_stream_node(operator_id) {
None => {
let dispatch_strategy = match &self.base.dist {
Distribution::HashShard(keys) | Distribution::UpstreamHashShard(keys, _) => {
DispatchStrategy {
r#type: DispatcherType::Hash as i32,
column_indices: keys.iter().map(|x| *x as u32).collect_vec(),
dist_key_indices: keys.iter().map(|x| *x as u32).collect_vec(),
output_indices,
}
}
Distribution::Single => DispatchStrategy {
r#type: DispatcherType::Simple as i32,
column_indices: vec![],
dist_key_indices: vec![],
output_indices,
},
Distribution::Broadcast => DispatchStrategy {
r#type: DispatcherType::Broadcast as i32,
column_indices: vec![],
dist_key_indices: vec![],
output_indices,
},
Distribution::SomeShard => {
// FIXME: use another DispatcherType?
DispatchStrategy {
r#type: DispatcherType::Hash as i32,
column_indices: self
dist_key_indices: self
.base
.logical_pk
.iter()
.map(|x| *x as u32)
.collect_vec(),
output_indices,
}
}
};
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/stream_fragmenter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ fn rewrite_stream_node(

let strategy = DispatchStrategy {
r#type: DispatcherType::NoShuffle.into(),
column_indices: vec![], // TODO: use distribution key
dist_key_indices: vec![], // TODO: use distribution key
output_indices: (0..(child_node.fields.len() as u32)).collect(),
};
Ok(StreamNode {
stream_key: child_node.stream_key.clone(),
Expand Down
43 changes: 32 additions & 11 deletions src/frontend/src/stream_fragmenter/rewrite/delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ fn build_no_shuffle_exchange_for_delta_join(
fields: upstream.fields.clone(),
stream_key: upstream.stream_key.clone(),
node_body: Some(NodeBody::Exchange(ExchangeNode {
strategy: Some(dispatch_no_shuffle()),
strategy: Some(dispatch_no_shuffle(
(0..(upstream.fields.len() as u32)).collect(),
)),
})),
input: vec![],
append_only: upstream.append_only,
Expand All @@ -46,33 +48,41 @@ fn build_no_shuffle_exchange_for_delta_join(
fn build_consistent_hash_shuffle_exchange_for_delta_join(
state: &mut BuildFragmentGraphState,
upstream: &StreamNode,
column_indices: Vec<u32>,
dist_key_indices: Vec<u32>,
) -> StreamNode {
StreamNode {
operator_id: state.gen_operator_id() as u64,
identity: "HASH Exchange (Lookup and Merge)".into(),
fields: upstream.fields.clone(),
stream_key: upstream.stream_key.clone(),
node_body: Some(NodeBody::Exchange(ExchangeNode {
strategy: Some(dispatch_consistent_hash_shuffle(column_indices)),
strategy: Some(dispatch_consistent_hash_shuffle(
dist_key_indices,
(0..(upstream.fields.len() as u32)).collect(),
)),
})),
input: vec![],
append_only: upstream.append_only,
}
}

fn dispatch_no_shuffle() -> DispatchStrategy {
fn dispatch_no_shuffle(output_indices: Vec<u32>) -> DispatchStrategy {
DispatchStrategy {
r#type: DispatcherType::NoShuffle.into(),
column_indices: vec![],
dist_key_indices: vec![],
output_indices,
}
}

fn dispatch_consistent_hash_shuffle(column_indices: Vec<u32>) -> DispatchStrategy {
fn dispatch_consistent_hash_shuffle(
dist_key_indices: Vec<u32>,
output_indices: Vec<u32>,
) -> DispatchStrategy {
// Actually Hash shuffle is consistent hash shuffle now.
DispatchStrategy {
r#type: DispatcherType::Hash.into(),
column_indices,
dist_key_indices,
output_indices,
}
}

Expand Down Expand Up @@ -136,6 +146,9 @@ fn build_delta_join_inner(
let i0_length = arrange_0.fields.len();
let i1_length = arrange_1.fields.len();

let i0_output_indices = (0..i0_length as u32).collect_vec();
let i1_output_indices = (0..i1_length as u32).collect_vec();

let lookup_0_column_reordering = {
let tmp: Vec<i32> = (i1_length..i1_length + i0_length)
.chain(0..i1_length)
Expand Down Expand Up @@ -204,7 +217,7 @@ fn build_delta_join_inner(
arrange_0_frag.fragment_id,
lookup_0_frag.fragment_id,
StreamFragmentEdge {
dispatch_strategy: dispatch_no_shuffle(),
dispatch_strategy: dispatch_no_shuffle(i0_output_indices.clone()),
link_id: exchange_a0l0.operator_id,
},
);
Expand All @@ -221,6 +234,7 @@ fn build_delta_join_inner(
.iter()
.map(|x| *x as u32)
.collect_vec(),
i0_output_indices,
),
link_id: exchange_a0l1.operator_id,
},
Expand All @@ -238,6 +252,7 @@ fn build_delta_join_inner(
.iter()
.map(|x| *x as u32)
.collect_vec(),
i1_output_indices.clone(),
),
link_id: exchange_a1l0.operator_id,
},
Expand All @@ -249,7 +264,7 @@ fn build_delta_join_inner(
arrange_1_frag.fragment_id,
lookup_1_frag.fragment_id,
StreamFragmentEdge {
dispatch_strategy: dispatch_no_shuffle(),
dispatch_strategy: dispatch_no_shuffle(i1_output_indices),
link_id: exchange_a1l1.operator_id,
},
);
Expand All @@ -275,7 +290,10 @@ fn build_delta_join_inner(
lookup_0_frag.fragment_id,
current_fragment.fragment_id,
StreamFragmentEdge {
dispatch_strategy: dispatch_consistent_hash_shuffle(node.stream_key.clone()),
dispatch_strategy: dispatch_consistent_hash_shuffle(
node.stream_key.clone(),
(0..node.fields.len() as u32).collect(),
),
link_id: exchange_l0m.operator_id,
},
);
Expand All @@ -284,7 +302,10 @@ fn build_delta_join_inner(
lookup_1_frag.fragment_id,
current_fragment.fragment_id,
StreamFragmentEdge {
dispatch_strategy: dispatch_consistent_hash_shuffle(node.stream_key.clone()),
dispatch_strategy: dispatch_consistent_hash_shuffle(
node.stream_key.clone(),
(0..node.fields.len() as u32).collect(),
),
link_id: exchange_l1m.operator_id,
},
);
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/utils/stream_graph_formatter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl StreamGraphFormatter {
"StreamExchange {} from {}",
match dist.r#type() {
DispatcherType::Unspecified => unreachable!(),
DispatcherType::Hash => format!("Hash({:?})", dist.column_indices),
DispatcherType::Hash => format!("Hash({:?})", dist.dist_key_indices),
DispatcherType::Broadcast => "Broadcast".to_string(),
DispatcherType::Simple => "Single".to_string(),
DispatcherType::NoShuffle => "NoShuffle".to_string(),
Expand Down
Loading

0 comments on commit 8dff620

Please sign in to comment.