Skip to content

Commit

Permalink
fix: corner case for worker based scaling (#17806)
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky authored Jul 25, 2024
1 parent 6834de8 commit 3a75734
Show file tree
Hide file tree
Showing 14 changed files with 58 additions and 40 deletions.
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
self.worker_slot_to_scan_range_mapping = HashMap::new();
}

/// Adds the scan range made from the given `kwy_scalar_impls` into the parallel unit id
/// Adds the scan range made from the given `kwy_scalar_impls` into the worker slot id
/// hash map, along with the scan range's virtual node.
async fn add_scan_range(&mut self, key_datums: Vec<Datum>) -> Result<()> {
let mut scan_range = ScanRange::full_table_scan();
Expand Down
4 changes: 2 additions & 2 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ enum MetaCommands {
ClusterInfo,
/// get source split info
SourceSplitInfo,
/// Reschedule the parallel unit in the stream graph
/// Reschedule the actors in the stream graph
///
/// The format is `fragment_id-[worker_id:count]+[worker_id:count]`
/// You can provide either decreased `worker_ids` only or increased only, but decreased should be preceded by
Expand Down Expand Up @@ -457,7 +457,7 @@ enum MetaCommands {
/// List all existing connections in the catalog
ListConnections,

/// List fragment to parallel units mapping for serving
/// List fragment mapping for serving
ListServingFragmentMapping,

/// Unregister workers from the cluster
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,7 @@ impl GlobalBarrierManagerContext {
}

/// This function will generate a migration plan, which includes the mapping for all expired and
/// in-used parallel unit to a new one.
/// in-used worker slot to a new one.
async fn generate_migration_plan(
&self,
expired_workers: HashSet<WorkerId>,
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/controller/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ mod tests {
);
}

// Since no worker is active, the parallel unit count should be 0.
// Since no worker is active, the parallelism should be 0.
assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);

for id in &worker_ids {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1355,14 +1355,14 @@ impl FragmentManager {

let worker_slot_mapping = if actor_to_vnode_bitmap.is_empty() {
// If there's no `vnode_bitmap`, then the fragment must be a singleton fragment.
// We directly use the single parallel unit to construct the mapping.
// We directly use the single worker slot to construct the mapping.
// TODO: also fill `vnode_bitmap` for the actor of singleton fragment so that we
// don't need this branch.

let worker_id = *actor_to_worker.values().exactly_one().unwrap();
WorkerSlotMapping::new_single(WorkerSlotId::new(worker_id, 0))
} else {
// Generate the parallel unit mapping from the fragment's actor bitmaps.
// Generate the worker slot mapping from the fragment's actor bitmaps with actor locations.
assert_eq!(actor_to_vnode_bitmap.len(), actor_to_worker.len());
ActorMapping::from_bitmaps(&actor_to_vnode_bitmap)
.to_worker_slot(&actor_to_worker)
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/manager/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ impl ClusterManagerCore {
.collect()
}

// List all parallel units on running nodes
// List all serving worker nodes
pub fn list_serving_worker_node(&self, worker_state: Option<State>) -> Vec<WorkerNode> {
self.list_worker_node(Some(WorkerType::ComputeNode), worker_state)
.into_iter()
Expand Down Expand Up @@ -815,7 +815,7 @@ mod tests {
worker_nodes.push(worker_node);
}

// Since no worker is active, the parallel unit count should be 0.
// Since no worker is active, the parallelism should be 0.
assert_cluster_manager(&cluster_manager, 0).await;

for worker_node in worker_nodes {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const TABLE_FRAGMENTS_CF_NAME: &str = "cf/table_fragments";
/// The parallelism for a `TableFragments`.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum TableParallelism {
/// This is when the system decides the parallelism, based on the available parallel units.
/// This is when the system decides the parallelism, based on the available worker parallelisms.
Adaptive,
/// We set this when the `TableFragments` parallelism is changed.
/// All fragments which are part of the `TableFragment` will have the same parallelism as this.
Expand Down Expand Up @@ -207,7 +207,7 @@ impl TableFragments {
}

/// Create a new `TableFragments` with state of `Initial`, with the status of actors set to
/// `Inactive` on the given parallel units.
/// `Inactive` on the given workers.
pub fn new(
table_id: TableId,
fragments: BTreeMap<FragmentId, Fragment>,
Expand Down
4 changes: 1 addition & 3 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1489,9 +1489,7 @@ impl DdlController {

let available_parallelism = cluster_info.parallelism();
if available_parallelism == 0 {
return Err(MetaError::unavailable(
"No available parallel units to schedule",
));
return Err(MetaError::unavailable("No available slots to schedule"));
}

let available_parallelism = NonZeroUsize::new(available_parallelism).unwrap();
Expand Down
58 changes: 39 additions & 19 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ impl ScaleController {
let mut actor_map = HashMap::new();
// Index for Fragment
let mut fragment_map = HashMap::new();
// Index for actor status, including actor's parallel unit
// Index for actor status, including actor's worker id
let mut actor_status = BTreeMap::new();
let mut fragment_state = HashMap::new();
let mut fragment_to_table = HashMap::new();
Expand Down Expand Up @@ -813,6 +813,8 @@ impl ScaleController {
.build_reschedule_context(&mut reschedules, options, table_parallelisms)
.await?;

// Here, the plan for both upstream and downstream of the NO_SHUFFLE Fragment should already have been populated.

// Index of actors to create/remove
// Fragment Id => ( Actor Id => Worker Id )
let (fragment_actors_to_remove, fragment_actors_to_create) =
Expand Down Expand Up @@ -840,7 +842,7 @@ impl ScaleController {

match fragment.distribution_type() {
FragmentDistributionType::Single => {
// Skip rebalance action for single distribution (always None)
// Skip re-balancing action for single distribution (always None)
fragment_actor_bitmap
.insert(fragment.fragment_id as FragmentId, Default::default());
}
Expand Down Expand Up @@ -893,12 +895,10 @@ impl ScaleController {
let fragment_actors_after_reschedule = fragment_actors_after_reschedule;

// In order to maintain consistency with the original structure, the upstream and downstream
// actors of NoShuffle need to be in the same parallel unit and hold the same virtual nodes,
// so for the actors after the upstream rebalancing, we need to find the parallel
// unit corresponding to each actor, and find the downstream actor corresponding to
// the parallel unit, and then copy the Bitmap to the corresponding actor. At the
// same time, we need to sort out the relationship between upstream and downstream
// actors
// actors of NoShuffle need to be in the same worker slot and hold the same virtual nodes,
// so for the actors after the upstream re-balancing, since we have sorted the actors of the same fragment by id on all workers,
// we can identify the corresponding upstream actor with NO_SHUFFLE.
// NOTE: There should be more asserts here to ensure correctness.
fn arrange_no_shuffle_relation(
ctx: &RescheduleContext,
fragment_id: &FragmentId,
Expand Down Expand Up @@ -971,9 +971,10 @@ impl ScaleController {
if let Some((root_fragment, root_actor_id)) = actor_group_map.get(actor_id) {
let root_bitmap = fragment_updated_bitmap
.get(root_fragment)
.and_then(|map| map.get(root_actor_id))
.unwrap()
.clone();
.expect("root fragment bitmap not found")
.get(root_actor_id)
.cloned()
.expect("root actor bitmap not found");

// Copy the bitmap
fragment_bitmap.insert(*actor_id, root_bitmap);
Expand Down Expand Up @@ -1489,27 +1490,28 @@ impl ScaleController {
for (fragment_id, WorkerReschedule { worker_actor_diff }) in reschedule {
let fragment = ctx.fragment_map.get(fragment_id).unwrap();

// Actor Id => Parallel Unit Id
// Actor Id => Worker Id
let mut actors_to_remove = BTreeMap::new();
let mut actors_to_create = BTreeMap::new();

// NOTE(important): The value needs to be a BTreeSet to ensure that the actors on the worker are sorted in ascending order.
let mut worker_to_actors = HashMap::new();

for actor in &fragment.actors {
let worker_id = ctx.actor_id_to_worker_id(&actor.actor_id).unwrap();
worker_to_actors
.entry(worker_id)
.or_insert(vec![])
.push(actor.actor_id);
.or_insert(BTreeSet::new())
.insert(actor.actor_id as ActorId);
}

let decreased_actor_count = worker_actor_diff
.iter()
.filter(|(_, change)| change.is_negative())
.map(|(worker_id, change)| (worker_id, change.unsigned_abs()));

for (removed, n) in decreased_actor_count {
if let Some(actor_ids) = worker_to_actors.get(removed) {
for (worker_id, n) in decreased_actor_count {
if let Some(actor_ids) = worker_to_actors.get(worker_id) {
assert!(actor_ids.len() >= n);

let removed_actors: Vec<_> = actor_ids
Expand All @@ -1519,7 +1521,7 @@ impl ScaleController {
.collect();

for actor in removed_actors {
actors_to_remove.insert(actor, *removed);
actors_to_remove.insert(actor, *worker_id);
}
}
}
Expand All @@ -1528,7 +1530,7 @@ impl ScaleController {
.iter()
.filter(|(_, change)| change.is_positive());

for (created_worker, n) in increased_actor_count {
for (worker, n) in increased_actor_count {
for _ in 0..*n {
let id = match self.env.id_gen_manager() {
IdGenManagerImpl::Kv(mgr) => {
Expand All @@ -1540,7 +1542,7 @@ impl ScaleController {
}
};

actors_to_create.insert(id, *created_worker);
actors_to_create.insert(id, *worker);
}
}

Expand All @@ -1553,6 +1555,24 @@ impl ScaleController {
}
}

// sanity checking
for actors_to_remove in fragment_actors_to_remove.values() {
for actor_id in actors_to_remove.keys() {
let actor = ctx.actor_map.get(actor_id).unwrap();
for dispatcher in &actor.dispatcher {
if DispatcherType::NoShuffle == dispatcher.get_type().unwrap() {
let downstream_actor_id = dispatcher.downstream_actor_id.iter().exactly_one().expect("there should be only one downstream actor id in NO_SHUFFLE dispatcher");

let _should_exists = fragment_actors_to_remove
.get(&(dispatcher.dispatcher_id as FragmentId))
.expect("downstream fragment of NO_SHUFFLE relation should be in the removing map")
.get(downstream_actor_id)
.expect("downstream actor of NO_SHUFFLE relation should be in the removing map");
}
}
}
}

Ok((fragment_actors_to_remove, fragment_actors_to_create))
}

Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/stream_graph/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ impl ExternalChange {
}
}

/// The parallel unit location of actors.
/// The worker slot location of actors.
type ActorLocations = BTreeMap<GlobalActorId, WorkerSlotId>;

/// The actual mutable state of building an actor graph.
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ pub struct StreamFragmentGraph {
dependent_table_ids: HashSet<TableId>,

/// The default parallelism of the job, specified by the `STREAMING_PARALLELISM` session
/// variable. If not specified, all active parallel units will be used.
/// variable. If not specified, all active worker slots will be used.
specified_parallelism: Option<NonZeroUsize>,
}

Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,7 @@ mod tests {
table_id: TableId,
fragments: BTreeMap<FragmentId, Fragment>,
) -> MetaResult<()> {
// Create fake locations where all actors are scheduled to the same parallel unit.
// Create fake locations where all actors are scheduled to the same worker.
let locations = {
let StreamingClusterInfo { worker_nodes, .. }: StreamingClusterInfo = self
.global_stream_manager
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use risingwave_common::util::iter_util::ZipEqFast;
///
/// This brings a problem when scaling in after a while. Some partitions may be reassigned back to
/// the current executor, while the cache entries of these partitions are still unevicted. So it's
/// possible that these entries have been updated by other executors on other parallel units, and
/// possible that these entries have been updated by other executors on other workers, and
/// the content is now stale! The executor must evict these entries which are not in the
/// **previous** partition before further processing.
pub(super) fn cache_may_stale(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ async fn nexmark_q4_materialize_agg_cache_invalidation() -> Result<()> {
let id = fragment.id();
let workers = fragment.all_worker_count().into_keys().collect_vec();

// Let parallel unit 0 handle all groups.
// Let worker slot 0 handle all groups.
cluster
.reschedule(format!(
"{}:[{}]",
Expand All @@ -242,8 +242,8 @@ async fn nexmark_q4_materialize_agg_cache_invalidation() -> Result<()> {
.assert_result_ne(result_1)
.assert_result_ne(RESULT);

// Let parallel unit 0 handle all groups again.
// Note that there're only 5 groups, so if the parallel unit 0 doesn't invalidate the cache
// Let worker slot 0 handle all groups again.
// Note that there're only 5 groups, so if the worker slot 0 doesn't invalidate the cache
// correctly, it will yield the wrong result.
cluster
.reschedule(format!(
Expand Down

0 comments on commit 3a75734

Please sign in to comment.