diff --git a/src/meta/src/barrier/info.rs b/src/meta/src/barrier/info.rs index aadc7af88b06..43d2ac5d973a 100644 --- a/src/meta/src/barrier/info.rs +++ b/src/meta/src/barrier/info.rs @@ -18,7 +18,7 @@ use risingwave_common::catalog::TableId; use tracing::warn; use crate::barrier::Command; -use crate::manager::{ActorInfos, InflightFragmentInfo, WorkerId}; +use crate::manager::{InflightFragmentInfo, WorkerId}; use crate::model::{ActorId, FragmentId}; #[derive(Debug, Clone)] @@ -52,10 +52,10 @@ pub(super) struct InflightGraphInfo { impl InflightGraphInfo { /// Resolve inflight actor info from given nodes and actors that are loaded from meta store. It will be used during recovery to rebuild all streaming actors. - pub fn resolve(actor_infos: ActorInfos) -> Self { + pub fn new(fragment_infos: HashMap) -> Self { let actor_map = { let mut map: HashMap<_, HashSet<_>> = HashMap::new(); - for info in actor_infos.fragment_infos.values() { + for info in fragment_infos.values() { for (actor_id, worker_id) in &info.actors { map.entry(*worker_id).or_default().insert(*actor_id); } @@ -63,8 +63,7 @@ impl InflightGraphInfo { map }; - let actor_location_map = actor_infos - .fragment_infos + let actor_location_map = fragment_infos .values() .flat_map(|fragment| { fragment @@ -77,7 +76,7 @@ impl InflightGraphInfo { Self { actor_map, actor_location_map, - fragment_infos: actor_infos.fragment_infos, + fragment_infos, } } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index e0312ad57739..50fed20fd6f1 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -748,7 +748,7 @@ impl GlobalBarrierManager { Err(e) => { let failed_command = self.checkpoint_control.command_wait_collect_from_worker(worker_id); if failed_command.is_some() - || self.state.inflight_actor_infos.contains_worker(worker_id) { + || self.state.inflight_graph_info.contains_worker(worker_id) { let errors = self.control_stream_manager.collect_errors(worker_id, e).await; let err = merge_node_rpc_errors("get error from control stream", errors); if let Some(failed_command) = failed_command { @@ -799,7 +799,7 @@ impl GlobalBarrierManager { span, } = scheduled; - let (pre_applied_actor_info, pre_applied_subscription_info) = + let (pre_applied_graph_info, pre_applied_subscription_info) = self.state.apply_command(&command); let (prev_epoch, curr_epoch) = self.state.next_epoch_pair(); @@ -821,7 +821,7 @@ impl GlobalBarrierManager { let command_ctx = Arc::new(CommandContext::new( self.active_streaming_nodes.current().clone(), pre_applied_subscription_info, - pre_applied_actor_info.existing_table_ids().collect(), + pre_applied_graph_info.existing_table_ids().collect(), prev_epoch.clone(), curr_epoch.clone(), self.state.paused_reason(), @@ -835,8 +835,8 @@ impl GlobalBarrierManager { let node_to_collect = match self.control_stream_manager.inject_barrier( &command_ctx, - &pre_applied_actor_info, - Some(&self.state.inflight_actor_infos), + &pre_applied_graph_info, + Some(&self.state.inflight_graph_info), ) { Ok(node_to_collect) => node_to_collect, Err(err) => { @@ -1189,17 +1189,17 @@ impl GlobalBarrierManagerContext { /// Resolve actor information from cluster, fragment manager and `ChangedTableId`. /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor /// will create or drop before this barrier flow through them. - async fn resolve_actor_info(&self) -> MetaResult { + async fn resolve_graph_info(&self) -> MetaResult { let info = match &self.metadata_manager { MetadataManager::V1(mgr) => { let all_actor_infos = mgr.fragment_manager.load_all_actors().await; - InflightGraphInfo::resolve(all_actor_infos) + InflightGraphInfo::new(all_actor_infos.fragment_infos) } MetadataManager::V2(mgr) => { let all_actor_infos = mgr.catalog_controller.load_all_actors().await?; - InflightGraphInfo::resolve(all_actor_infos) + InflightGraphInfo::new(all_actor_infos.fragment_infos) } }; diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 236821d6a7a0..e5adf887b254 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -304,7 +304,7 @@ impl GlobalBarrierManager { warn!(error = %err.as_report(), "scale actors failed"); })?; - self.context.resolve_actor_info().await.inspect_err(|err| { + self.context.resolve_graph_info().await.inspect_err(|err| { warn!(error = %err.as_report(), "resolve actor info failed"); })? } else { @@ -322,7 +322,7 @@ impl GlobalBarrierManager { .pre_apply_drop_cancel(&self.scheduled_barriers) .await? { - info = self.context.resolve_actor_info().await.inspect_err(|err| { + info = self.context.resolve_graph_info().await.inspect_err(|err| { warn!(error = %err.as_report(), "resolve actor info failed"); })? } @@ -498,7 +498,7 @@ impl GlobalBarrierManagerContext { if expired_worker_slots.is_empty() { debug!("no expired worker slots, skipping."); - return self.resolve_actor_info().await; + return self.resolve_graph_info().await; } debug!("start migrate actors."); @@ -608,7 +608,7 @@ impl GlobalBarrierManagerContext { debug!("migrate actors succeed."); - self.resolve_actor_info().await + self.resolve_graph_info().await } /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated. @@ -618,7 +618,7 @@ impl GlobalBarrierManagerContext { ) -> MetaResult { let mgr = self.metadata_manager.as_v1_ref(); - let info = self.resolve_actor_info().await?; + let info = self.resolve_graph_info().await?; // 1. get expired workers. let expired_workers: HashSet = info @@ -646,7 +646,7 @@ impl GlobalBarrierManagerContext { migration_plan.delete(self.env.meta_store().as_kv()).await?; debug!("migrate actors succeed."); - self.resolve_actor_info().await + self.resolve_graph_info().await } async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> { @@ -821,7 +821,7 @@ impl GlobalBarrierManagerContext { } async fn scale_actors_v1(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> { - let info = self.resolve_actor_info().await?; + let info = self.resolve_graph_info().await?; let mgr = self.metadata_manager.as_v1_ref(); debug!("start resetting actors distribution"); diff --git a/src/meta/src/barrier/state.rs b/src/meta/src/barrier/state.rs index a9d228a56f8a..32f74cb88893 100644 --- a/src/meta/src/barrier/state.rs +++ b/src/meta/src/barrier/state.rs @@ -26,7 +26,7 @@ pub struct BarrierManagerState { in_flight_prev_epoch: TracedEpoch, /// Inflight running actors info. - pub(crate) inflight_actor_infos: InflightGraphInfo, + pub(crate) inflight_graph_info: InflightGraphInfo, inflight_subscription_info: InflightSubscriptionInfo, @@ -37,13 +37,13 @@ pub struct BarrierManagerState { impl BarrierManagerState { pub fn new( in_flight_prev_epoch: TracedEpoch, - inflight_actor_infos: InflightGraphInfo, + inflight_graph_info: InflightGraphInfo, inflight_subscription_info: InflightSubscriptionInfo, paused_reason: Option, ) -> Self { Self { in_flight_prev_epoch, - inflight_actor_infos, + inflight_graph_info, inflight_subscription_info, paused_reason, } @@ -80,18 +80,18 @@ impl BarrierManagerState { ) -> (InflightGraphInfo, InflightSubscriptionInfo) { // update the fragment_infos outside pre_apply let fragment_changes = if let Some(fragment_changes) = command.fragment_changes() { - self.inflight_actor_infos.pre_apply(&fragment_changes); + self.inflight_graph_info.pre_apply(&fragment_changes); Some(fragment_changes) } else { None }; self.inflight_subscription_info.pre_apply(command); - let info = self.inflight_actor_infos.clone(); + let info = self.inflight_graph_info.clone(); let subscription_info = self.inflight_subscription_info.clone(); if let Some(fragment_changes) = fragment_changes { - self.inflight_actor_infos.post_apply(&fragment_changes); + self.inflight_graph_info.post_apply(&fragment_changes); } self.inflight_subscription_info.post_apply(command);