From 262285d653400ade39e12479fd9a0f0d4d3df41c Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 8 Aug 2024 14:08:08 +0800 Subject: [PATCH] reduce use of node_map --- src/meta/src/barrier/rpc.rs | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 8d9a6c06571dc..fc125d6e583a4 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -257,16 +257,19 @@ impl ControlStreamManager { let mutation = command_context.to_mutation(); let mut node_need_collect = HashSet::new(); - for worker_id in pre_applied_graph_info.worker_ids() { - if !command_context.node_map.contains_key(&worker_id) { - return Err(anyhow!("worker id {} not exist", worker_id).into()); + for worker_id in pre_applied_graph_info.worker_ids().chain( + applied_graph_info + .into_iter() + .flat_map(|info| info.worker_ids()), + ) { + if !self.nodes.contains_key(&worker_id) { + return Err(anyhow!("unconnected worker node {}", worker_id).into()); } } - command_context - .node_map - .iter() - .map(|(node_id, worker_node)| { + self.nodes + .iter_mut() + .map(|(node_id, node)| { let actor_ids_to_send: Vec<_> = pre_applied_graph_info.actor_ids_to_send(*node_id).collect(); let actor_ids_to_collect: Vec<_> = pre_applied_graph_info @@ -286,15 +289,6 @@ impl ControlStreamManager { }; { - let Some(node) = self.nodes.get_mut(node_id) else { - if actor_ids_to_collect.is_empty() { - // Worker node get disconnected but has no actor to collect. Simply skip it. - return Ok(()); - } - return Err( - anyhow!("unconnected worker node: {:?}", worker_node.host).into() - ); - }; let mutation = mutation.clone(); let barrier = Barrier { epoch: Some(risingwave_pb::data::Epoch {