Skip to content

Commit

Permalink
Updated lock methods; refactored reschedule_lock.
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Feb 6, 2024
1 parent 3da8d14 commit e72c48f
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 17 deletions.
11 changes: 8 additions & 3 deletions src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl ScaleService for ScaleServiceImpl {
&self,
_: Request<GetClusterInfoRequest>,
) -> Result<Response<GetClusterInfoResponse>, Status> {
let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;
let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;

let table_fragments = match &self.metadata_manager {
MetadataManager::V1(mgr) => mgr
Expand Down Expand Up @@ -149,7 +149,7 @@ impl ScaleService for ScaleServiceImpl {
resolve_no_shuffle_upstream,
} = request.into_inner();

let _reschedule_job_lock = self.stream_manager.reschedule_lock.write().await;
let _reschedule_job_lock = self.stream_manager.reschedule_lock_write_guard().await;

let current_revision = self.get_revision().await;

Expand Down Expand Up @@ -239,7 +239,12 @@ impl ScaleService for ScaleServiceImpl {

let req = request.into_inner();

let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;
let _reschedule_job_lock = self
.stream_manager
.scale_controller
.reschedule_lock
.read()
.await;

let current_revision = self.get_revision().await;

Expand Down
1 change: 1 addition & 0 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ impl GlobalBarrierManagerContext {
}

async fn scale_actors(&self) -> MetaResult<()> {
let _guard = self.scale_controller.reschedule_lock.write().await;
match &self.metadata_manager {
MetadataManager::V1(_) => self.scale_actors_v1().await,
MetadataManager::V2(_) => self.scale_actors_v2().await,
Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ impl DdlController {
.acquire()
.await
.unwrap();
let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;
let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;

let stream_ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());

Expand Down Expand Up @@ -1154,7 +1154,7 @@ impl DdlController {
target_replace_info: Option<ReplaceTableInfo>,
) -> MetaResult<NotificationVersion> {
let mgr = self.metadata_manager.as_v1_ref();
let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;
let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
let (mut version, streaming_job_ids) = match job_id {
StreamingJobId::MaterializedView(table_id) => {
mgr.catalog_manager
Expand Down Expand Up @@ -1647,7 +1647,7 @@ impl DdlController {
.replace_table_v2(stream_job, fragment_graph, table_col_index_mapping)
.await;
};
let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;
let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
let stream_ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());

let fragment_graph = self
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl DdlController {
.acquire()
.await
.unwrap();
let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;
let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;

// create streaming job.
match self
Expand Down Expand Up @@ -284,7 +284,7 @@ impl DdlController {
let mgr = self.metadata_manager.as_v2_ref();
let job_id = streaming_job.id();

let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;
let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());

// 1. build fragment graph.
Expand Down
15 changes: 13 additions & 2 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ use risingwave_pb::meta::FragmentParallelUnitMappings;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{DispatcherType, FragmentTypeFlag, StreamActor, StreamNode};
use thiserror_ext::AsReport;
use tokio::sync::oneshot;
use tokio::sync::oneshot::Receiver;
use tokio::sync::{oneshot, RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::task::JoinHandle;
use tokio::time::MissedTickBehavior;

Expand Down Expand Up @@ -375,6 +375,8 @@ pub struct ScaleController {
pub stream_rpc_manager: StreamRpcManager,

pub env: MetaSrvEnv,

pub reschedule_lock: RwLock<()>,
}

impl ScaleController {
Expand All @@ -389,6 +391,7 @@ impl ScaleController {
metadata_manager: metadata_manager.clone(),
source_manager,
env,
reschedule_lock: RwLock::new(()),
}
}

Expand Down Expand Up @@ -2449,6 +2452,14 @@ pub struct TableResizePolicy {
}

impl GlobalStreamManager {
pub async fn reschedule_lock_read_guard(&self) -> RwLockReadGuard<'_, ()> {
self.scale_controller.reschedule_lock.read().await
}

pub async fn reschedule_lock_write_guard(&self) -> RwLockWriteGuard<'_, ()> {
self.scale_controller.reschedule_lock.write().await
}

pub async fn reschedule_actors(
&self,
reschedules: HashMap<FragmentId, ParallelUnitReschedule>,
Expand Down Expand Up @@ -2518,7 +2529,7 @@ impl GlobalStreamManager {
}

async fn trigger_parallelism_control(&self) -> MetaResult<()> {
let _reschedule_job_lock = self.reschedule_lock.write().await;
let _reschedule_job_lock = self.reschedule_lock_write_guard().await;

match &self.metadata_manager {
MetadataManager::V1(mgr) => {
Expand Down
11 changes: 4 additions & 7 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
use risingwave_pb::stream_plan::Dispatcher;
use thiserror_ext::AsReport;
use tokio::sync::mpsc::Sender;
use tokio::sync::{oneshot, Mutex, RwLock};
use tokio::sync::{oneshot, Mutex};
use tracing::Instrument;

use super::{Locations, RescheduleOptions, ScaleController, ScaleControllerRef, TableResizePolicy};
Expand Down Expand Up @@ -192,9 +192,7 @@ pub struct GlobalStreamManager {

hummock_manager: HummockManagerRef,

pub reschedule_lock: RwLock<()>,

pub(crate) scale_controller: ScaleControllerRef,
pub scale_controller: ScaleControllerRef,

pub stream_rpc_manager: StreamRpcManager,
}
Expand Down Expand Up @@ -222,7 +220,6 @@ impl GlobalStreamManager {
source_manager,
hummock_manager,
creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
reschedule_lock: RwLock::new(()),
scale_controller,
stream_rpc_manager,
})
Expand Down Expand Up @@ -629,7 +626,7 @@ impl GlobalStreamManager {
return vec![];
}

let _reschedule_job_lock = self.reschedule_lock.read().await;
let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
let (receivers, recovered_job_ids) = self.creating_job_info.cancel_jobs(table_ids).await;

let futures = receivers.into_iter().map(|(id, receiver)| async move {
Expand Down Expand Up @@ -688,7 +685,7 @@ impl GlobalStreamManager {
parallelism: TableParallelism,
deferred: bool,
) -> MetaResult<()> {
let _reschedule_job_lock = self.reschedule_lock.write().await;
let _reschedule_job_lock = self.reschedule_lock_write_guard().await;

let worker_nodes = self
.metadata_manager
Expand Down

0 comments on commit e72c48f

Please sign in to comment.