diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 0f8ecd027dcd0..61a35629f6c69 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -368,15 +368,11 @@ impl GlobalBarrierManagerContext { // Resolve actor info for recovery. If there's no actor to recover, most of the // following steps will be no-op, while the compute nodes will still be reset. let mut info = if !self.env.opts.disable_automatic_parallelism_control { - let info = self.resolve_actor_info().await; - let scaled = self.scale_actors(&info).await.inspect_err(|err| { + self.scale_actors().await.inspect_err(|err| { warn!(error = %err.as_report(), "scale actors failed"); })?; - if scaled { - self.resolve_actor_info().await - } else { - info - } + + self.resolve_actor_info().await } else { // Migrate actors in expired CN to newly joined one. self.migrate_actors().await.inspect_err(|err| { @@ -596,14 +592,14 @@ impl GlobalBarrierManagerContext { Ok(info) } - async fn scale_actors(&self, info: &InflightActorInfo) -> MetaResult { + async fn scale_actors(&self) -> MetaResult<()> { match &self.metadata_manager { - MetadataManager::V1(_) => self.scale_actors_v1(info).await, - MetadataManager::V2(_) => self.scale_actors_v2(info).await, + MetadataManager::V1(_) => self.scale_actors_v1().await, + MetadataManager::V2(_) => self.scale_actors_v2().await, } } - async fn scale_actors_v2(&self, _info: &InflightActorInfo) -> MetaResult { + async fn scale_actors_v2(&self) -> MetaResult<()> { let mgr = self.metadata_manager.as_v2_ref(); debug!("start resetting actors distribution"); @@ -690,16 +686,18 @@ impl GlobalBarrierManagerContext { } debug!("scaling-in actors succeed."); - Ok(true) + Ok(()) } - async fn scale_actors_v1(&self, info: &InflightActorInfo) -> MetaResult { + async fn scale_actors_v1(&self) -> MetaResult<()> { + let info = self.resolve_actor_info().await; + let mgr = self.metadata_manager.as_v1_ref(); debug!("start resetting actors distribution"); if info.actor_location_map.is_empty() { debug!("empty cluster, skipping"); - return Ok(true); + return Ok(()); } let current_parallelism = info @@ -821,7 +819,7 @@ impl GlobalBarrierManagerContext { } debug!("scaling-in actors succeed."); - Ok(true) + Ok(()) } /// This function will generate a migration plan, which includes the mapping for all expired and diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs index 042993bd5bd4c..622a3efa6b564 100644 --- a/src/meta/src/controller/cluster.rs +++ b/src/meta/src/controller/cluster.rs @@ -109,7 +109,11 @@ impl ClusterController { let meta_store = env .sql_meta_store() .expect("sql meta store is not initialized"); - let inner = ClusterControllerInner::new(meta_store.conn).await?; + let inner = ClusterControllerInner::new( + meta_store.conn, + env.opts.disable_automatic_parallelism_control, + ) + .await?; Ok(Self { env, max_heartbeat_interval, @@ -460,13 +464,17 @@ pub struct ClusterControllerInner { /// Record for tracking available machine ids, one is available. available_transactional_ids: VecDeque, worker_extra_info: HashMap, + disable_automatic_parallelism_control: bool, } impl ClusterControllerInner { pub const MAX_WORKER_REUSABLE_ID_BITS: usize = 10; pub const MAX_WORKER_REUSABLE_ID_COUNT: usize = 1 << Self::MAX_WORKER_REUSABLE_ID_BITS; - pub async fn new(db: DatabaseConnection) -> MetaResult { + pub async fn new( + db: DatabaseConnection, + disable_automatic_parallelism_control: bool, + ) -> MetaResult { let workers: Vec<(WorkerId, Option)> = Worker::find() .select_only() .column(worker::Column::WorkerId) @@ -492,6 +500,7 @@ impl ClusterControllerInner { db, available_transactional_ids, worker_extra_info, + disable_automatic_parallelism_control, }) } @@ -598,14 +607,25 @@ impl ClusterControllerInner { match new_parallelism.cmp(¤t_parallelism.len()) { Ordering::Less => { - // Warn and keep the original parallelism if the worker registered with a - // smaller parallelism. - tracing::warn!( - "worker {} parallelism is less than current, current is {}, but received {}", - worker.worker_id, - current_parallelism.len(), - new_parallelism - ); + if !self.disable_automatic_parallelism_control { + // Handing over to the subsequent recovery loop for a forced reschedule. + tracing::info!( + "worker {} parallelism reduced from {} to {}", + worker.worker_id, + current_parallelism.len(), + new_parallelism + ); + current_parallelism.truncate(new_parallelism); + } else { + // Warn and keep the original parallelism if the worker registered with a + // smaller parallelism. + tracing::warn!( + "worker {} parallelism is less than current, current is {}, but received {}", + worker.worker_id, + current_parallelism.len(), + new_parallelism + ); + } } Ordering::Greater => { tracing::info!( diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 4a9d3f3a992e5..436c802666a3a 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -24,7 +24,7 @@ use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, Sink, use risingwave_meta_model_v2::{ actor, actor_dispatcher, fragment, object, sink, streaming_job, ActorId, ConnectorSplits, ExprContext, FragmentId, FragmentVnodeMapping, I32Array, JobStatus, ObjectId, SinkId, SourceId, - StreamNode, TableId, VnodeBitmap, WorkerId, + StreamNode, StreamingParallelism, TableId, VnodeBitmap, WorkerId, }; use risingwave_pb::common::PbParallelUnit; use risingwave_pb::meta::subscribe_response::{ @@ -54,6 +54,7 @@ use crate::controller::utils::{ PartialActorLocation, PartialFragmentStateTables, }; use crate::manager::{ActorInfos, LocalNotification}; +use crate::model::TableParallelism; use crate::stream::SplitAssignment; use crate::{MetaError, MetaResult}; @@ -311,6 +312,7 @@ impl CatalogController { HashMap>, )>, parallel_units_map: &HashMap, + parallelism: StreamingParallelism, ) -> MetaResult { let mut pb_fragments = HashMap::new(); let mut pb_actor_splits = HashMap::new(); @@ -333,8 +335,13 @@ impl CatalogController { actor_status: pb_actor_status, actor_splits: pb_actor_splits, ctx: Some(ctx.unwrap_or_default()), - // TODO(peng): fix this for model v2 - parallelism: None, + parallelism: Some( + match parallelism { + StreamingParallelism::Adaptive => TableParallelism::Adaptive, + StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _), + } + .into(), + ), }; Ok(table_fragments) @@ -631,6 +638,7 @@ impl CatalogController { job_info.timezone.map(|tz| PbStreamContext { timezone: tz }), fragment_info, ¶llel_units_map, + job_info.parallelism.clone(), ) } @@ -733,6 +741,7 @@ impl CatalogController { job.timezone.map(|tz| PbStreamContext { timezone: tz }), fragment_info, ¶llel_units_map, + job.parallelism.clone(), )?, ); } diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 2bd711b0a9d31..59164dea02e68 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -30,6 +30,7 @@ use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableId; use risingwave_common::hash::{ActorMapping, ParallelUnitId, VirtualNode}; use risingwave_common::util::iter_util::ZipEqDebug; +use risingwave_meta_model_v2::StreamingParallelism; use risingwave_pb::common::{ActorInfo, ParallelUnit, WorkerNode}; use risingwave_pb::meta::get_reschedule_plan_request::{Policy, StableResizePolicy}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; @@ -1365,9 +1366,6 @@ impl ScaleController { } for created_parallel_unit_id in added_parallel_units { - // self.env.sql_id_gen_manager_ref().map(|id_gen| id_gen.actors.generate_interval(1)) - // - let id = match self.env.sql_id_gen_manager_ref() { None => { self.env @@ -2488,8 +2486,64 @@ impl GlobalStreamManager { ) .await?; } - MetadataManager::V2(_mgr) => { - todo!() + MetadataManager::V2(mgr) => { + let table_parallelisms: HashMap<_, _> = { + let streaming_parallelisms = mgr + .catalog_controller + .get_all_streaming_parallelisms() + .await?; + + streaming_parallelisms + .into_iter() + .map(|(table_id, parallelism)| { + // no custom for sql backend + let table_parallelism = match parallelism { + StreamingParallelism::Adaptive => TableParallelism::Adaptive, + StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n), + }; + + (table_id as u32, table_parallelism) + }) + .collect() + }; + + let workers = mgr + .cluster_controller + .list_active_streaming_workers() + .await?; + + let schedulable_worker_ids = workers + .iter() + .filter(|worker| { + !worker + .property + .as_ref() + .map(|p| p.is_unschedulable) + .unwrap_or(false) + }) + .map(|worker| worker.id) + .collect(); + + let reschedules = self + .scale_controller + .generate_table_resize_plan(TableResizePolicy { + worker_ids: schedulable_worker_ids, + table_parallelisms: table_parallelisms.clone(), + }) + .await?; + + if reschedules.is_empty() { + return Ok(()); + } + + self.reschedule_actors( + reschedules, + RescheduleOptions { + resolve_no_shuffle_upstream: true, + }, + None, + ) + .await?; } }