diff --git a/proto/meta.proto b/proto/meta.proto index 2023ad432a438..ffe0db3710ce8 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -584,6 +584,7 @@ message GetServerlessStreamingJobsStatusResponse { repeated Status streaming_job_statuses = 1; } +// This is used by `risectl` service ScaleService { rpc GetClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse); rpc Reschedule(RescheduleRequest) returns (RescheduleResponse); diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 2c65f467b4a98..bbac53493b880 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -75,7 +75,8 @@ pub struct Reschedule { /// The downstream fragments of this fragment. pub downstream_fragment_ids: Vec, - /// Reassigned splits for source actors + /// Reassigned splits for source actors. + /// It becomes the `actor_splits` in [`UpdateMutation`]. pub actor_splits: HashMap>, /// Whether this fragment is injectable. The injectable means whether the fragment contains diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 1ee367a78022d..2747d63a37fae 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -721,7 +721,7 @@ impl GlobalBarrierManagerContext { } else { let (reschedule_fragment, _) = self .scale_controller - .prepare_reschedule_command( + .analyze_reschedule_plan( plan, RescheduleOptions { resolve_no_shuffle_upstream: true, @@ -871,7 +871,7 @@ impl GlobalBarrierManagerContext { let (reschedule_fragment, applied_reschedules) = self .scale_controller - .prepare_reschedule_command( + .analyze_reschedule_plan( plan, RescheduleOptions { resolve_no_shuffle_upstream: true, diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 010a44589c872..61705205367b5 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -799,8 +799,18 @@ impl ScaleController { Ok(()) } - // Results are the generated reschedule plan and the changes that need to be updated to the meta store. - pub(crate) async fn prepare_reschedule_command( + /// From the high-level [`WorkerReschedule`] to the low-level reschedule plan [`Reschedule`]. + /// + /// Returns `(reschedule_fragment, applied_reschedules)` + /// - `reschedule_fragment`: the generated reschedule plan + /// - `applied_reschedules`: the changes that need to be updated to the meta store (`pre_apply_reschedules`, only for V1). + /// + /// In [normal process of scaling](`GlobalStreamManager::reschedule_actors_impl`), we use the returned values to + /// build a [`Command::RescheduleFragment`], which will then flows through the barrier mechanism to perform scaling. + /// Meta store is updated after the barrier is collected. + /// + /// During recovery, we don't need the barrier mechanism, and can directly use the returned values to update meta. + pub(crate) async fn analyze_reschedule_plan( &self, mut reschedules: HashMap, options: RescheduleOptions, @@ -812,6 +822,7 @@ impl ScaleController { let ctx = self .build_reschedule_context(&mut reschedules, options, table_parallelisms) .await?; + let reschedules = reschedules; // Here, the plan for both upstream and downstream of the NO_SHUFFLE Fragment should already have been populated. @@ -2136,7 +2147,7 @@ impl ScaleController { WorkerReschedule { worker_actor_diff } } - pub fn build_no_shuffle_relation_index( + fn build_no_shuffle_relation_index( actor_map: &HashMap, no_shuffle_source_fragment_ids: &mut HashSet, no_shuffle_target_fragment_ids: &mut HashSet, @@ -2164,7 +2175,7 @@ impl ScaleController { } } - pub fn build_fragment_dispatcher_index( + fn build_fragment_dispatcher_index( actor_map: &HashMap, fragment_dispatcher_map: &mut HashMap>, ) { @@ -2303,8 +2314,8 @@ impl ScaleController { } } -// At present, for table level scaling, we use the strategy TableResizePolicy. -// Currently, this is used as an internal interface, so it won’t be included in Protobuf for the time being. +/// At present, for table level scaling, we use the strategy `TableResizePolicy`. +/// Currently, this is used as an internal interface, so it won’t be included in Protobuf. pub struct TableResizePolicy { pub(crate) worker_ids: BTreeSet, pub(crate) table_parallelisms: HashMap, @@ -2319,6 +2330,15 @@ impl GlobalStreamManager { self.scale_controller.reschedule_lock.write().await } + /// The entrypoint of rescheduling actors. + /// + /// Used by: + /// - The directly exposed low-level API `risingwave_meta_service::scale_service::ScaleService` + /// * `risectl scale resize` (high-level) + /// * `risectl meta reschedule` (low-level) + /// - High-level parallelism control API + /// * manual `ALTER [TABLE | INDEX | MATERIALIZED VIEW | SINK] SET PARALLELISM` + /// * automatic parallelism control for [`TableParallelism::Adaptive`] when worker nodes changed pub async fn reschedule_actors( &self, reschedules: HashMap, @@ -2350,7 +2370,7 @@ impl GlobalStreamManager { let (reschedule_fragment, applied_reschedules) = self .scale_controller - .prepare_reschedule_command(reschedules, options, table_parallelism.as_mut()) + .analyze_reschedule_plan(reschedules, options, table_parallelism.as_mut()) .await?; tracing::debug!("reschedule plan: {:?}", reschedule_fragment); @@ -2411,6 +2431,14 @@ impl GlobalStreamManager { Ok(()) } + /// When new worker nodes joined, or the parallelism of existing worker nodes changed, + /// examines if there are any jobs can be scaled, and scales them if found. + /// + /// This method will iterate over all `CREATED` jobs, and can be repeatedly called. + /// + /// Returns + /// - `Ok(false)` if no jobs can be scaled; + /// - `Ok(true)` if some jobs are scaled, and it is possible that there are more jobs can be scaled. async fn trigger_parallelism_control(&self) -> MetaResult { let background_streaming_jobs = self .metadata_manager @@ -2532,7 +2560,9 @@ impl GlobalStreamManager { for batch in batches { let parallelisms: HashMap<_, _> = batch.into_iter().collect(); - + // `table_parallelisms` contains ALL created jobs. + // We rely on `generate_table_resize_plan` to check if there are + // any jobs that can be scaled. let plan = self .scale_controller .generate_table_resize_plan(TableResizePolicy { @@ -2569,6 +2599,7 @@ impl GlobalStreamManager { Ok(true) } + /// Handles notification of worker node activation and deletion, and triggers parallelism control. async fn run(&self, mut shutdown_rx: Receiver<()>) { tracing::info!("starting automatic parallelism control monitor");