diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 234c4b36c1c5e..520f7ef4e373b 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -14,7 +14,6 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Formatter; -use std::sync::Arc; use futures::future::try_join_all; use itertools::Itertools; @@ -26,6 +25,7 @@ use risingwave_common::util::epoch::Epoch; use risingwave_connector::source::SplitImpl; use risingwave_hummock_sdk::HummockEpoch; use risingwave_pb::catalog::{CreateType, Table}; +use risingwave_pb::common::PbWorkerNode; use risingwave_pb::meta::table_fragments::PbActorStatus; use risingwave_pb::meta::PausedReason; use risingwave_pb::source::{ConnectorSplit, ConnectorSplits}; @@ -42,9 +42,9 @@ use risingwave_pb::stream_service::WaitEpochCommitRequest; use thiserror_ext::AsReport; use tracing::warn; -use super::info::{CommandActorChanges, CommandFragmentChanges, InflightActorInfo}; +use super::info::CommandFragmentChanges; use super::trace::TracedEpoch; -use crate::barrier::GlobalBarrierManagerContext; +use crate::barrier::{GlobalBarrierManagerContext, InflightSubscriptionInfo}; use crate::manager::{DdlType, InflightFragmentInfo, MetadataManager, StreamingJob, WorkerId}; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism}; use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig}; @@ -106,7 +106,7 @@ pub struct ReplaceTablePlan { } impl ReplaceTablePlan { - fn actor_changes(&self) -> CommandActorChanges { + fn fragment_changes(&self) -> HashMap { let mut fragment_changes = HashMap::new(); for fragment in self.new_table_fragments.fragments.values() { let fragment_change = CommandFragmentChanges::NewFragment(InflightFragmentInfo { @@ -140,7 +140,7 @@ impl ReplaceTablePlan { .insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment) .is_none()); } - CommandActorChanges { fragment_changes } + fragment_changes } } @@ -301,7 +301,7 @@ impl Command { Self::Resume(reason) } - pub fn actor_changes(&self) -> Option { + pub(crate) fn fragment_changes(&self) -> Option> { match self { Command::Plain(_) => None, Command::Pause(_) => None, @@ -309,40 +309,39 @@ impl Command { Command::DropStreamingJobs { unregistered_fragment_ids, .. - } => Some(CommandActorChanges { - fragment_changes: unregistered_fragment_ids + } => Some( + unregistered_fragment_ids .iter() .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment)) .collect(), - }), + ), Command::CreateStreamingJob { info, replace_table, } => { - let fragment_changes = info + let mut changes: HashMap<_, _> = info .new_fragment_info() .map(|(fragment_id, info)| { (fragment_id, CommandFragmentChanges::NewFragment(info)) }) .collect(); - let mut changes = CommandActorChanges { fragment_changes }; if let Some(plan) = replace_table { - let extra_change = plan.actor_changes(); + let extra_change = plan.fragment_changes(); changes.extend(extra_change); } Some(changes) } - Command::CancelStreamingJob(table_fragments) => Some(CommandActorChanges { - fragment_changes: table_fragments + Command::CancelStreamingJob(table_fragments) => Some( + table_fragments .fragments .values() .map(|fragment| (fragment.fragment_id, CommandFragmentChanges::RemoveFragment)) .collect(), - }), - Command::RescheduleFragment { reschedules, .. } => Some(CommandActorChanges { - fragment_changes: reschedules + ), + Command::RescheduleFragment { reschedules, .. } => Some( + reschedules .iter() .map(|(fragment_id, reschedule)| { ( @@ -360,8 +359,8 @@ impl Command { ) }) .collect(), - }), - Command::ReplaceTable(plan) => Some(plan.actor_changes()), + ), + Command::ReplaceTable(plan) => Some(plan.fragment_changes()), Command::SourceSplitAssignment(_) => None, Command::Throttle(_) => None, Command::CreateSubscription { .. } => None, @@ -406,10 +405,6 @@ impl BarrierKind { matches!(self, BarrierKind::Checkpoint(_)) } - pub fn is_initial(&self) -> bool { - matches!(self, BarrierKind::Initial) - } - pub fn as_str_name(&self) -> &'static str { match self { BarrierKind::Initial => "Initial", @@ -423,7 +418,9 @@ impl BarrierKind { /// [`Command`]. pub struct CommandContext { /// Resolved info in this barrier loop. - pub info: Arc, + pub node_map: HashMap, + pub subscription_info: InflightSubscriptionInfo, + pub table_ids_to_commit: HashSet, pub prev_epoch: TracedEpoch, pub curr_epoch: TracedEpoch, @@ -458,7 +455,9 @@ impl std::fmt::Debug for CommandContext { impl CommandContext { #[allow(clippy::too_many_arguments)] pub(super) fn new( - info: InflightActorInfo, + node_map: HashMap, + subscription_info: InflightSubscriptionInfo, + table_ids_to_commit: HashSet, prev_epoch: TracedEpoch, curr_epoch: TracedEpoch, current_paused_reason: Option, @@ -468,7 +467,9 @@ impl CommandContext { span: tracing::Span, ) -> Self { Self { - info: Arc::new(info), + node_map, + subscription_info, + table_ids_to_commit, prev_epoch, curr_epoch, current_paused_reason, @@ -861,9 +862,8 @@ impl CommandContext { self.barrier_manager_context .stream_rpc_manager .drop_actors( - &self.info.node_map, - self.info - .node_map + &self.node_map, + self.node_map .keys() .map(|worker_id| (*worker_id, actors.clone())), ) @@ -871,7 +871,7 @@ impl CommandContext { } pub async fn wait_epoch_commit(&self, epoch: HummockEpoch) -> MetaResult<()> { - let futures = self.info.node_map.values().map(|worker_node| async { + let futures = self.node_map.values().map(|worker_node| async { let client = self .barrier_manager_context .env diff --git a/src/meta/src/barrier/info.rs b/src/meta/src/barrier/info.rs index 44194c7f9eb30..caf124113fc97 100644 --- a/src/meta/src/barrier/info.rs +++ b/src/meta/src/barrier/info.rs @@ -12,14 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::collections::{HashMap, HashSet}; use risingwave_common::catalog::TableId; -use risingwave_pb::common::PbWorkerNode; +use risingwave_pb::common::WorkerNode; use tracing::warn; use crate::barrier::Command; -use crate::manager::{ActiveStreamingWorkerNodes, ActorInfos, InflightFragmentInfo, WorkerId}; +use crate::manager::{InflightFragmentInfo, WorkerId}; use crate::model::{ActorId, FragmentId}; #[derive(Debug, Clone)] @@ -32,53 +32,31 @@ pub(crate) enum CommandFragmentChanges { RemoveFragment, } -#[derive(Debug, Clone)] -pub struct CommandActorChanges { - pub(crate) fragment_changes: HashMap, -} - -impl CommandActorChanges { - pub fn extend(&mut self, other: CommandActorChanges) { - for (fragment_id, fragment) in other.fragment_changes { - assert!(self - .fragment_changes - .insert(fragment_id, fragment) - .is_none()); - } - } +#[derive(Default, Clone)] +pub struct InflightSubscriptionInfo { + /// `mv_table_id` => `subscription_id` => retention seconds + pub mv_depended_subscriptions: HashMap>, } -/// [`InflightActorInfo`] resolves the actor info read from meta store for +/// [`InflightGraphInfo`] resolves the actor info read from meta store for /// [`crate::barrier::GlobalBarrierManager`]. #[derive(Default, Clone)] -pub struct InflightActorInfo { - /// `node_id` => node - pub node_map: HashMap, - +pub(super) struct InflightGraphInfo { /// `node_id` => actors pub actor_map: HashMap>, /// `actor_id` => `WorkerId` pub actor_location_map: HashMap, - /// `mv_table_id` => `subscription_id` => retention seconds - pub mv_depended_subscriptions: HashMap>, - pub fragment_infos: HashMap, } -impl InflightActorInfo { +impl InflightGraphInfo { /// Resolve inflight actor info from given nodes and actors that are loaded from meta store. It will be used during recovery to rebuild all streaming actors. - pub fn resolve( - active_nodes: &ActiveStreamingWorkerNodes, - actor_infos: ActorInfos, - mv_depended_subscriptions: HashMap>, - ) -> Self { - let node_map = active_nodes.current().clone(); - + pub fn new(fragment_infos: HashMap) -> Self { let actor_map = { let mut map: HashMap<_, HashSet<_>> = HashMap::new(); - for info in actor_infos.fragment_infos.values() { + for info in fragment_infos.values() { for (actor_id, worker_id) in &info.actors { map.entry(*worker_id).or_default().insert(*actor_id); } @@ -86,8 +64,7 @@ impl InflightActorInfo { map }; - let actor_location_map = actor_infos - .fragment_infos + let actor_location_map = fragment_infos .values() .flat_map(|fragment| { fragment @@ -98,47 +75,28 @@ impl InflightActorInfo { .collect(); Self { - node_map, actor_map, actor_location_map, - mv_depended_subscriptions, - fragment_infos: actor_infos.fragment_infos, + fragment_infos, } } /// Update worker nodes snapshot. We need to support incremental updates for it in the future. - pub fn resolve_worker_nodes(&mut self, all_nodes: impl IntoIterator) { - let new_node_map = all_nodes - .into_iter() - .map(|node| (node.id, node)) - .collect::>(); - - let mut deleted_actors = BTreeMap::new(); - for (&actor_id, &location) in &self.actor_location_map { - if !new_node_map.contains_key(&location) { - deleted_actors - .entry(location) - .or_insert_with(BTreeSet::new) - .insert(actor_id); + pub fn on_new_worker_node_map(&mut self, node_map: &HashMap) { + for (node_id, actors) in &self.actor_map { + if !node_map.contains_key(node_id) { + warn!(node_id, ?actors, "node with running actors is deleted"); } } - for (node_id, actors) in deleted_actors { - let node = self.node_map.get(&node_id); - warn!( - node_id, - ?node, - ?actors, - "node with running actors is deleted" - ); - } - - self.node_map = new_node_map; } /// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update /// the info correspondingly. - pub fn pre_apply(&mut self, command: &Command) { - if let Some(CommandActorChanges { fragment_changes }) = command.actor_changes() { + pub(crate) fn pre_apply( + &mut self, + fragment_changes: &HashMap, + ) { + { let mut to_add = HashMap::new(); for (fragment_id, change) in fragment_changes { match change { @@ -146,24 +104,26 @@ impl InflightActorInfo { for (actor_id, node_id) in &info.actors { assert!(to_add.insert(*actor_id, *node_id).is_none()); } - assert!(self.fragment_infos.insert(fragment_id, info).is_none()); + assert!(self + .fragment_infos + .insert(*fragment_id, info.clone()) + .is_none()); } CommandFragmentChanges::Reschedule { new_actors, .. } => { let info = self .fragment_infos - .get_mut(&fragment_id) + .get_mut(fragment_id) .expect("should exist"); let actors = &mut info.actors; for (actor_id, node_id) in new_actors { - assert!(to_add.insert(actor_id, node_id).is_none()); - assert!(actors.insert(actor_id, node_id).is_none()); + assert!(to_add.insert(*actor_id, *node_id).is_none()); + assert!(actors.insert(*actor_id, *node_id).is_none()); } } CommandFragmentChanges::RemoveFragment => {} } } for (actor_id, node_id) in to_add { - assert!(self.node_map.contains_key(&node_id)); assert!( self.actor_map.entry(node_id).or_default().insert(actor_id), "duplicate actor in command changes" @@ -174,6 +134,11 @@ impl InflightActorInfo { ); } } + } +} + +impl InflightSubscriptionInfo { + pub fn pre_apply(&mut self, command: &Command) { if let Command::CreateSubscription { subscription_id, upstream_mv_table_id, @@ -190,29 +155,34 @@ impl InflightActorInfo { } } } +} +impl InflightGraphInfo { /// Apply some actor changes after the barrier command is collected, if the command contains any actors that are dropped, we should /// remove that from the snapshot correspondingly. - pub fn post_apply(&mut self, command: &Command) { - if let Some(fragment_changes) = command.actor_changes() { + pub(crate) fn post_apply( + &mut self, + fragment_changes: &HashMap, + ) { + { let mut all_to_remove = HashSet::new(); - for (fragment_id, changes) in fragment_changes.fragment_changes { + for (fragment_id, changes) in fragment_changes { match changes { CommandFragmentChanges::NewFragment(_) => {} CommandFragmentChanges::Reschedule { to_remove, .. } => { let info = self .fragment_infos - .get_mut(&fragment_id) + .get_mut(fragment_id) .expect("should exist"); for actor_id in to_remove { - assert!(all_to_remove.insert(actor_id)); - assert!(info.actors.remove(&actor_id).is_some()); + assert!(all_to_remove.insert(*actor_id)); + assert!(info.actors.remove(actor_id).is_some()); } } CommandFragmentChanges::RemoveFragment => { let info = self .fragment_infos - .remove(&fragment_id) + .remove(fragment_id) .expect("should exist"); for (actor_id, _) in info.actors { assert!(all_to_remove.insert(actor_id)); @@ -230,6 +200,11 @@ impl InflightActorInfo { } self.actor_map.retain(|_, actor_ids| !actor_ids.is_empty()); } + } +} + +impl InflightSubscriptionInfo { + pub fn post_apply(&mut self, command: &Command) { if let Command::DropSubscription { subscription_id, upstream_mv_table_id, @@ -250,13 +225,12 @@ impl InflightActorInfo { } } } +} +impl InflightGraphInfo { /// Returns actor list to collect in the target worker node. - pub fn actor_ids_to_collect( - fragment_infos: &HashMap, - node_id: WorkerId, - ) -> impl Iterator + '_ { - fragment_infos.values().flat_map(move |info| { + pub fn actor_ids_to_collect(&self, node_id: WorkerId) -> impl Iterator + '_ { + self.fragment_infos.values().flat_map(move |info| { info.actors .iter() .filter_map(move |(actor_id, actor_node_id)| { @@ -270,11 +244,8 @@ impl InflightActorInfo { } /// Returns actor list to send in the target worker node. - pub fn actor_ids_to_send( - fragment_infos: &HashMap, - node_id: WorkerId, - ) -> impl Iterator + '_ { - fragment_infos + pub fn actor_ids_to_send(&self, node_id: WorkerId) -> impl Iterator + '_ { + self.fragment_infos .values() .filter(|info| info.is_injectable) .flat_map(move |info| { @@ -290,11 +261,17 @@ impl InflightActorInfo { }) } - pub fn existing_table_ids( - fragment_infos: &HashMap, - ) -> impl Iterator + '_ { - fragment_infos + pub fn existing_table_ids(&self) -> impl Iterator + '_ { + self.fragment_infos .values() .flat_map(|info| info.state_table_ids.iter().cloned()) } + + pub fn worker_ids(&self) -> impl Iterator + '_ { + self.actor_map.keys().cloned() + } + + pub fn contains_worker(&self, worker_id: WorkerId) -> bool { + self.actor_map.contains_key(&worker_id) + } } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index e2a1b24794c6b..3110ca7c46d8c 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -52,7 +52,7 @@ use tracing::{error, info, warn, Instrument}; use self::command::CommandContext; use self::notifier::Notifier; -use crate::barrier::info::InflightActorInfo; +use crate::barrier::info::InflightGraphInfo; use crate::barrier::notifier::BarrierInfo; use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob}; use crate::barrier::rpc::{merge_node_rpc_errors, ControlStreamManager}; @@ -81,6 +81,7 @@ mod trace; pub use self::command::{ BarrierKind, Command, CreateStreamingJobCommandInfo, ReplaceTablePlan, Reschedule, }; +pub use self::info::InflightSubscriptionInfo; pub use self::rpc::StreamRpcManager; pub use self::schedule::BarrierScheduler; pub use self::trace::TracedEpoch; @@ -489,7 +490,8 @@ impl GlobalBarrierManager { let initial_invalid_state = BarrierManagerState::new( TracedEpoch::new(Epoch(INVALID_EPOCH)), - InflightActorInfo::default(), + InflightGraphInfo::default(), + InflightSubscriptionInfo::default(), None, ); @@ -716,8 +718,8 @@ impl GlobalBarrierManager { info!(?changed_worker, "worker changed"); - self.state - .resolve_worker_nodes(self.active_streaming_nodes.current().values().cloned()); + self.state.inflight_graph_info + .on_new_worker_node_map(self.active_streaming_nodes.current()); if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker { self.control_stream_manager.add_worker(node).await; } @@ -748,7 +750,7 @@ impl GlobalBarrierManager { Err(e) => { let failed_command = self.checkpoint_control.command_wait_collect_from_worker(worker_id); if failed_command.is_some() - || self.state.inflight_actor_infos.actor_map.contains_key(&worker_id) { + || self.state.inflight_graph_info.contains_worker(worker_id) { let errors = self.control_stream_manager.collect_errors(worker_id, e).await; let err = merge_node_rpc_errors("get error from control stream", errors); if let Some(failed_command) = failed_command { @@ -799,7 +801,8 @@ impl GlobalBarrierManager { span, } = scheduled; - let info = self.state.apply_command(&command); + let (pre_applied_graph_info, pre_applied_subscription_info) = + self.state.apply_command(&command); let (prev_epoch, curr_epoch) = self.state.next_epoch_pair(); self.pending_non_checkpoint_barriers @@ -818,7 +821,9 @@ impl GlobalBarrierManager { span.record("epoch", curr_epoch.value().0); let command_ctx = Arc::new(CommandContext::new( - info, + self.active_streaming_nodes.current().clone(), + pre_applied_subscription_info, + pre_applied_graph_info.existing_table_ids().collect(), prev_epoch.clone(), curr_epoch.clone(), self.state.paused_reason(), @@ -832,8 +837,8 @@ impl GlobalBarrierManager { let node_to_collect = match self.control_stream_manager.inject_barrier( &command_ctx, - &command_ctx.info.fragment_infos, - Some(&self.state.inflight_actor_infos.fragment_infos), + &pre_applied_graph_info, + Some(&self.state.inflight_graph_info), ) { Ok(node_to_collect) => node_to_collect, Err(err) => { @@ -1186,24 +1191,17 @@ impl GlobalBarrierManagerContext { /// Resolve actor information from cluster, fragment manager and `ChangedTableId`. /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor /// will create or drop before this barrier flow through them. - async fn resolve_actor_info( - &self, - active_nodes: &ActiveStreamingWorkerNodes, - ) -> MetaResult { - let subscriptions = self - .metadata_manager - .get_mv_depended_subscriptions() - .await?; + async fn resolve_graph_info(&self) -> MetaResult { let info = match &self.metadata_manager { MetadataManager::V1(mgr) => { let all_actor_infos = mgr.fragment_manager.load_all_actors().await; - InflightActorInfo::resolve(active_nodes, all_actor_infos, subscriptions) + InflightGraphInfo::new(all_actor_infos.fragment_infos) } MetadataManager::V2(mgr) => { let all_actor_infos = mgr.catalog_controller.load_all_actors().await?; - InflightActorInfo::resolve(active_nodes, all_actor_infos, subscriptions) + InflightGraphInfo::new(all_actor_infos.fragment_infos) } }; @@ -1304,7 +1302,7 @@ fn collect_commit_epoch_info( synced_ssts.iter().map(|sst| &sst.sst_info), epochs, command_ctx - .info + .subscription_info .mv_depended_subscriptions .iter() .filter_map(|(mv_table_id, subscriptions)| { @@ -1337,10 +1335,7 @@ fn collect_commit_epoch_info( sst_to_worker, new_table_fragment_info, table_new_change_log, - BTreeMap::from_iter([( - epoch, - InflightActorInfo::existing_table_ids(&command_ctx.info.fragment_infos).collect(), - )]), + BTreeMap::from_iter([(epoch, command_ctx.table_ids_to_commit.clone())]), epoch, vec![], ) diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 6fbd89d149150..e5adf887b254c 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -35,7 +35,7 @@ use tracing::{debug, warn, Instrument}; use super::{CheckpointControl, TracedEpoch}; use crate::barrier::command::CommandContext; -use crate::barrier::info::InflightActorInfo; +use crate::barrier::info::{InflightGraphInfo, InflightSubscriptionInfo}; use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::rpc::ControlStreamManager; use crate::barrier::schedule::ScheduledBarriers; @@ -304,12 +304,9 @@ impl GlobalBarrierManager { warn!(error = %err.as_report(), "scale actors failed"); })?; - self.context - .resolve_actor_info(&active_streaming_nodes) - .await - .inspect_err(|err| { - warn!(error = %err.as_report(), "resolve actor info failed"); - })? + self.context.resolve_graph_info().await.inspect_err(|err| { + warn!(error = %err.as_report(), "resolve actor info failed"); + })? } else { // Migrate actors in expired CN to newly joined one. self.context @@ -325,21 +322,15 @@ impl GlobalBarrierManager { .pre_apply_drop_cancel(&self.scheduled_barriers) .await? { - info = self - .context - .resolve_actor_info(&active_streaming_nodes) - .await - .inspect_err(|err| { - warn!(error = %err.as_report(), "resolve actor info failed"); - })? + info = self.context.resolve_graph_info().await.inspect_err(|err| { + warn!(error = %err.as_report(), "resolve actor info failed"); + })? } let info = info; self.context - .purge_state_table_from_hummock( - &InflightActorInfo::existing_table_ids(&info.fragment_infos).collect(), - ) + .purge_state_table_from_hummock(&info.existing_table_ids().collect()) .await .context("purge state table from hummock")?; @@ -355,13 +346,27 @@ impl GlobalBarrierManager { self.context.sink_manager.reset().await; + let subscription_info = InflightSubscriptionInfo { + mv_depended_subscriptions: self + .context + .metadata_manager + .get_mv_depended_subscriptions() + .await?, + }; + // update and build all actors. - self.context.update_actors(&info).await.inspect_err(|err| { - warn!(error = %err.as_report(), "update actors failed"); - })?; - self.context.build_actors(&info).await.inspect_err(|err| { - warn!(error = %err.as_report(), "build_actors failed"); - })?; + self.context + .update_actors(&info, &subscription_info, &active_streaming_nodes) + .await + .inspect_err(|err| { + warn!(error = %err.as_report(), "update actors failed"); + })?; + self.context + .build_actors(&info, &active_streaming_nodes) + .await + .inspect_err(|err| { + warn!(error = %err.as_report(), "build_actors failed"); + })?; // get split assignments for all actors let source_split_assignments = @@ -380,7 +385,9 @@ impl GlobalBarrierManager { // Inject the `Initial` barrier to initialize all executors. let command_ctx = Arc::new(CommandContext::new( - info.clone(), + active_streaming_nodes.current().clone(), + subscription_info.clone(), + info.existing_table_ids().collect(), prev_epoch.clone(), new_epoch.clone(), paused_reason, @@ -390,11 +397,8 @@ impl GlobalBarrierManager { tracing::Span::current(), // recovery span )); - let mut node_to_collect = control_stream_manager.inject_barrier( - &command_ctx, - &info.fragment_infos, - Some(&info.fragment_infos), - )?; + let mut node_to_collect = + control_stream_manager.inject_barrier(&command_ctx, &info, Some(&info))?; while !node_to_collect.is_empty() { let (worker_id, result) = control_stream_manager .next_complete_barrier_response() @@ -405,7 +409,12 @@ impl GlobalBarrierManager { } ( - BarrierManagerState::new(new_epoch, info, command_ctx.next_paused_reason()), + BarrierManagerState::new( + new_epoch, + info, + subscription_info, + command_ctx.next_paused_reason(), + ), active_streaming_nodes, control_stream_manager, tracker, @@ -453,7 +462,7 @@ impl GlobalBarrierManagerContext { async fn migrate_actors( &self, active_nodes: &mut ActiveStreamingWorkerNodes, - ) -> MetaResult { + ) -> MetaResult { match &self.metadata_manager { MetadataManager::V1(_) => self.migrate_actors_v1(active_nodes).await, MetadataManager::V2(_) => self.migrate_actors_v2(active_nodes).await, @@ -463,7 +472,7 @@ impl GlobalBarrierManagerContext { async fn migrate_actors_v2( &self, active_nodes: &mut ActiveStreamingWorkerNodes, - ) -> MetaResult { + ) -> MetaResult { let mgr = self.metadata_manager.as_v2_ref(); // all worker slots used by actors @@ -489,7 +498,7 @@ impl GlobalBarrierManagerContext { if expired_worker_slots.is_empty() { debug!("no expired worker slots, skipping."); - return self.resolve_actor_info(active_nodes).await; + return self.resolve_graph_info().await; } debug!("start migrate actors."); @@ -599,23 +608,25 @@ impl GlobalBarrierManagerContext { debug!("migrate actors succeed."); - self.resolve_actor_info(active_nodes).await + self.resolve_graph_info().await } /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated. async fn migrate_actors_v1( &self, active_nodes: &mut ActiveStreamingWorkerNodes, - ) -> MetaResult { + ) -> MetaResult { let mgr = self.metadata_manager.as_v1_ref(); - let info = self.resolve_actor_info(active_nodes).await?; + let info = self.resolve_graph_info().await?; // 1. get expired workers. let expired_workers: HashSet = info .actor_map .iter() - .filter(|(&worker, actors)| !actors.is_empty() && !info.node_map.contains_key(&worker)) + .filter(|(&worker, actors)| { + !actors.is_empty() && !active_nodes.current().contains_key(&worker) + }) .map(|(&worker, _)| worker) .collect(); if expired_workers.is_empty() { @@ -635,7 +646,7 @@ impl GlobalBarrierManagerContext { migration_plan.delete(self.env.meta_store().as_kv()).await?; debug!("migrate actors succeed."); - self.resolve_actor_info(active_nodes).await + self.resolve_graph_info().await } async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> { @@ -810,7 +821,7 @@ impl GlobalBarrierManagerContext { } async fn scale_actors_v1(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> { - let info = self.resolve_actor_info(active_nodes).await?; + let info = self.resolve_graph_info().await?; let mgr = self.metadata_manager.as_v1_ref(); debug!("start resetting actors distribution"); @@ -820,8 +831,8 @@ impl GlobalBarrierManagerContext { return Ok(()); } - let available_parallelism = info - .node_map + let available_parallelism = active_nodes + .current() .values() .map(|worker_node| worker_node.parallelism as usize) .sum(); @@ -1084,7 +1095,12 @@ impl GlobalBarrierManagerContext { } /// Update all actors in compute nodes. - async fn update_actors(&self, info: &InflightActorInfo) -> MetaResult<()> { + async fn update_actors( + &self, + info: &InflightGraphInfo, + subscription_info: &InflightSubscriptionInfo, + active_nodes: &ActiveStreamingWorkerNodes, + ) -> MetaResult<()> { if info.actor_map.is_empty() { tracing::debug!("no actor to update, skipping."); return Ok(()); @@ -1094,8 +1110,8 @@ impl GlobalBarrierManagerContext { .actor_map .iter() .map(|(node_id, actors)| { - let host = info - .node_map + let host = active_nodes + .current() .get(node_id) .ok_or_else(|| anyhow::anyhow!("worker evicted, wait for online."))? .host @@ -1108,11 +1124,14 @@ impl GlobalBarrierManagerContext { .flatten_ok() .try_collect()?; - let mut all_node_actors = self.metadata_manager.all_node_actors(false).await?; + let mut all_node_actors = self + .metadata_manager + .all_node_actors(false, &subscription_info.mv_depended_subscriptions) + .await?; // Check if any actors were dropped after info resolved. if all_node_actors.iter().any(|(node_id, node_actors)| { - !info.node_map.contains_key(node_id) + !active_nodes.current().contains_key(node_id) || info .actor_map .get(node_id) @@ -1124,7 +1143,7 @@ impl GlobalBarrierManagerContext { self.stream_rpc_manager .broadcast_update_actor_info( - &info.node_map, + active_nodes.current(), info.actor_map.keys().cloned(), actor_infos.into_iter(), info.actor_map.keys().map(|node_id| { @@ -1140,7 +1159,11 @@ impl GlobalBarrierManagerContext { } /// Build all actors in compute nodes. - async fn build_actors(&self, info: &InflightActorInfo) -> MetaResult<()> { + async fn build_actors( + &self, + info: &InflightGraphInfo, + active_nodes: &ActiveStreamingWorkerNodes, + ) -> MetaResult<()> { if info.actor_map.is_empty() { tracing::debug!("no actor to build, skipping."); return Ok(()); @@ -1148,7 +1171,7 @@ impl GlobalBarrierManagerContext { self.stream_rpc_manager .build_actors( - &info.node_map, + active_nodes.current(), info.actor_map.iter().map(|(node_id, actors)| { let actors = actors.iter().cloned().collect(); (*node_id, actors) diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index ae12c439e0a93..fc125d6e583a4 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -45,9 +45,8 @@ use uuid::Uuid; use super::command::CommandContext; use super::GlobalBarrierManagerContext; -use crate::barrier::info::InflightActorInfo; -use crate::manager::{InflightFragmentInfo, MetaSrvEnv, WorkerId}; -use crate::model::FragmentId; +use crate::barrier::info::InflightGraphInfo; +use crate::manager::{MetaSrvEnv, WorkerId}; use crate::{MetaError, MetaResult}; const COLLECT_ERROR_TIMEOUT: Duration = Duration::from_secs(3); @@ -249,31 +248,40 @@ impl ControlStreamManager { pub(super) fn inject_barrier( &mut self, command_context: &CommandContext, - pre_applied_fragment_infos: &HashMap, - applied_fragment_infos: Option<&HashMap>, + pre_applied_graph_info: &InflightGraphInfo, + applied_graph_info: Option<&InflightGraphInfo>, ) -> MetaResult> { fail_point!("inject_barrier_err", |_| risingwave_common::bail!( "inject_barrier_err" )); let mutation = command_context.to_mutation(); - let info = command_context.info.clone(); let mut node_need_collect = HashSet::new(); - info.node_map - .iter() - .map(|(node_id, worker_node)| { + for worker_id in pre_applied_graph_info.worker_ids().chain( + applied_graph_info + .into_iter() + .flat_map(|info| info.worker_ids()), + ) { + if !self.nodes.contains_key(&worker_id) { + return Err(anyhow!("unconnected worker node {}", worker_id).into()); + } + } + + self.nodes + .iter_mut() + .map(|(node_id, node)| { let actor_ids_to_send: Vec<_> = - InflightActorInfo::actor_ids_to_send(pre_applied_fragment_infos, *node_id) - .collect(); - let actor_ids_to_collect: Vec<_> = - InflightActorInfo::actor_ids_to_collect(pre_applied_fragment_infos, *node_id) - .collect(); + pre_applied_graph_info.actor_ids_to_send(*node_id).collect(); + let actor_ids_to_collect: Vec<_> = pre_applied_graph_info + .actor_ids_to_collect(*node_id) + .collect(); if actor_ids_to_collect.is_empty() { // No need to send or collect barrier for this node. assert!(actor_ids_to_send.is_empty()); } - let table_ids_to_sync = if let Some(fragment_infos) = applied_fragment_infos { - InflightActorInfo::existing_table_ids(fragment_infos) + let table_ids_to_sync = if let Some(graph_info) = applied_graph_info { + graph_info + .existing_table_ids() .map(|table_id| table_id.table_id) .collect() } else { @@ -281,15 +289,6 @@ impl ControlStreamManager { }; { - let Some(node) = self.nodes.get_mut(node_id) else { - if actor_ids_to_collect.is_empty() { - // Worker node get disconnected but has no actor to collect. Simply skip it. - return Ok(()); - } - return Err( - anyhow!("unconnected worker node: {:?}", worker_node.host).into() - ); - }; let mutation = mutation.clone(); let barrier = Barrier { epoch: Some(risingwave_pb::data::Epoch { diff --git a/src/meta/src/barrier/state.rs b/src/meta/src/barrier/state.rs index f3b49b8fa2709..32f74cb888935 100644 --- a/src/meta/src/barrier/state.rs +++ b/src/meta/src/barrier/state.rs @@ -12,10 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_pb::common::WorkerNode; use risingwave_pb::meta::PausedReason; -use crate::barrier::info::InflightActorInfo; +use crate::barrier::info::{InflightGraphInfo, InflightSubscriptionInfo}; use crate::barrier::{Command, TracedEpoch}; /// `BarrierManagerState` defines the necessary state of `GlobalBarrierManager`. @@ -27,7 +26,9 @@ pub struct BarrierManagerState { in_flight_prev_epoch: TracedEpoch, /// Inflight running actors info. - pub(crate) inflight_actor_infos: InflightActorInfo, + pub(crate) inflight_graph_info: InflightGraphInfo, + + inflight_subscription_info: InflightSubscriptionInfo, /// Whether the cluster is paused and the reason. paused_reason: Option, @@ -36,12 +37,14 @@ pub struct BarrierManagerState { impl BarrierManagerState { pub fn new( in_flight_prev_epoch: TracedEpoch, - inflight_actor_infos: InflightActorInfo, + inflight_graph_info: InflightGraphInfo, + inflight_subscription_info: InflightSubscriptionInfo, paused_reason: Option, ) -> Self { Self { in_flight_prev_epoch, - inflight_actor_infos, + inflight_graph_info, + inflight_subscription_info, paused_reason, } } @@ -69,17 +72,29 @@ impl BarrierManagerState { (prev_epoch, next_epoch) } - pub fn resolve_worker_nodes(&mut self, nodes: impl IntoIterator) { - self.inflight_actor_infos.resolve_worker_nodes(nodes); - } - /// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors /// will be removed from the state after the info get resolved. - pub fn apply_command(&mut self, command: &Command) -> InflightActorInfo { - self.inflight_actor_infos.pre_apply(command); - let info = self.inflight_actor_infos.clone(); - self.inflight_actor_infos.post_apply(command); + pub fn apply_command( + &mut self, + command: &Command, + ) -> (InflightGraphInfo, InflightSubscriptionInfo) { + // update the fragment_infos outside pre_apply + let fragment_changes = if let Some(fragment_changes) = command.fragment_changes() { + self.inflight_graph_info.pre_apply(&fragment_changes); + Some(fragment_changes) + } else { + None + }; + self.inflight_subscription_info.pre_apply(command); + + let info = self.inflight_graph_info.clone(); + let subscription_info = self.inflight_subscription_info.clone(); + + if let Some(fragment_changes) = fragment_changes { + self.inflight_graph_info.post_apply(&fragment_changes); + } + self.inflight_subscription_info.post_apply(command); - info + (info, subscription_info) } } diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index e99259810b117..a5c0267b6b667 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -733,12 +733,12 @@ impl MetadataManager { pub async fn all_node_actors( &self, include_inactive: bool, + subscriptions: &HashMap>, ) -> MetaResult>> { - let subscriptions = self.get_mv_depended_subscriptions().await?; match &self { MetadataManager::V1(mgr) => Ok(mgr .fragment_manager - .all_node_actors(include_inactive, &subscriptions) + .all_node_actors(include_inactive, subscriptions) .await), MetadataManager::V2(mgr) => { let table_fragments = mgr.catalog_controller.table_fragments().await?; @@ -751,7 +751,7 @@ impl MetadataManager { node_actors.extend( actors .into_iter() - .map(|actor| to_build_actor_info(actor, &subscriptions, table_id)), + .map(|actor| to_build_actor_info(actor, subscriptions, table_id)), ) } }