diff --git a/src/meta/src/barrier/info.rs b/src/meta/src/barrier/info.rs index b52ab90955287..7e845be71d987 100644 --- a/src/meta/src/barrier/info.rs +++ b/src/meta/src/barrier/info.rs @@ -15,6 +15,7 @@ use std::collections::{HashMap, HashSet}; use risingwave_pb::common::PbWorkerNode; +use tracing::warn; use crate::manager::{ActorInfos, WorkerId}; use crate::model::ActorId; @@ -87,10 +88,16 @@ impl InflightActorInfo { /// Update worker nodes snapshot. We need to support incremental updates for it in the future. pub fn resolve_worker_nodes(&mut self, all_nodes: impl IntoIterator) { - self.node_map = all_nodes + let new_node_map = all_nodes .into_iter() .map(|node| (node.id, node)) .collect::>(); + for (actor_id, location) in &self.actor_location_map { + if !new_node_map.contains_key(location) { + warn!(actor_id, location, node = ?self.node_map.get(location), "node with running actors is deleted"); + } + } + self.node_map = new_node_map; } /// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index c6e54b538c3a8..3bad3cbd15a2d 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -32,6 +32,7 @@ use risingwave_hummock_sdk::table_watermark::{ }; use risingwave_hummock_sdk::{ExtendedSstableInfo, HummockSstableObjectId}; use risingwave_pb::catalog::table::TableType; +use risingwave_pb::common::WorkerNode; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::PausedReason; @@ -41,7 +42,7 @@ use thiserror_ext::AsReport; use tokio::sync::oneshot::{Receiver, Sender}; use tokio::sync::Mutex; use tokio::task::JoinHandle; -use tracing::Instrument; +use tracing::{info, warn, Instrument}; use self::command::CommandContext; use self::notifier::Notifier; @@ -54,7 +55,9 @@ use crate::barrier::state::BarrierManagerState; use crate::barrier::BarrierEpochState::{Completed, InFlight}; use crate::hummock::{CommitEpochInfo, HummockManagerRef}; use crate::manager::sink_coordination::SinkCoordinatorManager; -use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager, WorkerId}; +use crate::manager::{ + ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv, MetadataManager, WorkerId, +}; use crate::model::{ActorId, TableFragments}; use crate::rpc::metrics::MetaMetrics; use crate::stream::{ScaleControllerRef, SourceManagerRef}; @@ -183,6 +186,8 @@ pub struct GlobalBarrierManager { checkpoint_control: CheckpointControl, rpc_manager: BarrierRpcManager, + + active_streaming_nodes: ActiveStreamingWorkerNodes, } /// Controls the concurrent execution of commands. @@ -397,6 +402,8 @@ impl GlobalBarrierManager { ); let checkpoint_control = CheckpointControl::new(metrics.clone()); + let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized(); + let tracker = CreateMviewProgressTracker::new(); let context = GlobalBarrierManagerContext { @@ -423,6 +430,7 @@ impl GlobalBarrierManager { state: initial_invalid_state, checkpoint_control, rpc_manager, + active_streaming_nodes, } } @@ -447,7 +455,7 @@ impl GlobalBarrierManager { .await .pause_on_next_bootstrap(); if paused { - tracing::warn!( + warn!( "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \ It will now be reset to `false`. \ To resume the data sources, either restart the cluster again or use `risectl meta resume`.", @@ -498,7 +506,7 @@ impl GlobalBarrierManager { } } - self.state = { + { let latest_snapshot = self.context.hummock_manager.latest_snapshot(); assert_eq!( latest_snapshot.committed_epoch, latest_snapshot.current_epoch, @@ -518,11 +526,10 @@ impl GlobalBarrierManager { let paused = self.take_pause_on_bootstrap().await.unwrap_or(false); let paused_reason = paused.then_some(PausedReason::Manual); - self.context - .recovery(prev_epoch, paused_reason, &self.scheduled_barriers) + self.recovery(prev_epoch, paused_reason) .instrument(span) - .await - }; + .await; + } self.context.set_status(BarrierManagerStatus::Running).await; @@ -545,6 +552,59 @@ impl GlobalBarrierManager { tracing::info!("Barrier manager is stopped"); break; } + + changed_worker = self.active_streaming_nodes.changed() => { + #[cfg(debug_assertions)] + { + match self + .context + .metadata_manager + .list_active_streaming_compute_nodes() + .await + { + Ok(worker_nodes) => { + let ignore_irrelevant_info = |node: &WorkerNode| { + ( + node.id, + WorkerNode { + id: node.id, + r#type: node.r#type, + host: node.host.clone(), + parallel_units: node.parallel_units.clone(), + property: node.property.clone(), + resource: node.resource.clone(), + ..Default::default() + }, + ) + }; + let worker_nodes: HashMap<_, _> = + worker_nodes.iter().map(ignore_irrelevant_info).collect(); + let curr_worker_nodes: HashMap<_, _> = self + .active_streaming_nodes + .current() + .values() + .map(ignore_irrelevant_info) + .collect(); + if worker_nodes != curr_worker_nodes { + warn!( + ?worker_nodes, + ?curr_worker_nodes, + "different to global snapshot" + ); + } + } + Err(e) => { + warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot"); + } + } + } + + info!(?changed_worker, "worker changed"); + + self.state + .resolve_worker_nodes(self.active_streaming_nodes.current().values().cloned()); + } + // Checkpoint frequency changes. notification = local_notification_rx.recv() => { let notification = notification.unwrap(); @@ -595,13 +655,6 @@ impl GlobalBarrierManager { span, } = self.scheduled_barriers.pop_or_default().await; - let all_nodes = self - .context - .metadata_manager - .list_active_streaming_compute_nodes() - .await - .unwrap(); - self.state.resolve_worker_nodes(all_nodes); let info = self.state.apply_command(&command); let (prev_epoch, curr_epoch) = self.state.next_epoch_pair(); @@ -668,7 +721,7 @@ impl GlobalBarrierManager { // back to frontend fail_point!("inject_barrier_err_success"); let fail_node = self.checkpoint_control.barrier_failed(); - tracing::warn!(%prev_epoch, error = %err.as_report(), "Failed to complete epoch"); + warn!(%prev_epoch, error = %err.as_report(), "Failed to complete epoch"); self.failure_recovery(err, fail_node).await; return; } @@ -693,7 +746,7 @@ impl GlobalBarrierManager { .drain(index..) .chain(self.checkpoint_control.barrier_failed().into_iter()) .collect_vec(); - tracing::warn!(%prev_epoch, error = %err.as_report(), "Failed to commit epoch"); + warn!(%prev_epoch, error = %err.as_report(), "Failed to commit epoch"); self.failure_recovery(err, fail_nodes).await; } } @@ -734,11 +787,7 @@ impl GlobalBarrierManager { // No need to clean dirty tables for barrier recovery, // The foreground stream job should cleanup their own tables. - self.state = self - .context - .recovery(prev_epoch, None, &self.scheduled_barriers) - .instrument(span) - .await; + self.recovery(prev_epoch, None).instrument(span).await; self.context.set_status(BarrierManagerStatus::Running).await; } else { panic!("failed to execute barrier: {}", err.as_report()); @@ -914,23 +963,17 @@ impl GlobalBarrierManagerContext { /// Resolve actor information from cluster, fragment manager and `ChangedTableId`. /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor /// will create or drop before this barrier flow through them. - async fn resolve_actor_info(&self) -> MetaResult { + async fn resolve_actor_info( + &self, + all_nodes: Vec, + ) -> MetaResult { let info = match &self.metadata_manager { MetadataManager::V1(mgr) => { - let all_nodes = mgr - .cluster_manager - .list_active_streaming_compute_nodes() - .await; let all_actor_infos = mgr.fragment_manager.load_all_actors().await; InflightActorInfo::resolve(all_nodes, all_actor_infos) } MetadataManager::V2(mgr) => { - let all_nodes = mgr - .cluster_controller - .list_active_streaming_workers() - .await - .unwrap(); let all_actor_infos = mgr.catalog_controller.load_all_actors().await?; InflightActorInfo::resolve(all_nodes, all_actor_infos) diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 1208d2c49b58d..858fa937044dd 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -21,7 +21,7 @@ use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_meta_model_v2::StreamingParallelism; -use risingwave_pb::common::ActorInfo; +use risingwave_pb::common::{ActorInfo, WorkerNode}; use risingwave_pb::meta::table_fragments::State; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; @@ -37,16 +37,15 @@ use crate::barrier::command::CommandContext; use crate::barrier::info::InflightActorInfo; use crate::barrier::notifier::Notifier; use crate::barrier::progress::CreateMviewProgressTracker; -use crate::barrier::schedule::ScheduledBarriers; use crate::barrier::state::BarrierManagerState; -use crate::barrier::{Command, GlobalBarrierManagerContext}; +use crate::barrier::{Command, GlobalBarrierManager, GlobalBarrierManagerContext}; use crate::controller::catalog::ReleaseContext; -use crate::manager::{MetadataManager, WorkerId}; +use crate::manager::{ActiveStreamingWorkerNodes, MetadataManager, WorkerId}; use crate::model::{MetadataModel, MigrationPlan, TableFragments, TableParallelism}; use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy}; use crate::MetaResult; -impl GlobalBarrierManagerContext { +impl GlobalBarrierManager { // Retry base interval in milliseconds. const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20; // Retry max interval. @@ -59,7 +58,9 @@ impl GlobalBarrierManagerContext { .max_delay(Self::RECOVERY_RETRY_MAX_INTERVAL) .map(jitter) } +} +impl GlobalBarrierManagerContext { /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted. async fn clean_dirty_streaming_jobs(&self) -> MetaResult<()> { match &self.metadata_manager { @@ -295,17 +296,18 @@ impl GlobalBarrierManagerContext { Ok(()) } +} +impl GlobalBarrierManager { /// Pre buffered drop and cancel command, return true if any. - async fn pre_apply_drop_cancel( - &self, - scheduled_barriers: &ScheduledBarriers, - ) -> MetaResult { - let (dropped_actors, cancelled) = - scheduled_barriers.pre_apply_drop_cancel_scheduled().await; + async fn pre_apply_drop_cancel(&self) -> MetaResult { + let (dropped_actors, cancelled) = self + .scheduled_barriers + .pre_apply_drop_cancel_scheduled() + .await; let applied = !dropped_actors.is_empty() || !cancelled.is_empty(); if !cancelled.is_empty() { - match &self.metadata_manager { + match &self.context.metadata_manager { MetadataManager::V1(mgr) => { mgr.fragment_manager .drop_table_fragments_vec(&cancelled) @@ -330,57 +332,74 @@ impl GlobalBarrierManagerContext { /// the cluster or `risectl` command. Used for debugging purpose. /// /// Returns the new state of the barrier manager after recovery. - pub async fn recovery( - &self, - prev_epoch: TracedEpoch, - paused_reason: Option, - scheduled_barriers: &ScheduledBarriers, - ) -> BarrierManagerState { + pub async fn recovery(&mut self, prev_epoch: TracedEpoch, paused_reason: Option) { // Mark blocked and abort buffered schedules, they might be dirty already. - scheduled_barriers + self.scheduled_barriers .abort_and_mark_blocked("cluster is under recovering") .await; tracing::info!("recovery start!"); - self.clean_dirty_streaming_jobs() + self.context + .clean_dirty_streaming_jobs() .await .expect("clean dirty streaming jobs"); - self.sink_manager.reset().await; + self.context.sink_manager.reset().await; let retry_strategy = Self::get_retry_strategy(); // Mview progress needs to be recovered. tracing::info!("recovering mview progress"); - self.recover_background_mv_progress() + self.context + .recover_background_mv_progress() .await .expect("recover mview progress should not fail"); tracing::info!("recovered mview progress"); // We take retry into consideration because this is the latency user sees for a cluster to // get recovered. - let recovery_timer = self.metrics.recovery_latency.start_timer(); + let recovery_timer = self.context.metrics.recovery_latency.start_timer(); - let state = tokio_retry::Retry::spawn(retry_strategy, || { + let (state, active_streaming_nodes) = tokio_retry::Retry::spawn(retry_strategy, || { async { let recovery_result: MetaResult<_> = try { // This is a quick path to accelerate the process of dropping and canceling streaming jobs. - let _ = self.pre_apply_drop_cancel(scheduled_barriers).await?; + let _ = self.pre_apply_drop_cancel().await?; + + let active_streaming_nodes = ActiveStreamingWorkerNodes::new_snapshot( + self.context.metadata_manager.clone(), + ) + .await?; + + let all_nodes = active_streaming_nodes + .current() + .values() + .cloned() + .collect_vec(); // Resolve actor info for recovery. If there's no actor to recover, most of the // following steps will be no-op, while the compute nodes will still be reset. let mut info = if !self.env.opts.disable_automatic_parallelism_control { - self.scale_actors().await.inspect_err(|err| { - warn!(error = %err.as_report(), "scale actors failed"); - })?; - - self.resolve_actor_info().await.inspect_err(|err| { - warn!(error = %err.as_report(), "resolve actor info failed"); - })? + self.context + .scale_actors(all_nodes.clone()) + .await + .inspect_err(|err| { + warn!(error = %err.as_report(), "scale actors failed"); + })?; + + self.context + .resolve_actor_info(all_nodes.clone()) + .await + .inspect_err(|err| { + warn!(error = %err.as_report(), "resolve actor info failed"); + })? } else { // Migrate actors in expired CN to newly joined one. - self.migrate_actors().await.inspect_err(|err| { - warn!(error = %err.as_report(), "migrate actors failed"); - })? + self.context + .migrate_actors(all_nodes.clone()) + .await + .inspect_err(|err| { + warn!(error = %err.as_report(), "migrate actors failed"); + })? }; // Reset all compute nodes, stop and drop existing actors. @@ -388,10 +407,14 @@ impl GlobalBarrierManagerContext { warn!(error = %err.as_report(), "reset compute nodes failed"); })?; - if self.pre_apply_drop_cancel(scheduled_barriers).await? { - info = self.resolve_actor_info().await.inspect_err(|err| { - warn!(error = %err.as_report(), "resolve actor info failed"); - })?; + if self.pre_apply_drop_cancel().await? { + info = self + .context + .resolve_actor_info(all_nodes.clone()) + .await + .inspect_err(|err| { + warn!(error = %err.as_report(), "resolve actor info failed"); + })? } // update and build all actors. @@ -403,7 +426,8 @@ impl GlobalBarrierManagerContext { })?; // get split assignments for all actors - let source_split_assignments = self.source_manager.list_assignments().await; + let source_split_assignments = + self.context.source_manager.list_assignments().await; let command = Command::Plain(Some(Mutation::Add(AddMutation { // Actors built during recovery is not treated as newly added actors. actor_dispatchers: Default::default(), @@ -423,7 +447,7 @@ impl GlobalBarrierManagerContext { paused_reason, command, BarrierKind::Initial, - self.clone(), + self.context.clone(), tracing::Span::current(), // recovery span )); @@ -431,13 +455,18 @@ impl GlobalBarrierManagerContext { { use risingwave_common::util::epoch::INVALID_EPOCH; - let mce = self.hummock_manager.get_current_max_committed_epoch().await; + let mce = self + .context + .hummock_manager + .get_current_max_committed_epoch() + .await; if mce != INVALID_EPOCH { command_ctx.wait_epoch_commit(mce).await?; } }; - let await_barrier_complete = self.inject_barrier(command_ctx.clone()).await; + let await_barrier_complete = + self.context.inject_barrier(command_ctx.clone()).await; let res = match await_barrier_complete.await.result { Ok(response) => { if let Err(err) = command_ctx.post_collect().await { @@ -454,10 +483,13 @@ impl GlobalBarrierManagerContext { }; let (new_epoch, _) = res?; - BarrierManagerState::new(new_epoch, info, command_ctx.next_paused_reason()) + ( + BarrierManagerState::new(new_epoch, info, command_ctx.next_paused_reason()), + active_streaming_nodes, + ) }; if recovery_result.is_err() { - self.metrics.recovery_failure_cnt.inc(); + self.context.metrics.recovery_failure_cnt.inc(); } recovery_result } @@ -467,7 +499,7 @@ impl GlobalBarrierManagerContext { .expect("Retry until recovery success."); recovery_timer.observe_duration(); - scheduled_barriers.mark_ready().await; + self.scheduled_barriers.mark_ready().await; tracing::info!( epoch = state.in_flight_prev_epoch().value().0, @@ -475,18 +507,21 @@ impl GlobalBarrierManagerContext { "recovery success" ); - state + self.state = state; + self.active_streaming_nodes = active_streaming_nodes; } +} +impl GlobalBarrierManagerContext { /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated. - async fn migrate_actors(&self) -> MetaResult { + async fn migrate_actors(&self, all_nodes: Vec) -> MetaResult { match &self.metadata_manager { - MetadataManager::V1(_) => self.migrate_actors_v1().await, - MetadataManager::V2(_) => self.migrate_actors_v2().await, + MetadataManager::V1(_) => self.migrate_actors_v1(all_nodes).await, + MetadataManager::V2(_) => self.migrate_actors_v2(all_nodes).await, } } - async fn migrate_actors_v2(&self) -> MetaResult { + async fn migrate_actors_v2(&self, all_nodes: Vec) -> MetaResult { let mgr = self.metadata_manager.as_v2_ref(); let all_inuse_parallel_units: HashSet<_> = mgr @@ -495,12 +530,10 @@ impl GlobalBarrierManagerContext { .await? .into_iter() .collect(); - let active_parallel_units: HashSet<_> = mgr - .cluster_controller - .list_active_parallel_units() - .await? - .into_iter() - .map(|pu| pu.id as i32) + + let active_parallel_units: HashSet<_> = all_nodes + .iter() + .flat_map(|node| node.parallel_units.iter().map(|pu| pu.id as i32)) .collect(); let expired_parallel_units: BTreeSet<_> = all_inuse_parallel_units @@ -509,7 +542,7 @@ impl GlobalBarrierManagerContext { .collect(); if expired_parallel_units.is_empty() { debug!("no expired parallel units, skipping."); - return self.resolve_actor_info().await; + return self.resolve_actor_info(all_nodes.clone()).await; } debug!("start migrate actors."); @@ -526,12 +559,14 @@ impl GlobalBarrierManagerContext { let start = Instant::now(); let mut plan = HashMap::new(); 'discovery: while !to_migrate_parallel_units.is_empty() { - let new_parallel_units = mgr - .cluster_controller - .list_active_parallel_units() - .await? - .into_iter() - .filter(|pu| !inuse_parallel_units.contains(&(pu.id as i32))) + let new_parallel_units = all_nodes + .iter() + .flat_map(|node| { + node.parallel_units + .iter() + .filter(|pu| !inuse_parallel_units.contains(&(pu.id as _))) + }) + .cloned() .collect_vec(); if !new_parallel_units.is_empty() { debug!("new parallel units found: {:#?}", new_parallel_units); @@ -560,14 +595,14 @@ impl GlobalBarrierManagerContext { debug!("migrate actors succeed."); - self.resolve_actor_info().await + self.resolve_actor_info(all_nodes).await } /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated. - async fn migrate_actors_v1(&self) -> MetaResult { + async fn migrate_actors_v1(&self, all_nodes: Vec) -> MetaResult { let mgr = self.metadata_manager.as_v1_ref(); - let info = self.resolve_actor_info().await?; + let info = self.resolve_actor_info(all_nodes.clone()).await?; // 1. get expired workers. let expired_workers: HashSet = info @@ -582,7 +617,9 @@ impl GlobalBarrierManagerContext { } debug!("start migrate actors."); - let migration_plan = self.generate_migration_plan(expired_workers).await?; + let migration_plan = self + .generate_migration_plan(expired_workers, &all_nodes) + .await?; // 2. start to migrate fragment one-by-one. mgr.fragment_manager .migrate_fragment_actors(&migration_plan) @@ -591,18 +628,18 @@ impl GlobalBarrierManagerContext { migration_plan.delete(self.env.meta_store_checked()).await?; debug!("migrate actors succeed."); - self.resolve_actor_info().await + self.resolve_actor_info(all_nodes).await } - async fn scale_actors(&self) -> MetaResult<()> { + async fn scale_actors(&self, all_nodes: Vec) -> MetaResult<()> { let _guard = self.scale_controller.reschedule_lock.write().await; match &self.metadata_manager { - MetadataManager::V1(_) => self.scale_actors_v1().await, - MetadataManager::V2(_) => self.scale_actors_v2().await, + MetadataManager::V1(_) => self.scale_actors_v1(all_nodes).await, + MetadataManager::V2(_) => self.scale_actors_v2(all_nodes).await, } } - async fn scale_actors_v2(&self) -> MetaResult<()> { + async fn scale_actors_v2(&self, workers: Vec) -> MetaResult<()> { let mgr = self.metadata_manager.as_v2_ref(); debug!("start resetting actors distribution"); @@ -626,11 +663,6 @@ impl GlobalBarrierManagerContext { .collect() }; - let workers = mgr - .cluster_controller - .list_active_streaming_workers() - .await?; - let schedulable_worker_ids = workers .iter() .filter(|worker| { @@ -699,8 +731,8 @@ impl GlobalBarrierManagerContext { Ok(()) } - async fn scale_actors_v1(&self) -> MetaResult<()> { - let info = self.resolve_actor_info().await?; + async fn scale_actors_v1(&self, workers: Vec) -> MetaResult<()> { + let info = self.resolve_actor_info(workers.clone()).await?; let mgr = self.metadata_manager.as_v1_ref(); debug!("start resetting actors distribution"); @@ -763,11 +795,6 @@ impl GlobalBarrierManagerContext { .collect() }; - let workers = mgr - .cluster_manager - .list_active_streaming_compute_nodes() - .await; - let schedulable_worker_ids = workers .iter() .filter(|worker| { @@ -842,6 +869,7 @@ impl GlobalBarrierManagerContext { async fn generate_migration_plan( &self, expired_workers: HashSet, + all_nodes: &Vec, ) -> MetaResult { let mgr = self.metadata_manager.as_v1_ref(); @@ -891,10 +919,10 @@ impl GlobalBarrierManagerContext { let start = Instant::now(); // if in-used expire parallel units are not empty, should wait for newly joined worker. 'discovery: while !to_migrate_parallel_units.is_empty() { - let mut new_parallel_units = mgr - .cluster_manager - .list_active_streaming_parallel_units() - .await; + let mut new_parallel_units = all_nodes + .iter() + .flat_map(|worker| worker.parallel_units.iter().cloned()) + .collect_vec(); new_parallel_units.retain(|pu| !inuse_parallel_units.contains(&pu.id)); if !new_parallel_units.is_empty() { @@ -945,7 +973,9 @@ impl GlobalBarrierManagerContext { new_plan.insert(self.env.meta_store_checked()).await?; Ok(new_plan) } +} +impl GlobalBarrierManager { /// Update all actors in compute nodes. async fn update_actors(&self, info: &InflightActorInfo) -> MetaResult<()> { if info.actor_map.is_empty() { @@ -971,7 +1001,7 @@ impl GlobalBarrierManagerContext { .flatten_ok() .try_collect()?; - let mut all_node_actors = self.metadata_manager.all_node_actors(false).await?; + let mut all_node_actors = self.context.metadata_manager.all_node_actors(false).await?; // Check if any actors were dropped after info resolved. if all_node_actors.iter().any(|(node_id, node_actors)| { @@ -985,7 +1015,8 @@ impl GlobalBarrierManagerContext { return Err(anyhow!("actors dropped during update").into()); } - self.stream_rpc_manager + self.context + .stream_rpc_manager .broadcast_update_actor_info( &info.node_map, info.actor_map.keys().cloned(), @@ -1009,7 +1040,8 @@ impl GlobalBarrierManagerContext { return Ok(()); } - self.stream_rpc_manager + self.context + .stream_rpc_manager .build_actors( &info.node_map, info.actor_map.iter().map(|(node_id, actors)| { @@ -1025,7 +1057,8 @@ impl GlobalBarrierManagerContext { /// Reset all compute nodes by calling `force_stop_actors`. async fn reset_compute_nodes(&self, info: &InflightActorInfo) -> MetaResult<()> { debug!(worker = ?info.node_map.keys().collect_vec(), "force stop actors"); - self.stream_rpc_manager + self.context + .stream_rpc_manager .force_stop_actors(info.node_map.values()) .await?; diff --git a/src/meta/src/barrier/state.rs b/src/meta/src/barrier/state.rs index 560f17118c58f..f24bbb5637220 100644 --- a/src/meta/src/barrier/state.rs +++ b/src/meta/src/barrier/state.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_pb::common::PbWorkerNode; +use risingwave_pb::common::WorkerNode; use risingwave_pb::meta::PausedReason; use crate::barrier::info::InflightActorInfo; @@ -69,8 +69,7 @@ impl BarrierManagerState { (prev_epoch, next_epoch) } - // TODO: optimize it as incremental updates. - pub fn resolve_worker_nodes(&mut self, nodes: Vec) { + pub fn resolve_worker_nodes(&mut self, nodes: impl IntoIterator) { self.inflight_actor_infos.resolve_worker_nodes(nodes); } diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs index 622a3efa6b564..2a03f063befe3 100644 --- a/src/meta/src/controller/cluster.rs +++ b/src/meta/src/controller/cluster.rs @@ -31,6 +31,7 @@ use risingwave_meta_model_v2::{worker, worker_property, I32Array, TransactionId, use risingwave_pb::common::worker_node::{PbProperty, PbResource, PbState}; use risingwave_pb::common::{ HostAddress, ParallelUnit, PbHostAddress, PbParallelUnit, PbWorkerNode, PbWorkerType, + WorkerNode, }; use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; use risingwave_pb::meta::heartbeat_request; @@ -43,6 +44,7 @@ use sea_orm::{ TransactionTrait, }; use thiserror_ext::AsReport; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::sync::oneshot::Sender; use tokio::sync::{RwLock, RwLockReadGuard}; use tokio::task::JoinHandle; @@ -338,6 +340,22 @@ impl ClusterController { Ok(workers) } + pub(crate) async fn subscribe_active_streaming_compute_nodes( + &self, + ) -> MetaResult<(Vec, UnboundedReceiver)> { + let inner = self.inner.read().await; + let worker_nodes = inner.list_active_streaming_workers().await?; + let (tx, rx) = unbounded_channel(); + + // insert before release the read lock to ensure that we don't lose any update in between + self.env + .notification_manager() + .insert_local_sender(tx) + .await; + drop(inner); + Ok((worker_nodes, rx)) + } + /// A convenient method to get all running compute nodes that may have running actors on them /// i.e. CNs which are running pub async fn list_active_streaming_workers(&self) -> MetaResult> { diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index 38e1ff3ca8f33..b6c31ddb51688 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -32,6 +32,7 @@ use risingwave_pb::meta::heartbeat_request; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; use thiserror_ext::AsReport; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::sync::oneshot::Sender; use tokio::sync::{RwLock, RwLockReadGuard}; use tokio::task::JoinHandle; @@ -458,6 +459,22 @@ impl ClusterManager { core.list_worker_node(worker_type, worker_state) } + pub async fn subscribe_active_streaming_compute_nodes( + &self, + ) -> (Vec, UnboundedReceiver) { + let core = self.core.read().await; + let worker_nodes = core.list_streaming_worker_node(Some(State::Running)); + let (tx, rx) = unbounded_channel(); + + // insert before release the read lock to ensure that we don't lose any update in between + self.env + .notification_manager() + .insert_local_sender(tx) + .await; + drop(core); + (worker_nodes, rx) + } + /// A convenient method to get all running compute nodes that may have running actors on them /// i.e. CNs which are running pub async fn list_active_streaming_compute_nodes(&self) -> Vec { @@ -465,11 +482,6 @@ impl ClusterManager { core.list_streaming_worker_node(Some(State::Running)) } - pub async fn list_active_streaming_parallel_units(&self) -> Vec { - let core = self.core.read().await; - core.list_active_streaming_parallel_units() - } - /// Get the cluster info used for scheduling a streaming job, containing all nodes that are /// running and schedulable pub async fn list_active_serving_compute_nodes(&self) -> Vec { @@ -703,13 +715,6 @@ impl ClusterManagerCore { .collect() } - fn list_active_streaming_parallel_units(&self) -> Vec { - self.list_streaming_worker_node(Some(State::Running)) - .into_iter() - .flat_map(|w| w.parallel_units) - .collect() - } - // Lists active worker nodes fn get_streaming_cluster_info(&self) -> StreamingClusterInfo { let mut streaming_worker_node = self.list_streaming_worker_node(Some(State::Running)); @@ -969,7 +974,12 @@ mod tests { } async fn assert_cluster_manager(cluster_manager: &ClusterManager, parallel_count: usize) { - let parallel_units = cluster_manager.list_active_streaming_parallel_units().await; + let parallel_units = cluster_manager + .list_active_serving_compute_nodes() + .await + .into_iter() + .flat_map(|w| w.parallel_units) + .collect_vec(); assert_eq!(parallel_units.len(), parallel_count); } diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index be16ec0601941..eef1c0c101256 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -18,16 +18,19 @@ use risingwave_common::catalog::{TableId, TableOption}; use risingwave_meta_model_v2::SourceId; use risingwave_pb::catalog::{PbSource, PbTable}; use risingwave_pb::common::worker_node::{PbResource, State}; -use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerType}; +use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType}; use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, PbFragment}; use risingwave_pb::stream_plan::{PbDispatchStrategy, PbStreamActor, StreamActor}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; +use tracing::warn; use crate::barrier::Reschedule; use crate::controller::catalog::CatalogControllerRef; use crate::controller::cluster::{ClusterControllerRef, WorkerExtraInfo}; use crate::manager::{ - CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, StreamingClusterInfo, WorkerId, + CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, LocalNotification, + StreamingClusterInfo, WorkerId, }; use crate::model::{ActorId, FragmentId, MetadataModel, TableFragments, TableParallelism}; use crate::stream::SplitAssignment; @@ -52,6 +55,127 @@ pub struct MetadataManagerV2 { pub catalog_controller: CatalogControllerRef, } +#[derive(Debug)] +pub(crate) enum ActiveStreamingWorkerChange { + Add(WorkerNode), + Remove(WorkerNode), + Update(WorkerNode), +} + +pub struct ActiveStreamingWorkerNodes { + worker_nodes: HashMap, + rx: UnboundedReceiver, +} + +impl ActiveStreamingWorkerNodes { + pub(crate) fn uninitialized() -> Self { + Self { + worker_nodes: Default::default(), + rx: unbounded_channel().1, + } + } + + /// Return an uninitialized one as a place holder for future initialized + pub(crate) async fn new_snapshot(meta_manager: MetadataManager) -> MetaResult { + let (nodes, rx) = meta_manager + .subscribe_active_streaming_compute_nodes() + .await?; + Ok(Self { + worker_nodes: nodes.into_iter().map(|node| (node.id, node)).collect(), + rx, + }) + } + + pub(crate) fn current(&self) -> &HashMap { + &self.worker_nodes + } + + pub(crate) async fn changed(&mut self) -> ActiveStreamingWorkerChange { + let ret = loop { + let notification = self + .rx + .recv() + .await + .expect("notification stopped or uninitialized"); + match notification { + LocalNotification::WorkerNodeDeleted(worker) => { + let is_streaming_compute_node = worker.r#type == WorkerType::ComputeNode as i32 + && worker.property.as_ref().unwrap().is_streaming; + let Some(prev_worker) = self.worker_nodes.remove(&worker.id) else { + if is_streaming_compute_node { + warn!( + ?worker, + "notify to delete an non-existing streaming compute worker" + ); + } + continue; + }; + if !is_streaming_compute_node { + warn!( + ?worker, + ?prev_worker, + "deleted worker has a different recent type" + ); + } + if worker.state == State::Starting as i32 { + warn!( + id = worker.id, + host = ?worker.host, + state = worker.state, + "a starting streaming worker is deleted" + ); + } + break ActiveStreamingWorkerChange::Remove(prev_worker); + } + LocalNotification::WorkerNodeActivated(worker) => { + if worker.r#type != WorkerType::ComputeNode as i32 + || !worker.property.as_ref().unwrap().is_streaming + { + if let Some(prev_worker) = self.worker_nodes.remove(&worker.id) { + warn!( + ?worker, + ?prev_worker, + "the type of a streaming worker is changed" + ); + break ActiveStreamingWorkerChange::Remove(prev_worker); + } else { + continue; + } + } + assert_eq!( + worker.state, + State::Running as i32, + "not started worker added: {:?}", + worker + ); + if let Some(prev_worker) = self.worker_nodes.insert(worker.id, worker.clone()) { + assert_eq!(prev_worker.host, worker.host); + assert_eq!(prev_worker.r#type, worker.r#type); + warn!( + ?prev_worker, + ?worker, + eq = prev_worker == worker, + "notify to update an existing active worker" + ); + if prev_worker == worker { + continue; + } else { + break ActiveStreamingWorkerChange::Update(worker); + } + } else { + break ActiveStreamingWorkerChange::Add(worker); + } + } + _ => { + continue; + } + } + }; + + ret + } +} + impl MetadataManager { pub fn new_v1( cluster_manager: ClusterManagerRef, @@ -171,6 +295,22 @@ impl MetadataManager { } } + pub async fn subscribe_active_streaming_compute_nodes( + &self, + ) -> MetaResult<(Vec, UnboundedReceiver)> { + match self { + MetadataManager::V1(mgr) => Ok(mgr + .cluster_manager + .subscribe_active_streaming_compute_nodes() + .await), + MetadataManager::V2(mgr) => { + mgr.cluster_controller + .subscribe_active_streaming_compute_nodes() + .await + } + } + } + pub async fn list_active_streaming_compute_nodes(&self) -> MetaResult> { match self { MetadataManager::V1(mgr) => Ok(mgr