From 3493eaabea345a8b9376f911a3ce5b2ee44acb0b Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 2 Jul 2024 15:35:25 +0800 Subject: [PATCH] add notes Signed-off-by: xxchan --- proto/meta.proto | 1 + src/meta/src/stream/scale.rs | 27 +++++++++++++++++++++++---- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/proto/meta.proto b/proto/meta.proto index e4068a0b8cd5..778232c32910 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -559,6 +559,7 @@ message GetReschedulePlanResponse { bool success = 3; } +// This is used by `risectl` service ScaleService { rpc GetClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse); rpc Reschedule(RescheduleRequest) returns (RescheduleResponse); diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 673aa5b29d2b..7d05f2b88142 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -867,7 +867,10 @@ impl ScaleController { Ok(()) } - // Results are the generated reschedule plan and the changes that need to be updated to the meta store. + /// Results are the generated reschedule plan and the changes that need to be updated to the meta store. + /// + /// XXX: Why this is also used in recovery? + /// Should other modules use only `reschedule_actors_impl`, but not this internal method? pub(crate) async fn prepare_reschedule_command( &self, mut reschedules: HashMap, @@ -2667,8 +2670,8 @@ impl ScaleController { } } -// At present, for table level scaling, we use the strategy TableResizePolicy. -// Currently, this is used as an internal interface, so it won’t be included in Protobuf for the time being. +/// At present, for table level scaling, we use the strategy TableResizePolicy. +/// Currently, this is used as an internal interface, so it won’t be included in Protobuf for the time being. pub struct TableResizePolicy { pub(crate) worker_ids: BTreeSet, pub(crate) table_parallelisms: HashMap, @@ -2683,6 +2686,11 @@ impl GlobalStreamManager { self.scale_controller.reschedule_lock.write().await } + /// The entrypoint of rescheduling actors. + /// + /// Used by: + /// - `risectl scale` (`risingwave_meta_service::scale_service::ScaleService`) + /// - `ALTER [TABLE | INDEX | MATERIALIZED VIEW | SINK] SET PARALLELISM` pub async fn reschedule_actors( &self, reschedules: HashMap, @@ -2775,6 +2783,14 @@ impl GlobalStreamManager { Ok(()) } + /// When new worker nodes joined, or the parallelism of existing worker nodes changed, + /// examines if there are any jobs can be scaled, and scales them if found. + /// + /// This method will iterate over all `CREATED` jobs, and can be repeatedly called. + /// + /// Returns + /// - `Ok(false)` if no jobs can be scaled; + /// - `Ok(true)` if some jobs are scaled, and it is possible that there are more jobs can be scaled. async fn trigger_parallelism_control(&self) -> MetaResult { let background_streaming_jobs = self .metadata_manager @@ -2896,7 +2912,9 @@ impl GlobalStreamManager { for batch in batches { let parallelisms: HashMap<_, _> = batch.into_iter().collect(); - + // `table_parallelisms` contains ALL created jobs. + // We rely on `generate_table_resize_plan` to check if there are + // any jobs that can be scaled. let plan = self .scale_controller .generate_table_resize_plan(TableResizePolicy { @@ -2933,6 +2951,7 @@ impl GlobalStreamManager { Ok(true) } + /// Handles notification of worker node activation and deletion, and triggers parallelism control. async fn run(&self, mut shutdown_rx: Receiver<()>) { tracing::info!("starting automatic parallelism control monitor");