diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 12e298648f5db..1958ab03a166d 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -22,7 +22,6 @@ use futures::stream::FuturesUnordered; use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_common::hash::ParallelUnitId; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_pb::common::ActorInfo; use risingwave_pb::meta::PausedReason; @@ -35,7 +34,7 @@ use risingwave_pb::stream_service::{ use thiserror_ext::AsReport; use tokio::sync::oneshot; use tokio_retry::strategy::{jitter, ExponentialBackoff}; -use tracing::{debug, info, warn, Instrument}; +use tracing::{debug, warn, Instrument}; use uuid::Uuid; use super::TracedEpoch; @@ -48,7 +47,7 @@ use crate::barrier::state::BarrierManagerState; use crate::barrier::{Command, GlobalBarrierManagerContext}; use crate::controller::catalog::ReleaseContext; use crate::manager::{MetadataManager, WorkerId}; -use crate::model::{MetadataModel, MigrationPlan, TableFragments}; +use crate::model::{MetadataModel, MigrationPlan, TableFragments, TableParallelism}; use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy}; use crate::MetaResult; @@ -617,45 +616,53 @@ impl GlobalBarrierManagerContext { async fn scale_actors_v1(&self, info: &InflightActorInfo) -> MetaResult { let mgr = self.metadata_manager.as_v1_ref(); - debug!("start scaling-in offline actors."); + debug!("start resetting actors distribution"); - let prev_worker_parallel_units = mgr.fragment_manager.all_worker_parallel_units().await; - - let curr_worker_parallel_units: HashMap> = info + let current_parallelism = info .node_map - .iter() - .map(|(worker_id, worker_node)| { - ( - *worker_id, - worker_node - .parallel_units - .iter() - .map(|parallel_unit| parallel_unit.id) - .collect(), - ) - }) - .collect(); - - // todo: maybe we can only check the reduced workers - if curr_worker_parallel_units == prev_worker_parallel_units { - debug!("no changed workers, skipping."); - return Ok(false); + .values() + .flat_map(|worker_node| worker_node.parallel_units.iter()) + .count(); + + /// We infer the new parallelism strategy based on the prior level of parallelism of the table. + /// If the parallelism strategy is Fixed or Auto, we won't make any modifications. + /// For Custom, we'll assess the parallelism of the core fragment; + /// if the parallelism is higher than the currently available parallelism, we'll set it to Auto. + /// If it's lower, we'll set it to Fixed. + fn derive_target_parallelism_for_custom( + current_parallelism: usize, + table: &TableFragments, + ) -> TableParallelism { + let derive_from_fragment = table.mview_fragment().or_else(|| table.sink_fragment()); + + if let TableParallelism::Custom = &table.assigned_parallelism { + if let Some(fragment) = derive_from_fragment { + let fragment_parallelism = fragment.get_actors().len(); + if fragment_parallelism >= current_parallelism { + TableParallelism::Auto + } else { + TableParallelism::Fixed(fragment_parallelism) + } + } else { + TableParallelism::Auto + } + } else { + table.assigned_parallelism + } } - info!("parallel unit has changed, triggering a forced reschedule."); - - debug!( - "previous worker parallel units {:?}, current worker parallel units {:?}", - prev_worker_parallel_units, curr_worker_parallel_units - ); - - let table_parallelisms = { + let table_parallelisms: HashMap<_, _> = { let guard = mgr.fragment_manager.get_fragment_read_guard().await; guard .table_fragments() .iter() - .map(|(table_id, table)| (table_id.table_id, table.assigned_parallelism)) + .map(|(table_id, table)| { + let target_parallelism = + derive_target_parallelism_for_custom(current_parallelism, table); + + (table_id.table_id, target_parallelism) + }) .collect() }; @@ -682,10 +689,20 @@ impl GlobalBarrierManagerContext { .unwrap() .generate_table_resize_plan(TableResizePolicy { worker_ids: schedulable_worker_ids, - table_parallelisms, + table_parallelisms: table_parallelisms.clone(), }) .await?; + let table_parallelisms: HashMap<_, _> = table_parallelisms + .into_iter() + .map(|(table_id, parallelism)| { + debug_assert_ne!(parallelism, TableParallelism::Custom); + (TableId::new(table_id), parallelism) + }) + .collect(); + + let mut compared_table_parallelisms = table_parallelisms.clone(); + let (reschedule_fragment, applied_reschedules) = self .scale_controller .as_ref() @@ -695,15 +712,18 @@ impl GlobalBarrierManagerContext { RescheduleOptions { resolve_no_shuffle_upstream: true, }, - None, + Some(&mut compared_table_parallelisms), ) .await?; + // Because custom parallelism doesn't exist, this function won't result in a no-shuffle rewrite for table parallelisms. + debug_assert_eq!(compared_table_parallelisms, table_parallelisms); + if let Err(e) = self .scale_controller .as_ref() .unwrap() - .post_apply_reschedule(&reschedule_fragment, &Default::default()) + .post_apply_reschedule(&reschedule_fragment, &table_parallelisms) .await { tracing::error!( diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index f71ea1567725c..559cef397c250 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -1316,7 +1316,9 @@ impl FragmentManager { for (table_id, parallelism) in table_parallelism_assignment { if let Some(mut table) = table_fragments.get_mut(table_id) { - table.assigned_parallelism = parallelism; + if table.assigned_parallelism != parallelism { + table.assigned_parallelism = parallelism; + } } }