Skip to content

Commit

Permalink
feat: convert custom parallelism to auto/fixed in recovery loop in me…
Browse files Browse the repository at this point in the history
…ta (#14871)
  • Loading branch information
shanicky authored Jan 30, 2024
1 parent 8cfc7f1 commit 11b02eb
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 37 deletions.
92 changes: 56 additions & 36 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use futures::stream::FuturesUnordered;
use futures::TryStreamExt;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::ParallelUnitId;
use risingwave_hummock_sdk::compaction_group::StateTableId;
use risingwave_pb::common::ActorInfo;
use risingwave_pb::meta::PausedReason;
Expand All @@ -35,7 +34,7 @@ use risingwave_pb::stream_service::{
use thiserror_ext::AsReport;
use tokio::sync::oneshot;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tracing::{debug, info, warn, Instrument};
use tracing::{debug, warn, Instrument};
use uuid::Uuid;

use super::TracedEpoch;
Expand All @@ -48,7 +47,7 @@ use crate::barrier::state::BarrierManagerState;
use crate::barrier::{Command, GlobalBarrierManagerContext};
use crate::controller::catalog::ReleaseContext;
use crate::manager::{MetadataManager, WorkerId};
use crate::model::{MetadataModel, MigrationPlan, TableFragments};
use crate::model::{MetadataModel, MigrationPlan, TableFragments, TableParallelism};
use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy};
use crate::MetaResult;

Expand Down Expand Up @@ -617,45 +616,53 @@ impl GlobalBarrierManagerContext {

async fn scale_actors_v1(&self, info: &InflightActorInfo) -> MetaResult<bool> {
let mgr = self.metadata_manager.as_v1_ref();
debug!("start scaling-in offline actors.");
debug!("start resetting actors distribution");

let prev_worker_parallel_units = mgr.fragment_manager.all_worker_parallel_units().await;

let curr_worker_parallel_units: HashMap<WorkerId, HashSet<ParallelUnitId>> = info
let current_parallelism = info
.node_map
.iter()
.map(|(worker_id, worker_node)| {
(
*worker_id,
worker_node
.parallel_units
.iter()
.map(|parallel_unit| parallel_unit.id)
.collect(),
)
})
.collect();

// todo: maybe we can only check the reduced workers
if curr_worker_parallel_units == prev_worker_parallel_units {
debug!("no changed workers, skipping.");
return Ok(false);
.values()
.flat_map(|worker_node| worker_node.parallel_units.iter())
.count();

/// We infer the new parallelism strategy based on the prior level of parallelism of the table.
/// If the parallelism strategy is Fixed or Auto, we won't make any modifications.
/// For Custom, we'll assess the parallelism of the core fragment;
/// if the parallelism is higher than the currently available parallelism, we'll set it to Auto.
/// If it's lower, we'll set it to Fixed.
fn derive_target_parallelism_for_custom(
current_parallelism: usize,
table: &TableFragments,
) -> TableParallelism {
let derive_from_fragment = table.mview_fragment().or_else(|| table.sink_fragment());

if let TableParallelism::Custom = &table.assigned_parallelism {
if let Some(fragment) = derive_from_fragment {
let fragment_parallelism = fragment.get_actors().len();
if fragment_parallelism >= current_parallelism {
TableParallelism::Auto
} else {
TableParallelism::Fixed(fragment_parallelism)
}
} else {
TableParallelism::Auto
}
} else {
table.assigned_parallelism
}
}

info!("parallel unit has changed, triggering a forced reschedule.");

debug!(
"previous worker parallel units {:?}, current worker parallel units {:?}",
prev_worker_parallel_units, curr_worker_parallel_units
);

let table_parallelisms = {
let table_parallelisms: HashMap<_, _> = {
let guard = mgr.fragment_manager.get_fragment_read_guard().await;

guard
.table_fragments()
.iter()
.map(|(table_id, table)| (table_id.table_id, table.assigned_parallelism))
.map(|(table_id, table)| {
let target_parallelism =
derive_target_parallelism_for_custom(current_parallelism, table);

(table_id.table_id, target_parallelism)
})
.collect()
};

Expand All @@ -682,10 +689,20 @@ impl GlobalBarrierManagerContext {
.unwrap()
.generate_table_resize_plan(TableResizePolicy {
worker_ids: schedulable_worker_ids,
table_parallelisms,
table_parallelisms: table_parallelisms.clone(),
})
.await?;

let table_parallelisms: HashMap<_, _> = table_parallelisms
.into_iter()
.map(|(table_id, parallelism)| {
debug_assert_ne!(parallelism, TableParallelism::Custom);
(TableId::new(table_id), parallelism)
})
.collect();

let mut compared_table_parallelisms = table_parallelisms.clone();

let (reschedule_fragment, applied_reschedules) = self
.scale_controller
.as_ref()
Expand All @@ -695,15 +712,18 @@ impl GlobalBarrierManagerContext {
RescheduleOptions {
resolve_no_shuffle_upstream: true,
},
None,
Some(&mut compared_table_parallelisms),
)
.await?;

// Because custom parallelism doesn't exist, this function won't result in a no-shuffle rewrite for table parallelisms.
debug_assert_eq!(compared_table_parallelisms, table_parallelisms);

if let Err(e) = self
.scale_controller
.as_ref()
.unwrap()
.post_apply_reschedule(&reschedule_fragment, &Default::default())
.post_apply_reschedule(&reschedule_fragment, &table_parallelisms)
.await
{
tracing::error!(
Expand Down
4 changes: 3 additions & 1 deletion src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,9 @@ impl FragmentManager {

for (table_id, parallelism) in table_parallelism_assignment {
if let Some(mut table) = table_fragments.get_mut(table_id) {
table.assigned_parallelism = parallelism;
if table.assigned_parallelism != parallelism {
table.assigned_parallelism = parallelism;
}
}
}

Expand Down

0 comments on commit 11b02eb

Please sign in to comment.