Skip to content

Commit

Permalink
fix: try to reduce memory usage during generate_table_resize_plan (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky authored Feb 4, 2024
1 parent b618869 commit 81e7425
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 56 deletions.
51 changes: 31 additions & 20 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,16 +659,23 @@ impl GlobalBarrierManagerContext {

let mut compared_table_parallelisms = table_parallelisms.clone();

let (reschedule_fragment, _) = self
.scale_controller
.prepare_reschedule_command(
plan,
RescheduleOptions {
resolve_no_shuffle_upstream: true,
},
Some(&mut compared_table_parallelisms),
)
.await?;
// skip reschedule if no reschedule is generated.
let reschedule_fragment = if plan.is_empty() {
HashMap::new()
} else {
let (reschedule_fragment, _) = self
.scale_controller
.prepare_reschedule_command(
plan,
RescheduleOptions {
resolve_no_shuffle_upstream: true,
},
Some(&mut compared_table_parallelisms),
)
.await?;

reschedule_fragment
};

// Because custom parallelism doesn't exist, this function won't result in a no-shuffle rewrite for table parallelisms.
debug_assert_eq!(compared_table_parallelisms, table_parallelisms);
Expand Down Expand Up @@ -788,16 +795,20 @@ impl GlobalBarrierManagerContext {

let mut compared_table_parallelisms = table_parallelisms.clone();

let (reschedule_fragment, applied_reschedules) = self
.scale_controller
.prepare_reschedule_command(
plan,
RescheduleOptions {
resolve_no_shuffle_upstream: true,
},
Some(&mut compared_table_parallelisms),
)
.await?;
// skip reschedule if no reschedule is generated.
let (reschedule_fragment, applied_reschedules) = if plan.is_empty() {
(HashMap::new(), HashMap::new())
} else {
self.scale_controller
.prepare_reschedule_command(
plan,
RescheduleOptions {
resolve_no_shuffle_upstream: true,
},
Some(&mut compared_table_parallelisms),
)
.await?
};

// Because custom parallelism doesn't exist, this function won't result in a no-shuffle rewrite for table parallelisms.
debug_assert_eq!(compared_table_parallelisms, table_parallelisms);
Expand Down
151 changes: 115 additions & 36 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1678,60 +1678,139 @@ impl ScaleController {
})
.collect::<BTreeMap<_, _>>();

let all_table_fragments = self.list_all_table_fragments().await?;
// index for no shuffle relation
let mut no_shuffle_source_fragment_ids = HashSet::new();
let mut no_shuffle_target_fragment_ids = HashSet::new();

// FIXME: only need actor id and dispatcher info, avoid clone it.
let mut actor_map = HashMap::new();
// index for fragment_id -> distribution_type
let mut fragment_distribution_map = HashMap::new();
// index for actor -> parallel_unit
let mut actor_status = HashMap::new();
// FIXME: only need fragment distribution info, should avoid clone it.
// index for table_id -> [fragment_id]
let mut table_fragment_id_map = HashMap::new();
// index for fragment_id -> [actor_id]
let mut fragment_actor_id_map = HashMap::new();

// internal helper func for building index
fn build_index(
no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
fragment_distribution_map: &mut HashMap<FragmentId, FragmentDistributionType>,
actor_status: &mut HashMap<ActorId, ActorStatus>,
table_fragment_id_map: &mut HashMap<u32, HashSet<FragmentId>>,
fragment_actor_id_map: &mut HashMap<FragmentId, HashSet<u32>>,
table_fragments: &BTreeMap<TableId, TableFragments>,
) {
// This is only for assertion purposes and will be removed once the dispatcher_id is guaranteed to always correspond to the downstream fragment_id,
// such as through the foreign key constraints in the SQL backend.
let mut actor_fragment_id_map_for_check = HashMap::new();
for table_fragments in table_fragments.values() {
for (fragment_id, fragment) in &table_fragments.fragments {
for actor in &fragment.actors {
debug_assert!(actor_fragment_id_map_for_check
.insert(actor.actor_id, *fragment_id)
.is_none());
}
}
}

let mut table_fragment_map = HashMap::new();
for table_fragments in all_table_fragments {
let table_id = table_fragments.table_id().table_id;
for (fragment_id, fragment) in table_fragments.fragments {
fragment
.actors
.iter()
.map(|actor| (actor.actor_id, actor))
.for_each(|(id, actor)| {
actor_map.insert(id as ActorId, actor.clone());
});
for (table_id, table_fragments) in table_fragments {
for (fragment_id, fragment) in &table_fragments.fragments {
for actor in &fragment.actors {
fragment_actor_id_map
.entry(*fragment_id)
.or_default()
.insert(actor.actor_id);

for dispatcher in &actor.dispatcher {
if dispatcher.r#type() == DispatcherType::NoShuffle {
no_shuffle_source_fragment_ids
.insert(actor.fragment_id as FragmentId);

let downstream_actor_id =
dispatcher.downstream_actor_id.iter().exactly_one().expect(
"no shuffle should have exactly one downstream actor id",
);

let downstream_fragment_id = actor_fragment_id_map_for_check
.get(downstream_actor_id)
.unwrap();

// dispatcher_id of dispatcher should be exactly same as downstream fragment id
// but we need to check it to make sure
debug_assert_eq!(
*downstream_fragment_id,
dispatcher.dispatcher_id as FragmentId
);

table_fragment_map
.entry(table_id)
.or_insert(HashMap::new())
.insert(fragment_id, fragment);
}
no_shuffle_target_fragment_ids
.insert(dispatcher.dispatcher_id as FragmentId);
}
}
}

actor_status.extend(table_fragments.actor_status);
}
fragment_distribution_map.insert(*fragment_id, fragment.distribution_type());

let mut no_shuffle_source_fragment_ids = HashSet::new();
let mut no_shuffle_target_fragment_ids = HashSet::new();
table_fragment_id_map
.entry(table_id.table_id())
.or_default()
.insert(*fragment_id);
}

Self::build_no_shuffle_relation_index(
&actor_map,
&mut no_shuffle_source_fragment_ids,
&mut no_shuffle_target_fragment_ids,
);
actor_status.extend(table_fragments.actor_status.clone());
}
}

match &self.metadata_manager {
MetadataManager::V1(mgr) => {
let guard = mgr.fragment_manager.get_fragment_read_guard().await;
build_index(
&mut no_shuffle_source_fragment_ids,
&mut no_shuffle_target_fragment_ids,
&mut fragment_distribution_map,
&mut actor_status,
&mut table_fragment_id_map,
&mut fragment_actor_id_map,
guard.table_fragments(),
);
}
MetadataManager::V2(_) => {
let all_table_fragments = self.list_all_table_fragments().await?;
let all_table_fragments = all_table_fragments
.into_iter()
.map(|table_fragments| (table_fragments.table_id(), table_fragments))
.collect::<BTreeMap<_, _>>();

build_index(
&mut no_shuffle_source_fragment_ids,
&mut no_shuffle_target_fragment_ids,
&mut fragment_distribution_map,
&mut actor_status,
&mut table_fragment_id_map,
&mut fragment_actor_id_map,
&all_table_fragments,
);
}
}

let mut target_plan = HashMap::new();

for (table_id, parallelism) in table_parallelisms {
let fragment_map = table_fragment_map.remove(&table_id).unwrap();
let fragment_map = table_fragment_id_map.remove(&table_id).unwrap();

for (fragment_id, fragment) in fragment_map {
for fragment_id in fragment_map {
// Currently, all of our NO_SHUFFLE relation propagations are only transmitted from upstream to downstream.
if no_shuffle_target_fragment_ids.contains(&fragment_id) {
continue;
}

let fragment_parallel_unit_ids: BTreeSet<_> = fragment
.actors
let fragment_parallel_unit_ids: BTreeSet<ParallelUnitId> = fragment_actor_id_map
.get(&fragment_id)
.unwrap()
.iter()
.map(|actor| {
.map(|actor_id| {
actor_status
.get(&actor.actor_id)
.get(actor_id)
.and_then(|status| status.parallel_unit.clone())
.unwrap()
.id as ParallelUnitId
Expand All @@ -1748,7 +1827,7 @@ impl ScaleController {
);
}

match fragment.get_distribution_type().unwrap() {
match fragment_distribution_map.get(&fragment_id).unwrap() {
FragmentDistributionType::Unspecified => unreachable!(),
FragmentDistributionType::Single => {
let single_parallel_unit_id =
Expand Down

0 comments on commit 81e7425

Please sign in to comment.