Skip to content

Commit

Permalink
fix(scale): move reschedule_lock to ScaleController & use universal s…
Browse files Browse the repository at this point in the history
…cale_controller (#15037)

Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky authored Feb 7, 2024
1 parent 05b84b4 commit 85f0023
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 43 deletions.
11 changes: 11 additions & 0 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use risingwave_meta::controller::cluster::ClusterController;
use risingwave_meta::manager::MetadataManager;
use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer;
use risingwave_meta::rpc::ElectionClientRef;
use risingwave_meta::stream::ScaleController;
use risingwave_meta::MetaStoreBackend;
use risingwave_meta_model_migration::{Migrator, MigratorTrait};
use risingwave_meta_service::backup_service::BackupServiceImpl;
Expand Down Expand Up @@ -537,6 +538,13 @@ pub async fn start_service_as_election_leader(

let stream_rpc_manager = StreamRpcManager::new(env.clone());

let scale_controller = Arc::new(ScaleController::new(
&metadata_manager,
source_manager.clone(),
stream_rpc_manager.clone(),
env.clone(),
));

let barrier_manager = GlobalBarrierManager::new(
scheduled_barriers,
env.clone(),
Expand All @@ -546,6 +554,7 @@ pub async fn start_service_as_election_leader(
sink_manager.clone(),
meta_metrics.clone(),
stream_rpc_manager.clone(),
scale_controller.clone(),
);

{
Expand All @@ -563,6 +572,7 @@ pub async fn start_service_as_election_leader(
source_manager.clone(),
hummock_manager.clone(),
stream_rpc_manager,
scale_controller.clone(),
)
.unwrap(),
);
Expand Down Expand Up @@ -627,6 +637,7 @@ pub async fn start_service_as_election_leader(
source_manager,
stream_manager.clone(),
barrier_manager.context().clone(),
scale_controller.clone(),
);

let cluster_srv = ClusterServiceImpl::new(metadata_manager.clone());
Expand Down
17 changes: 5 additions & 12 deletions src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use risingwave_common::catalog;
use risingwave_meta::manager::MetadataManager;
use risingwave_meta::model::TableParallelism;
use risingwave_meta::stream::{ScaleController, ScaleControllerRef, TableRevision};
use risingwave_meta::stream::{ScaleControllerRef, TableRevision};
use risingwave_meta_model_v2::FragmentId;
use risingwave_pb::common::WorkerType;
use risingwave_pb::meta::scale_service_server::ScaleService;
Expand Down Expand Up @@ -49,14 +48,8 @@ impl ScaleServiceImpl {
source_manager: SourceManagerRef,
stream_manager: GlobalStreamManagerRef,
barrier_manager: BarrierManagerRef,
scale_controller: ScaleControllerRef,
) -> Self {
let scale_controller = Arc::new(ScaleController::new(
&metadata_manager,
source_manager.clone(),
stream_manager.stream_rpc_manager.clone(),
stream_manager.env.clone(),
));

Self {
metadata_manager,
source_manager,
Expand All @@ -82,7 +75,7 @@ impl ScaleService for ScaleServiceImpl {
&self,
_: Request<GetClusterInfoRequest>,
) -> Result<Response<GetClusterInfoResponse>, Status> {
let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;
let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;

let table_fragments = match &self.metadata_manager {
MetadataManager::V1(mgr) => mgr
Expand Down Expand Up @@ -149,7 +142,7 @@ impl ScaleService for ScaleServiceImpl {
resolve_no_shuffle_upstream,
} = request.into_inner();

let _reschedule_job_lock = self.stream_manager.reschedule_lock.write().await;
let _reschedule_job_lock = self.stream_manager.reschedule_lock_write_guard().await;

let current_revision = self.get_revision().await;

Expand Down Expand Up @@ -239,7 +232,7 @@ impl ScaleService for ScaleServiceImpl {

let req = request.into_inner();

let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;
let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;

let current_revision = self.get_revision().await;

Expand Down
10 changes: 2 additions & 8 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use crate::manager::sink_coordination::SinkCoordinatorManager;
use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager, WorkerId};
use crate::model::{ActorId, TableFragments};
use crate::rpc::metrics::MetaMetrics;
use crate::stream::{ScaleController, ScaleControllerRef, SourceManagerRef};
use crate::stream::{ScaleControllerRef, SourceManagerRef};
use crate::{MetaError, MetaResult};

mod command;
Expand Down Expand Up @@ -385,6 +385,7 @@ impl GlobalBarrierManager {
sink_manager: SinkCoordinatorManager,
metrics: Arc<MetaMetrics>,
stream_rpc_manager: StreamRpcManager,
scale_controller: ScaleControllerRef,
) -> Self {
let enable_recovery = env.opts.enable_recovery;
let in_flight_barrier_nums = env.opts.in_flight_barrier_nums;
Expand All @@ -398,13 +399,6 @@ impl GlobalBarrierManager {

let tracker = CreateMviewProgressTracker::new();

let scale_controller = Arc::new(ScaleController::new(
&metadata_manager,
source_manager.clone(),
stream_rpc_manager.clone(),
env.clone(),
));

let context = GlobalBarrierManagerContext {
status: Arc::new(Mutex::new(BarrierManagerStatus::Starting)),
metadata_manager,
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ impl GlobalBarrierManagerContext {
}

async fn scale_actors(&self) -> MetaResult<()> {
let _guard = self.scale_controller.reschedule_lock.write().await;
match &self.metadata_manager {
MetadataManager::V1(_) => self.scale_actors_v1().await,
MetadataManager::V2(_) => self.scale_actors_v2().await,
Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ impl DdlController {
.acquire()
.await
.unwrap();
let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;
let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;

let stream_ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());

Expand Down Expand Up @@ -1154,7 +1154,7 @@ impl DdlController {
target_replace_info: Option<ReplaceTableInfo>,
) -> MetaResult<NotificationVersion> {
let mgr = self.metadata_manager.as_v1_ref();
let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;
let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
let (mut version, streaming_job_ids) = match job_id {
StreamingJobId::MaterializedView(table_id) => {
mgr.catalog_manager
Expand Down Expand Up @@ -1647,7 +1647,7 @@ impl DdlController {
.replace_table_v2(stream_job, fragment_graph, table_col_index_mapping)
.await;
};
let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;
let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
let stream_ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());

let fragment_graph = self
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl DdlController {
.acquire()
.await
.unwrap();
let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;
let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;

// create streaming job.
match self
Expand Down Expand Up @@ -284,7 +284,7 @@ impl DdlController {
let mgr = self.metadata_manager.as_v2_ref();
let job_id = streaming_job.id();

let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;
let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());

// 1. build fragment graph.
Expand Down
15 changes: 13 additions & 2 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ use risingwave_pb::meta::FragmentParallelUnitMappings;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{DispatcherType, FragmentTypeFlag, StreamActor, StreamNode};
use thiserror_ext::AsReport;
use tokio::sync::oneshot;
use tokio::sync::oneshot::Receiver;
use tokio::sync::{oneshot, RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::task::JoinHandle;
use tokio::time::MissedTickBehavior;

Expand Down Expand Up @@ -375,6 +375,8 @@ pub struct ScaleController {
pub stream_rpc_manager: StreamRpcManager,

pub env: MetaSrvEnv,

pub reschedule_lock: RwLock<()>,
}

impl ScaleController {
Expand All @@ -389,6 +391,7 @@ impl ScaleController {
metadata_manager: metadata_manager.clone(),
source_manager,
env,
reschedule_lock: RwLock::new(()),
}
}

Expand Down Expand Up @@ -2449,6 +2452,14 @@ pub struct TableResizePolicy {
}

impl GlobalStreamManager {
pub async fn reschedule_lock_read_guard(&self) -> RwLockReadGuard<'_, ()> {
self.scale_controller.reschedule_lock.read().await
}

pub async fn reschedule_lock_write_guard(&self) -> RwLockWriteGuard<'_, ()> {
self.scale_controller.reschedule_lock.write().await
}

pub async fn reschedule_actors(
&self,
reschedules: HashMap<FragmentId, ParallelUnitReschedule>,
Expand Down Expand Up @@ -2518,7 +2529,7 @@ impl GlobalStreamManager {
}

async fn trigger_parallelism_control(&self) -> MetaResult<()> {
let _reschedule_job_lock = self.reschedule_lock.write().await;
let _reschedule_job_lock = self.reschedule_lock_write_guard().await;

match &self.metadata_manager {
MetadataManager::V1(mgr) => {
Expand Down
31 changes: 15 additions & 16 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
use risingwave_pb::stream_plan::Dispatcher;
use thiserror_ext::AsReport;
use tokio::sync::mpsc::Sender;
use tokio::sync::{oneshot, Mutex, RwLock};
use tokio::sync::{oneshot, Mutex};
use tracing::Instrument;

use super::{Locations, RescheduleOptions, ScaleController, ScaleControllerRef, TableResizePolicy};
use super::{Locations, RescheduleOptions, ScaleControllerRef, TableResizePolicy};
use crate::barrier::{BarrierScheduler, Command, ReplaceTablePlan, StreamRpcManager};
use crate::hummock::HummockManagerRef;
use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, StreamingJob};
Expand Down Expand Up @@ -192,9 +192,7 @@ pub struct GlobalStreamManager {

hummock_manager: HummockManagerRef,

pub reschedule_lock: RwLock<()>,

pub(crate) scale_controller: ScaleControllerRef,
pub scale_controller: ScaleControllerRef,

pub stream_rpc_manager: StreamRpcManager,
}
Expand All @@ -207,22 +205,15 @@ impl GlobalStreamManager {
source_manager: SourceManagerRef,
hummock_manager: HummockManagerRef,
stream_rpc_manager: StreamRpcManager,
scale_controller: ScaleControllerRef,
) -> MetaResult<Self> {
let scale_controller = Arc::new(ScaleController::new(
&metadata_manager,
source_manager.clone(),
stream_rpc_manager.clone(),
env.clone(),
));

Ok(Self {
env,
metadata_manager,
barrier_scheduler,
source_manager,
hummock_manager,
creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
reschedule_lock: RwLock::new(()),
scale_controller,
stream_rpc_manager,
})
Expand Down Expand Up @@ -629,7 +620,7 @@ impl GlobalStreamManager {
return vec![];
}

let _reschedule_job_lock = self.reschedule_lock.read().await;
let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
let (receivers, recovered_job_ids) = self.creating_job_info.cancel_jobs(table_ids).await;

let futures = receivers.into_iter().map(|(id, receiver)| async move {
Expand Down Expand Up @@ -688,7 +679,7 @@ impl GlobalStreamManager {
parallelism: TableParallelism,
deferred: bool,
) -> MetaResult<()> {
let _reschedule_job_lock = self.reschedule_lock.write().await;
let _reschedule_job_lock = self.reschedule_lock_write_guard().await;

let worker_nodes = self
.metadata_manager
Expand Down Expand Up @@ -786,7 +777,7 @@ mod tests {
use crate::model::{ActorId, FragmentId};
use crate::rpc::ddl_controller::DropMode;
use crate::rpc::metrics::MetaMetrics;
use crate::stream::SourceManager;
use crate::stream::{ScaleController, SourceManager};
use crate::MetaOpts;

struct FakeFragmentState {
Expand Down Expand Up @@ -985,6 +976,12 @@ mod tests {
let (sink_manager, _) = SinkCoordinatorManager::start_worker();

let stream_rpc_manager = StreamRpcManager::new(env.clone());
let scale_controller = Arc::new(ScaleController::new(
&metadata_manager,
source_manager.clone(),
stream_rpc_manager.clone(),
env.clone(),
));

let barrier_manager = GlobalBarrierManager::new(
scheduled_barriers,
Expand All @@ -995,6 +992,7 @@ mod tests {
sink_manager,
meta_metrics.clone(),
stream_rpc_manager.clone(),
scale_controller.clone(),
);

let stream_manager = GlobalStreamManager::new(
Expand All @@ -1004,6 +1002,7 @@ mod tests {
source_manager.clone(),
hummock_manager,
stream_rpc_manager,
scale_controller.clone(),
)?;

let (join_handle_2, shutdown_tx_2) = GlobalBarrierManager::start(barrier_manager);
Expand Down

0 comments on commit 85f0023

Please sign in to comment.