Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Aug 7, 2024
1 parent 911a072 commit 5c5dcca
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 27 deletions.
11 changes: 5 additions & 6 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use risingwave_common::catalog::TableId;
use tracing::warn;

use crate::barrier::Command;
use crate::manager::{ActorInfos, InflightFragmentInfo, WorkerId};
use crate::manager::{InflightFragmentInfo, WorkerId};
use crate::model::{ActorId, FragmentId};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -52,19 +52,18 @@ pub(super) struct InflightGraphInfo {

impl InflightGraphInfo {
/// Resolve inflight actor info from given nodes and actors that are loaded from meta store. It will be used during recovery to rebuild all streaming actors.
pub fn resolve(actor_infos: ActorInfos) -> Self {
pub fn new(fragment_infos: HashMap<FragmentId, InflightFragmentInfo>) -> Self {
let actor_map = {
let mut map: HashMap<_, HashSet<_>> = HashMap::new();
for info in actor_infos.fragment_infos.values() {
for info in fragment_infos.values() {
for (actor_id, worker_id) in &info.actors {
map.entry(*worker_id).or_default().insert(*actor_id);
}
}
map
};

let actor_location_map = actor_infos
.fragment_infos
let actor_location_map = fragment_infos
.values()
.flat_map(|fragment| {
fragment
Expand All @@ -77,7 +76,7 @@ impl InflightGraphInfo {
Self {
actor_map,
actor_location_map,
fragment_infos: actor_infos.fragment_infos,
fragment_infos,
}
}

Expand Down
16 changes: 8 additions & 8 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ impl GlobalBarrierManager {
Err(e) => {
let failed_command = self.checkpoint_control.command_wait_collect_from_worker(worker_id);
if failed_command.is_some()
|| self.state.inflight_actor_infos.contains_worker(worker_id) {
|| self.state.inflight_graph_info.contains_worker(worker_id) {
let errors = self.control_stream_manager.collect_errors(worker_id, e).await;
let err = merge_node_rpc_errors("get error from control stream", errors);
if let Some(failed_command) = failed_command {
Expand Down Expand Up @@ -799,7 +799,7 @@ impl GlobalBarrierManager {
span,
} = scheduled;

let (pre_applied_actor_info, pre_applied_subscription_info) =
let (pre_applied_graph_info, pre_applied_subscription_info) =
self.state.apply_command(&command);

let (prev_epoch, curr_epoch) = self.state.next_epoch_pair();
Expand All @@ -821,7 +821,7 @@ impl GlobalBarrierManager {
let command_ctx = Arc::new(CommandContext::new(
self.active_streaming_nodes.current().clone(),
pre_applied_subscription_info,
pre_applied_actor_info.existing_table_ids().collect(),
pre_applied_graph_info.existing_table_ids().collect(),
prev_epoch.clone(),
curr_epoch.clone(),
self.state.paused_reason(),
Expand All @@ -835,8 +835,8 @@ impl GlobalBarrierManager {

let node_to_collect = match self.control_stream_manager.inject_barrier(
&command_ctx,
&pre_applied_actor_info,
Some(&self.state.inflight_actor_infos),
&pre_applied_graph_info,
Some(&self.state.inflight_graph_info),
) {
Ok(node_to_collect) => node_to_collect,
Err(err) => {
Expand Down Expand Up @@ -1189,17 +1189,17 @@ impl GlobalBarrierManagerContext {
/// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
/// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor
/// will create or drop before this barrier flow through them.
async fn resolve_actor_info(&self) -> MetaResult<InflightGraphInfo> {
async fn resolve_graph_info(&self) -> MetaResult<InflightGraphInfo> {
let info = match &self.metadata_manager {
MetadataManager::V1(mgr) => {
let all_actor_infos = mgr.fragment_manager.load_all_actors().await;

InflightGraphInfo::resolve(all_actor_infos)
InflightGraphInfo::new(all_actor_infos.fragment_infos)
}
MetadataManager::V2(mgr) => {
let all_actor_infos = mgr.catalog_controller.load_all_actors().await?;

InflightGraphInfo::resolve(all_actor_infos)
InflightGraphInfo::new(all_actor_infos.fragment_infos)
}
};

Expand Down
14 changes: 7 additions & 7 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ impl GlobalBarrierManager {
warn!(error = %err.as_report(), "scale actors failed");
})?;

self.context.resolve_actor_info().await.inspect_err(|err| {
self.context.resolve_graph_info().await.inspect_err(|err| {
warn!(error = %err.as_report(), "resolve actor info failed");
})?
} else {
Expand All @@ -322,7 +322,7 @@ impl GlobalBarrierManager {
.pre_apply_drop_cancel(&self.scheduled_barriers)
.await?
{
info = self.context.resolve_actor_info().await.inspect_err(|err| {
info = self.context.resolve_graph_info().await.inspect_err(|err| {
warn!(error = %err.as_report(), "resolve actor info failed");
})?
}
Expand Down Expand Up @@ -498,7 +498,7 @@ impl GlobalBarrierManagerContext {

if expired_worker_slots.is_empty() {
debug!("no expired worker slots, skipping.");
return self.resolve_actor_info().await;
return self.resolve_graph_info().await;
}

debug!("start migrate actors.");
Expand Down Expand Up @@ -608,7 +608,7 @@ impl GlobalBarrierManagerContext {

debug!("migrate actors succeed.");

self.resolve_actor_info().await
self.resolve_graph_info().await
}

/// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated.
Expand All @@ -618,7 +618,7 @@ impl GlobalBarrierManagerContext {
) -> MetaResult<InflightGraphInfo> {
let mgr = self.metadata_manager.as_v1_ref();

let info = self.resolve_actor_info().await?;
let info = self.resolve_graph_info().await?;

// 1. get expired workers.
let expired_workers: HashSet<WorkerId> = info
Expand Down Expand Up @@ -646,7 +646,7 @@ impl GlobalBarrierManagerContext {
migration_plan.delete(self.env.meta_store().as_kv()).await?;
debug!("migrate actors succeed.");

self.resolve_actor_info().await
self.resolve_graph_info().await
}

async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> {
Expand Down Expand Up @@ -821,7 +821,7 @@ impl GlobalBarrierManagerContext {
}

async fn scale_actors_v1(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> {
let info = self.resolve_actor_info().await?;
let info = self.resolve_graph_info().await?;

let mgr = self.metadata_manager.as_v1_ref();
debug!("start resetting actors distribution");
Expand Down
12 changes: 6 additions & 6 deletions src/meta/src/barrier/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct BarrierManagerState {
in_flight_prev_epoch: TracedEpoch,

/// Inflight running actors info.
pub(crate) inflight_actor_infos: InflightGraphInfo,
pub(crate) inflight_graph_info: InflightGraphInfo,

inflight_subscription_info: InflightSubscriptionInfo,

Expand All @@ -37,13 +37,13 @@ pub struct BarrierManagerState {
impl BarrierManagerState {
pub fn new(
in_flight_prev_epoch: TracedEpoch,
inflight_actor_infos: InflightGraphInfo,
inflight_graph_info: InflightGraphInfo,
inflight_subscription_info: InflightSubscriptionInfo,
paused_reason: Option<PausedReason>,
) -> Self {
Self {
in_flight_prev_epoch,
inflight_actor_infos,
inflight_graph_info,
inflight_subscription_info,
paused_reason,
}
Expand Down Expand Up @@ -80,18 +80,18 @@ impl BarrierManagerState {
) -> (InflightGraphInfo, InflightSubscriptionInfo) {
// update the fragment_infos outside pre_apply
let fragment_changes = if let Some(fragment_changes) = command.fragment_changes() {
self.inflight_actor_infos.pre_apply(&fragment_changes);
self.inflight_graph_info.pre_apply(&fragment_changes);
Some(fragment_changes)
} else {
None
};
self.inflight_subscription_info.pre_apply(command);

let info = self.inflight_actor_infos.clone();
let info = self.inflight_graph_info.clone();
let subscription_info = self.inflight_subscription_info.clone();

if let Some(fragment_changes) = fragment_changes {
self.inflight_actor_infos.post_apply(&fragment_changes);
self.inflight_graph_info.post_apply(&fragment_changes);
}
self.inflight_subscription_info.post_apply(command);

Expand Down

0 comments on commit 5c5dcca

Please sign in to comment.