From d104b17b0e26cce8877c27a06f880504709ec6c2 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Tue, 6 Feb 2024 19:07:52 +0800 Subject: [PATCH] Updated lock methods; refactored reschedule_lock. --- src/meta/service/src/scale_service.rs | 11 ++++++++--- src/meta/src/barrier/recovery.rs | 1 + src/meta/src/rpc/ddl_controller.rs | 6 +++--- src/meta/src/rpc/ddl_controller_v2.rs | 4 ++-- src/meta/src/stream/scale.rs | 15 +++++++++++++-- src/meta/src/stream/stream_manager.rs | 11 ++++------- 6 files changed, 31 insertions(+), 17 deletions(-) diff --git a/src/meta/service/src/scale_service.rs b/src/meta/service/src/scale_service.rs index 33270fc2204f9..0aece757af465 100644 --- a/src/meta/service/src/scale_service.rs +++ b/src/meta/service/src/scale_service.rs @@ -82,7 +82,7 @@ impl ScaleService for ScaleServiceImpl { &self, _: Request, ) -> Result, Status> { - let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; let table_fragments = match &self.metadata_manager { MetadataManager::V1(mgr) => mgr @@ -149,7 +149,7 @@ impl ScaleService for ScaleServiceImpl { resolve_no_shuffle_upstream, } = request.into_inner(); - let _reschedule_job_lock = self.stream_manager.reschedule_lock.write().await; + let _reschedule_job_lock = self.stream_manager.reschedule_lock_write_guard().await; let current_revision = self.get_revision().await; @@ -239,7 +239,12 @@ impl ScaleService for ScaleServiceImpl { let req = request.into_inner(); - let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + let _reschedule_job_lock = self + .stream_manager + .scale_controller + .reschedule_lock + .read() + .await; let current_revision = self.get_revision().await; diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index e4b93a286a3f8..1208d2c49b58d 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -595,6 +595,7 @@ impl GlobalBarrierManagerContext { } async fn scale_actors(&self) -> MetaResult<()> { + let _guard = self.scale_controller.reschedule_lock.write().await; match &self.metadata_manager { MetadataManager::V1(_) => self.scale_actors_v1().await, MetadataManager::V2(_) => self.scale_actors_v2().await, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 84ff4ea1de682..7de106523599e 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -654,7 +654,7 @@ impl DdlController { .acquire() .await .unwrap(); - let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; let stream_ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); @@ -1154,7 +1154,7 @@ impl DdlController { target_replace_info: Option, ) -> MetaResult { let mgr = self.metadata_manager.as_v1_ref(); - let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; let (mut version, streaming_job_ids) = match job_id { StreamingJobId::MaterializedView(table_id) => { mgr.catalog_manager @@ -1647,7 +1647,7 @@ impl DdlController { .replace_table_v2(stream_job, fragment_graph, table_col_index_mapping) .await; }; - let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; let stream_ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); let fragment_graph = self diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index dd3defd141aef..748857d2c3da7 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -76,7 +76,7 @@ impl DdlController { .acquire() .await .unwrap(); - let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; // create streaming job. match self @@ -284,7 +284,7 @@ impl DdlController { let mgr = self.metadata_manager.as_v2_ref(); let job_id = streaming_job.id(); - let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); // 1. build fragment graph. diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index e598aefb291d3..7f40f8e3da033 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -41,8 +41,8 @@ use risingwave_pb::meta::FragmentParallelUnitMappings; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{DispatcherType, FragmentTypeFlag, StreamActor, StreamNode}; use thiserror_ext::AsReport; -use tokio::sync::oneshot; use tokio::sync::oneshot::Receiver; +use tokio::sync::{oneshot, RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::task::JoinHandle; use tokio::time::MissedTickBehavior; @@ -375,6 +375,8 @@ pub struct ScaleController { pub stream_rpc_manager: StreamRpcManager, pub env: MetaSrvEnv, + + pub reschedule_lock: RwLock<()>, } impl ScaleController { @@ -389,6 +391,7 @@ impl ScaleController { metadata_manager: metadata_manager.clone(), source_manager, env, + reschedule_lock: RwLock::new(()), } } @@ -2449,6 +2452,14 @@ pub struct TableResizePolicy { } impl GlobalStreamManager { + pub async fn reschedule_lock_read_guard(&self) -> RwLockReadGuard<'_, ()> { + self.scale_controller.reschedule_lock.read().await + } + + pub async fn reschedule_lock_write_guard(&self) -> RwLockWriteGuard<'_, ()> { + self.scale_controller.reschedule_lock.write().await + } + pub async fn reschedule_actors( &self, reschedules: HashMap, @@ -2518,7 +2529,7 @@ impl GlobalStreamManager { } async fn trigger_parallelism_control(&self) -> MetaResult<()> { - let _reschedule_job_lock = self.reschedule_lock.write().await; + let _reschedule_job_lock = self.reschedule_lock_write_guard().await; match &self.metadata_manager { MetadataManager::V1(mgr) => { diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 5889292756ca7..fab1c3262cc8f 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -24,7 +24,7 @@ use risingwave_pb::stream_plan::update_mutation::MergeUpdate; use risingwave_pb::stream_plan::Dispatcher; use thiserror_ext::AsReport; use tokio::sync::mpsc::Sender; -use tokio::sync::{oneshot, Mutex, RwLock}; +use tokio::sync::{oneshot, Mutex}; use tracing::Instrument; use super::{Locations, RescheduleOptions, ScaleController, ScaleControllerRef, TableResizePolicy}; @@ -192,9 +192,7 @@ pub struct GlobalStreamManager { hummock_manager: HummockManagerRef, - pub reschedule_lock: RwLock<()>, - - pub(crate) scale_controller: ScaleControllerRef, + pub scale_controller: ScaleControllerRef, pub stream_rpc_manager: StreamRpcManager, } @@ -222,7 +220,6 @@ impl GlobalStreamManager { source_manager, hummock_manager, creating_job_info: Arc::new(CreatingStreamingJobInfo::default()), - reschedule_lock: RwLock::new(()), scale_controller, stream_rpc_manager, }) @@ -629,7 +626,7 @@ impl GlobalStreamManager { return vec![]; } - let _reschedule_job_lock = self.reschedule_lock.read().await; + let _reschedule_job_lock = self.reschedule_lock_read_guard().await; let (receivers, recovered_job_ids) = self.creating_job_info.cancel_jobs(table_ids).await; let futures = receivers.into_iter().map(|(id, receiver)| async move { @@ -688,7 +685,7 @@ impl GlobalStreamManager { parallelism: TableParallelism, deferred: bool, ) -> MetaResult<()> { - let _reschedule_job_lock = self.reschedule_lock.write().await; + let _reschedule_job_lock = self.reschedule_lock_write_guard().await; let worker_nodes = self .metadata_manager