diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index bd61debc3bf7..59e520a3316f 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -35,6 +35,7 @@ use risingwave_meta::controller::cluster::ClusterController; use risingwave_meta::manager::MetadataManager; use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer; use risingwave_meta::rpc::ElectionClientRef; +use risingwave_meta::stream::ScaleController; use risingwave_meta::MetaStoreBackend; use risingwave_meta_model_migration::{Migrator, MigratorTrait}; use risingwave_meta_service::backup_service::BackupServiceImpl; @@ -537,6 +538,13 @@ pub async fn start_service_as_election_leader( let stream_rpc_manager = StreamRpcManager::new(env.clone()); + let scale_controller = Arc::new(ScaleController::new( + &metadata_manager, + source_manager.clone(), + stream_rpc_manager.clone(), + env.clone(), + )); + let barrier_manager = GlobalBarrierManager::new( scheduled_barriers, env.clone(), @@ -546,6 +554,7 @@ pub async fn start_service_as_election_leader( sink_manager.clone(), meta_metrics.clone(), stream_rpc_manager.clone(), + scale_controller.clone(), ); { @@ -563,6 +572,7 @@ pub async fn start_service_as_election_leader( source_manager.clone(), hummock_manager.clone(), stream_rpc_manager, + scale_controller.clone(), ) .unwrap(), ); @@ -627,6 +637,7 @@ pub async fn start_service_as_election_leader( source_manager, stream_manager.clone(), barrier_manager.context().clone(), + scale_controller.clone(), ); let cluster_srv = ClusterServiceImpl::new(metadata_manager.clone()); diff --git a/src/meta/service/src/scale_service.rs b/src/meta/service/src/scale_service.rs index 33270fc2204f..33899856a57b 100644 --- a/src/meta/service/src/scale_service.rs +++ b/src/meta/service/src/scale_service.rs @@ -13,12 +13,11 @@ // limitations under the License. use std::collections::HashMap; -use std::sync::Arc; use risingwave_common::catalog; use risingwave_meta::manager::MetadataManager; use risingwave_meta::model::TableParallelism; -use risingwave_meta::stream::{ScaleController, ScaleControllerRef, TableRevision}; +use risingwave_meta::stream::{ScaleControllerRef, TableRevision}; use risingwave_meta_model_v2::FragmentId; use risingwave_pb::common::WorkerType; use risingwave_pb::meta::scale_service_server::ScaleService; @@ -49,14 +48,8 @@ impl ScaleServiceImpl { source_manager: SourceManagerRef, stream_manager: GlobalStreamManagerRef, barrier_manager: BarrierManagerRef, + scale_controller: ScaleControllerRef, ) -> Self { - let scale_controller = Arc::new(ScaleController::new( - &metadata_manager, - source_manager.clone(), - stream_manager.stream_rpc_manager.clone(), - stream_manager.env.clone(), - )); - Self { metadata_manager, source_manager, @@ -82,7 +75,7 @@ impl ScaleService for ScaleServiceImpl { &self, _: Request, ) -> Result, Status> { - let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; let table_fragments = match &self.metadata_manager { MetadataManager::V1(mgr) => mgr @@ -149,7 +142,7 @@ impl ScaleService for ScaleServiceImpl { resolve_no_shuffle_upstream, } = request.into_inner(); - let _reschedule_job_lock = self.stream_manager.reschedule_lock.write().await; + let _reschedule_job_lock = self.stream_manager.reschedule_lock_write_guard().await; let current_revision = self.get_revision().await; @@ -239,7 +232,7 @@ impl ScaleService for ScaleServiceImpl { let req = request.into_inner(); - let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; let current_revision = self.get_revision().await; diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 9de6df91fed3..c6e54b538c3a 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -57,7 +57,7 @@ use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager, WorkerId}; use crate::model::{ActorId, TableFragments}; use crate::rpc::metrics::MetaMetrics; -use crate::stream::{ScaleController, ScaleControllerRef, SourceManagerRef}; +use crate::stream::{ScaleControllerRef, SourceManagerRef}; use crate::{MetaError, MetaResult}; mod command; @@ -385,6 +385,7 @@ impl GlobalBarrierManager { sink_manager: SinkCoordinatorManager, metrics: Arc, stream_rpc_manager: StreamRpcManager, + scale_controller: ScaleControllerRef, ) -> Self { let enable_recovery = env.opts.enable_recovery; let in_flight_barrier_nums = env.opts.in_flight_barrier_nums; @@ -398,13 +399,6 @@ impl GlobalBarrierManager { let tracker = CreateMviewProgressTracker::new(); - let scale_controller = Arc::new(ScaleController::new( - &metadata_manager, - source_manager.clone(), - stream_rpc_manager.clone(), - env.clone(), - )); - let context = GlobalBarrierManagerContext { status: Arc::new(Mutex::new(BarrierManagerStatus::Starting)), metadata_manager, diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index e4b93a286a3f..1208d2c49b58 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -595,6 +595,7 @@ impl GlobalBarrierManagerContext { } async fn scale_actors(&self) -> MetaResult<()> { + let _guard = self.scale_controller.reschedule_lock.write().await; match &self.metadata_manager { MetadataManager::V1(_) => self.scale_actors_v1().await, MetadataManager::V2(_) => self.scale_actors_v2().await, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 84ff4ea1de68..7de106523599 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -654,7 +654,7 @@ impl DdlController { .acquire() .await .unwrap(); - let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; let stream_ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); @@ -1154,7 +1154,7 @@ impl DdlController { target_replace_info: Option, ) -> MetaResult { let mgr = self.metadata_manager.as_v1_ref(); - let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; let (mut version, streaming_job_ids) = match job_id { StreamingJobId::MaterializedView(table_id) => { mgr.catalog_manager @@ -1647,7 +1647,7 @@ impl DdlController { .replace_table_v2(stream_job, fragment_graph, table_col_index_mapping) .await; }; - let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; let stream_ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); let fragment_graph = self diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index dd3defd141ae..748857d2c3da 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -76,7 +76,7 @@ impl DdlController { .acquire() .await .unwrap(); - let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; // create streaming job. match self @@ -284,7 +284,7 @@ impl DdlController { let mgr = self.metadata_manager.as_v2_ref(); let job_id = streaming_job.id(); - let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); // 1. build fragment graph. diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index e598aefb291d..7f40f8e3da03 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -41,8 +41,8 @@ use risingwave_pb::meta::FragmentParallelUnitMappings; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{DispatcherType, FragmentTypeFlag, StreamActor, StreamNode}; use thiserror_ext::AsReport; -use tokio::sync::oneshot; use tokio::sync::oneshot::Receiver; +use tokio::sync::{oneshot, RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::task::JoinHandle; use tokio::time::MissedTickBehavior; @@ -375,6 +375,8 @@ pub struct ScaleController { pub stream_rpc_manager: StreamRpcManager, pub env: MetaSrvEnv, + + pub reschedule_lock: RwLock<()>, } impl ScaleController { @@ -389,6 +391,7 @@ impl ScaleController { metadata_manager: metadata_manager.clone(), source_manager, env, + reschedule_lock: RwLock::new(()), } } @@ -2449,6 +2452,14 @@ pub struct TableResizePolicy { } impl GlobalStreamManager { + pub async fn reschedule_lock_read_guard(&self) -> RwLockReadGuard<'_, ()> { + self.scale_controller.reschedule_lock.read().await + } + + pub async fn reschedule_lock_write_guard(&self) -> RwLockWriteGuard<'_, ()> { + self.scale_controller.reschedule_lock.write().await + } + pub async fn reschedule_actors( &self, reschedules: HashMap, @@ -2518,7 +2529,7 @@ impl GlobalStreamManager { } async fn trigger_parallelism_control(&self) -> MetaResult<()> { - let _reschedule_job_lock = self.reschedule_lock.write().await; + let _reschedule_job_lock = self.reschedule_lock_write_guard().await; match &self.metadata_manager { MetadataManager::V1(mgr) => { diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 5889292756ca..d7388f50da09 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -24,10 +24,10 @@ use risingwave_pb::stream_plan::update_mutation::MergeUpdate; use risingwave_pb::stream_plan::Dispatcher; use thiserror_ext::AsReport; use tokio::sync::mpsc::Sender; -use tokio::sync::{oneshot, Mutex, RwLock}; +use tokio::sync::{oneshot, Mutex}; use tracing::Instrument; -use super::{Locations, RescheduleOptions, ScaleController, ScaleControllerRef, TableResizePolicy}; +use super::{Locations, RescheduleOptions, ScaleControllerRef, TableResizePolicy}; use crate::barrier::{BarrierScheduler, Command, ReplaceTablePlan, StreamRpcManager}; use crate::hummock::HummockManagerRef; use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, StreamingJob}; @@ -192,9 +192,7 @@ pub struct GlobalStreamManager { hummock_manager: HummockManagerRef, - pub reschedule_lock: RwLock<()>, - - pub(crate) scale_controller: ScaleControllerRef, + pub scale_controller: ScaleControllerRef, pub stream_rpc_manager: StreamRpcManager, } @@ -207,14 +205,8 @@ impl GlobalStreamManager { source_manager: SourceManagerRef, hummock_manager: HummockManagerRef, stream_rpc_manager: StreamRpcManager, + scale_controller: ScaleControllerRef, ) -> MetaResult { - let scale_controller = Arc::new(ScaleController::new( - &metadata_manager, - source_manager.clone(), - stream_rpc_manager.clone(), - env.clone(), - )); - Ok(Self { env, metadata_manager, @@ -222,7 +214,6 @@ impl GlobalStreamManager { source_manager, hummock_manager, creating_job_info: Arc::new(CreatingStreamingJobInfo::default()), - reschedule_lock: RwLock::new(()), scale_controller, stream_rpc_manager, }) @@ -629,7 +620,7 @@ impl GlobalStreamManager { return vec![]; } - let _reschedule_job_lock = self.reschedule_lock.read().await; + let _reschedule_job_lock = self.reschedule_lock_read_guard().await; let (receivers, recovered_job_ids) = self.creating_job_info.cancel_jobs(table_ids).await; let futures = receivers.into_iter().map(|(id, receiver)| async move { @@ -688,7 +679,7 @@ impl GlobalStreamManager { parallelism: TableParallelism, deferred: bool, ) -> MetaResult<()> { - let _reschedule_job_lock = self.reschedule_lock.write().await; + let _reschedule_job_lock = self.reschedule_lock_write_guard().await; let worker_nodes = self .metadata_manager @@ -786,7 +777,7 @@ mod tests { use crate::model::{ActorId, FragmentId}; use crate::rpc::ddl_controller::DropMode; use crate::rpc::metrics::MetaMetrics; - use crate::stream::SourceManager; + use crate::stream::{ScaleController, SourceManager}; use crate::MetaOpts; struct FakeFragmentState { @@ -985,6 +976,12 @@ mod tests { let (sink_manager, _) = SinkCoordinatorManager::start_worker(); let stream_rpc_manager = StreamRpcManager::new(env.clone()); + let scale_controller = Arc::new(ScaleController::new( + &metadata_manager, + source_manager.clone(), + stream_rpc_manager.clone(), + env.clone(), + )); let barrier_manager = GlobalBarrierManager::new( scheduled_barriers, @@ -995,6 +992,7 @@ mod tests { sink_manager, meta_metrics.clone(), stream_rpc_manager.clone(), + scale_controller.clone(), ); let stream_manager = GlobalStreamManager::new( @@ -1004,6 +1002,7 @@ mod tests { source_manager.clone(), hummock_manager, stream_rpc_manager, + scale_controller.clone(), )?; let (join_handle_2, shutdown_tx_2) = GlobalBarrierManager::start(barrier_manager);