Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: convert custom parallelism to auto/fixed in recovery loop in meta #14871

Merged
merged 2 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 56 additions & 36 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use futures::stream::FuturesUnordered;
use futures::TryStreamExt;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::ParallelUnitId;
use risingwave_hummock_sdk::compaction_group::StateTableId;
use risingwave_pb::common::ActorInfo;
use risingwave_pb::meta::PausedReason;
Expand All @@ -35,7 +34,7 @@ use risingwave_pb::stream_service::{
use thiserror_ext::AsReport;
use tokio::sync::oneshot;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tracing::{debug, info, warn, Instrument};
use tracing::{debug, warn, Instrument};
use uuid::Uuid;

use super::TracedEpoch;
Expand All @@ -48,7 +47,7 @@ use crate::barrier::state::BarrierManagerState;
use crate::barrier::{Command, GlobalBarrierManagerContext};
use crate::controller::catalog::ReleaseContext;
use crate::manager::{MetadataManager, WorkerId};
use crate::model::{MetadataModel, MigrationPlan, TableFragments};
use crate::model::{MetadataModel, MigrationPlan, TableFragments, TableParallelism};
use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy};
use crate::MetaResult;

Expand Down Expand Up @@ -617,45 +616,53 @@ impl GlobalBarrierManagerContext {

async fn scale_actors_v1(&self, info: &InflightActorInfo) -> MetaResult<bool> {
let mgr = self.metadata_manager.as_v1_ref();
debug!("start scaling-in offline actors.");
debug!("start resetting actors distribution");

let prev_worker_parallel_units = mgr.fragment_manager.all_worker_parallel_units().await;

let curr_worker_parallel_units: HashMap<WorkerId, HashSet<ParallelUnitId>> = info
let current_parallelism = info
.node_map
.iter()
.map(|(worker_id, worker_node)| {
(
*worker_id,
worker_node
.parallel_units
.iter()
.map(|parallel_unit| parallel_unit.id)
.collect(),
)
})
.collect();

// todo: maybe we can only check the reduced workers
if curr_worker_parallel_units == prev_worker_parallel_units {
debug!("no changed workers, skipping.");
return Ok(false);
.values()
.flat_map(|worker_node| worker_node.parallel_units.iter())
.count();

/// We infer the new parallelism strategy based on the prior level of parallelism of the table.
/// If the parallelism strategy is Fixed or Auto, we won't make any modifications.
/// For Custom, we'll assess the parallelism of the core fragment;
/// if the parallelism is higher than the currently available parallelism, we'll set it to Auto.
/// If it's lower, we'll set it to Fixed.
fn derive_target_parallelism_for_custom(
current_parallelism: usize,
table: &TableFragments,
) -> TableParallelism {
let derive_from_fragment = table.mview_fragment().or_else(|| table.sink_fragment());

if let TableParallelism::Custom = &table.assigned_parallelism {
if let Some(fragment) = derive_from_fragment {
let fragment_parallelism = fragment.get_actors().len();
if fragment_parallelism >= current_parallelism {
TableParallelism::Auto
} else {
TableParallelism::Fixed(fragment_parallelism)
}
} else {
TableParallelism::Auto
}
} else {
table.assigned_parallelism
}
}

info!("parallel unit has changed, triggering a forced reschedule.");

debug!(
"previous worker parallel units {:?}, current worker parallel units {:?}",
prev_worker_parallel_units, curr_worker_parallel_units
);

let table_parallelisms = {
let table_parallelisms: HashMap<_, _> = {
let guard = mgr.fragment_manager.get_fragment_read_guard().await;

guard
.table_fragments()
.iter()
.map(|(table_id, table)| (table_id.table_id, table.assigned_parallelism))
.map(|(table_id, table)| {
let target_parallelism =
derive_target_parallelism_for_custom(current_parallelism, table);

(table_id.table_id, target_parallelism)
})
.collect()
};

Expand All @@ -682,10 +689,20 @@ impl GlobalBarrierManagerContext {
.unwrap()
.generate_table_resize_plan(TableResizePolicy {
worker_ids: schedulable_worker_ids,
table_parallelisms,
table_parallelisms: table_parallelisms.clone(),
})
.await?;

let table_parallelisms: HashMap<_, _> = table_parallelisms
.into_iter()
.map(|(table_id, parallelism)| {
debug_assert_ne!(parallelism, TableParallelism::Custom);
(TableId::new(table_id), parallelism)
})
.collect();

let mut compared_table_parallelisms = table_parallelisms.clone();

let (reschedule_fragment, applied_reschedules) = self
.scale_controller
.as_ref()
Expand All @@ -695,15 +712,18 @@ impl GlobalBarrierManagerContext {
RescheduleOptions {
resolve_no_shuffle_upstream: true,
},
None,
Some(&mut compared_table_parallelisms),
)
.await?;

// Because custom parallelism doesn't exist, this function won't result in a no-shuffle rewrite for table parallelisms.
debug_assert_eq!(compared_table_parallelisms, table_parallelisms);

if let Err(e) = self
.scale_controller
.as_ref()
.unwrap()
.post_apply_reschedule(&reschedule_fragment, &Default::default())
.post_apply_reschedule(&reschedule_fragment, &table_parallelisms)
.await
{
tracing::error!(
Expand Down
4 changes: 3 additions & 1 deletion src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,9 @@ impl FragmentManager {

for (table_id, parallelism) in table_parallelism_assignment {
if let Some(mut table) = table_fragments.get_mut(table_id) {
table.assigned_parallelism = parallelism;
if table.assigned_parallelism != parallelism {
table.assigned_parallelism = parallelism;
}
}
}

Expand Down
Loading