Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(scale): move reschedule_lock to ScaleController & use universal scale_controller #15037

Merged
merged 3 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use risingwave_meta::controller::cluster::ClusterController;
use risingwave_meta::manager::MetadataManager;
use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer;
use risingwave_meta::rpc::ElectionClientRef;
use risingwave_meta::stream::ScaleController;
use risingwave_meta::MetaStoreBackend;
use risingwave_meta_model_migration::{Migrator, MigratorTrait};
use risingwave_meta_service::backup_service::BackupServiceImpl;
Expand Down Expand Up @@ -537,6 +538,13 @@ pub async fn start_service_as_election_leader(

let stream_rpc_manager = StreamRpcManager::new(env.clone());

let scale_controller = Arc::new(ScaleController::new(
&metadata_manager,
source_manager.clone(),
stream_rpc_manager.clone(),
env.clone(),
));

let barrier_manager = GlobalBarrierManager::new(
scheduled_barriers,
env.clone(),
Expand All @@ -546,6 +554,7 @@ pub async fn start_service_as_election_leader(
sink_manager.clone(),
meta_metrics.clone(),
stream_rpc_manager.clone(),
scale_controller.clone(),
);

{
Expand All @@ -563,6 +572,7 @@ pub async fn start_service_as_election_leader(
source_manager.clone(),
hummock_manager.clone(),
stream_rpc_manager,
scale_controller.clone(),
)
.unwrap(),
);
Expand Down Expand Up @@ -627,6 +637,7 @@ pub async fn start_service_as_election_leader(
source_manager,
stream_manager.clone(),
barrier_manager.context().clone(),
scale_controller.clone(),
);

let cluster_srv = ClusterServiceImpl::new(metadata_manager.clone());
Expand Down
17 changes: 5 additions & 12 deletions src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use risingwave_common::catalog;
use risingwave_meta::manager::MetadataManager;
use risingwave_meta::model::TableParallelism;
use risingwave_meta::stream::{ScaleController, ScaleControllerRef, TableRevision};
use risingwave_meta::stream::{ScaleControllerRef, TableRevision};
use risingwave_meta_model_v2::FragmentId;
use risingwave_pb::common::WorkerType;
use risingwave_pb::meta::scale_service_server::ScaleService;
Expand Down Expand Up @@ -49,14 +48,8 @@ impl ScaleServiceImpl {
source_manager: SourceManagerRef,
stream_manager: GlobalStreamManagerRef,
barrier_manager: BarrierManagerRef,
scale_controller: ScaleControllerRef,
) -> Self {
let scale_controller = Arc::new(ScaleController::new(
&metadata_manager,
source_manager.clone(),
stream_manager.stream_rpc_manager.clone(),
stream_manager.env.clone(),
));

Self {
metadata_manager,
source_manager,
Expand All @@ -82,7 +75,7 @@ impl ScaleService for ScaleServiceImpl {
&self,
_: Request<GetClusterInfoRequest>,
) -> Result<Response<GetClusterInfoResponse>, Status> {
let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;
let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;

let table_fragments = match &self.metadata_manager {
MetadataManager::V1(mgr) => mgr
Expand Down Expand Up @@ -149,7 +142,7 @@ impl ScaleService for ScaleServiceImpl {
resolve_no_shuffle_upstream,
} = request.into_inner();

let _reschedule_job_lock = self.stream_manager.reschedule_lock.write().await;
let _reschedule_job_lock = self.stream_manager.reschedule_lock_write_guard().await;

let current_revision = self.get_revision().await;

Expand Down Expand Up @@ -239,7 +232,7 @@ impl ScaleService for ScaleServiceImpl {

let req = request.into_inner();

let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;
let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;

let current_revision = self.get_revision().await;

Expand Down
10 changes: 2 additions & 8 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use crate::manager::sink_coordination::SinkCoordinatorManager;
use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager, WorkerId};
use crate::model::{ActorId, TableFragments};
use crate::rpc::metrics::MetaMetrics;
use crate::stream::{ScaleController, ScaleControllerRef, SourceManagerRef};
use crate::stream::{ScaleControllerRef, SourceManagerRef};
use crate::{MetaError, MetaResult};

mod command;
Expand Down Expand Up @@ -385,6 +385,7 @@ impl GlobalBarrierManager {
sink_manager: SinkCoordinatorManager,
metrics: Arc<MetaMetrics>,
stream_rpc_manager: StreamRpcManager,
scale_controller: ScaleControllerRef,
) -> Self {
let enable_recovery = env.opts.enable_recovery;
let in_flight_barrier_nums = env.opts.in_flight_barrier_nums;
Expand All @@ -398,13 +399,6 @@ impl GlobalBarrierManager {

let tracker = CreateMviewProgressTracker::new();

let scale_controller = Arc::new(ScaleController::new(
&metadata_manager,
source_manager.clone(),
stream_rpc_manager.clone(),
env.clone(),
));

let context = GlobalBarrierManagerContext {
status: Arc::new(Mutex::new(BarrierManagerStatus::Starting)),
metadata_manager,
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ impl GlobalBarrierManagerContext {
}

async fn scale_actors(&self) -> MetaResult<()> {
let _guard = self.scale_controller.reschedule_lock.write().await;
match &self.metadata_manager {
MetadataManager::V1(_) => self.scale_actors_v1().await,
MetadataManager::V2(_) => self.scale_actors_v2().await,
Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ impl DdlController {
.acquire()
.await
.unwrap();
let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;
let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;

let stream_ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());

Expand Down Expand Up @@ -1154,7 +1154,7 @@ impl DdlController {
target_replace_info: Option<ReplaceTableInfo>,
) -> MetaResult<NotificationVersion> {
let mgr = self.metadata_manager.as_v1_ref();
let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;
let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
let (mut version, streaming_job_ids) = match job_id {
StreamingJobId::MaterializedView(table_id) => {
mgr.catalog_manager
Expand Down Expand Up @@ -1647,7 +1647,7 @@ impl DdlController {
.replace_table_v2(stream_job, fragment_graph, table_col_index_mapping)
.await;
};
let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;
let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
let stream_ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());

let fragment_graph = self
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl DdlController {
.acquire()
.await
.unwrap();
let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;
let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;

// create streaming job.
match self
Expand Down Expand Up @@ -284,7 +284,7 @@ impl DdlController {
let mgr = self.metadata_manager.as_v2_ref();
let job_id = streaming_job.id();

let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;
let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());

// 1. build fragment graph.
Expand Down
15 changes: 13 additions & 2 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ use risingwave_pb::meta::FragmentParallelUnitMappings;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{DispatcherType, FragmentTypeFlag, StreamActor, StreamNode};
use thiserror_ext::AsReport;
use tokio::sync::oneshot;
use tokio::sync::oneshot::Receiver;
use tokio::sync::{oneshot, RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::task::JoinHandle;
use tokio::time::MissedTickBehavior;

Expand Down Expand Up @@ -375,6 +375,8 @@ pub struct ScaleController {
pub stream_rpc_manager: StreamRpcManager,

pub env: MetaSrvEnv,

pub reschedule_lock: RwLock<()>,
}

impl ScaleController {
Expand All @@ -389,6 +391,7 @@ impl ScaleController {
metadata_manager: metadata_manager.clone(),
source_manager,
env,
reschedule_lock: RwLock::new(()),
}
}

Expand Down Expand Up @@ -2449,6 +2452,14 @@ pub struct TableResizePolicy {
}

impl GlobalStreamManager {
pub async fn reschedule_lock_read_guard(&self) -> RwLockReadGuard<'_, ()> {
self.scale_controller.reschedule_lock.read().await
}

pub async fn reschedule_lock_write_guard(&self) -> RwLockWriteGuard<'_, ()> {
self.scale_controller.reschedule_lock.write().await
}

pub async fn reschedule_actors(
&self,
reschedules: HashMap<FragmentId, ParallelUnitReschedule>,
Expand Down Expand Up @@ -2518,7 +2529,7 @@ impl GlobalStreamManager {
}

async fn trigger_parallelism_control(&self) -> MetaResult<()> {
let _reschedule_job_lock = self.reschedule_lock.write().await;
let _reschedule_job_lock = self.reschedule_lock_write_guard().await;

match &self.metadata_manager {
MetadataManager::V1(mgr) => {
Expand Down
31 changes: 15 additions & 16 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
use risingwave_pb::stream_plan::Dispatcher;
use thiserror_ext::AsReport;
use tokio::sync::mpsc::Sender;
use tokio::sync::{oneshot, Mutex, RwLock};
use tokio::sync::{oneshot, Mutex};
use tracing::Instrument;

use super::{Locations, RescheduleOptions, ScaleController, ScaleControllerRef, TableResizePolicy};
use super::{Locations, RescheduleOptions, ScaleControllerRef, TableResizePolicy};
use crate::barrier::{BarrierScheduler, Command, ReplaceTablePlan, StreamRpcManager};
use crate::hummock::HummockManagerRef;
use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, StreamingJob};
Expand Down Expand Up @@ -192,9 +192,7 @@ pub struct GlobalStreamManager {

hummock_manager: HummockManagerRef,

pub reschedule_lock: RwLock<()>,

pub(crate) scale_controller: ScaleControllerRef,
pub scale_controller: ScaleControllerRef,

pub stream_rpc_manager: StreamRpcManager,
}
Expand All @@ -207,22 +205,15 @@ impl GlobalStreamManager {
source_manager: SourceManagerRef,
hummock_manager: HummockManagerRef,
stream_rpc_manager: StreamRpcManager,
scale_controller: ScaleControllerRef,
) -> MetaResult<Self> {
let scale_controller = Arc::new(ScaleController::new(
&metadata_manager,
source_manager.clone(),
stream_rpc_manager.clone(),
env.clone(),
));

Ok(Self {
env,
metadata_manager,
barrier_scheduler,
source_manager,
hummock_manager,
creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
reschedule_lock: RwLock::new(()),
scale_controller,
stream_rpc_manager,
})
Expand Down Expand Up @@ -629,7 +620,7 @@ impl GlobalStreamManager {
return vec![];
}

let _reschedule_job_lock = self.reschedule_lock.read().await;
let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
let (receivers, recovered_job_ids) = self.creating_job_info.cancel_jobs(table_ids).await;

let futures = receivers.into_iter().map(|(id, receiver)| async move {
Expand Down Expand Up @@ -688,7 +679,7 @@ impl GlobalStreamManager {
parallelism: TableParallelism,
deferred: bool,
) -> MetaResult<()> {
let _reschedule_job_lock = self.reschedule_lock.write().await;
let _reschedule_job_lock = self.reschedule_lock_write_guard().await;

let worker_nodes = self
.metadata_manager
Expand Down Expand Up @@ -786,7 +777,7 @@ mod tests {
use crate::model::{ActorId, FragmentId};
use crate::rpc::ddl_controller::DropMode;
use crate::rpc::metrics::MetaMetrics;
use crate::stream::SourceManager;
use crate::stream::{ScaleController, SourceManager};
use crate::MetaOpts;

struct FakeFragmentState {
Expand Down Expand Up @@ -985,6 +976,12 @@ mod tests {
let (sink_manager, _) = SinkCoordinatorManager::start_worker();

let stream_rpc_manager = StreamRpcManager::new(env.clone());
let scale_controller = Arc::new(ScaleController::new(
&metadata_manager,
source_manager.clone(),
stream_rpc_manager.clone(),
env.clone(),
));

let barrier_manager = GlobalBarrierManager::new(
scheduled_barriers,
Expand All @@ -995,6 +992,7 @@ mod tests {
sink_manager,
meta_metrics.clone(),
stream_rpc_manager.clone(),
scale_controller.clone(),
);

let stream_manager = GlobalStreamManager::new(
Expand All @@ -1004,6 +1002,7 @@ mod tests {
source_manager.clone(),
hummock_manager,
stream_rpc_manager,
scale_controller.clone(),
)?;

let (join_handle_2, shutdown_tx_2) = GlobalBarrierManager::start(barrier_manager);
Expand Down
Loading