From 48b283360358f418d06f9da5a8ad7c29bc7f99ea Mon Sep 17 00:00:00 2001 From: William Wen Date: Sun, 4 Feb 2024 16:28:54 +0800 Subject: [PATCH] refine --- src/meta/src/manager/metadata.rs | 47 +++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 9090910b23fd8..eec27ee7a6104 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -58,6 +58,7 @@ pub struct MetadataManagerV2 { pub(crate) enum ActiveStreamingWorkerChange { Add(WorkerNode), Remove(WorkerNode), + Update(WorkerNode), } pub struct ActiveStreamingWorkerNodes { @@ -94,10 +95,23 @@ impl ActiveStreamingWorkerNodes { .expect("notification stopped or uninitialized"); match notification { LocalNotification::WorkerNodeDeleted(worker) => { - if worker.r#type != WorkerType::ComputeNode as i32 - || !worker.property.as_ref().unwrap().is_streaming - { + let is_streaming_compute_node = worker.r#type == WorkerType::ComputeNode as i32 + || worker.property.as_ref().unwrap().is_streaming; + let Some(prev_worker) = self.worker_nodes.remove(&worker.id) else { + if is_streaming_compute_node { + warn!( + ?worker, + "notify to delete an non-existing streaming compute worker" + ); + } continue; + }; + if !is_streaming_compute_node { + warn!( + ?worker, + ?prev_worker, + "deleted worker has a different recent type" + ); } if worker.state == State::Starting as i32 { warn!( @@ -106,20 +120,23 @@ impl ActiveStreamingWorkerNodes { state = worker.state, "a starting streaming worker is deleted" ); - continue; - } - if self.worker_nodes.remove(&worker.id).is_none() { - warn!(?worker, "notify to delete an non-existing worker"); - continue; - } else { - break ActiveStreamingWorkerChange::Remove(worker); } + break ActiveStreamingWorkerChange::Remove(prev_worker); } LocalNotification::WorkerNodeActivated(worker) => { if worker.r#type != WorkerType::ComputeNode as i32 || !worker.property.as_ref().unwrap().is_streaming { - continue; + if let Some(prev_worker) = self.worker_nodes.remove(&worker.id) { + warn!( + ?worker, + ?prev_worker, + "the type of a streaming worker is changed" + ); + break ActiveStreamingWorkerChange::Remove(prev_worker); + } else { + continue; + } } assert_eq!( worker.state, @@ -134,9 +151,13 @@ impl ActiveStreamingWorkerNodes { ?prev_worker, ?worker, eq = prev_worker == worker, - "notify to insert an existing active worker" + "notify to update an existing active worker" ); - continue; + if prev_worker == worker { + continue; + } else { + break ActiveStreamingWorkerChange::Update(worker); + } } else { break ActiveStreamingWorkerChange::Add(worker); }