From d2e8828b158b4db54d5f60a0f0afccf0ecd245c3 Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 9 Aug 2024 13:23:56 +0800 Subject: [PATCH] check on new worker map --- src/meta/src/barrier/info.rs | 10 ++++++++++ src/meta/src/barrier/mod.rs | 2 ++ 2 files changed, 12 insertions(+) diff --git a/src/meta/src/barrier/info.rs b/src/meta/src/barrier/info.rs index 43d2ac5d973aa..caf124113fc97 100644 --- a/src/meta/src/barrier/info.rs +++ b/src/meta/src/barrier/info.rs @@ -15,6 +15,7 @@ use std::collections::{HashMap, HashSet}; use risingwave_common::catalog::TableId; +use risingwave_pb::common::WorkerNode; use tracing::warn; use crate::barrier::Command; @@ -80,6 +81,15 @@ impl InflightGraphInfo { } } + /// Update worker nodes snapshot. We need to support incremental updates for it in the future. + pub fn on_new_worker_node_map(&mut self, node_map: &HashMap) { + for (node_id, actors) in &self.actor_map { + if !node_map.contains_key(node_id) { + warn!(node_id, ?actors, "node with running actors is deleted"); + } + } + } + /// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update /// the info correspondingly. pub(crate) fn pre_apply( diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 50fed20fd6f19..3110ca7c46d8c 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -718,6 +718,8 @@ impl GlobalBarrierManager { info!(?changed_worker, "worker changed"); + self.state.inflight_graph_info + .on_new_worker_node_map(self.active_streaming_nodes.current()); if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker { self.control_stream_manager.add_worker(node).await; }