Skip to content

Commit

Permalink
feat: support scaling table fragments connected by hash shuffle (#14485)
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky authored Jan 10, 2024
1 parent 50d1114 commit cf9a2db
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 26 deletions.
81 changes: 57 additions & 24 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1120,9 +1120,20 @@ impl FragmentManager {
.filter(|t| t.fragment_ids().any(|f| reschedules.contains_key(&f)))
.map(|t| t.table_id())
.collect_vec();

let fragment_id_to_table_id: HashMap<_, _> = map
.iter()
.flat_map(|(table_id, table)| {
table
.fragment_ids()
.map(|fragment_id| (fragment_id, *table_id))
})
.collect();

let mut table_fragments = BTreeMapTransaction::new(map);
let mut fragment_mapping_to_notify = vec![];

// First step, update fragment itself
for table_id in to_update_table_fragments {
// Takes out the reschedules of the fragments in this table.
let reschedules = reschedules
Expand All @@ -1137,20 +1148,17 @@ impl FragmentManager {

let mut table_fragment = table_fragments.get_mut(table_id).unwrap();

for (fragment_id, reschedule) in reschedules {
for (fragment_id, reschedule) in &reschedules {
let Reschedule {
added_actors,
removed_actors,
vnode_bitmap_updates,
upstream_fragment_dispatcher_ids,
upstream_dispatcher_mapping,
downstream_fragment_ids,
actor_splits,
..
} = reschedule;

// First step, update self fragment
// Add actors to this fragment: set the state to `Running`.
for actor_id in &added_actors {
for actor_id in added_actors {
table_fragment
.actor_status
.get_mut(actor_id)
Expand All @@ -1166,10 +1174,10 @@ impl FragmentManager {
table_fragment.actor_splits.remove(actor_id);
}

table_fragment.actor_splits.extend(actor_splits);
table_fragment.actor_splits.extend(actor_splits.clone());

let actor_status = table_fragment.actor_status.clone();
let fragment = table_fragment.fragments.get_mut(&fragment_id).unwrap();
let fragment = table_fragment.fragments.get_mut(fragment_id).unwrap();

fragment
.actors
Expand Down Expand Up @@ -1215,21 +1223,38 @@ impl FragmentManager {

// Notify fragment mapping to frontend nodes.
let fragment_mapping = FragmentParallelUnitMapping {
fragment_id: fragment_id as FragmentId,
fragment_id: *fragment_id as FragmentId,
mapping: Some(vnode_mapping),
};
fragment_mapping_to_notify.push(fragment_mapping);
}

// Second step, update upstream fragments & downstream fragments
for (fragment_id, reschedule) in &reschedules {
let Reschedule {
upstream_fragment_dispatcher_ids,
upstream_dispatcher_mapping,
downstream_fragment_ids,
added_actors,
removed_actors,
..
} = reschedule;

let removed_actor_ids: HashSet<_> = removed_actors.iter().cloned().collect();

// Second step, update upstream fragments
// Update the dispatcher of the upstream fragments.
for (upstream_fragment_id, dispatcher_id) in upstream_fragment_dispatcher_ids {
// here we assume the upstream fragment is in the same streaming job as this
// fragment. Cross-table references only occur in the case
// of StreamScan fragment, and the scale of StreamScan fragment does not introduce updates
// to the upstream Fragment (because of NoShuffle)
let upstream_fragment = table_fragment
let upstream_table_id = fragment_id_to_table_id
.get(upstream_fragment_id)
.expect("upstream fragment must exist");

// After introducing arrangement backfill and sink into table, two tables might be connected via operators outside of the NO_SHUFFLE.
let mut upstream_table_fragment =
table_fragments.get_mut(*upstream_table_id).unwrap();

let upstream_fragment = upstream_table_fragment
.fragments
.get_mut(&upstream_fragment_id)
.get_mut(upstream_fragment_id)
.unwrap();

for upstream_actor in &mut upstream_fragment.actors {
Expand All @@ -1238,7 +1263,7 @@ impl FragmentManager {
}

for dispatcher in &mut upstream_actor.dispatcher {
if dispatcher.dispatcher_id == dispatcher_id {
if dispatcher.dispatcher_id == *dispatcher_id {
if let DispatcherType::Hash = dispatcher.r#type() {
dispatcher.hash_mapping = upstream_dispatcher_mapping
.as_ref()
Expand All @@ -1248,19 +1273,27 @@ impl FragmentManager {
update_actors(
dispatcher.downstream_actor_id.as_mut(),
&removed_actor_ids,
&added_actors,
added_actors,
);
}
}
}
}

// Update the merge executor of the downstream fragment.
for &downstream_fragment_id in &downstream_fragment_ids {
let downstream_fragment = table_fragment
for downstream_fragment_id in downstream_fragment_ids {
let downstream_table_id = fragment_id_to_table_id
.get(downstream_fragment_id)
.expect("downstream fragment must exist");

let mut downstream_table_fragment =
table_fragments.get_mut(*downstream_table_id).unwrap();

let downstream_fragment = downstream_table_fragment
.fragments
.get_mut(&downstream_fragment_id)
.get_mut(downstream_fragment_id)
.unwrap();

for downstream_actor in &mut downstream_fragment.actors {
if new_created_actors.contains(&downstream_actor.actor_id) {
continue;
Expand All @@ -1269,15 +1302,15 @@ impl FragmentManager {
update_actors(
downstream_actor.upstream_actor_id.as_mut(),
&removed_actor_ids,
&added_actors,
added_actors,
);

if let Some(node) = downstream_actor.nodes.as_mut() {
update_merge_node_upstream(
node,
&fragment_id,
fragment_id,
&removed_actor_ids,
&added_actors,
added_actors,
);
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl RescheduleContext {
self.actor_status
.get(actor_id)
.and_then(|actor_status| actor_status.parallel_unit.as_ref())
.ok_or_else(|| anyhow!("could not found ParallelUnit for {}", actor_id).into())
.ok_or_else(|| anyhow!("could not found parallel unit for actor {}", actor_id).into())
}

fn parallel_unit_id_to_worker(
Expand Down Expand Up @@ -1048,7 +1048,6 @@ impl ScaleController {
if new_created_actors.contains_key(downstream_actor_id) {
continue;
}

let downstream_worker_id = ctx
.actor_id_to_parallel_unit(downstream_actor_id)?
.worker_node_id;
Expand Down

0 comments on commit cf9a2db

Please sign in to comment.