Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Feb 4, 2024
1 parent 1f51cb3 commit 48b2833
Showing 1 changed file with 34 additions and 13 deletions.
47 changes: 34 additions & 13 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub struct MetadataManagerV2 {
pub(crate) enum ActiveStreamingWorkerChange {
Add(WorkerNode),
Remove(WorkerNode),
Update(WorkerNode),
}

pub struct ActiveStreamingWorkerNodes {
Expand Down Expand Up @@ -94,10 +95,23 @@ impl ActiveStreamingWorkerNodes {
.expect("notification stopped or uninitialized");
match notification {
LocalNotification::WorkerNodeDeleted(worker) => {
if worker.r#type != WorkerType::ComputeNode as i32
|| !worker.property.as_ref().unwrap().is_streaming
{
let is_streaming_compute_node = worker.r#type == WorkerType::ComputeNode as i32
|| worker.property.as_ref().unwrap().is_streaming;
let Some(prev_worker) = self.worker_nodes.remove(&worker.id) else {
if is_streaming_compute_node {
warn!(
?worker,
"notify to delete an non-existing streaming compute worker"
);
}
continue;
};
if !is_streaming_compute_node {
warn!(
?worker,
?prev_worker,
"deleted worker has a different recent type"
);
}
if worker.state == State::Starting as i32 {
warn!(
Expand All @@ -106,20 +120,23 @@ impl ActiveStreamingWorkerNodes {
state = worker.state,
"a starting streaming worker is deleted"
);
continue;
}
if self.worker_nodes.remove(&worker.id).is_none() {
warn!(?worker, "notify to delete an non-existing worker");
continue;
} else {
break ActiveStreamingWorkerChange::Remove(worker);
}
break ActiveStreamingWorkerChange::Remove(prev_worker);
}
LocalNotification::WorkerNodeActivated(worker) => {
if worker.r#type != WorkerType::ComputeNode as i32
|| !worker.property.as_ref().unwrap().is_streaming
{
continue;
if let Some(prev_worker) = self.worker_nodes.remove(&worker.id) {
warn!(
?worker,
?prev_worker,
"the type of a streaming worker is changed"
);
break ActiveStreamingWorkerChange::Remove(prev_worker);
} else {
continue;
}
}
assert_eq!(
worker.state,
Expand All @@ -134,9 +151,13 @@ impl ActiveStreamingWorkerNodes {
?prev_worker,
?worker,
eq = prev_worker == worker,
"notify to insert an existing active worker"
"notify to update an existing active worker"
);
continue;
if prev_worker == worker {
continue;
} else {
break ActiveStreamingWorkerChange::Update(worker);
}
} else {
break ActiveStreamingWorkerChange::Add(worker);
}
Expand Down

0 comments on commit 48b2833

Please sign in to comment.