diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 505bfa21f813c..520f7ef4e373b 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -14,7 +14,6 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Formatter; -use std::sync::Arc; use futures::future::try_join_all; use itertools::Itertools; @@ -26,6 +25,7 @@ use risingwave_common::util::epoch::Epoch; use risingwave_connector::source::SplitImpl; use risingwave_hummock_sdk::HummockEpoch; use risingwave_pb::catalog::{CreateType, Table}; +use risingwave_pb::common::PbWorkerNode; use risingwave_pb::meta::table_fragments::PbActorStatus; use risingwave_pb::meta::PausedReason; use risingwave_pb::source::{ConnectorSplit, ConnectorSplits}; @@ -42,7 +42,7 @@ use risingwave_pb::stream_service::WaitEpochCommitRequest; use thiserror_ext::AsReport; use tracing::warn; -use super::info::{CommandFragmentChanges, InflightActorInfo}; +use super::info::CommandFragmentChanges; use super::trace::TracedEpoch; use crate::barrier::{GlobalBarrierManagerContext, InflightSubscriptionInfo}; use crate::manager::{DdlType, InflightFragmentInfo, MetadataManager, StreamingJob, WorkerId}; @@ -418,7 +418,7 @@ impl BarrierKind { /// [`Command`]. pub struct CommandContext { /// Resolved info in this barrier loop. - pub info: Arc, + pub node_map: HashMap, pub subscription_info: InflightSubscriptionInfo, pub table_ids_to_commit: HashSet, @@ -455,7 +455,7 @@ impl std::fmt::Debug for CommandContext { impl CommandContext { #[allow(clippy::too_many_arguments)] pub(super) fn new( - info: InflightActorInfo, + node_map: HashMap, subscription_info: InflightSubscriptionInfo, table_ids_to_commit: HashSet, prev_epoch: TracedEpoch, @@ -467,7 +467,7 @@ impl CommandContext { span: tracing::Span, ) -> Self { Self { - info: Arc::new(info), + node_map, subscription_info, table_ids_to_commit, prev_epoch, @@ -862,9 +862,8 @@ impl CommandContext { self.barrier_manager_context .stream_rpc_manager .drop_actors( - &self.info.node_map, - self.info - .node_map + &self.node_map, + self.node_map .keys() .map(|worker_id| (*worker_id, actors.clone())), ) @@ -872,7 +871,7 @@ impl CommandContext { } pub async fn wait_epoch_commit(&self, epoch: HummockEpoch) -> MetaResult<()> { - let futures = self.info.node_map.values().map(|worker_node| async { + let futures = self.node_map.values().map(|worker_node| async { let client = self .barrier_manager_context .env diff --git a/src/meta/src/barrier/info.rs b/src/meta/src/barrier/info.rs index 84f4498668ab3..aadc7af88b06a 100644 --- a/src/meta/src/barrier/info.rs +++ b/src/meta/src/barrier/info.rs @@ -12,16 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::collections::{HashMap, HashSet}; use risingwave_common::catalog::TableId; -use risingwave_pb::common::PbWorkerNode; use tracing::warn; use crate::barrier::Command; -use crate::manager::{ - ActiveStreamingWorkerNodes, InflightFragmentInfo, InflightGraphInfo, WorkerId, -}; +use crate::manager::{ActorInfos, InflightFragmentInfo, WorkerId}; use crate::model::{ActorId, FragmentId}; #[derive(Debug, Clone)] @@ -40,31 +37,25 @@ pub struct InflightSubscriptionInfo { pub mv_depended_subscriptions: HashMap>, } -/// [`InflightActorInfo`] resolves the actor info read from meta store for +/// [`InflightGraphInfo`] resolves the actor info read from meta store for /// [`crate::barrier::GlobalBarrierManager`]. #[derive(Default, Clone)] -pub struct InflightActorInfo { - /// `node_id` => node - pub node_map: HashMap, - +pub(super) struct InflightGraphInfo { /// `node_id` => actors pub actor_map: HashMap>, /// `actor_id` => `WorkerId` pub actor_location_map: HashMap, + + pub fragment_infos: HashMap, } -impl InflightActorInfo { +impl InflightGraphInfo { /// Resolve inflight actor info from given nodes and actors that are loaded from meta store. It will be used during recovery to rebuild all streaming actors. - pub fn resolve( - active_nodes: &ActiveStreamingWorkerNodes, - graph_info: &InflightGraphInfo, - ) -> Self { - let node_map = active_nodes.current().clone(); - + pub fn resolve(actor_infos: ActorInfos) -> Self { let actor_map = { let mut map: HashMap<_, HashSet<_>> = HashMap::new(); - for info in graph_info.fragment_infos.values() { + for info in actor_infos.fragment_infos.values() { for (actor_id, worker_id) in &info.actors { map.entry(*worker_id).or_default().insert(*actor_id); } @@ -72,7 +63,7 @@ impl InflightActorInfo { map }; - let actor_location_map = graph_info + let actor_location_map = actor_infos .fragment_infos .values() .flat_map(|fragment| { @@ -84,49 +75,18 @@ impl InflightActorInfo { .collect(); Self { - node_map, actor_map, actor_location_map, + fragment_infos: actor_infos.fragment_infos, } } - /// Update worker nodes snapshot. We need to support incremental updates for it in the future. - pub fn resolve_worker_nodes(&mut self, all_nodes: impl IntoIterator) { - let new_node_map = all_nodes - .into_iter() - .map(|node| (node.id, node)) - .collect::>(); - - let mut deleted_actors = BTreeMap::new(); - for (&actor_id, &location) in &self.actor_location_map { - if !new_node_map.contains_key(&location) { - deleted_actors - .entry(location) - .or_insert_with(BTreeSet::new) - .insert(actor_id); - } - } - for (node_id, actors) in deleted_actors { - let node = self.node_map.get(&node_id); - warn!( - node_id, - ?node, - ?actors, - "node with running actors is deleted" - ); - } - - self.node_map = new_node_map; - } -} - -impl InflightGraphInfo { /// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update /// the info correspondingly. pub(crate) fn pre_apply( &mut self, fragment_changes: &HashMap, - ) -> HashMap { + ) { { let mut to_add = HashMap::new(); for (fragment_id, change) in fragment_changes { @@ -154,16 +114,7 @@ impl InflightGraphInfo { CommandFragmentChanges::RemoveFragment => {} } } - to_add - } - } -} - -impl InflightActorInfo { - pub fn pre_apply(&mut self, actors_to_add: Option>) { - { - for (actor_id, node_id) in actors_to_add.into_iter().flatten() { - assert!(self.node_map.contains_key(&node_id)); + for (actor_id, node_id) in to_add { assert!( self.actor_map.entry(node_id).or_default().insert(actor_id), "duplicate actor in command changes" @@ -203,7 +154,7 @@ impl InflightGraphInfo { pub(crate) fn post_apply( &mut self, fragment_changes: &HashMap, - ) -> HashSet { + ) { { let mut all_to_remove = HashSet::new(); for (fragment_id, changes) in fragment_changes { @@ -224,21 +175,13 @@ impl InflightGraphInfo { .fragment_infos .remove(fragment_id) .expect("should exist"); - for actor_id in info.actors.keys() { - assert!(all_to_remove.insert(*actor_id)); + for (actor_id, _) in info.actors { + assert!(all_to_remove.insert(actor_id)); } } } } - all_to_remove - } - } -} - -impl InflightActorInfo { - pub fn post_apply(&mut self, actors_to_remove: Option>) { - { - for actor_id in actors_to_remove.into_iter().flatten() { + for actor_id in all_to_remove { let node_id = self .actor_location_map .remove(&actor_id) @@ -314,4 +257,12 @@ impl InflightGraphInfo { .values() .flat_map(|info| info.state_table_ids.iter().cloned()) } + + pub fn worker_ids(&self) -> impl Iterator + '_ { + self.actor_map.keys().cloned() + } + + pub fn contains_worker(&self, worker_id: WorkerId) -> bool { + self.actor_map.contains_key(&worker_id) + } } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 34343c3e2ed52..e0312ad57739b 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -52,7 +52,7 @@ use tracing::{error, info, warn, Instrument}; use self::command::CommandContext; use self::notifier::Notifier; -use crate::barrier::info::InflightActorInfo; +use crate::barrier::info::InflightGraphInfo; use crate::barrier::notifier::BarrierInfo; use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob}; use crate::barrier::rpc::{merge_node_rpc_errors, ControlStreamManager}; @@ -61,8 +61,8 @@ use crate::error::MetaErrorInner; use crate::hummock::{CommitEpochInfo, HummockManagerRef, NewTableFragmentInfo}; use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{ - ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, InflightGraphInfo, LocalNotification, - MetaSrvEnv, MetadataManager, SystemParamsManagerImpl, WorkerId, + ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv, + MetadataManager, SystemParamsManagerImpl, WorkerId, }; use crate::rpc::metrics::MetaMetrics; use crate::stream::{ScaleControllerRef, SourceManagerRef}; @@ -490,7 +490,6 @@ impl GlobalBarrierManager { let initial_invalid_state = BarrierManagerState::new( TracedEpoch::new(Epoch(INVALID_EPOCH)), - InflightActorInfo::default(), InflightGraphInfo::default(), InflightSubscriptionInfo::default(), None, @@ -719,8 +718,6 @@ impl GlobalBarrierManager { info!(?changed_worker, "worker changed"); - self.state - .resolve_worker_nodes(self.active_streaming_nodes.current().values().cloned()); if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker { self.control_stream_manager.add_worker(node).await; } @@ -751,7 +748,7 @@ impl GlobalBarrierManager { Err(e) => { let failed_command = self.checkpoint_control.command_wait_collect_from_worker(worker_id); if failed_command.is_some() - || self.state.inflight_actor_infos.actor_map.contains_key(&worker_id) { + || self.state.inflight_actor_infos.contains_worker(worker_id) { let errors = self.control_stream_manager.collect_errors(worker_id, e).await; let err = merge_node_rpc_errors("get error from control stream", errors); if let Some(failed_command) = failed_command { @@ -802,7 +799,7 @@ impl GlobalBarrierManager { span, } = scheduled; - let (pre_applied_actor_info, pre_applied_graph_info, pre_applied_subscription_info) = + let (pre_applied_actor_info, pre_applied_subscription_info) = self.state.apply_command(&command); let (prev_epoch, curr_epoch) = self.state.next_epoch_pair(); @@ -822,9 +819,9 @@ impl GlobalBarrierManager { span.record("epoch", curr_epoch.value().0); let command_ctx = Arc::new(CommandContext::new( - pre_applied_actor_info, + self.active_streaming_nodes.current().clone(), pre_applied_subscription_info, - pre_applied_graph_info.existing_table_ids().collect(), + pre_applied_actor_info.existing_table_ids().collect(), prev_epoch.clone(), curr_epoch.clone(), self.state.paused_reason(), @@ -838,8 +835,8 @@ impl GlobalBarrierManager { let node_to_collect = match self.control_stream_manager.inject_barrier( &command_ctx, - &pre_applied_graph_info, - Some(&self.state.inflight_graph_info), + &pre_applied_actor_info, + Some(&self.state.inflight_actor_infos), ) { Ok(node_to_collect) => node_to_collect, Err(err) => { @@ -1192,10 +1189,18 @@ impl GlobalBarrierManagerContext { /// Resolve actor information from cluster, fragment manager and `ChangedTableId`. /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor /// will create or drop before this barrier flow through them. - async fn load_graph_info(&self) -> MetaResult { + async fn resolve_actor_info(&self) -> MetaResult { let info = match &self.metadata_manager { - MetadataManager::V1(mgr) => mgr.fragment_manager.load_graph_info().await, - MetadataManager::V2(mgr) => mgr.catalog_controller.load_graph_info().await?, + MetadataManager::V1(mgr) => { + let all_actor_infos = mgr.fragment_manager.load_all_actors().await; + + InflightGraphInfo::resolve(all_actor_infos) + } + MetadataManager::V2(mgr) => { + let all_actor_infos = mgr.catalog_controller.load_all_actors().await?; + + InflightGraphInfo::resolve(all_actor_infos) + } }; Ok(info) diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 5f1ab8cb586f8..236821d6a7a01 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -35,14 +35,14 @@ use tracing::{debug, warn, Instrument}; use super::{CheckpointControl, TracedEpoch}; use crate::barrier::command::CommandContext; -use crate::barrier::info::{InflightActorInfo, InflightSubscriptionInfo}; +use crate::barrier::info::{InflightGraphInfo, InflightSubscriptionInfo}; use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::rpc::ControlStreamManager; use crate::barrier::schedule::ScheduledBarriers; use crate::barrier::state::BarrierManagerState; use crate::barrier::{BarrierKind, Command, GlobalBarrierManager, GlobalBarrierManagerContext}; use crate::controller::catalog::ReleaseContext; -use crate::manager::{ActiveStreamingWorkerNodes, InflightGraphInfo, MetadataManager, WorkerId}; +use crate::manager::{ActiveStreamingWorkerNodes, MetadataManager, WorkerId}; use crate::model::{MetadataModel, MigrationPlan, TableFragments, TableParallelism}; use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy}; use crate::{model, MetaError, MetaResult}; @@ -294,7 +294,7 @@ impl GlobalBarrierManager { // following steps will be no-op, while the compute nodes will still be reset. // FIXME: Transactions should be used. // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere. - let mut graph_info = if !self.env.opts.disable_automatic_parallelism_control + let mut info = if !self.env.opts.disable_automatic_parallelism_control && background_streaming_jobs.is_empty() { self.context @@ -304,7 +304,7 @@ impl GlobalBarrierManager { warn!(error = %err.as_report(), "scale actors failed"); })?; - self.context.load_graph_info().await.inspect_err(|err| { + self.context.resolve_actor_info().await.inspect_err(|err| { warn!(error = %err.as_report(), "resolve actor info failed"); })? } else { @@ -322,15 +322,15 @@ impl GlobalBarrierManager { .pre_apply_drop_cancel(&self.scheduled_barriers) .await? { - graph_info = self.context.load_graph_info().await.inspect_err(|err| { + info = self.context.resolve_actor_info().await.inspect_err(|err| { warn!(error = %err.as_report(), "resolve actor info failed"); })? } - let graph_info = graph_info; + let info = info; self.context - .purge_state_table_from_hummock(&graph_info.existing_table_ids().collect()) + .purge_state_table_from_hummock(&info.existing_table_ids().collect()) .await .context("purge state table from hummock")?; @@ -346,8 +346,6 @@ impl GlobalBarrierManager { self.context.sink_manager.reset().await; - let actor_info = - InflightActorInfo::resolve(&active_streaming_nodes, &graph_info); let subscription_info = InflightSubscriptionInfo { mv_depended_subscriptions: self .context @@ -358,13 +356,13 @@ impl GlobalBarrierManager { // update and build all actors. self.context - .update_actors(&actor_info, &subscription_info) + .update_actors(&info, &subscription_info, &active_streaming_nodes) .await .inspect_err(|err| { warn!(error = %err.as_report(), "update actors failed"); })?; self.context - .build_actors(&actor_info) + .build_actors(&info, &active_streaming_nodes) .await .inspect_err(|err| { warn!(error = %err.as_report(), "build_actors failed"); @@ -387,9 +385,9 @@ impl GlobalBarrierManager { // Inject the `Initial` barrier to initialize all executors. let command_ctx = Arc::new(CommandContext::new( - actor_info.clone(), + active_streaming_nodes.current().clone(), subscription_info.clone(), - graph_info.existing_table_ids().collect(), + info.existing_table_ids().collect(), prev_epoch.clone(), new_epoch.clone(), paused_reason, @@ -399,11 +397,8 @@ impl GlobalBarrierManager { tracing::Span::current(), // recovery span )); - let mut node_to_collect = control_stream_manager.inject_barrier( - &command_ctx, - &graph_info, - Some(&graph_info), - )?; + let mut node_to_collect = + control_stream_manager.inject_barrier(&command_ctx, &info, Some(&info))?; while !node_to_collect.is_empty() { let (worker_id, result) = control_stream_manager .next_complete_barrier_response() @@ -416,8 +411,7 @@ impl GlobalBarrierManager { ( BarrierManagerState::new( new_epoch, - actor_info, - graph_info, + info, subscription_info, command_ctx.next_paused_reason(), ), @@ -504,7 +498,7 @@ impl GlobalBarrierManagerContext { if expired_worker_slots.is_empty() { debug!("no expired worker slots, skipping."); - return self.load_graph_info().await; + return self.resolve_actor_info().await; } debug!("start migrate actors."); @@ -614,7 +608,7 @@ impl GlobalBarrierManagerContext { debug!("migrate actors succeed."); - self.load_graph_info().await + self.resolve_actor_info().await } /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated. @@ -624,19 +618,20 @@ impl GlobalBarrierManagerContext { ) -> MetaResult { let mgr = self.metadata_manager.as_v1_ref(); - let graph_info = self.load_graph_info().await?; - let info = InflightActorInfo::resolve(active_nodes, &graph_info); + let info = self.resolve_actor_info().await?; // 1. get expired workers. let expired_workers: HashSet = info .actor_map .iter() - .filter(|(&worker, actors)| !actors.is_empty() && !info.node_map.contains_key(&worker)) + .filter(|(&worker, actors)| { + !actors.is_empty() && !active_nodes.current().contains_key(&worker) + }) .map(|(&worker, _)| worker) .collect(); if expired_workers.is_empty() { debug!("no expired workers, skipping."); - return Ok(graph_info); + return Ok(info); } debug!("start migrate actors."); @@ -651,7 +646,7 @@ impl GlobalBarrierManagerContext { migration_plan.delete(self.env.meta_store().as_kv()).await?; debug!("migrate actors succeed."); - self.load_graph_info().await + self.resolve_actor_info().await } async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> { @@ -826,8 +821,7 @@ impl GlobalBarrierManagerContext { } async fn scale_actors_v1(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> { - let graph_info = self.load_graph_info().await?; - let info = InflightActorInfo::resolve(active_nodes, &graph_info); + let info = self.resolve_actor_info().await?; let mgr = self.metadata_manager.as_v1_ref(); debug!("start resetting actors distribution"); @@ -837,8 +831,8 @@ impl GlobalBarrierManagerContext { return Ok(()); } - let available_parallelism = info - .node_map + let available_parallelism = active_nodes + .current() .values() .map(|worker_node| worker_node.parallelism as usize) .sum(); @@ -1103,8 +1097,9 @@ impl GlobalBarrierManagerContext { /// Update all actors in compute nodes. async fn update_actors( &self, - info: &InflightActorInfo, + info: &InflightGraphInfo, subscription_info: &InflightSubscriptionInfo, + active_nodes: &ActiveStreamingWorkerNodes, ) -> MetaResult<()> { if info.actor_map.is_empty() { tracing::debug!("no actor to update, skipping."); @@ -1115,8 +1110,8 @@ impl GlobalBarrierManagerContext { .actor_map .iter() .map(|(node_id, actors)| { - let host = info - .node_map + let host = active_nodes + .current() .get(node_id) .ok_or_else(|| anyhow::anyhow!("worker evicted, wait for online."))? .host @@ -1131,12 +1126,12 @@ impl GlobalBarrierManagerContext { let mut all_node_actors = self .metadata_manager - .all_node_actors(false, subscription_info) + .all_node_actors(false, &subscription_info.mv_depended_subscriptions) .await?; // Check if any actors were dropped after info resolved. if all_node_actors.iter().any(|(node_id, node_actors)| { - !info.node_map.contains_key(node_id) + !active_nodes.current().contains_key(node_id) || info .actor_map .get(node_id) @@ -1148,7 +1143,7 @@ impl GlobalBarrierManagerContext { self.stream_rpc_manager .broadcast_update_actor_info( - &info.node_map, + active_nodes.current(), info.actor_map.keys().cloned(), actor_infos.into_iter(), info.actor_map.keys().map(|node_id| { @@ -1164,7 +1159,11 @@ impl GlobalBarrierManagerContext { } /// Build all actors in compute nodes. - async fn build_actors(&self, info: &InflightActorInfo) -> MetaResult<()> { + async fn build_actors( + &self, + info: &InflightGraphInfo, + active_nodes: &ActiveStreamingWorkerNodes, + ) -> MetaResult<()> { if info.actor_map.is_empty() { tracing::debug!("no actor to build, skipping."); return Ok(()); @@ -1172,7 +1171,7 @@ impl GlobalBarrierManagerContext { self.stream_rpc_manager .build_actors( - &info.node_map, + active_nodes.current(), info.actor_map.iter().map(|(node_id, actors)| { let actors = actors.iter().cloned().collect(); (*node_id, actors) diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index f8db87d1156c7..8d9a6c06571dc 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -45,7 +45,8 @@ use uuid::Uuid; use super::command::CommandContext; use super::GlobalBarrierManagerContext; -use crate::manager::{InflightGraphInfo, MetaSrvEnv, WorkerId}; +use crate::barrier::info::InflightGraphInfo; +use crate::manager::{MetaSrvEnv, WorkerId}; use crate::{MetaError, MetaResult}; const COLLECT_ERROR_TIMEOUT: Duration = Duration::from_secs(3); @@ -254,10 +255,16 @@ impl ControlStreamManager { "inject_barrier_err" )); let mutation = command_context.to_mutation(); - let info = command_context.info.clone(); let mut node_need_collect = HashSet::new(); - info.node_map + for worker_id in pre_applied_graph_info.worker_ids() { + if !command_context.node_map.contains_key(&worker_id) { + return Err(anyhow!("worker id {} not exist", worker_id).into()); + } + } + + command_context + .node_map .iter() .map(|(node_id, worker_node)| { let actor_ids_to_send: Vec<_> = diff --git a/src/meta/src/barrier/state.rs b/src/meta/src/barrier/state.rs index 95a152c18e362..a9d228a56f8a0 100644 --- a/src/meta/src/barrier/state.rs +++ b/src/meta/src/barrier/state.rs @@ -12,12 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_pb::common::WorkerNode; use risingwave_pb::meta::PausedReason; -use crate::barrier::info::{InflightActorInfo, InflightSubscriptionInfo}; +use crate::barrier::info::{InflightGraphInfo, InflightSubscriptionInfo}; use crate::barrier::{Command, TracedEpoch}; -use crate::manager::InflightGraphInfo; /// `BarrierManagerState` defines the necessary state of `GlobalBarrierManager`. pub struct BarrierManagerState { @@ -28,9 +26,7 @@ pub struct BarrierManagerState { in_flight_prev_epoch: TracedEpoch, /// Inflight running actors info. - pub(super) inflight_actor_infos: InflightActorInfo, - - pub(super) inflight_graph_info: InflightGraphInfo, + pub(crate) inflight_actor_infos: InflightGraphInfo, inflight_subscription_info: InflightSubscriptionInfo, @@ -41,15 +37,13 @@ pub struct BarrierManagerState { impl BarrierManagerState { pub fn new( in_flight_prev_epoch: TracedEpoch, - inflight_actor_infos: InflightActorInfo, - inflight_graph_info: InflightGraphInfo, + inflight_actor_infos: InflightGraphInfo, inflight_subscription_info: InflightSubscriptionInfo, paused_reason: Option, ) -> Self { Self { in_flight_prev_epoch, inflight_actor_infos, - inflight_graph_info, inflight_subscription_info, paused_reason, } @@ -78,43 +72,29 @@ impl BarrierManagerState { (prev_epoch, next_epoch) } - pub fn resolve_worker_nodes(&mut self, nodes: impl IntoIterator) { - self.inflight_actor_infos.resolve_worker_nodes(nodes); - } - /// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors /// will be removed from the state after the info get resolved. pub fn apply_command( &mut self, command: &Command, - ) -> ( - InflightActorInfo, - InflightGraphInfo, - InflightSubscriptionInfo, - ) { + ) -> (InflightGraphInfo, InflightSubscriptionInfo) { // update the fragment_infos outside pre_apply - let (actors_to_add, fragment_changes) = - if let Some(fragment_changes) = command.fragment_changes() { - let actors_to_add = self.inflight_graph_info.pre_apply(&fragment_changes); - (Some(actors_to_add), Some(fragment_changes)) - } else { - (None, None) - }; - self.inflight_actor_infos.pre_apply(actors_to_add); + let fragment_changes = if let Some(fragment_changes) = command.fragment_changes() { + self.inflight_actor_infos.pre_apply(&fragment_changes); + Some(fragment_changes) + } else { + None + }; self.inflight_subscription_info.pre_apply(command); - let actor_info = self.inflight_actor_infos.clone(); - let graph_info = self.inflight_graph_info.clone(); + let info = self.inflight_actor_infos.clone(); let subscription_info = self.inflight_subscription_info.clone(); - let actors_to_remove = if let Some(fragment_changes) = fragment_changes { - Some(self.inflight_graph_info.post_apply(&fragment_changes)) - } else { - None - }; - self.inflight_actor_infos.post_apply(actors_to_remove); + if let Some(fragment_changes) = fragment_changes { + self.inflight_actor_infos.post_apply(&fragment_changes); + } self.inflight_subscription_info.post_apply(command); - (actor_info, graph_info, subscription_info) + (info, subscription_info) } } diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index be4a72e98af77..6c24a6a44e5b4 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -56,7 +56,7 @@ use crate::controller::utils::{ get_actor_dispatchers, get_fragment_mappings, rebuild_fragment_mapping_from_actors, FragmentDesc, PartialActorLocation, PartialFragmentStateTables, }; -use crate::manager::{InflightFragmentInfo, InflightGraphInfo, LocalNotification}; +use crate::manager::{ActorInfos, InflightFragmentInfo, LocalNotification}; use crate::model::{TableFragments, TableParallelism}; use crate::stream::SplitAssignment; use crate::{MetaError, MetaResult}; @@ -838,7 +838,7 @@ impl CatalogController { /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or /// collected - pub async fn load_graph_info(&self) -> MetaResult { + pub async fn load_all_actors(&self) -> MetaResult { let inner = self.inner.read().await; let actor_info: Vec<(ActorId, WorkerId, FragmentId, i32, I32Array)> = Actor::find() .select_only() @@ -887,7 +887,7 @@ impl CatalogController { } } - Ok(InflightGraphInfo::new(fragment_infos)) + Ok(ActorInfos::new(fragment_infos)) } pub async fn migrate_actors( diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 372c8cf96bd76..42e80d1c13a64 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -180,12 +180,11 @@ pub struct InflightFragmentInfo { pub is_injectable: bool, } -#[derive(Clone, Debug, Default)] -pub struct InflightGraphInfo { +pub struct ActorInfos { pub fragment_infos: HashMap, } -impl InflightGraphInfo { +impl ActorInfos { pub fn new(fragment_infos: HashMap) -> Self { Self { fragment_infos } } @@ -774,7 +773,7 @@ impl FragmentManager { /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or /// collected - pub async fn load_graph_info(&self) -> InflightGraphInfo { + pub async fn load_all_actors(&self) -> ActorInfos { let mut fragment_infos = HashMap::new(); let map = &self.core.read().await.table_fragments; @@ -807,7 +806,7 @@ impl FragmentManager { } } - InflightGraphInfo::new(fragment_infos) + ActorInfos::new(fragment_infos) } async fn migrate_fragment_actors_inner( diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index bf658c9655d74..a5c0267b6b667 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -32,7 +32,7 @@ use tokio::sync::oneshot; use tokio::time::{sleep, Instant}; use tracing::warn; -use crate::barrier::{InflightSubscriptionInfo, Reschedule}; +use crate::barrier::Reschedule; use crate::controller::catalog::CatalogControllerRef; use crate::controller::cluster::{ClusterControllerRef, WorkerExtraInfo}; use crate::manager::{ @@ -733,15 +733,12 @@ impl MetadataManager { pub async fn all_node_actors( &self, include_inactive: bool, - subscription_info: &InflightSubscriptionInfo, + subscriptions: &HashMap>, ) -> MetaResult>> { match &self { MetadataManager::V1(mgr) => Ok(mgr .fragment_manager - .all_node_actors( - include_inactive, - &subscription_info.mv_depended_subscriptions, - ) + .all_node_actors(include_inactive, subscriptions) .await), MetadataManager::V2(mgr) => { let table_fragments = mgr.catalog_controller.table_fragments().await?; @@ -751,13 +748,11 @@ impl MetadataManager { let table_id = tf.table_id(); for (node_id, actors) in tf.worker_actors(include_inactive) { let node_actors = actor_maps.entry(node_id).or_insert_with(Vec::new); - node_actors.extend(actors.into_iter().map(|actor| { - to_build_actor_info( - actor, - &subscription_info.mv_depended_subscriptions, - table_id, - ) - })) + node_actors.extend( + actors + .into_iter() + .map(|actor| to_build_actor_info(actor, subscriptions, table_id)), + ) } } Ok(actor_maps)