diff --git a/src/meta/service/src/scale_service.rs b/src/meta/service/src/scale_service.rs index 33270fc2204f9..885198172c6a5 100644 --- a/src/meta/service/src/scale_service.rs +++ b/src/meta/service/src/scale_service.rs @@ -141,7 +141,7 @@ impl ScaleService for ScaleServiceImpl { &self, request: Request, ) -> Result, Status> { - self.barrier_manager.check_status_running().await?; + self.barrier_manager.check_status_running()?; let RescheduleRequest { reschedules, @@ -235,7 +235,7 @@ impl ScaleService for ScaleServiceImpl { &self, request: Request, ) -> Result, Status> { - self.barrier_manager.check_status_running().await?; + self.barrier_manager.check_status_running()?; let req = request.into_inner(); diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 9de6df91fed39..ed59e65ba50c6 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -19,6 +19,7 @@ use std::mem::take; use std::sync::Arc; use std::time::Duration; +use arc_swap::ArcSwap; use fail::fail_point; use itertools::Itertools; use prometheus::HistogramTimer; @@ -32,6 +33,7 @@ use risingwave_hummock_sdk::table_watermark::{ }; use risingwave_hummock_sdk::{ExtendedSstableInfo, HummockSstableObjectId}; use risingwave_pb::catalog::table::TableType; +use risingwave_pb::common::WorkerNode; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::PausedReason; @@ -41,7 +43,7 @@ use thiserror_ext::AsReport; use tokio::sync::oneshot::{Receiver, Sender}; use tokio::sync::Mutex; use tokio::task::JoinHandle; -use tracing::Instrument; +use tracing::{info, warn, Instrument}; use self::command::CommandContext; use self::notifier::Notifier; @@ -54,7 +56,9 @@ use crate::barrier::state::BarrierManagerState; use crate::barrier::BarrierEpochState::{Completed, InFlight}; use crate::hummock::{CommitEpochInfo, HummockManagerRef}; use crate::manager::sink_coordination::SinkCoordinatorManager; -use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager, WorkerId}; +use crate::manager::{ + ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv, MetadataManager, WorkerId, +}; use crate::model::{ActorId, TableFragments}; use crate::rpc::metrics::MetaMetrics; use crate::stream::{ScaleController, ScaleControllerRef, SourceManagerRef}; @@ -134,7 +138,7 @@ struct Scheduled { #[derive(Clone)] pub struct GlobalBarrierManagerContext { - status: Arc>, + status: Arc>, tracker: Arc>, @@ -183,6 +187,8 @@ pub struct GlobalBarrierManager { checkpoint_control: CheckpointControl, rpc_manager: BarrierRpcManager, + + active_streaming_nodes: ActiveStreamingWorkerNodes, } /// Controls the concurrent execution of commands. @@ -396,6 +402,9 @@ impl GlobalBarrierManager { ); let checkpoint_control = CheckpointControl::new(metrics.clone()); + let active_streaming_nodes = + ActiveStreamingWorkerNodes::uninitialized(metadata_manager.clone()); + let tracker = CreateMviewProgressTracker::new(); let scale_controller = Arc::new(ScaleController::new( @@ -406,7 +415,7 @@ impl GlobalBarrierManager { )); let context = GlobalBarrierManagerContext { - status: Arc::new(Mutex::new(BarrierManagerStatus::Starting)), + status: Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting))), metadata_manager, hummock_manager, source_manager, @@ -429,6 +438,7 @@ impl GlobalBarrierManager { state: initial_invalid_state, checkpoint_control, rpc_manager, + active_streaming_nodes, } } @@ -480,6 +490,7 @@ impl GlobalBarrierManager { let interval = Duration::from_millis( self.env.system_params_reader().await.barrier_interval_ms() as u64, ); + self.scheduled_barriers.set_min_interval(interval); tracing::info!( "Starting barrier manager with: interval={:?}, enable_recovery={}, in_flight_barrier_nums={}", interval, @@ -504,7 +515,7 @@ impl GlobalBarrierManager { } } - self.state = { + { let latest_snapshot = self.context.hummock_manager.latest_snapshot(); assert_eq!( latest_snapshot.committed_epoch, latest_snapshot.current_epoch, @@ -517,23 +528,19 @@ impl GlobalBarrierManager { // Even if there's no actor to recover, we still go through the recovery process to // inject the first `Initial` barrier. self.context - .set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap)) - .await; + .set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap)); let span = tracing::info_span!("bootstrap_recovery", prev_epoch = prev_epoch.value().0); let paused = self.take_pause_on_bootstrap().await.unwrap_or(false); let paused_reason = paused.then_some(PausedReason::Manual); - self.context - .recovery(prev_epoch, paused_reason, &self.scheduled_barriers) + self.recovery(prev_epoch, paused_reason) .instrument(span) - .await - }; + .await; + } - self.context.set_status(BarrierManagerStatus::Running).await; + self.context.set_status(BarrierManagerStatus::Running); - let mut min_interval = tokio::time::interval(interval); - min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); let (local_notification_tx, mut local_notification_rx) = tokio::sync::mpsc::unbounded_channel(); self.env @@ -551,16 +558,64 @@ impl GlobalBarrierManager { tracing::info!("Barrier manager is stopped"); break; } + + changed_worker = self.active_streaming_nodes.changed() => { + #[cfg(debug_assertions)] + { + match self + .context + .metadata_manager + .list_active_streaming_compute_nodes() + .await + { + Ok(worker_nodes) => { + let ignore_irrelevant_info = |node: &WorkerNode| { + ( + node.id, + WorkerNode { + id: node.id, + r#type: node.r#type, + host: node.host.clone(), + parallel_units: node.parallel_units.clone(), + property: node.property.clone(), + resource: node.resource.clone(), + ..Default::default() + }, + ) + }; + let worker_nodes: HashMap<_, _> = + worker_nodes.iter().map(ignore_irrelevant_info).collect(); + let curr_worker_nodes: HashMap<_, _> = self + .active_streaming_nodes + .current() + .values() + .map(ignore_irrelevant_info) + .collect(); + if worker_nodes != curr_worker_nodes { + warn!( + ?worker_nodes, + ?curr_worker_nodes, + "different to global snapshot" + ); + } + } + Err(e) => { + warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot"); + } + } + } + + info!(?changed_worker, "worker changed"); + + // TODO: may apply the changed worker to state + } + // Checkpoint frequency changes. notification = local_notification_rx.recv() => { let notification = notification.unwrap(); // Handle barrier interval and checkpoint frequency changes if let LocalNotification::SystemParamsChange(p) = ¬ification { - let new_interval = Duration::from_millis(p.barrier_interval_ms() as u64); - if new_interval != min_interval.period() { - min_interval = tokio::time::interval(new_interval); - min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - } + self.scheduled_barriers.set_min_interval(Duration::from_millis(p.barrier_interval_ms() as u64)); self.scheduled_barriers .set_checkpoint_frequency(p.checkpoint_frequency() as usize) } @@ -572,15 +627,11 @@ impl GlobalBarrierManager { ) .await; } - - // There's barrier scheduled. - _ = self.scheduled_barriers.wait_one(), if self.checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => { - min_interval.reset(); // Reset the interval as we have a new barrier. - self.handle_new_barrier().await; - } - // Minimum interval reached. - _ = min_interval.tick(), if self.checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => { - self.handle_new_barrier().await; + scheduled = self.scheduled_barriers.next_barrier(), + if self + .checkpoint_control + .can_inject_barrier(self.in_flight_barrier_nums) => { + self.handle_new_barrier(scheduled); } } self.checkpoint_control.update_barrier_nums_metrics(); @@ -588,26 +639,17 @@ impl GlobalBarrierManager { } /// Handle the new barrier from the scheduled queue and inject it. - async fn handle_new_barrier(&mut self) { - assert!(self - .checkpoint_control - .can_inject_barrier(self.in_flight_barrier_nums)); - + fn handle_new_barrier(&mut self, scheduled: Scheduled) { let Scheduled { command, mut notifiers, send_latency_timer, checkpoint, span, - } = self.scheduled_barriers.pop_or_default().await; + } = scheduled; - let all_nodes = self - .context - .metadata_manager - .list_active_streaming_compute_nodes() - .await - .unwrap(); - self.state.resolve_worker_nodes(all_nodes); + self.state + .resolve_worker_nodes(self.active_streaming_nodes.current().values().cloned()); let info = self.state.apply_command(&command); let (prev_epoch, curr_epoch) = self.state.next_epoch_pair(); @@ -629,15 +671,12 @@ impl GlobalBarrierManager { command, kind, self.context.clone(), - span.clone(), + span, )); send_latency_timer.observe_duration(); - self.rpc_manager - .inject_barrier(command_ctx.clone()) - .instrument(span) - .await; + self.rpc_manager.inject_barrier(command_ctx.clone()); // Notify about the injection. let prev_paused_reason = self.state.paused_reason(); @@ -649,7 +688,7 @@ impl GlobalBarrierManager { prev_paused_reason, curr_paused_reason, }; - notifiers.iter_mut().for_each(|n| n.notify_injected(info)); + notifiers.iter_mut().for_each(|n| n.notify_started(info)); // Update the paused state after the barrier is injected. self.state.set_paused_reason(curr_paused_reason); @@ -728,8 +767,7 @@ impl GlobalBarrierManager { self.context .set_status(BarrierManagerStatus::Recovering(RecoveryReason::Failover( err.clone(), - ))) - .await; + ))); let latest_snapshot = self.context.hummock_manager.latest_snapshot(); let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recovery from the committed epoch let span = tracing::info_span!( @@ -740,12 +778,8 @@ impl GlobalBarrierManager { // No need to clean dirty tables for barrier recovery, // The foreground stream job should cleanup their own tables. - self.state = self - .context - .recovery(prev_epoch, None, &self.scheduled_barriers) - .instrument(span) - .await; - self.context.set_status(BarrierManagerStatus::Running).await; + self.recovery(prev_epoch, None).instrument(span).await; + self.context.set_status(BarrierManagerStatus::Running); } else { panic!("failed to execute barrier: {}", err.as_report()); } @@ -897,9 +931,9 @@ impl GlobalBarrierManager { impl GlobalBarrierManagerContext { /// Check the status of barrier manager, return error if it is not `Running`. - pub async fn check_status_running(&self) -> MetaResult<()> { - let status = self.status.lock().await; - match &*status { + pub fn check_status_running(&self) -> MetaResult<()> { + let status = self.status.load(); + match &**status { BarrierManagerStatus::Starting | BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap) => { bail!("The cluster is bootstrapping") @@ -912,33 +946,25 @@ impl GlobalBarrierManagerContext { } /// Set barrier manager status. - async fn set_status(&self, new_status: BarrierManagerStatus) { - let mut status = self.status.lock().await; - *status = new_status; + fn set_status(&self, new_status: BarrierManagerStatus) { + self.status.store(Arc::new(new_status)); } /// Resolve actor information from cluster, fragment manager and `ChangedTableId`. /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor /// will create or drop before this barrier flow through them. - async fn resolve_actor_info(&self) -> MetaResult { + async fn resolve_actor_info( + &self, + all_nodes: Vec, + ) -> MetaResult { let info = match &self.metadata_manager { MetadataManager::V1(mgr) => { - let all_nodes = mgr - .cluster_manager - .list_active_streaming_compute_nodes() - .await; let all_actor_infos = mgr.fragment_manager.load_all_actors().await; InflightActorInfo::resolve(all_nodes, all_actor_infos) } MetadataManager::V2(mgr) => { - let all_nodes = mgr - .cluster_controller - .list_active_streaming_workers() - .await - .unwrap(); let all_actor_infos = mgr.catalog_controller.load_all_actors().await?; - InflightActorInfo::resolve(all_nodes, all_actor_infos) } }; diff --git a/src/meta/src/barrier/notifier.rs b/src/meta/src/barrier/notifier.rs index 1675544d7a410..e142e9be514cf 100644 --- a/src/meta/src/barrier/notifier.rs +++ b/src/meta/src/barrier/notifier.rs @@ -31,8 +31,8 @@ pub struct BarrierInfo { /// Used for notifying the status of a scheduled command/barrier. #[derive(Debug, Default)] pub(crate) struct Notifier { - /// Get notified when scheduled barrier is injected to compute nodes. - pub injected: Option>, + /// Get notified when scheduled barrier has started to be handled. + pub started: Option>, /// Get notified when scheduled barrier is collected or failed. pub collected: Option>>, @@ -43,8 +43,8 @@ pub(crate) struct Notifier { impl Notifier { /// Notify when we have injected a barrier to compute nodes. - pub fn notify_injected(&mut self, info: BarrierInfo) { - if let Some(tx) = self.injected.take() { + pub fn notify_started(&mut self, info: BarrierInfo) { + if let Some(tx) = self.started.take() { tx.send(info).ok(); } } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index e4b93a286a3f8..ce215505a3a21 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -21,7 +21,7 @@ use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_meta_model_v2::StreamingParallelism; -use risingwave_pb::common::ActorInfo; +use risingwave_pb::common::{ActorInfo, WorkerNode}; use risingwave_pb::meta::table_fragments::State; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; @@ -37,16 +37,15 @@ use crate::barrier::command::CommandContext; use crate::barrier::info::InflightActorInfo; use crate::barrier::notifier::Notifier; use crate::barrier::progress::CreateMviewProgressTracker; -use crate::barrier::schedule::ScheduledBarriers; use crate::barrier::state::BarrierManagerState; -use crate::barrier::{Command, GlobalBarrierManagerContext}; +use crate::barrier::{Command, GlobalBarrierManager, GlobalBarrierManagerContext}; use crate::controller::catalog::ReleaseContext; -use crate::manager::{MetadataManager, WorkerId}; +use crate::manager::{ActiveStreamingWorkerNodes, MetadataManager, WorkerId}; use crate::model::{MetadataModel, MigrationPlan, TableFragments, TableParallelism}; use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy}; use crate::MetaResult; -impl GlobalBarrierManagerContext { +impl GlobalBarrierManager { // Retry base interval in milliseconds. const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20; // Retry max interval. @@ -59,7 +58,9 @@ impl GlobalBarrierManagerContext { .max_delay(Self::RECOVERY_RETRY_MAX_INTERVAL) .map(jitter) } +} +impl GlobalBarrierManagerContext { /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted. async fn clean_dirty_streaming_jobs(&self) -> MetaResult<()> { match &self.metadata_manager { @@ -295,17 +296,15 @@ impl GlobalBarrierManagerContext { Ok(()) } +} +impl GlobalBarrierManager { /// Pre buffered drop and cancel command, return true if any. - async fn pre_apply_drop_cancel( - &self, - scheduled_barriers: &ScheduledBarriers, - ) -> MetaResult { - let (dropped_actors, cancelled) = - scheduled_barriers.pre_apply_drop_cancel_scheduled().await; + async fn pre_apply_drop_cancel(&self) -> MetaResult { + let (dropped_actors, cancelled) = self.scheduled_barriers.pre_apply_drop_cancel_scheduled(); let applied = !dropped_actors.is_empty() || !cancelled.is_empty(); if !cancelled.is_empty() { - match &self.metadata_manager { + match &self.context.metadata_manager { MetadataManager::V1(mgr) => { mgr.fragment_manager .drop_table_fragments_vec(&cancelled) @@ -330,57 +329,73 @@ impl GlobalBarrierManagerContext { /// the cluster or `risectl` command. Used for debugging purpose. /// /// Returns the new state of the barrier manager after recovery. - pub async fn recovery( - &self, - prev_epoch: TracedEpoch, - paused_reason: Option, - scheduled_barriers: &ScheduledBarriers, - ) -> BarrierManagerState { + pub async fn recovery(&mut self, prev_epoch: TracedEpoch, paused_reason: Option) { // Mark blocked and abort buffered schedules, they might be dirty already. - scheduled_barriers - .abort_and_mark_blocked("cluster is under recovering") - .await; + self.scheduled_barriers + .abort_and_mark_blocked("cluster is under recovering"); tracing::info!("recovery start!"); - self.clean_dirty_streaming_jobs() + self.context + .clean_dirty_streaming_jobs() .await .expect("clean dirty streaming jobs"); - self.sink_manager.reset().await; + self.context.sink_manager.reset().await; let retry_strategy = Self::get_retry_strategy(); // Mview progress needs to be recovered. tracing::info!("recovering mview progress"); - self.recover_background_mv_progress() + self.context + .recover_background_mv_progress() .await .expect("recover mview progress should not fail"); tracing::info!("recovered mview progress"); // We take retry into consideration because this is the latency user sees for a cluster to // get recovered. - let recovery_timer = self.metrics.recovery_latency.start_timer(); + let recovery_timer = self.context.metrics.recovery_latency.start_timer(); - let state = tokio_retry::Retry::spawn(retry_strategy, || { + let (state, active_streaming_nodes) = tokio_retry::Retry::spawn(retry_strategy, || { async { let recovery_result: MetaResult<_> = try { // This is a quick path to accelerate the process of dropping and canceling streaming jobs. - let _ = self.pre_apply_drop_cancel(scheduled_barriers).await?; + let _ = self.pre_apply_drop_cancel().await?; + + let active_streaming_nodes = ActiveStreamingWorkerNodes::new_snapshot( + self.context.metadata_manager.clone(), + ) + .await?; + + let all_nodes = active_streaming_nodes + .current() + .values() + .cloned() + .collect_vec(); // Resolve actor info for recovery. If there's no actor to recover, most of the // following steps will be no-op, while the compute nodes will still be reset. - let mut info = if !self.env.opts.disable_automatic_parallelism_control { - self.scale_actors().await.inspect_err(|err| { - warn!(error = %err.as_report(), "scale actors failed"); - })?; - - self.resolve_actor_info().await.inspect_err(|err| { - warn!(error = %err.as_report(), "resolve actor info failed"); - })? + let info = if !self.env.opts.disable_automatic_parallelism_control { + self.context + .scale_actors(all_nodes.clone()) + .await + .inspect_err(|err| { + warn!(error = %err.as_report(), "scale actors failed"); + })?; + + self.context + .resolve_actor_info(all_nodes.clone()) + .await + .inspect_err(|err| { + warn!(error = %err.as_report(), "resolve actor info failed"); + })? } else { // Migrate actors in expired CN to newly joined one. - self.migrate_actors().await.inspect_err(|err| { - warn!(error = %err.as_report(), "migrate actors failed"); - })? + self.context + .migrate_actors(all_nodes.clone()) + .await + .inspect_err(|err| { + warn!(error = %err.as_report(), "migrate actors failed"); + })? }; // Reset all compute nodes, stop and drop existing actors. @@ -388,11 +403,16 @@ impl GlobalBarrierManagerContext { warn!(error = %err.as_report(), "reset compute nodes failed"); })?; - if self.pre_apply_drop_cancel(scheduled_barriers).await? { - info = self.resolve_actor_info().await.inspect_err(|err| { - warn!(error = %err.as_report(), "resolve actor info failed"); - })?; - } + let info = if self.pre_apply_drop_cancel().await? { + self.context + .resolve_actor_info(all_nodes.clone()) + .await + .inspect_err(|err| { + warn!(error = %err.as_report(), "resolve actor info failed"); + })? + } else { + info + }; // update and build all actors. self.update_actors(&info).await.inspect_err(|err| { @@ -403,7 +423,8 @@ impl GlobalBarrierManagerContext { })?; // get split assignments for all actors - let source_split_assignments = self.source_manager.list_assignments().await; + let source_split_assignments = + self.context.source_manager.list_assignments().await; let command = Command::Plain(Some(Mutation::Add(AddMutation { // Actors built during recovery is not treated as newly added actors. actor_dispatchers: Default::default(), @@ -423,7 +444,7 @@ impl GlobalBarrierManagerContext { paused_reason, command, BarrierKind::Initial, - self.clone(), + self.context.clone(), tracing::Span::current(), // recovery span )); @@ -431,14 +452,22 @@ impl GlobalBarrierManagerContext { { use risingwave_common::util::epoch::INVALID_EPOCH; - let mce = self.hummock_manager.get_current_max_committed_epoch().await; + let mce = self + .context + .hummock_manager + .get_current_max_committed_epoch() + .await; if mce != INVALID_EPOCH { command_ctx.wait_epoch_commit(mce).await?; } }; - let await_barrier_complete = self.inject_barrier(command_ctx.clone()).await; - let res = match await_barrier_complete.await.result { + let res = match self + .context + .inject_barrier(command_ctx.clone(), None, None) + .await + .result + { Ok(response) => { if let Err(err) = command_ctx.post_collect().await { warn!(error = %err.as_report(), "post_collect failed"); @@ -454,10 +483,13 @@ impl GlobalBarrierManagerContext { }; let (new_epoch, _) = res?; - BarrierManagerState::new(new_epoch, info, command_ctx.next_paused_reason()) + ( + BarrierManagerState::new(new_epoch, info, command_ctx.next_paused_reason()), + active_streaming_nodes, + ) }; if recovery_result.is_err() { - self.metrics.recovery_failure_cnt.inc(); + self.context.metrics.recovery_failure_cnt.inc(); } recovery_result } @@ -467,7 +499,7 @@ impl GlobalBarrierManagerContext { .expect("Retry until recovery success."); recovery_timer.observe_duration(); - scheduled_barriers.mark_ready().await; + self.scheduled_barriers.mark_ready(); tracing::info!( epoch = state.in_flight_prev_epoch().value().0, @@ -475,18 +507,21 @@ impl GlobalBarrierManagerContext { "recovery success" ); - state + self.state = state; + self.active_streaming_nodes = active_streaming_nodes; } +} +impl GlobalBarrierManagerContext { /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated. - async fn migrate_actors(&self) -> MetaResult { + async fn migrate_actors(&self, all_nodes: Vec) -> MetaResult { match &self.metadata_manager { - MetadataManager::V1(_) => self.migrate_actors_v1().await, - MetadataManager::V2(_) => self.migrate_actors_v2().await, + MetadataManager::V1(_) => self.migrate_actors_v1(all_nodes).await, + MetadataManager::V2(_) => self.migrate_actors_v2(all_nodes).await, } } - async fn migrate_actors_v2(&self) -> MetaResult { + async fn migrate_actors_v2(&self, all_nodes: Vec) -> MetaResult { let mgr = self.metadata_manager.as_v2_ref(); let all_inuse_parallel_units: HashSet<_> = mgr @@ -495,12 +530,10 @@ impl GlobalBarrierManagerContext { .await? .into_iter() .collect(); - let active_parallel_units: HashSet<_> = mgr - .cluster_controller - .list_active_parallel_units() - .await? - .into_iter() - .map(|pu| pu.id as i32) + + let active_parallel_units: HashSet<_> = all_nodes + .iter() + .flat_map(|node| node.parallel_units.iter().map(|pu| pu.id as i32)) .collect(); let expired_parallel_units: BTreeSet<_> = all_inuse_parallel_units @@ -509,7 +542,7 @@ impl GlobalBarrierManagerContext { .collect(); if expired_parallel_units.is_empty() { debug!("no expired parallel units, skipping."); - return self.resolve_actor_info().await; + return self.resolve_actor_info(all_nodes.clone()).await; } debug!("start migrate actors."); @@ -526,12 +559,14 @@ impl GlobalBarrierManagerContext { let start = Instant::now(); let mut plan = HashMap::new(); 'discovery: while !to_migrate_parallel_units.is_empty() { - let new_parallel_units = mgr - .cluster_controller - .list_active_parallel_units() - .await? - .into_iter() - .filter(|pu| !inuse_parallel_units.contains(&(pu.id as i32))) + let new_parallel_units = all_nodes + .iter() + .flat_map(|node| { + node.parallel_units + .iter() + .filter(|pu| !inuse_parallel_units.contains(&(pu.id as _))) + }) + .cloned() .collect_vec(); if !new_parallel_units.is_empty() { debug!("new parallel units found: {:#?}", new_parallel_units); @@ -560,14 +595,14 @@ impl GlobalBarrierManagerContext { debug!("migrate actors succeed."); - self.resolve_actor_info().await + self.resolve_actor_info(all_nodes).await } /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated. - async fn migrate_actors_v1(&self) -> MetaResult { + async fn migrate_actors_v1(&self, all_nodes: Vec) -> MetaResult { let mgr = self.metadata_manager.as_v1_ref(); - let info = self.resolve_actor_info().await?; + let info = self.resolve_actor_info(all_nodes.clone()).await?; // 1. get expired workers. let expired_workers: HashSet = info @@ -582,7 +617,9 @@ impl GlobalBarrierManagerContext { } debug!("start migrate actors."); - let migration_plan = self.generate_migration_plan(expired_workers).await?; + let migration_plan = self + .generate_migration_plan(expired_workers, &all_nodes) + .await?; // 2. start to migrate fragment one-by-one. mgr.fragment_manager .migrate_fragment_actors(&migration_plan) @@ -591,17 +628,17 @@ impl GlobalBarrierManagerContext { migration_plan.delete(self.env.meta_store_checked()).await?; debug!("migrate actors succeed."); - self.resolve_actor_info().await + self.resolve_actor_info(all_nodes).await } - async fn scale_actors(&self) -> MetaResult<()> { + async fn scale_actors(&self, all_nodes: Vec) -> MetaResult<()> { match &self.metadata_manager { - MetadataManager::V1(_) => self.scale_actors_v1().await, - MetadataManager::V2(_) => self.scale_actors_v2().await, + MetadataManager::V1(_) => self.scale_actors_v1(all_nodes).await, + MetadataManager::V2(_) => self.scale_actors_v2(all_nodes).await, } } - async fn scale_actors_v2(&self) -> MetaResult<()> { + async fn scale_actors_v2(&self, workers: Vec) -> MetaResult<()> { let mgr = self.metadata_manager.as_v2_ref(); debug!("start resetting actors distribution"); @@ -625,11 +662,6 @@ impl GlobalBarrierManagerContext { .collect() }; - let workers = mgr - .cluster_controller - .list_active_streaming_workers() - .await?; - let schedulable_worker_ids = workers .iter() .filter(|worker| { @@ -698,8 +730,8 @@ impl GlobalBarrierManagerContext { Ok(()) } - async fn scale_actors_v1(&self) -> MetaResult<()> { - let info = self.resolve_actor_info().await?; + async fn scale_actors_v1(&self, workers: Vec) -> MetaResult<()> { + let info = self.resolve_actor_info(workers.clone()).await?; let mgr = self.metadata_manager.as_v1_ref(); debug!("start resetting actors distribution"); @@ -762,11 +794,6 @@ impl GlobalBarrierManagerContext { .collect() }; - let workers = mgr - .cluster_manager - .list_active_streaming_compute_nodes() - .await; - let schedulable_worker_ids = workers .iter() .filter(|worker| { @@ -841,6 +868,7 @@ impl GlobalBarrierManagerContext { async fn generate_migration_plan( &self, expired_workers: HashSet, + all_nodes: &Vec, ) -> MetaResult { let mgr = self.metadata_manager.as_v1_ref(); @@ -890,10 +918,10 @@ impl GlobalBarrierManagerContext { let start = Instant::now(); // if in-used expire parallel units are not empty, should wait for newly joined worker. 'discovery: while !to_migrate_parallel_units.is_empty() { - let mut new_parallel_units = mgr - .cluster_manager - .list_active_streaming_parallel_units() - .await; + let mut new_parallel_units = all_nodes + .iter() + .flat_map(|worker| worker.parallel_units.iter().cloned()) + .collect_vec(); new_parallel_units.retain(|pu| !inuse_parallel_units.contains(&pu.id)); if !new_parallel_units.is_empty() { @@ -944,7 +972,9 @@ impl GlobalBarrierManagerContext { new_plan.insert(self.env.meta_store_checked()).await?; Ok(new_plan) } +} +impl GlobalBarrierManager { /// Update all actors in compute nodes. async fn update_actors(&self, info: &InflightActorInfo) -> MetaResult<()> { if info.actor_map.is_empty() { @@ -970,7 +1000,7 @@ impl GlobalBarrierManagerContext { .flatten_ok() .try_collect()?; - let mut all_node_actors = self.metadata_manager.all_node_actors(false).await?; + let mut all_node_actors = self.context.metadata_manager.all_node_actors(false).await?; // Check if any actors were dropped after info resolved. if all_node_actors.iter().any(|(node_id, node_actors)| { @@ -984,7 +1014,8 @@ impl GlobalBarrierManagerContext { return Err(anyhow!("actors dropped during update").into()); } - self.stream_rpc_manager + self.context + .stream_rpc_manager .broadcast_update_actor_info( &info.node_map, info.actor_map.keys().cloned(), @@ -1008,7 +1039,8 @@ impl GlobalBarrierManagerContext { return Ok(()); } - self.stream_rpc_manager + self.context + .stream_rpc_manager .build_actors( &info.node_map, info.actor_map.iter().map(|(node_id, actors)| { @@ -1024,7 +1056,8 @@ impl GlobalBarrierManagerContext { /// Reset all compute nodes by calling `force_stop_actors`. async fn reset_compute_nodes(&self, info: &InflightActorInfo) -> MetaResult<()> { debug!(worker = ?info.node_map.keys().collect_vec(), "force stop actors"); - self.stream_rpc_manager + self.context + .stream_rpc_manager .force_stop_actors(info.node_map.values()) .await?; diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 670ee7cf10929..877f935f25207 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -35,6 +35,7 @@ use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::StreamClient; use rw_futures_util::pending_on_none; use tokio::sync::oneshot; +use tracing::Instrument; use uuid::Uuid; use super::command::CommandContext; @@ -47,6 +48,8 @@ pub(super) struct BarrierRpcManager { /// Futures that await on the completion of barrier. injected_in_progress_barrier: FuturesUnordered, + + prev_injecting_barrier: Option>, } impl BarrierRpcManager { @@ -54,15 +57,24 @@ impl BarrierRpcManager { Self { context, injected_in_progress_barrier: FuturesUnordered::new(), + prev_injecting_barrier: None, } } pub(super) fn clear(&mut self) { self.injected_in_progress_barrier = FuturesUnordered::new(); + self.prev_injecting_barrier = None; } - pub(super) async fn inject_barrier(&mut self, command_context: Arc) { - let await_complete_future = self.context.inject_barrier(command_context).await; + pub(super) fn inject_barrier(&mut self, command_context: Arc) { + // this is to notify that the barrier has been injected so that the next + // barrier can be injected to avoid out of order barrier injection. + // TODO: can be removed when bidi-stream control in implemented. + let (inject_tx, inject_rx) = oneshot::channel(); + let prev_inject_rx = self.prev_injecting_barrier.replace(inject_rx); + let await_complete_future = + self.context + .inject_barrier(command_context, Some(inject_tx), prev_inject_rx); self.injected_in_progress_barrier .push(await_complete_future); } @@ -76,35 +88,49 @@ pub(super) type BarrierCompletionFuture = impl Future, + inject_tx: Option>, + prev_inject_rx: Option>, ) -> BarrierCompletionFuture { let (tx, rx) = oneshot::channel(); let prev_epoch = command_context.prev_epoch.value().0; - let result = self - .stream_rpc_manager - .inject_barrier(command_context.clone()) - .await; - match result { - Ok(node_need_collect) => { - // todo: the collect handler should be abort when recovery. - tokio::spawn({ - let stream_rpc_manager = self.stream_rpc_manager.clone(); - async move { - stream_rpc_manager - .collect_barrier(node_need_collect, command_context, tx) - .await - } - }); + let stream_rpc_manager = self.stream_rpc_manager.clone(); + // todo: the collect handler should be abort when recovery. + let _join_handle = tokio::spawn(async move { + let span = command_context.span.clone(); + if let Some(prev_inject_rx) = prev_inject_rx { + if prev_inject_rx.await.is_err() { + let _ = tx.send(BarrierCompletion { + prev_epoch, + result: Err(anyhow!("prev barrier failed to be injected").into()), + }); + return; + } } - Err(e) => { - let _ = tx.send(BarrierCompletion { - prev_epoch, - result: Err(e), - }); + let result = stream_rpc_manager + .inject_barrier(command_context.clone()) + .instrument(span.clone()) + .await; + match result { + Ok(node_need_collect) => { + if let Some(inject_tx) = inject_tx { + let _ = inject_tx.send(()); + } + stream_rpc_manager + .collect_barrier(node_need_collect, command_context, tx) + .instrument(span.clone()) + .await; + } + Err(e) => { + let _ = tx.send(BarrierCompletion { + prev_epoch, + result: Err(e), + }); + } } - } + }); rx.map(move |result| match result { Ok(completion) => completion, Err(_e) => BarrierCompletion { diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index 56152c18baa70..f599456f94bea 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -13,17 +13,21 @@ // limitations under the License. use std::collections::{HashSet, VecDeque}; +use std::future::pending; use std::iter::once; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use anyhow::{anyhow, Context}; use assert_matches::assert_matches; +use parking_lot::Mutex; use risingwave_common::catalog::TableId; use risingwave_pb::hummock::HummockSnapshot; use risingwave_pb::meta::PausedReason; -use tokio::sync::{oneshot, watch, RwLock}; +use tokio::select; +use tokio::sync::{oneshot, watch}; +use tokio::time::Interval; +use tracing::warn; use super::notifier::{BarrierInfo, Notifier}; use super::{Command, Scheduled}; @@ -37,19 +41,11 @@ use crate::{MetaError, MetaResult}; /// We manually implement one here instead of using channels since we may need to update the front /// of the queue to add some notifiers for instant flushes. struct Inner { - queue: RwLock, + queue: Mutex, /// When `queue` is not empty anymore, all subscribers of this watcher will be notified. changed_tx: watch::Sender<()>, - /// The numbers of barrier (checkpoint = false) since the last barrier (checkpoint = true) - num_uncheckpointed_barrier: AtomicUsize, - - /// Force checkpoint in next barrier. - force_checkpoint: AtomicBool, - - checkpoint_frequency: AtomicUsize, - /// Used for recording send latency of each barrier. metrics: Arc, } @@ -62,7 +58,7 @@ enum QueueStatus { Blocked(String), } -struct ScheduledQueue { +pub(super) struct ScheduledQueue { queue: VecDeque, status: QueueStatus, } @@ -154,11 +150,8 @@ impl BarrierScheduler { checkpoint_frequency, ); let inner = Arc::new(Inner { - queue: RwLock::new(ScheduledQueue::new()), + queue: Mutex::new(ScheduledQueue::new()), changed_tx: watch::channel(()).0, - num_uncheckpointed_barrier: AtomicUsize::new(0), - checkpoint_frequency: AtomicUsize::new(checkpoint_frequency), - force_checkpoint: AtomicBool::new(false), metrics, }); @@ -167,13 +160,19 @@ impl BarrierScheduler { inner: inner.clone(), hummock_manager, }, - ScheduledBarriers { inner }, + ScheduledBarriers { + num_uncheckpointed_barrier: 0, + force_checkpoint: false, + checkpoint_frequency, + inner, + min_interval: None, + }, ) } /// Push a scheduled barrier into the queue. - async fn push(&self, scheduleds: impl IntoIterator) -> MetaResult<()> { - let mut queue = self.inner.queue.write().await; + fn push(&self, scheduleds: impl IntoIterator) -> MetaResult<()> { + let mut queue = self.inner.queue.lock(); for scheduled in scheduleds { queue.push_back(scheduled)?; if queue.len() == 1 { @@ -184,8 +183,8 @@ impl BarrierScheduler { } /// Try to cancel scheduled cmd for create streaming job, return true if cancelled. - pub async fn try_cancel_scheduled_create(&self, table_id: TableId) -> bool { - let queue = &mut self.inner.queue.write().await; + pub fn try_cancel_scheduled_create(&self, table_id: TableId) -> bool { + let queue = &mut self.inner.queue.lock(); if let Some(idx) = queue.queue.iter().position(|scheduled| { if let Command::CreateStreamingJob { table_fragments, .. @@ -207,12 +206,12 @@ impl BarrierScheduler { /// Attach `new_notifiers` to the very first scheduled barrier. If there's no one scheduled, a /// default barrier will be created. If `new_checkpoint` is true, the barrier will become a /// checkpoint. - async fn attach_notifiers( + fn attach_notifiers( &self, new_notifiers: Vec, new_checkpoint: bool, ) -> MetaResult<()> { - let mut queue = self.inner.queue.write().await; + let mut queue = self.inner.queue.lock(); match queue.queue.front_mut() { Some(Scheduled { notifiers, @@ -243,7 +242,7 @@ impl BarrierScheduler { collected: Some(tx), ..Default::default() }; - self.attach_notifiers(vec![notifier], checkpoint).await?; + self.attach_notifiers(vec![notifier], checkpoint)?; rx.await.unwrap() } @@ -258,23 +257,23 @@ impl BarrierScheduler { let mut scheduleds = Vec::with_capacity(commands.len()); for command in commands { - let (injected_tx, injected_rx) = oneshot::channel(); + let (started_tx, started_rx) = oneshot::channel(); let (collect_tx, collect_rx) = oneshot::channel(); let (finish_tx, finish_rx) = oneshot::channel(); - contexts.push((injected_rx, collect_rx, finish_rx)); + contexts.push((started_rx, collect_rx, finish_rx)); scheduleds.push(self.inner.new_scheduled( command.need_checkpoint(), command, once(Notifier { - injected: Some(injected_tx), + started: Some(started_tx), collected: Some(collect_tx), finished: Some(finish_tx), }), )); } - self.push(scheduleds).await?; + self.push(scheduleds)?; let mut infos = Vec::with_capacity(contexts.len()); @@ -340,21 +339,62 @@ impl BarrierScheduler { /// The receiver side of the barrier scheduling queue. /// Held by the [`super::GlobalBarrierManager`] to execute these commands. pub struct ScheduledBarriers { + min_interval: Option, + + /// Force checkpoint in next barrier. + force_checkpoint: bool, + + /// The numbers of barrier (checkpoint = false) since the last barrier (checkpoint = true) + num_uncheckpointed_barrier: usize, + checkpoint_frequency: usize, inner: Arc, } impl ScheduledBarriers { - /// Pop a scheduled barrier from the queue, or a default checkpoint barrier if not exists. - pub(super) async fn pop_or_default(&self) -> Scheduled { - let mut queue = self.inner.queue.write().await; - let checkpoint = self.try_get_checkpoint(); - let scheduled = match queue.queue.pop_front() { - Some(mut scheduled) => { + pub(super) fn set_min_interval(&mut self, min_interval: Duration) { + let set_new_interval = match &self.min_interval { + None => true, + Some(prev_min_interval) => min_interval != prev_min_interval.period(), + }; + if set_new_interval { + let mut min_interval = tokio::time::interval(min_interval); + min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + self.min_interval = Some(min_interval); + } + } + + pub(super) async fn next_barrier(&mut self) -> Scheduled { + let scheduled = select! { + biased; + mut scheduled = async { + loop { + let mut rx = self.inner.changed_tx.subscribe(); + { + let mut queue = self.inner.queue.lock(); + if let Some(scheduled) = queue.queue.pop_front() { + break scheduled + } + } + rx.changed().await.unwrap(); + } + } => { + if let Some(min_interval) = &mut self.min_interval { + min_interval.reset(); + } + let checkpoint = self.try_get_checkpoint(); scheduled.checkpoint = scheduled.checkpoint || checkpoint; scheduled - } - None => { - // If no command scheduled, create a periodic barrier by default. + }, + _ = async { + match &mut self.min_interval { + Some(min_interval) => min_interval.tick().await, + None => { + warn!("min interval not set and pending"); + pending::<_>().await + } + } + } => { + let checkpoint = self.try_get_checkpoint(); self.inner .new_scheduled(checkpoint, Command::barrier(), std::iter::empty()) } @@ -363,22 +403,10 @@ impl ScheduledBarriers { scheduled } - /// Wait for at least one scheduled barrier in the queue. - pub(super) async fn wait_one(&self) { - let queue = self.inner.queue.read().await; - if queue.len() > 0 { - return; - } - let mut rx = self.inner.changed_tx.subscribe(); - drop(queue); - - rx.changed().await.unwrap(); - } - /// Mark command scheduler as blocked and abort all queued scheduled command and notify with /// specific reason. - pub(super) async fn abort_and_mark_blocked(&self, reason: impl Into + Copy) { - let mut queue = self.inner.queue.write().await; + pub(super) fn abort_and_mark_blocked(&self, reason: impl Into + Copy) { + let mut queue = self.inner.queue.lock(); queue.mark_blocked(reason.into()); while let Some(Scheduled { notifiers, .. }) = queue.queue.pop_front() { notifiers @@ -388,15 +416,15 @@ impl ScheduledBarriers { } /// Mark command scheduler as ready to accept new command. - pub(super) async fn mark_ready(&self) { - let mut queue = self.inner.queue.write().await; + pub(super) fn mark_ready(&self) { + let mut queue = self.inner.queue.lock(); queue.mark_ready(); } /// Try to pre apply drop and cancel scheduled command and return them if any. /// It should only be called in recovery. - pub(super) async fn pre_apply_drop_cancel_scheduled(&self) -> (Vec, HashSet) { - let mut queue = self.inner.queue.write().await; + pub(super) fn pre_apply_drop_cancel_scheduled(&self) -> (Vec, HashSet) { + let mut queue = self.inner.queue.lock(); assert_matches!(queue.status, QueueStatus::Blocked(_)); let (mut drop_table_ids, mut cancel_table_ids) = (vec![], HashSet::new()); @@ -426,37 +454,26 @@ impl ScheduledBarriers { /// Whether the barrier(checkpoint = true) should be injected. fn try_get_checkpoint(&self) -> bool { - self.inner - .num_uncheckpointed_barrier - .load(Ordering::Relaxed) - + 1 - >= self.inner.checkpoint_frequency.load(Ordering::Relaxed) - || self.inner.force_checkpoint.load(Ordering::Relaxed) + self.num_uncheckpointed_barrier + 1 >= self.checkpoint_frequency || self.force_checkpoint } /// Make the `checkpoint` of the next barrier must be true - pub fn force_checkpoint_in_next_barrier(&self) { - self.inner.force_checkpoint.store(true, Ordering::Relaxed) + pub fn force_checkpoint_in_next_barrier(&mut self) { + self.force_checkpoint = true; } /// Update the `checkpoint_frequency` - pub fn set_checkpoint_frequency(&self, frequency: usize) { - self.inner - .checkpoint_frequency - .store(frequency, Ordering::Relaxed); + pub fn set_checkpoint_frequency(&mut self, frequency: usize) { + self.checkpoint_frequency = frequency; } /// Update the `num_uncheckpointed_barrier` - fn update_num_uncheckpointed_barrier(&self, checkpoint: bool) { + fn update_num_uncheckpointed_barrier(&mut self, checkpoint: bool) { if checkpoint { - self.inner - .num_uncheckpointed_barrier - .store(0, Ordering::Relaxed); - self.inner.force_checkpoint.store(false, Ordering::Relaxed); + self.num_uncheckpointed_barrier = 0; + self.force_checkpoint = false; } else { - self.inner - .num_uncheckpointed_barrier - .fetch_add(1, Ordering::Relaxed); + self.num_uncheckpointed_barrier += 1; } } } diff --git a/src/meta/src/barrier/state.rs b/src/meta/src/barrier/state.rs index 560f17118c58f..de1f7b0bb6d09 100644 --- a/src/meta/src/barrier/state.rs +++ b/src/meta/src/barrier/state.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_pb::common::PbWorkerNode; +use risingwave_pb::common::WorkerNode; use risingwave_pb::meta::PausedReason; use crate::barrier::info::InflightActorInfo; @@ -70,7 +70,7 @@ impl BarrierManagerState { } // TODO: optimize it as incremental updates. - pub fn resolve_worker_nodes(&mut self, nodes: Vec) { + pub fn resolve_worker_nodes(&mut self, nodes: impl IntoIterator) { self.inflight_actor_infos.resolve_worker_nodes(nodes); } diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs index 622a3efa6b564..2a03f063befe3 100644 --- a/src/meta/src/controller/cluster.rs +++ b/src/meta/src/controller/cluster.rs @@ -31,6 +31,7 @@ use risingwave_meta_model_v2::{worker, worker_property, I32Array, TransactionId, use risingwave_pb::common::worker_node::{PbProperty, PbResource, PbState}; use risingwave_pb::common::{ HostAddress, ParallelUnit, PbHostAddress, PbParallelUnit, PbWorkerNode, PbWorkerType, + WorkerNode, }; use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; use risingwave_pb::meta::heartbeat_request; @@ -43,6 +44,7 @@ use sea_orm::{ TransactionTrait, }; use thiserror_ext::AsReport; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::sync::oneshot::Sender; use tokio::sync::{RwLock, RwLockReadGuard}; use tokio::task::JoinHandle; @@ -338,6 +340,22 @@ impl ClusterController { Ok(workers) } + pub(crate) async fn subscribe_active_streaming_compute_nodes( + &self, + ) -> MetaResult<(Vec, UnboundedReceiver)> { + let inner = self.inner.read().await; + let worker_nodes = inner.list_active_streaming_workers().await?; + let (tx, rx) = unbounded_channel(); + + // insert before release the read lock to ensure that we don't lose any update in between + self.env + .notification_manager() + .insert_local_sender(tx) + .await; + drop(inner); + Ok((worker_nodes, rx)) + } + /// A convenient method to get all running compute nodes that may have running actors on them /// i.e. CNs which are running pub async fn list_active_streaming_workers(&self) -> MetaResult> { diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index 38e1ff3ca8f33..b6c31ddb51688 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -32,6 +32,7 @@ use risingwave_pb::meta::heartbeat_request; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; use thiserror_ext::AsReport; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::sync::oneshot::Sender; use tokio::sync::{RwLock, RwLockReadGuard}; use tokio::task::JoinHandle; @@ -458,6 +459,22 @@ impl ClusterManager { core.list_worker_node(worker_type, worker_state) } + pub async fn subscribe_active_streaming_compute_nodes( + &self, + ) -> (Vec, UnboundedReceiver) { + let core = self.core.read().await; + let worker_nodes = core.list_streaming_worker_node(Some(State::Running)); + let (tx, rx) = unbounded_channel(); + + // insert before release the read lock to ensure that we don't lose any update in between + self.env + .notification_manager() + .insert_local_sender(tx) + .await; + drop(core); + (worker_nodes, rx) + } + /// A convenient method to get all running compute nodes that may have running actors on them /// i.e. CNs which are running pub async fn list_active_streaming_compute_nodes(&self) -> Vec { @@ -465,11 +482,6 @@ impl ClusterManager { core.list_streaming_worker_node(Some(State::Running)) } - pub async fn list_active_streaming_parallel_units(&self) -> Vec { - let core = self.core.read().await; - core.list_active_streaming_parallel_units() - } - /// Get the cluster info used for scheduling a streaming job, containing all nodes that are /// running and schedulable pub async fn list_active_serving_compute_nodes(&self) -> Vec { @@ -703,13 +715,6 @@ impl ClusterManagerCore { .collect() } - fn list_active_streaming_parallel_units(&self) -> Vec { - self.list_streaming_worker_node(Some(State::Running)) - .into_iter() - .flat_map(|w| w.parallel_units) - .collect() - } - // Lists active worker nodes fn get_streaming_cluster_info(&self) -> StreamingClusterInfo { let mut streaming_worker_node = self.list_streaming_worker_node(Some(State::Running)); @@ -969,7 +974,12 @@ mod tests { } async fn assert_cluster_manager(cluster_manager: &ClusterManager, parallel_count: usize) { - let parallel_units = cluster_manager.list_active_streaming_parallel_units().await; + let parallel_units = cluster_manager + .list_active_serving_compute_nodes() + .await + .into_iter() + .flat_map(|w| w.parallel_units) + .collect_vec(); assert_eq!(parallel_units.len(), parallel_count); } diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index be16ec0601941..bce8ada29e97a 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -18,16 +18,19 @@ use risingwave_common::catalog::{TableId, TableOption}; use risingwave_meta_model_v2::SourceId; use risingwave_pb::catalog::{PbSource, PbTable}; use risingwave_pb::common::worker_node::{PbResource, State}; -use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerType}; +use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType}; use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, PbFragment}; use risingwave_pb::stream_plan::{PbDispatchStrategy, PbStreamActor, StreamActor}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; +use tracing::warn; use crate::barrier::Reschedule; use crate::controller::catalog::CatalogControllerRef; use crate::controller::cluster::{ClusterControllerRef, WorkerExtraInfo}; use crate::manager::{ - CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, StreamingClusterInfo, WorkerId, + CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, LocalNotification, + StreamingClusterInfo, WorkerId, }; use crate::model::{ActorId, FragmentId, MetadataModel, TableFragments, TableParallelism}; use crate::stream::SplitAssignment; @@ -52,6 +55,130 @@ pub struct MetadataManagerV2 { pub catalog_controller: CatalogControllerRef, } +#[derive(Debug)] +pub(crate) enum ActiveStreamingWorkerChange { + Add(WorkerNode), + Remove(WorkerNode), + Update(WorkerNode), +} + +pub struct ActiveStreamingWorkerNodes { + worker_nodes: HashMap, + rx: UnboundedReceiver, + _meta_manager: MetadataManager, +} + +impl ActiveStreamingWorkerNodes { + pub(crate) fn uninitialized(meta_manager: MetadataManager) -> Self { + Self { + _meta_manager: meta_manager, + worker_nodes: Default::default(), + rx: unbounded_channel().1, + } + } + + /// Return an uninitialized one as a place holder for future initialized + pub(crate) async fn new_snapshot(meta_manager: MetadataManager) -> MetaResult { + let (nodes, rx) = meta_manager + .subscribe_active_streaming_compute_nodes() + .await?; + Ok(Self { + worker_nodes: nodes.into_iter().map(|node| (node.id, node)).collect(), + rx, + _meta_manager: meta_manager, + }) + } + + pub(crate) fn current(&self) -> &HashMap { + &self.worker_nodes + } + + pub(crate) async fn changed(&mut self) -> ActiveStreamingWorkerChange { + let ret = loop { + let notification = self + .rx + .recv() + .await + .expect("notification stopped or uninitialized"); + match notification { + LocalNotification::WorkerNodeDeleted(worker) => { + let is_streaming_compute_node = worker.r#type == WorkerType::ComputeNode as i32 + && worker.property.as_ref().unwrap().is_streaming; + let Some(prev_worker) = self.worker_nodes.remove(&worker.id) else { + if is_streaming_compute_node { + warn!( + ?worker, + "notify to delete an non-existing streaming compute worker" + ); + } + continue; + }; + if !is_streaming_compute_node { + warn!( + ?worker, + ?prev_worker, + "deleted worker has a different recent type" + ); + } + if worker.state == State::Starting as i32 { + warn!( + id = worker.id, + host = ?worker.host, + state = worker.state, + "a starting streaming worker is deleted" + ); + } + break ActiveStreamingWorkerChange::Remove(prev_worker); + } + LocalNotification::WorkerNodeActivated(worker) => { + if worker.r#type != WorkerType::ComputeNode as i32 + || !worker.property.as_ref().unwrap().is_streaming + { + if let Some(prev_worker) = self.worker_nodes.remove(&worker.id) { + warn!( + ?worker, + ?prev_worker, + "the type of a streaming worker is changed" + ); + break ActiveStreamingWorkerChange::Remove(prev_worker); + } else { + continue; + } + } + assert_eq!( + worker.state, + State::Running as i32, + "not started worker added: {:?}", + worker + ); + if let Some(prev_worker) = self.worker_nodes.insert(worker.id, worker.clone()) { + assert_eq!(prev_worker.host, worker.host); + assert_eq!(prev_worker.r#type, worker.r#type); + warn!( + ?prev_worker, + ?worker, + eq = prev_worker == worker, + "notify to update an existing active worker" + ); + if prev_worker == worker { + continue; + } else { + break ActiveStreamingWorkerChange::Update(worker); + } + } else { + break ActiveStreamingWorkerChange::Add(worker); + } + } + _ => { + continue; + } + } + }; + + ret + } +} + impl MetadataManager { pub fn new_v1( cluster_manager: ClusterManagerRef, @@ -171,6 +298,22 @@ impl MetadataManager { } } + pub async fn subscribe_active_streaming_compute_nodes( + &self, + ) -> MetaResult<(Vec, UnboundedReceiver)> { + match self { + MetadataManager::V1(mgr) => Ok(mgr + .cluster_manager + .subscribe_active_streaming_compute_nodes() + .await), + MetadataManager::V2(mgr) => { + mgr.cluster_controller + .subscribe_active_streaming_compute_nodes() + .await + } + } + } + pub async fn list_active_streaming_compute_nodes(&self) -> MetaResult> { match self { MetadataManager::V1(mgr) => Ok(mgr diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 84ff4ea1de682..5c3111bd2f44e 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -269,7 +269,7 @@ impl DdlController { /// would be a huge hassle and pain if we don't spawn here. pub async fn run_command(&self, command: DdlCommand) -> MetaResult { if !command.allow_in_recovery() { - self.barrier_manager.check_status_running().await?; + self.barrier_manager.check_status_running()?; } let ctrl = self.clone(); let fut = async move { diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 5889292756ca7..517781f944d92 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -293,11 +293,7 @@ impl GlobalStreamManager { .await { // try to cancel buffered creating command. - if self - .barrier_scheduler - .try_cancel_scheduled_create(table_id) - .await - { + if self.barrier_scheduler.try_cancel_scheduled_create(table_id) { tracing::debug!( "cancelling streaming job {table_id} in buffer queue." );