diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 835947b3e649e..427cc392adef9 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -584,6 +584,7 @@ impl GlobalBarrierManager { if self .checkpoint_control .can_inject_barrier(self.in_flight_barrier_nums) => { + self.active_streaming_nodes.sync().await; self.handle_new_barrier(scheduled); } } diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index ff55a0bad4f98..305443be1495c 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap, HashSet}; +use std::future::pending; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_meta_model_v2::SourceId; @@ -92,13 +93,26 @@ impl ActiveStreamingWorkerNodes { &self.worker_nodes } + pub(crate) async fn sync(&mut self) { + let workers = self + ._meta_manager + .list_active_streaming_compute_nodes() + .await + .unwrap(); + self.worker_nodes = workers.into_iter().map(|node| (node.id, node)).collect(); + } + pub(crate) async fn changed(&mut self) -> ActiveStreamingWorkerChange { + let temp = 1; let ret = loop { let notification = self .rx .recv() .await .expect("notification stopped or uninitialized"); + if temp == 1 { + return pending().await; + } match notification { LocalNotification::WorkerNodeDeleted(worker) => { let is_streaming_compute_node = worker.r#type == WorkerType::ComputeNode as i32