diff --git a/src/meta/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/model_v2/migration/src/m20230908_072257_init.rs index bf96b8c6227a1..66e8db40418c3 100644 --- a/src/meta/model_v2/migration/src/m20230908_072257_init.rs +++ b/src/meta/model_v2/migration/src/m20230908_072257_init.rs @@ -328,6 +328,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(StreamingJob::JobStatus).string().not_null()) .col(ColumnDef::new(StreamingJob::CreateType).string().not_null()) .col(ColumnDef::new(StreamingJob::Timezone).string()) + .col(ColumnDef::new(StreamingJob::Parallelism).json().not_null()) .foreign_key( &mut ForeignKey::create() .name("FK_streaming_job_object_id") @@ -991,6 +992,7 @@ enum StreamingJob { JobStatus, Timezone, CreateType, + Parallelism, } #[derive(DeriveIden)] diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index 1cec6b553c161..58397cc069ada 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -227,3 +227,11 @@ derive_from_json_struct!( FragmentVnodeMapping, risingwave_pb::common::ParallelUnitMapping ); + +#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)] +pub enum StreamingParallelism { + Auto, + Fixed(usize), +} + +impl Eq for StreamingParallelism {} diff --git a/src/meta/model_v2/src/streaming_job.rs b/src/meta/model_v2/src/streaming_job.rs index b78f637a838c1..bbb6ac332f2e0 100644 --- a/src/meta/model_v2/src/streaming_job.rs +++ b/src/meta/model_v2/src/streaming_job.rs @@ -14,7 +14,7 @@ use sea_orm::entity::prelude::*; -use crate::{CreateType, JobStatus}; +use crate::{CreateType, JobStatus, StreamingParallelism}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "streaming_job")] @@ -24,6 +24,7 @@ pub struct Model { pub job_status: JobStatus, pub create_type: CreateType, pub timezone: Option, + pub parallelism: StreamingParallelism, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/src/meta/service/src/scale_service.rs b/src/meta/service/src/scale_service.rs index acfca997d8040..4c9e3ba2b5f71 100644 --- a/src/meta/service/src/scale_service.rs +++ b/src/meta/service/src/scale_service.rs @@ -15,9 +15,11 @@ use std::collections::HashMap; use std::sync::Arc; +use risingwave_common::catalog; use risingwave_meta::manager::MetadataManager; use risingwave_meta::model::TableParallelism; use risingwave_meta::stream::{ScaleController, ScaleControllerRef, TableRevision}; +use risingwave_meta_model_v2::FragmentId; use risingwave_pb::common::WorkerType; use risingwave_pb::meta::scale_service_server::ScaleService; use risingwave_pb::meta::{ @@ -38,7 +40,7 @@ pub struct ScaleServiceImpl { source_manager: SourceManagerRef, stream_manager: GlobalStreamManagerRef, barrier_manager: BarrierManagerRef, - scale_controller: Option, + scale_controller: ScaleControllerRef, } impl ScaleServiceImpl { @@ -48,14 +50,12 @@ impl ScaleServiceImpl { stream_manager: GlobalStreamManagerRef, barrier_manager: BarrierManagerRef, ) -> Self { - let scale_controller = match &metadata_manager { - MetadataManager::V1(_) => Some(Arc::new(ScaleController::new( - &metadata_manager, - source_manager.clone(), - stream_manager.env.clone(), - ))), - MetadataManager::V2(_) => None, - }; + let scale_controller = Arc::new(ScaleController::new( + &metadata_manager, + source_manager.clone(), + stream_manager.env.clone(), + )); + Self { metadata_manager, source_manager, @@ -68,7 +68,8 @@ impl ScaleServiceImpl { async fn get_revision(&self) -> TableRevision { match &self.metadata_manager { MetadataManager::V1(mgr) => mgr.fragment_manager.get_revision().await, - MetadataManager::V2(_) => unimplemented!("support table revision in v2"), + // todo, support table revision in meta model v2 + MetadataManager::V2(_) => Default::default(), } } } @@ -141,10 +142,6 @@ impl ScaleService for ScaleServiceImpl { ) -> Result, Status> { self.barrier_manager.check_status_running().await?; - let MetadataManager::V1(mgr) = &self.metadata_manager else { - unimplemented!("only available in v1"); - }; - let RescheduleRequest { reschedules, revision, @@ -163,19 +160,36 @@ impl ScaleService for ScaleServiceImpl { } let table_parallelisms = { - let guard = mgr.fragment_manager.get_fragment_read_guard().await; - - let mut table_parallelisms = HashMap::new(); - for (table_id, table) in guard.table_fragments() { - if table - .fragment_ids() - .any(|fragment_id| reschedules.contains_key(&fragment_id)) - { - table_parallelisms.insert(*table_id, TableParallelism::Custom); + match &self.metadata_manager { + MetadataManager::V1(mgr) => { + let guard = mgr.fragment_manager.get_fragment_read_guard().await; + + let mut table_parallelisms = HashMap::new(); + for (table_id, table) in guard.table_fragments() { + if table + .fragment_ids() + .any(|fragment_id| reschedules.contains_key(&fragment_id)) + { + table_parallelisms.insert(*table_id, TableParallelism::Custom); + } + } + + table_parallelisms } - } + MetadataManager::V2(mgr) => { + let streaming_job_ids = mgr + .catalog_controller + .get_fragment_job_id( + reschedules.keys().map(|id| *id as FragmentId).collect(), + ) + .await?; - table_parallelisms + streaming_job_ids + .into_iter() + .map(|id| (catalog::TableId::new(id as _), TableParallelism::Custom)) + .collect() + } + } }; self.stream_manager @@ -240,11 +254,8 @@ impl ScaleService for ScaleServiceImpl { .policy .ok_or_else(|| Status::invalid_argument("policy is required"))?; - let Some(scale_controller) = &self.scale_controller else { - return Err(Status::unimplemented( - "reschedule plan is not supported in v2", - )); - }; + let scale_controller = &self.scale_controller; + let plan = scale_controller.get_reschedule_plan(policy).await?; let next_revision = self.get_revision().await; diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 6ece437ffb2de..19a9c8b48a806 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -23,6 +23,7 @@ use risingwave_common::catalog::TableId; use risingwave_common::hash::ActorMapping; use risingwave_connector::source::SplitImpl; use risingwave_hummock_sdk::HummockEpoch; +use risingwave_pb::meta::table_fragments::PbActorStatus; use risingwave_pb::meta::PausedReason; use risingwave_pb::source::{ConnectorSplit, ConnectorSplits}; use risingwave_pb::stream_plan::barrier::BarrierKind; @@ -31,8 +32,8 @@ use risingwave_pb::stream_plan::throttle_mutation::RateLimit; use risingwave_pb::stream_plan::update_mutation::*; use risingwave_pb::stream_plan::{ AddMutation, BarrierMutation, CombinedMutation, Dispatcher, Dispatchers, FragmentTypeFlag, - PauseMutation, ResumeMutation, SourceChangeSplitMutation, StopMutation, ThrottleMutation, - UpdateMutation, + PauseMutation, ResumeMutation, SourceChangeSplitMutation, StopMutation, StreamActor, + ThrottleMutation, UpdateMutation, }; use risingwave_pb::stream_service::{DropActorsRequest, WaitEpochCommitRequest}; use thiserror_ext::AsReport; @@ -76,6 +77,8 @@ pub struct Reschedule { /// Whether this fragment is injectable. The injectable means whether the fragment contains /// any executors that are able to receive barrier. pub injectable: bool, + + pub newly_created_actors: Vec<(StreamActor, PbActorStatus)>, } #[derive(Debug, Clone)] @@ -478,18 +481,15 @@ impl CommandContext { ), Command::RescheduleFragment { reschedules, .. } => { - let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager - else { - unimplemented!("implement scale functions in v2"); - }; + let metadata_manager = &self.barrier_manager_context.metadata_manager; + let mut dispatcher_update = HashMap::new(); for reschedule in reschedules.values() { for &(upstream_fragment_id, dispatcher_id) in &reschedule.upstream_fragment_dispatcher_ids { // Find the actors of the upstream fragment. - let upstream_actor_ids = mgr - .fragment_manager + let upstream_actor_ids = metadata_manager .get_running_actors_of_fragment(upstream_fragment_id) .await?; @@ -527,8 +527,7 @@ impl CommandContext { for (&fragment_id, reschedule) in reschedules { for &downstream_fragment_id in &reschedule.downstream_fragment_ids { // Find the actors of the downstream fragment. - let downstream_actor_ids = mgr - .fragment_manager + let downstream_actor_ids = metadata_manager .get_running_actors_of_fragment(downstream_fragment_id) .await?; @@ -968,8 +967,6 @@ impl CommandContext { self.clean_up(removed_actors).await?; self.barrier_manager_context .scale_controller - .as_ref() - .unwrap() .post_apply_reschedule(reschedules, table_parallelism) .await?; } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 96e8743b83a94..c6d57b01dca53 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -143,7 +143,7 @@ pub struct GlobalBarrierManagerContext { source_manager: SourceManagerRef, - scale_controller: Option, + scale_controller: ScaleControllerRef, sink_manager: SinkCoordinatorManager, @@ -394,14 +394,12 @@ impl GlobalBarrierManager { let tracker = CreateMviewProgressTracker::new(); - let scale_controller = match &metadata_manager { - MetadataManager::V1(_) => Some(Arc::new(ScaleController::new( - &metadata_manager, - source_manager.clone(), - env.clone(), - ))), - MetadataManager::V2(_) => None, - }; + let scale_controller = Arc::new(ScaleController::new( + &metadata_manager, + source_manager.clone(), + env.clone(), + )); + let context = GlobalBarrierManagerContext { status: Arc::new(Mutex::new(BarrierManagerStatus::Starting)), metadata_manager, diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 2a760528dfa33..f324e083b4c43 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -23,6 +23,7 @@ use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compaction_group::StateTableId; +use risingwave_meta_model_v2::StreamingParallelism; use risingwave_pb::common::ActorInfo; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; @@ -605,13 +606,98 @@ impl GlobalBarrierManagerContext { async fn scale_actors(&self, info: &InflightActorInfo) -> MetaResult { match &self.metadata_manager { MetadataManager::V1(_) => self.scale_actors_v1(info).await, - MetadataManager::V2(_) => self.scale_actors_v2(info), + MetadataManager::V2(_) => self.scale_actors_v2(info).await, } } - fn scale_actors_v2(&self, _info: &InflightActorInfo) -> MetaResult { - let _mgr = self.metadata_manager.as_v2_ref(); - unimplemented!("implement auto-scale funcs in sql backend") + async fn scale_actors_v2(&self, _info: &InflightActorInfo) -> MetaResult { + let mgr = self.metadata_manager.as_v2_ref(); + debug!("start resetting actors distribution"); + + let table_parallelisms: HashMap<_, _> = { + let streaming_parallelisms = mgr + .catalog_controller + .get_all_streaming_parallelisms() + .await?; + + streaming_parallelisms + .into_iter() + .map(|(table_id, parallelism)| { + // no custom for sql backend + let table_parallelism = match parallelism { + StreamingParallelism::Auto => TableParallelism::Auto, + StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n), + }; + + (table_id as u32, table_parallelism) + }) + .collect() + }; + + let workers = mgr + .cluster_controller + .list_active_streaming_workers() + .await?; + + let schedulable_worker_ids = workers + .iter() + .filter(|worker| { + !worker + .property + .as_ref() + .map(|p| p.is_unschedulable) + .unwrap_or(false) + }) + .map(|worker| worker.id) + .collect(); + + let plan = self + .scale_controller + .generate_table_resize_plan(TableResizePolicy { + worker_ids: schedulable_worker_ids, + table_parallelisms: table_parallelisms.clone(), + }) + .await?; + + let table_parallelisms: HashMap<_, _> = table_parallelisms + .into_iter() + .map(|(table_id, parallelism)| { + debug_assert_ne!(parallelism, TableParallelism::Custom); + (TableId::new(table_id), parallelism) + }) + .collect(); + + let mut compared_table_parallelisms = table_parallelisms.clone(); + + let (reschedule_fragment, _) = self + .scale_controller + .prepare_reschedule_command( + plan, + RescheduleOptions { + resolve_no_shuffle_upstream: true, + }, + Some(&mut compared_table_parallelisms), + ) + .await?; + + // Because custom parallelism doesn't exist, this function won't result in a no-shuffle rewrite for table parallelisms. + debug_assert_eq!(compared_table_parallelisms, table_parallelisms); + + if let Err(e) = self + .scale_controller + .post_apply_reschedule(&reschedule_fragment, &table_parallelisms) + .await + { + tracing::error!( + error = %e.as_report(), + "failed to apply reschedule for offline scaling in recovery", + ); + + return Err(e); + } + + debug!("scaling-in actors succeed."); + Ok(true) } async fn scale_actors_v1(&self, info: &InflightActorInfo) -> MetaResult { @@ -685,8 +771,6 @@ impl GlobalBarrierManagerContext { let plan = self .scale_controller - .as_ref() - .unwrap() .generate_table_resize_plan(TableResizePolicy { worker_ids: schedulable_worker_ids, table_parallelisms: table_parallelisms.clone(), @@ -705,8 +789,6 @@ impl GlobalBarrierManagerContext { let (reschedule_fragment, applied_reschedules) = self .scale_controller - .as_ref() - .unwrap() .prepare_reschedule_command( plan, RescheduleOptions { @@ -721,8 +803,6 @@ impl GlobalBarrierManagerContext { if let Err(e) = self .scale_controller - .as_ref() - .unwrap() .post_apply_reschedule(&reschedule_fragment, &table_parallelisms) .await { diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 75ef08c92278a..1fbfecf76bf47 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -27,7 +27,8 @@ use risingwave_meta_model_v2::{ connection, database, function, index, object, object_dependency, schema, sink, source, streaming_job, table, user_privilege, view, ActorId, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, FunctionId, IndexId, JobStatus, ObjectId, - PrivateLinkService, Property, SchemaId, SourceId, StreamSourceInfo, TableId, UserId, + PrivateLinkService, Property, SchemaId, SourceId, StreamSourceInfo, StreamingParallelism, + TableId, UserId, }; use risingwave_pb::catalog::table::PbTableType; use risingwave_pb::catalog::{ @@ -2081,6 +2082,26 @@ impl CatalogController { .collect()) } + pub async fn get_all_streaming_parallelisms( + &self, + ) -> MetaResult> { + let inner = self.inner.read().await; + + let job_parallelisms = StreamingJob::find() + .select_only() + .columns([ + streaming_job::Column::JobId, + streaming_job::Column::Parallelism, + ]) + .into_tuple::<(ObjectId, StreamingParallelism)>() + .all(&inner.db) + .await?; + + Ok(job_parallelisms + .into_iter() + .collect::>()) + } + pub async fn get_table_name_type_mapping( &self, ) -> MetaResult> { diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index d5e50c6fecfe1..4a9d3f3a992e5 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -576,6 +576,23 @@ impl CatalogController { Ok(upstream_job_counts) } + pub async fn get_fragment_job_id( + &self, + fragment_ids: Vec, + ) -> MetaResult> { + let inner = self.inner.read().await; + + let object_ids: Vec = Fragment::find() + .select_only() + .column(fragment::Column::JobId) + .filter(fragment::Column::FragmentId.is_in(fragment_ids)) + .into_tuple() + .all(&inner.db) + .await?; + + Ok(object_ids) + } + pub async fn get_job_fragments_by_id(&self, job_id: ObjectId) -> MetaResult { let inner = self.inner.read().await; let fragment_actors = Fragment::find() diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index c7547632e6105..37b31d047cec7 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -12,41 +12,55 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::num::NonZeroUsize; use itertools::Itertools; +use risingwave_common::buffer::Bitmap; +use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping}; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; +use risingwave_meta_model_v2::actor_dispatcher::DispatcherType; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::{ - Actor, ActorDispatcher, Fragment, Index, Object, ObjectDependency, Source, Table, + Actor, ActorDispatcher, Fragment, Index, Object, ObjectDependency, Source, + StreamingJob as StreamingJobModel, Table, }; use risingwave_meta_model_v2::{ actor, actor_dispatcher, fragment, index, object, object_dependency, sink, source, streaming_job, table, ActorId, ActorUpstreamActors, CreateType, DatabaseId, ExprNodeArray, - FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, StreamNode, TableId, - TableVersion, UserId, + FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, StreamNode, + StreamingParallelism, TableId, TableVersion, UserId, }; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion}; use risingwave_pb::catalog::{PbCreateType, PbTable}; use risingwave_pb::meta::relation::PbRelationInfo; use risingwave_pb::meta::subscribe_response::{ - Info as NotificationInfo, Operation as NotificationOperation, + Info as NotificationInfo, Info, Operation as NotificationOperation, Operation, +}; +use risingwave_pb::meta::table_fragments::actor_status::PbActorState; +use risingwave_pb::meta::table_fragments::PbActorStatus; +use risingwave_pb::meta::{ + FragmentParallelUnitMapping, PbRelation, PbRelationGroup, PbTableFragments, }; -use risingwave_pb::meta::{PbRelation, PbRelationGroup, PbTableFragments}; use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits}; +use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::update_mutation::PbMergeUpdate; -use risingwave_pb::stream_plan::{PbDispatcher, PbFragmentTypeFlag}; +use risingwave_pb::stream_plan::{ + PbDispatcher, PbDispatcherType, PbFragmentTypeFlag, PbStreamActor, +}; use sea_orm::sea_query::SimpleExpr; use sea_orm::ActiveValue::Set; use sea_orm::{ ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, - ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, TransactionTrait, + JoinType, ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, + TransactionTrait, }; +use crate::barrier::Reschedule; use crate::controller::catalog::CatalogController; use crate::controller::rename::ReplaceTableExprRewriter; use crate::controller::utils::{ @@ -55,7 +69,7 @@ use crate::controller::utils::{ }; use crate::controller::ObjectModel; use crate::manager::{NotificationVersion, SinkId, StreamingJob}; -use crate::model::StreamContext; +use crate::model::{StreamContext, TableParallelism}; use crate::stream::SplitAssignment; use crate::{MetaError, MetaResult}; @@ -68,6 +82,7 @@ impl CatalogController { schema_id: Option, create_type: PbCreateType, ctx: &StreamContext, + streaming_parallelism: StreamingParallelism, ) -> MetaResult { let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?; let job = streaming_job::ActiveModel { @@ -75,6 +90,7 @@ impl CatalogController { job_status: Set(JobStatus::Initial), create_type: Set(create_type.into()), timezone: Set(ctx.timezone.clone()), + parallelism: Set(streaming_parallelism), }; job.insert(txn).await?; @@ -85,11 +101,17 @@ impl CatalogController { &self, streaming_job: &mut StreamingJob, ctx: &StreamContext, + parallelism: &Option, ) -> MetaResult<()> { let inner = self.inner.write().await; let txn = inner.db.begin().await?; let create_type = streaming_job.create_type(); + let streaming_parallelism = match parallelism { + None => StreamingParallelism::Auto, + Some(n) => StreamingParallelism::Fixed(n.parallelism as _), + }; + ensure_user_id(streaming_job.owner() as _, &txn).await?; ensure_object_id(ObjectType::Database, streaming_job.database_id() as _, &txn).await?; ensure_object_id(ObjectType::Schema, streaming_job.schema_id() as _, &txn).await?; @@ -111,6 +133,7 @@ impl CatalogController { Some(table.schema_id as _), create_type, ctx, + streaming_parallelism, ) .await?; table.id = job_id as _; @@ -126,6 +149,7 @@ impl CatalogController { Some(sink.schema_id as _), create_type, ctx, + streaming_parallelism, ) .await?; sink.id = job_id as _; @@ -141,6 +165,7 @@ impl CatalogController { Some(table.schema_id as _), create_type, ctx, + streaming_parallelism, ) .await?; table.id = job_id as _; @@ -175,6 +200,7 @@ impl CatalogController { Some(index.schema_id as _), create_type, ctx, + streaming_parallelism, ) .await?; // to be compatible with old implementation. @@ -204,6 +230,7 @@ impl CatalogController { Some(src.schema_id as _), create_type, ctx, + streaming_parallelism, ) .await?; src.id = job_id as _; @@ -446,6 +473,7 @@ impl CatalogController { streaming_job: &StreamingJob, ctx: &StreamContext, version: &PbTableVersion, + default_parallelism: &Option, ) -> MetaResult { let id = streaming_job.id(); let inner = self.inner.write().await; @@ -464,6 +492,11 @@ impl CatalogController { return Err(MetaError::permission_denied("table version is stale")); } + let parallelism = match default_parallelism { + None => StreamingParallelism::Auto, + Some(n) => StreamingParallelism::Fixed(n.get() as _), + }; + // 2. create streaming object for new replace table. let obj_id = Self::create_streaming_job_obj( &txn, @@ -473,6 +506,7 @@ impl CatalogController { Some(streaming_job.schema_id() as _), PbCreateType::Foreground, ctx, + parallelism, ) .await?; @@ -831,4 +865,345 @@ impl CatalogController { Ok(fragment_actors) } + + pub async fn post_apply_reschedules( + &self, + reschedules: HashMap, + table_parallelism_assignment: HashMap< + risingwave_common::catalog::TableId, + TableParallelism, + >, + ) -> MetaResult<()> { + fn update_actors( + actors: &mut Vec, + to_remove: &HashSet, + to_create: &Vec, + ) { + let actor_id_set: HashSet<_> = actors.iter().copied().collect(); + for actor_id in to_create { + debug_assert!(!actor_id_set.contains(actor_id)); + } + for actor_id in to_remove { + debug_assert!(actor_id_set.contains(actor_id)); + } + + actors.retain(|actor_id| !to_remove.contains(actor_id)); + actors.extend_from_slice(to_create); + } + + let new_created_actors: HashSet<_> = reschedules + .values() + .flat_map(|reschedule| { + reschedule + .added_actors + .values() + .flatten() + .map(|actor_id| *actor_id as ActorId) + }) + .collect(); + + let inner = self.inner.write().await; + + let txn = inner.db.begin().await?; + + let mut fragment_mapping_to_notify = vec![]; + + // for assert only + let mut assert_dispatcher_update_checker = HashSet::new(); + + for ( + fragment_id, + Reschedule { + added_actors, + removed_actors, + vnode_bitmap_updates, + actor_splits, + injectable: _, + newly_created_actors, + upstream_fragment_dispatcher_ids, + upstream_dispatcher_mapping, + downstream_fragment_ids, + }, + ) in reschedules + { + // drop removed actors + Actor::delete_many() + .filter( + actor::Column::ActorId + .is_in(removed_actors.iter().map(|id| *id as ActorId).collect_vec()), + ) + .exec(&txn) + .await?; + + // newly created actor + let mut new_actors = vec![]; + let mut new_actor_dispatchers = vec![]; + + for ( + PbStreamActor { + actor_id, + fragment_id, + mut nodes, + dispatcher, + upstream_actor_id, + vnode_bitmap, + expr_context, + .. + }, + // actor_status + PbActorStatus { + parallel_unit, + state, + }, + ) in newly_created_actors + { + let mut actor_upstreams = BTreeMap::>::new(); + + if let Some(nodes) = &mut nodes { + visit_stream_node(nodes, |node| { + if let PbNodeBody::Merge(node) = node { + actor_upstreams + .entry(node.upstream_fragment_id as FragmentId) + .or_default() + .extend(node.upstream_actor_id.iter().map(|id| *id as ActorId)); + } + }); + } + + let actor_upstreams: BTreeMap> = actor_upstreams + .into_iter() + .map(|(k, v)| (k, v.into_iter().collect())) + .collect(); + + debug_assert_eq!( + actor_upstreams + .values() + .flatten() + .cloned() + .sorted() + .collect_vec(), + upstream_actor_id + .iter() + .map(|actor_id| *actor_id as i32) + .sorted() + .collect_vec() + ); + + let actor_upstreams = ActorUpstreamActors(actor_upstreams); + + let status = ActorStatus::from(PbActorState::try_from(state).unwrap()); + let parallel_unit_id = parallel_unit.unwrap().id; + + let splits = actor_splits + .get(&actor_id) + .map(|splits| splits.iter().map(PbConnectorSplit::from).collect_vec()); + + new_actors.push(actor::ActiveModel { + actor_id: Set(actor_id as _), + fragment_id: Set(fragment_id as _), + status: Set(status), + splits: Set(splits.map(|splits| PbConnectorSplits { splits }.into())), + parallel_unit_id: Set(parallel_unit_id as _), + upstream_actor_ids: Set(actor_upstreams), + vnode_bitmap: Set(vnode_bitmap.map(|bitmap| bitmap.into())), + expr_context: Set(expr_context.unwrap().into()), + }); + + for PbDispatcher { + r#type: dispatcher_type, + dist_key_indices, + output_indices, + hash_mapping, + dispatcher_id, + downstream_actor_id, + } in dispatcher + { + new_actor_dispatchers.push(actor_dispatcher::ActiveModel { + id: Default::default(), + actor_id: Set(actor_id as _), + dispatcher_type: Set(PbDispatcherType::try_from(dispatcher_type) + .unwrap() + .into()), + dist_key_indices: Set(dist_key_indices.into()), + output_indices: Set(output_indices.into()), + hash_mapping: Set(hash_mapping.map(|mapping| mapping.into())), + dispatcher_id: Set(dispatcher_id as _), + downstream_actor_ids: Set(downstream_actor_id.into()), + }) + } + } + + if !new_actors.is_empty() { + Actor::insert_many(new_actors).exec(&txn).await?; + } + + if !new_actor_dispatchers.is_empty() { + ActorDispatcher::insert_many(new_actor_dispatchers) + .exec(&txn) + .await?; + } + + // actor update + for (actor_id, bitmap) in vnode_bitmap_updates { + let actor = Actor::find_by_id(actor_id as ActorId) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?; + + let mut actor = actor.into_active_model(); + actor.vnode_bitmap = Set(Some(bitmap.to_protobuf().into())); + actor.update(&txn).await?; + } + + // fragment update + let fragment = Fragment::find_by_id(fragment_id) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?; + + let fragment_actors = fragment.find_related(Actor).all(&txn).await?; + + let mut actor_to_parallel_unit = HashMap::with_capacity(fragment_actors.len()); + let mut actor_to_vnode_bitmap = HashMap::with_capacity(fragment_actors.len()); + for actor in &fragment_actors { + actor_to_parallel_unit.insert(actor.actor_id as u32, actor.parallel_unit_id as _); + if let Some(vnode_bitmap) = &actor.vnode_bitmap { + let bitmap = Bitmap::from(vnode_bitmap.inner_ref()); + actor_to_vnode_bitmap.insert(actor.actor_id as u32, bitmap); + } + } + + let vnode_mapping = if actor_to_vnode_bitmap.is_empty() { + let parallel_unit = *actor_to_parallel_unit.values().exactly_one().unwrap(); + ParallelUnitMapping::new_single(parallel_unit as ParallelUnitId) + } else { + // Generate the parallel unit mapping from the fragment's actor bitmaps. + assert_eq!(actor_to_vnode_bitmap.len(), actor_to_parallel_unit.len()); + ActorMapping::from_bitmaps(&actor_to_vnode_bitmap) + .to_parallel_unit(&actor_to_parallel_unit) + } + .to_protobuf(); + + let mut fragment = fragment.into_active_model(); + fragment.vnode_mapping = Set(vnode_mapping.clone().into()); + fragment.update(&txn).await?; + + fragment_mapping_to_notify.push(FragmentParallelUnitMapping { + fragment_id: fragment_id as u32, + mapping: Some(vnode_mapping), + }); + + // for downstream and upstream + let removed_actor_ids: HashSet<_> = removed_actors + .iter() + .map(|actor_id| *actor_id as ActorId) + .collect(); + + let added_actor_ids = added_actors + .values() + .flatten() + .map(|actor_id| *actor_id as ActorId) + .collect_vec(); + + // first step, upstream fragment + for (upstream_fragment_id, dispatcher_id) in upstream_fragment_dispatcher_ids { + let upstream_fragment = Fragment::find_by_id(upstream_fragment_id as FragmentId) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?; + + let all_dispatchers = actor_dispatcher::Entity::find() + .join(JoinType::InnerJoin, actor_dispatcher::Relation::Actor.def()) + .filter(actor::Column::FragmentId.eq(upstream_fragment.fragment_id)) + .filter(actor_dispatcher::Column::DispatcherId.eq(dispatcher_id as i32)) + .all(&txn) + .await?; + + for dispatcher in all_dispatchers { + debug_assert!(assert_dispatcher_update_checker.insert(dispatcher.id)); + + let mut dispatcher = dispatcher.into_active_model(); + + if dispatcher.dispatcher_type.as_ref() == &DispatcherType::Hash { + dispatcher.hash_mapping = + Set(upstream_dispatcher_mapping.as_ref().map(|m| { + risingwave_meta_model_v2::ActorMapping::from(m.to_protobuf()) + })); + } else { + debug_assert!(upstream_dispatcher_mapping.is_none()); + } + + let mut new_downstream_actor_ids = + dispatcher.downstream_actor_ids.as_ref().inner_ref().clone(); + + update_actors( + new_downstream_actor_ids.as_mut(), + &removed_actor_ids, + &added_actor_ids, + ); + + dispatcher.downstream_actor_ids = Set(new_downstream_actor_ids.into()); + dispatcher.update(&txn).await?; + } + } + + // second step, downstream fragment + for downstream_fragment_id in downstream_fragment_ids { + let actors = Actor::find() + .filter(actor::Column::FragmentId.eq(downstream_fragment_id as FragmentId)) + .all(&txn) + .await?; + + for actor in actors { + if new_created_actors.contains(&actor.actor_id) { + continue; + } + + let mut actor = actor.into_active_model(); + + let mut new_upstream_actor_ids = + actor.upstream_actor_ids.as_ref().inner_ref().clone(); + + update_actors( + new_upstream_actor_ids.get_mut(&fragment_id).unwrap(), + &removed_actor_ids, + &added_actor_ids, + ); + + actor.upstream_actor_ids = Set(new_upstream_actor_ids.into()); + + actor.update(&txn).await?; + } + } + } + + for (table_id, parallelism) in table_parallelism_assignment { + let mut streaming_job = StreamingJobModel::find_by_id(table_id.table_id() as ObjectId) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))? + .into_active_model(); + + streaming_job.parallelism = Set(match parallelism { + TableParallelism::Auto => StreamingParallelism::Auto, + TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n as _), + TableParallelism::Custom => { + unreachable!("sql backend does't support custom parallelism") + } + }); + + streaming_job.update(&txn).await?; + } + + txn.commit().await?; + + for mapping in fragment_mapping_to_notify { + self.env + .notification_manager() + .notify_frontend(Operation::Update, Info::ParallelUnitMapping(mapping)) + .await; + } + + Ok(()) + } } diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 4606edfb1dd1e..83a04f5f043ba 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -20,15 +20,16 @@ use risingwave_pb::catalog::{PbSource, PbTable}; use risingwave_pb::common::worker_node::{PbResource, State}; use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerType}; use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; -use risingwave_pb::meta::table_fragments::{Fragment, PbFragment}; -use risingwave_pb::stream_plan::{PbDispatchStrategy, PbStreamActor}; +use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, PbFragment}; +use risingwave_pb::stream_plan::{PbDispatchStrategy, PbStreamActor, StreamActor}; +use crate::barrier::Reschedule; use crate::controller::catalog::CatalogControllerRef; use crate::controller::cluster::{ClusterControllerRef, WorkerExtraInfo}; use crate::manager::{ CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, StreamingClusterInfo, WorkerId, }; -use crate::model::{ActorId, FragmentId, MetadataModel, TableFragments}; +use crate::model::{ActorId, FragmentId, MetadataModel, TableFragments, TableParallelism}; use crate::stream::SplitAssignment; use crate::MetaResult; @@ -189,6 +190,69 @@ impl MetadataManager { } } + pub async fn pre_apply_reschedules( + &self, + created_actors: HashMap>, + ) -> HashMap> { + match self { + MetadataManager::V1(mgr) => { + mgr.fragment_manager + .pre_apply_reschedules(created_actors) + .await + } + + // V2 doesn't need to pre apply reschedules. + MetadataManager::V2(_) => HashMap::new(), + } + } + + pub async fn post_apply_reschedules( + &self, + reschedules: HashMap, + table_parallelism_assignment: HashMap, + ) -> MetaResult<()> { + match self { + MetadataManager::V1(mgr) => { + mgr.fragment_manager + .post_apply_reschedules(reschedules, table_parallelism_assignment) + .await + } + MetadataManager::V2(mgr) => { + // temp convert u32 to i32 + let reschedules = reschedules.into_iter().map(|(k, v)| (k as _, v)).collect(); + + mgr.catalog_controller + .post_apply_reschedules(reschedules, table_parallelism_assignment) + .await + } + } + } + + pub async fn running_fragment_parallelisms( + &self, + id_filter: Option>, + ) -> MetaResult> { + match self { + MetadataManager::V1(mgr) => Ok(mgr + .fragment_manager + .running_fragment_parallelisms(id_filter) + .await + .into_iter() + .map(|(k, v)| (k as FragmentId, v)) + .collect()), + MetadataManager::V2(mgr) => { + let id_filter = id_filter.map(|ids| ids.into_iter().map(|id| id as _).collect()); + Ok(mgr + .catalog_controller + .running_fragment_parallelisms(id_filter) + .await? + .into_iter() + .map(|(k, v)| (k as FragmentId, v)) + .collect()) + } + } + } + /// Get and filter the "**root**" fragments of the specified relations. /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`. /// diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index 946f16078bcf5..dd3defd141aef 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -43,7 +43,7 @@ impl DdlController { let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); mgr.catalog_controller - .create_job_catalog(&mut streaming_job, &ctx) + .create_job_catalog(&mut streaming_job, &ctx, &fragment_graph.parallelism) .await?; let job_id = streaming_job.id(); @@ -299,7 +299,12 @@ impl DdlController { }; let dummy_id = mgr .catalog_controller - .create_job_catalog_for_replace(&streaming_job, &ctx, table.get_version()?) + .create_job_catalog_for_replace( + &streaming_job, + &ctx, + table.get_version()?, + &fragment_graph.default_parallelism(), + ) .await?; tracing::debug!(id = streaming_job.id(), "building replace streaming job"); diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 1d755226b17a0..b56757ac322c6 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -50,10 +50,7 @@ use tokio::time::MissedTickBehavior; use uuid::Uuid; use crate::barrier::{Command, Reschedule}; -use crate::manager::{ - ClusterManagerRef, FragmentManagerRef, IdCategory, LocalNotification, MetaSrvEnv, - MetadataManager, WorkerId, -}; +use crate::manager::{IdCategory, LocalNotification, MetaSrvEnv, MetadataManager, WorkerId}; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism}; use crate::serving::{ to_deleted_fragment_parallel_unit_mapping, to_fragment_parallel_unit_mapping, @@ -61,9 +58,9 @@ use crate::serving::{ }; use crate::storage::{MetaStore, MetaStoreError, MetaStoreRef, Transaction, DEFAULT_COLUMN_FAMILY}; use crate::stream::{GlobalStreamManager, SourceManagerRef}; -use crate::{MetaError, MetaResult}; +use crate::{model, MetaError, MetaResult}; -#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] +#[derive(Default, Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] pub struct TableRevision(u64); const TABLE_REVISION_KEY: &[u8] = b"table_revision"; @@ -369,9 +366,7 @@ pub struct RescheduleOptions { pub type ScaleControllerRef = Arc; pub struct ScaleController { - pub(super) fragment_manager: FragmentManagerRef, - - pub cluster_manager: ClusterManagerRef, + pub metadata_manager: MetadataManager, pub source_manager: SourceManagerRef, @@ -384,14 +379,10 @@ impl ScaleController { source_manager: SourceManagerRef, env: MetaSrvEnv, ) -> Self { - match metadata_manager { - MetadataManager::V1(mgr) => Self { - fragment_manager: mgr.fragment_manager.clone(), - cluster_manager: mgr.cluster_manager.clone(), - source_manager, - env, - }, - MetadataManager::V2(_) => unimplemented!("support v2 in scale controller"), + Self { + metadata_manager: metadata_manager.clone(), + source_manager, + env, } } @@ -402,11 +393,10 @@ impl ScaleController { options: RescheduleOptions, table_parallelisms: Option<&mut HashMap>, ) -> MetaResult { - // Index worker node, used to create actor let worker_nodes: HashMap = self - .cluster_manager + .metadata_manager .list_active_streaming_compute_nodes() - .await + .await? .into_iter() .map(|worker_node| (worker_node.id, worker_node)) .collect(); @@ -464,7 +454,10 @@ impl ScaleController { let mut actor_status = BTreeMap::new(); let mut fragment_state = HashMap::new(); let mut fragment_to_table = HashMap::new(); - for table_fragments in self.fragment_manager.list_table_fragments().await { + + let all_table_fragments = self.list_all_table_fragments().await?; + + for table_fragments in all_table_fragments { fragment_state.extend( table_fragments .fragment_ids() @@ -1291,6 +1284,7 @@ impl ScaleController { downstream_fragment_ids, actor_splits, injectable, + newly_created_actors: vec![], }, ); } @@ -1323,8 +1317,13 @@ impl ScaleController { fragment_created_actors.insert(*fragment_id, created_actors); } + for (fragment_id, to_create) in &fragment_created_actors { + let reschedule = reschedule_fragment.get_mut(fragment_id).unwrap(); + reschedule.newly_created_actors = to_create.values().cloned().collect(); + } + let applied_reschedules = self - .fragment_manager + .metadata_manager .pre_apply_reschedules(fragment_created_actors) .await; @@ -1378,11 +1377,22 @@ impl ScaleController { } for created_parallel_unit_id in added_parallel_units { - let id = self - .env - .id_gen_manager() - .generate::<{ IdCategory::Actor }>() - .await? as ActorId; + // self.env.sql_id_gen_manager_ref().map(|id_gen| id_gen.actors.generate_interval(1)) + // + + let id = match self.env.sql_id_gen_manager_ref() { + None => { + self.env + .id_gen_manager() + .generate::<{ IdCategory::Actor }>() + .await? as ActorId + } + Some(id_gen) => { + let id = id_gen.generate_interval::<{ IdCategory::Actor }>(1); + + id as ActorId + } + }; actors_to_create.insert(id, *created_parallel_unit_id); } @@ -1554,20 +1564,20 @@ impl ScaleController { table_parallelism: &HashMap, ) -> MetaResult<()> { // Update fragment info after rescheduling in meta store. - self.fragment_manager + self.metadata_manager .post_apply_reschedules(reschedules.clone(), table_parallelism.clone()) .await?; // Update serving fragment info after rescheduling in meta store. if !reschedules.is_empty() { let workers = self - .cluster_manager - .list_active_serving_compute_nodes() - .await; + .metadata_manager + .list_active_streaming_compute_nodes() + .await?; let streaming_parallelisms = self - .fragment_manager + .metadata_manager .running_fragment_parallelisms(Some(reschedules.keys().cloned().collect())) - .await; + .await?; let serving_vnode_mapping = Arc::new(ServingVnodeMapping::default()); let (upserted, failed) = serving_vnode_mapping.upsert(streaming_parallelisms, &workers); if !upserted.is_empty() { @@ -1624,6 +1634,23 @@ impl ScaleController { Ok(()) } + // FIXME: should be removed + async fn list_all_table_fragments(&self) -> MetaResult> { + use crate::model::MetadataModel; + let all_table_fragments = match &self.metadata_manager { + MetadataManager::V1(mgr) => mgr.fragment_manager.list_table_fragments().await, + MetadataManager::V2(mgr) => mgr + .catalog_controller + .table_fragments() + .await? + .into_values() + .map(model::TableFragments::from_protobuf) + .collect(), + }; + + Ok(all_table_fragments) + } + pub async fn generate_table_resize_plan( &self, policy: TableResizePolicy, @@ -1634,9 +1661,9 @@ impl ScaleController { } = policy; let workers = self - .cluster_manager + .metadata_manager .list_active_streaming_compute_nodes() - .await; + .await?; let unschedulable_worker_ids = Self::filter_unschedulable_workers(&workers); @@ -1665,7 +1692,7 @@ impl ScaleController { }) .collect::>(); - let all_table_fragments = self.fragment_manager.list_table_fragments().await; + let all_table_fragments = self.list_all_table_fragments().await?; // FIXME: only need actor id and dispatcher info, avoid clone it. let mut actor_map = HashMap::new(); @@ -1825,9 +1852,9 @@ impl ScaleController { let mut target_plan = HashMap::with_capacity(fragment_worker_changes.len()); let workers = self - .cluster_manager + .metadata_manager .list_active_streaming_compute_nodes() - .await; + .await?; let unschedulable_worker_ids = Self::filter_unschedulable_workers(&workers); @@ -1853,7 +1880,7 @@ impl ScaleController { }) .collect::>(); - let all_table_fragments = self.fragment_manager.list_table_fragments().await; + let all_table_fragments = self.list_all_table_fragments().await?; // FIXME: only need actor id and dispatcher info, avoid clone it. let mut actor_map = HashMap::new(); @@ -2379,16 +2406,10 @@ impl GlobalStreamManager { options: RescheduleOptions, table_parallelism: Option>, ) -> MetaResult<()> { - let MetadataManager::V1(mgr) = &self.metadata_manager else { - unimplemented!("support reschedule in v2"); - }; - let mut table_parallelism = table_parallelism; let (reschedule_fragment, applied_reschedules) = self .scale_controller - .as_ref() - .unwrap() .prepare_reschedule_command(reschedules, options, table_parallelism.as_mut()) .await?; @@ -2399,13 +2420,20 @@ impl GlobalStreamManager { table_parallelism: table_parallelism.unwrap_or_default(), }; - let fragment_manager_ref = mgr.fragment_manager.clone(); + match &self.metadata_manager { + MetadataManager::V1(mgr) => { + let fragment_manager_ref = mgr.fragment_manager.clone(); - revert_funcs.push(Box::pin(async move { - fragment_manager_ref - .cancel_apply_reschedules(applied_reschedules) - .await; - })); + revert_funcs.push(Box::pin(async move { + fragment_manager_ref + .cancel_apply_reschedules(applied_reschedules) + .await; + })); + } + MetadataManager::V2(_) => { + // meta model v2 does not need to revert + } + } tracing::debug!("pausing tick lock in source manager"); let _source_pause_guard = self.source_manager.paused.lock().await; @@ -2422,60 +2450,61 @@ impl GlobalStreamManager { async fn trigger_parallelism_control(&self) -> MetaResult<()> { let _reschedule_job_lock = self.reschedule_lock.write().await; - let MetadataManager::V1(mgr) = &self.metadata_manager else { - unimplemented!("support reschedule in v2"); - }; + match &self.metadata_manager { + MetadataManager::V1(mgr) => { + let table_parallelisms = { + let guard = mgr.fragment_manager.get_fragment_read_guard().await; - let table_parallelisms = { - let guard = mgr.fragment_manager.get_fragment_read_guard().await; + guard + .table_fragments() + .iter() + .map(|(table_id, table)| (table_id.table_id, table.assigned_parallelism)) + .collect() + }; - guard - .table_fragments() - .iter() - .map(|(table_id, table)| (table_id.table_id, table.assigned_parallelism)) - .collect() - }; + let workers = mgr + .cluster_manager + .list_active_streaming_compute_nodes() + .await; - let workers = mgr - .cluster_manager - .list_active_streaming_compute_nodes() - .await; + let schedulable_worker_ids = workers + .iter() + .filter(|worker| { + !worker + .property + .as_ref() + .map(|p| p.is_unschedulable) + .unwrap_or(false) + }) + .map(|worker| worker.id) + .collect(); - let schedulable_worker_ids = workers - .iter() - .filter(|worker| { - !worker - .property - .as_ref() - .map(|p| p.is_unschedulable) - .unwrap_or(false) - }) - .map(|worker| worker.id) - .collect(); + let reschedules = self + .scale_controller + .generate_table_resize_plan(TableResizePolicy { + worker_ids: schedulable_worker_ids, + table_parallelisms, + }) + .await?; - let reschedules = self - .scale_controller - .as_ref() - .unwrap() - .generate_table_resize_plan(TableResizePolicy { - worker_ids: schedulable_worker_ids, - table_parallelisms, - }) - .await?; + if reschedules.is_empty() { + return Ok(()); + } - if reschedules.is_empty() { - return Ok(()); + self.reschedule_actors( + reschedules, + RescheduleOptions { + resolve_no_shuffle_upstream: true, + }, + None, + ) + .await?; + } + MetadataManager::V2(_mgr) => { + todo!() + } } - self.reschedule_actors( - reschedules, - RescheduleOptions { - resolve_no_shuffle_upstream: true, - }, - None, - ) - .await?; - Ok(()) } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 52976d744c207..057a73ea487eb 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -201,7 +201,7 @@ pub struct GlobalStreamManager { pub reschedule_lock: RwLock<()>, - pub(crate) scale_controller: Option, + pub(crate) scale_controller: ScaleControllerRef, } impl GlobalStreamManager { @@ -212,14 +212,12 @@ impl GlobalStreamManager { source_manager: SourceManagerRef, hummock_manager: HummockManagerRef, ) -> MetaResult { - let scale_controller = match &metadata_manager { - MetadataManager::V1(_) => { - let scale_controller = - ScaleController::new(&metadata_manager, source_manager.clone(), env.clone()); - Some(Arc::new(scale_controller)) - } - MetadataManager::V2(_) => None, - }; + let scale_controller = Arc::new(ScaleController::new( + &metadata_manager, + source_manager.clone(), + env.clone(), + )); + Ok(Self { env, metadata_manager, @@ -739,15 +737,12 @@ impl GlobalStreamManager { parallelism: TableParallelism, deferred: bool, ) -> MetaResult<()> { - let MetadataManager::V1(mgr) = &self.metadata_manager else { - unimplemented!("support alter table parallelism in v2"); - }; let _reschedule_job_lock = self.reschedule_lock.write().await; - let worker_nodes = mgr - .cluster_manager + let worker_nodes = self + .metadata_manager .list_active_streaming_compute_nodes() - .await; + .await?; let worker_ids = worker_nodes .iter() @@ -764,15 +759,11 @@ impl GlobalStreamManager { parallelism ); self.scale_controller - .as_ref() - .unwrap() .post_apply_reschedule(&HashMap::new(), &table_parallelism_assignment) .await?; } else { let reschedules = self .scale_controller - .as_ref() - .unwrap() .generate_table_resize_plan(TableResizePolicy { worker_ids, table_parallelisms: table_parallelism_assignment @@ -785,8 +776,6 @@ impl GlobalStreamManager { if reschedules.is_empty() { tracing::debug!("empty reschedule plan generated for job {}, set the parallelism directly to {:?}", table_id, parallelism); self.scale_controller - .as_ref() - .unwrap() .post_apply_reschedule(&HashMap::new(), &table_parallelism_assignment) .await?; } else {