Skip to content

Commit

Permalink
check on new worker map
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Aug 9, 2024
1 parent 262285d commit d2e8828
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 0 deletions.
10 changes: 10 additions & 0 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::{HashMap, HashSet};

use risingwave_common::catalog::TableId;
use risingwave_pb::common::WorkerNode;
use tracing::warn;

use crate::barrier::Command;
Expand Down Expand Up @@ -80,6 +81,15 @@ impl InflightGraphInfo {
}
}

/// Update worker nodes snapshot. We need to support incremental updates for it in the future.
pub fn on_new_worker_node_map(&mut self, node_map: &HashMap<WorkerId, WorkerNode>) {
for (node_id, actors) in &self.actor_map {
if !node_map.contains_key(node_id) {
warn!(node_id, ?actors, "node with running actors is deleted");
}
}
}

/// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update
/// the info correspondingly.
pub(crate) fn pre_apply(
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,8 @@ impl GlobalBarrierManager {

info!(?changed_worker, "worker changed");

self.state.inflight_graph_info
.on_new_worker_node_map(self.active_streaming_nodes.current());
if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker {
self.control_stream_manager.add_worker(node).await;
}
Expand Down

0 comments on commit d2e8828

Please sign in to comment.