From 2b96f244237e6e30c88a3a77fc72bf98cfdf9b2c Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Tue, 6 Feb 2024 19:07:52 +0800 Subject: [PATCH 1/3] Updated lock methods; refactored reschedule_lock. --- src/meta/service/src/scale_service.rs | 11 ++++++++--- src/meta/src/barrier/recovery.rs | 1 + src/meta/src/rpc/ddl_controller.rs | 6 +++--- src/meta/src/rpc/ddl_controller_v2.rs | 4 ++-- src/meta/src/stream/scale.rs | 15 +++++++++++++-- src/meta/src/stream/stream_manager.rs | 11 ++++------- 6 files changed, 31 insertions(+), 17 deletions(-) diff --git a/src/meta/service/src/scale_service.rs b/src/meta/service/src/scale_service.rs index 33270fc2204f9..0aece757af465 100644 --- a/src/meta/service/src/scale_service.rs +++ b/src/meta/service/src/scale_service.rs @@ -82,7 +82,7 @@ impl ScaleService for ScaleServiceImpl { &self, _: Request, ) -> Result, Status> { - let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; let table_fragments = match &self.metadata_manager { MetadataManager::V1(mgr) => mgr @@ -149,7 +149,7 @@ impl ScaleService for ScaleServiceImpl { resolve_no_shuffle_upstream, } = request.into_inner(); - let _reschedule_job_lock = self.stream_manager.reschedule_lock.write().await; + let _reschedule_job_lock = self.stream_manager.reschedule_lock_write_guard().await; let current_revision = self.get_revision().await; @@ -239,7 +239,12 @@ impl ScaleService for ScaleServiceImpl { let req = request.into_inner(); - let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + let _reschedule_job_lock = self + .stream_manager + .scale_controller + .reschedule_lock + .read() + .await; let current_revision = self.get_revision().await; diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index e4b93a286a3f8..1208d2c49b58d 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -595,6 +595,7 @@ impl GlobalBarrierManagerContext { } async fn scale_actors(&self) -> MetaResult<()> { + let _guard = self.scale_controller.reschedule_lock.write().await; match &self.metadata_manager { MetadataManager::V1(_) => self.scale_actors_v1().await, MetadataManager::V2(_) => self.scale_actors_v2().await, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 84ff4ea1de682..7de106523599e 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -654,7 +654,7 @@ impl DdlController { .acquire() .await .unwrap(); - let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; let stream_ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); @@ -1154,7 +1154,7 @@ impl DdlController { target_replace_info: Option, ) -> MetaResult { let mgr = self.metadata_manager.as_v1_ref(); - let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; let (mut version, streaming_job_ids) = match job_id { StreamingJobId::MaterializedView(table_id) => { mgr.catalog_manager @@ -1647,7 +1647,7 @@ impl DdlController { .replace_table_v2(stream_job, fragment_graph, table_col_index_mapping) .await; }; - let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; let stream_ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); let fragment_graph = self diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index dd3defd141aef..748857d2c3da7 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -76,7 +76,7 @@ impl DdlController { .acquire() .await .unwrap(); - let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; // create streaming job. match self @@ -284,7 +284,7 @@ impl DdlController { let mgr = self.metadata_manager.as_v2_ref(); let job_id = streaming_job.id(); - let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); // 1. build fragment graph. diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index e598aefb291d3..7f40f8e3da033 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -41,8 +41,8 @@ use risingwave_pb::meta::FragmentParallelUnitMappings; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{DispatcherType, FragmentTypeFlag, StreamActor, StreamNode}; use thiserror_ext::AsReport; -use tokio::sync::oneshot; use tokio::sync::oneshot::Receiver; +use tokio::sync::{oneshot, RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::task::JoinHandle; use tokio::time::MissedTickBehavior; @@ -375,6 +375,8 @@ pub struct ScaleController { pub stream_rpc_manager: StreamRpcManager, pub env: MetaSrvEnv, + + pub reschedule_lock: RwLock<()>, } impl ScaleController { @@ -389,6 +391,7 @@ impl ScaleController { metadata_manager: metadata_manager.clone(), source_manager, env, + reschedule_lock: RwLock::new(()), } } @@ -2449,6 +2452,14 @@ pub struct TableResizePolicy { } impl GlobalStreamManager { + pub async fn reschedule_lock_read_guard(&self) -> RwLockReadGuard<'_, ()> { + self.scale_controller.reschedule_lock.read().await + } + + pub async fn reschedule_lock_write_guard(&self) -> RwLockWriteGuard<'_, ()> { + self.scale_controller.reschedule_lock.write().await + } + pub async fn reschedule_actors( &self, reschedules: HashMap, @@ -2518,7 +2529,7 @@ impl GlobalStreamManager { } async fn trigger_parallelism_control(&self) -> MetaResult<()> { - let _reschedule_job_lock = self.reschedule_lock.write().await; + let _reschedule_job_lock = self.reschedule_lock_write_guard().await; match &self.metadata_manager { MetadataManager::V1(mgr) => { diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 5889292756ca7..fab1c3262cc8f 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -24,7 +24,7 @@ use risingwave_pb::stream_plan::update_mutation::MergeUpdate; use risingwave_pb::stream_plan::Dispatcher; use thiserror_ext::AsReport; use tokio::sync::mpsc::Sender; -use tokio::sync::{oneshot, Mutex, RwLock}; +use tokio::sync::{oneshot, Mutex}; use tracing::Instrument; use super::{Locations, RescheduleOptions, ScaleController, ScaleControllerRef, TableResizePolicy}; @@ -192,9 +192,7 @@ pub struct GlobalStreamManager { hummock_manager: HummockManagerRef, - pub reschedule_lock: RwLock<()>, - - pub(crate) scale_controller: ScaleControllerRef, + pub scale_controller: ScaleControllerRef, pub stream_rpc_manager: StreamRpcManager, } @@ -222,7 +220,6 @@ impl GlobalStreamManager { source_manager, hummock_manager, creating_job_info: Arc::new(CreatingStreamingJobInfo::default()), - reschedule_lock: RwLock::new(()), scale_controller, stream_rpc_manager, }) @@ -629,7 +626,7 @@ impl GlobalStreamManager { return vec![]; } - let _reschedule_job_lock = self.reschedule_lock.read().await; + let _reschedule_job_lock = self.reschedule_lock_read_guard().await; let (receivers, recovered_job_ids) = self.creating_job_info.cancel_jobs(table_ids).await; let futures = receivers.into_iter().map(|(id, receiver)| async move { @@ -688,7 +685,7 @@ impl GlobalStreamManager { parallelism: TableParallelism, deferred: bool, ) -> MetaResult<()> { - let _reschedule_job_lock = self.reschedule_lock.write().await; + let _reschedule_job_lock = self.reschedule_lock_write_guard().await; let worker_nodes = self .metadata_manager From 55d3e66f4a608eac17c74e40f90324c3530025cb Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Wed, 7 Feb 2024 09:21:58 +0800 Subject: [PATCH 2/3] use universal scale controller Signed-off-by: Shanicky Chen --- src/meta/node/src/server.rs | 10 ++++++++++ src/meta/service/src/scale_service.rs | 18 +++--------------- src/meta/src/barrier/mod.rs | 10 ++-------- src/meta/src/stream/stream_manager.rs | 17 +++++++++-------- 4 files changed, 24 insertions(+), 31 deletions(-) diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index bd61debc3bf79..176b3ecced5cd 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -35,6 +35,7 @@ use risingwave_meta::controller::cluster::ClusterController; use risingwave_meta::manager::MetadataManager; use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer; use risingwave_meta::rpc::ElectionClientRef; +use risingwave_meta::stream::ScaleController; use risingwave_meta::MetaStoreBackend; use risingwave_meta_model_migration::{Migrator, MigratorTrait}; use risingwave_meta_service::backup_service::BackupServiceImpl; @@ -537,6 +538,12 @@ pub async fn start_service_as_election_leader( let stream_rpc_manager = StreamRpcManager::new(env.clone()); + let scale_controller = Arc::new(ScaleController::new( + &metadata_manager, + source_manager.clone(), + env.clone(), + )); + let barrier_manager = GlobalBarrierManager::new( scheduled_barriers, env.clone(), @@ -546,6 +553,7 @@ pub async fn start_service_as_election_leader( sink_manager.clone(), meta_metrics.clone(), stream_rpc_manager.clone(), + scale_controller.clone(), ); { @@ -563,6 +571,7 @@ pub async fn start_service_as_election_leader( source_manager.clone(), hummock_manager.clone(), stream_rpc_manager, + scale_controller.clone(), ) .unwrap(), ); @@ -627,6 +636,7 @@ pub async fn start_service_as_election_leader( source_manager, stream_manager.clone(), barrier_manager.context().clone(), + scale_controller.clone(), ); let cluster_srv = ClusterServiceImpl::new(metadata_manager.clone()); diff --git a/src/meta/service/src/scale_service.rs b/src/meta/service/src/scale_service.rs index 0aece757af465..33899856a57bc 100644 --- a/src/meta/service/src/scale_service.rs +++ b/src/meta/service/src/scale_service.rs @@ -13,12 +13,11 @@ // limitations under the License. use std::collections::HashMap; -use std::sync::Arc; use risingwave_common::catalog; use risingwave_meta::manager::MetadataManager; use risingwave_meta::model::TableParallelism; -use risingwave_meta::stream::{ScaleController, ScaleControllerRef, TableRevision}; +use risingwave_meta::stream::{ScaleControllerRef, TableRevision}; use risingwave_meta_model_v2::FragmentId; use risingwave_pb::common::WorkerType; use risingwave_pb::meta::scale_service_server::ScaleService; @@ -49,14 +48,8 @@ impl ScaleServiceImpl { source_manager: SourceManagerRef, stream_manager: GlobalStreamManagerRef, barrier_manager: BarrierManagerRef, + scale_controller: ScaleControllerRef, ) -> Self { - let scale_controller = Arc::new(ScaleController::new( - &metadata_manager, - source_manager.clone(), - stream_manager.stream_rpc_manager.clone(), - stream_manager.env.clone(), - )); - Self { metadata_manager, source_manager, @@ -239,12 +232,7 @@ impl ScaleService for ScaleServiceImpl { let req = request.into_inner(); - let _reschedule_job_lock = self - .stream_manager - .scale_controller - .reschedule_lock - .read() - .await; + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; let current_revision = self.get_revision().await; diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 9de6df91fed39..c6e54b538c3a8 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -57,7 +57,7 @@ use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager, WorkerId}; use crate::model::{ActorId, TableFragments}; use crate::rpc::metrics::MetaMetrics; -use crate::stream::{ScaleController, ScaleControllerRef, SourceManagerRef}; +use crate::stream::{ScaleControllerRef, SourceManagerRef}; use crate::{MetaError, MetaResult}; mod command; @@ -385,6 +385,7 @@ impl GlobalBarrierManager { sink_manager: SinkCoordinatorManager, metrics: Arc, stream_rpc_manager: StreamRpcManager, + scale_controller: ScaleControllerRef, ) -> Self { let enable_recovery = env.opts.enable_recovery; let in_flight_barrier_nums = env.opts.in_flight_barrier_nums; @@ -398,13 +399,6 @@ impl GlobalBarrierManager { let tracker = CreateMviewProgressTracker::new(); - let scale_controller = Arc::new(ScaleController::new( - &metadata_manager, - source_manager.clone(), - stream_rpc_manager.clone(), - env.clone(), - )); - let context = GlobalBarrierManagerContext { status: Arc::new(Mutex::new(BarrierManagerStatus::Starting)), metadata_manager, diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index fab1c3262cc8f..0740642e9a241 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -205,14 +205,8 @@ impl GlobalStreamManager { source_manager: SourceManagerRef, hummock_manager: HummockManagerRef, stream_rpc_manager: StreamRpcManager, + scale_controller: ScaleControllerRef, ) -> MetaResult { - let scale_controller = Arc::new(ScaleController::new( - &metadata_manager, - source_manager.clone(), - stream_rpc_manager.clone(), - env.clone(), - )); - Ok(Self { env, metadata_manager, @@ -783,7 +777,7 @@ mod tests { use crate::model::{ActorId, FragmentId}; use crate::rpc::ddl_controller::DropMode; use crate::rpc::metrics::MetaMetrics; - use crate::stream::SourceManager; + use crate::stream::{ScaleController, SourceManager}; use crate::MetaOpts; struct FakeFragmentState { @@ -982,6 +976,11 @@ mod tests { let (sink_manager, _) = SinkCoordinatorManager::start_worker(); let stream_rpc_manager = StreamRpcManager::new(env.clone()); + let scale_controller = Arc::new(ScaleController::new( + &metadata_manager, + source_manager.clone(), + env.clone(), + )); let barrier_manager = GlobalBarrierManager::new( scheduled_barriers, @@ -992,6 +991,7 @@ mod tests { sink_manager, meta_metrics.clone(), stream_rpc_manager.clone(), + scale_controller.clone(), ); let stream_manager = GlobalStreamManager::new( @@ -1001,6 +1001,7 @@ mod tests { source_manager.clone(), hummock_manager, stream_rpc_manager, + scale_controller.clone(), )?; let (join_handle_2, shutdown_tx_2) = GlobalBarrierManager::start(barrier_manager); From cfe8845df3b5d6c9dd56684e1e3e31c360d88cf8 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Wed, 7 Feb 2024 16:11:09 +0800 Subject: [PATCH 3/3] Added clone arg to ScaleController ctor, cleaned imports. --- src/meta/node/src/server.rs | 1 + src/meta/src/stream/stream_manager.rs | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 176b3ecced5cd..59e520a3316fb 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -541,6 +541,7 @@ pub async fn start_service_as_election_leader( let scale_controller = Arc::new(ScaleController::new( &metadata_manager, source_manager.clone(), + stream_rpc_manager.clone(), env.clone(), )); diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 0740642e9a241..d7388f50da09c 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -27,7 +27,7 @@ use tokio::sync::mpsc::Sender; use tokio::sync::{oneshot, Mutex}; use tracing::Instrument; -use super::{Locations, RescheduleOptions, ScaleController, ScaleControllerRef, TableResizePolicy}; +use super::{Locations, RescheduleOptions, ScaleControllerRef, TableResizePolicy}; use crate::barrier::{BarrierScheduler, Command, ReplaceTablePlan, StreamRpcManager}; use crate::hummock::HummockManagerRef; use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, StreamingJob}; @@ -979,6 +979,7 @@ mod tests { let scale_controller = Arc::new(ScaleController::new( &metadata_manager, source_manager.clone(), + stream_rpc_manager.clone(), env.clone(), ));