Skip to content

Commit

Permalink
fix: clean dirty state and notify for expired worker nodes in cluster…
Browse files Browse the repository at this point in the history
… controller (#16901)

Co-authored-by: Bugen Zhao <[email protected]>
  • Loading branch information
yezizp2012 and BugenZhao authored May 24, 2024
1 parent 7bd4e04 commit e7c1035
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 30 deletions.
2 changes: 1 addition & 1 deletion src/meta/service/src/cluster_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl ClusterService for ClusterServiceImpl {
let _ = mgr.cluster_manager.delete_worker_node(host).await?;
}
MetadataManager::V2(mgr) => {
let _ = mgr.cluster_controller.delete_worker(host).await?;
mgr.cluster_controller.delete_worker(host).await?;
}
}

Expand Down
62 changes: 33 additions & 29 deletions src/meta/src/controller/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl ClusterController {
Ok(())
}

pub async fn delete_worker(&self, host_address: HostAddress) -> MetaResult<PbWorkerNode> {
pub async fn delete_worker(&self, host_address: HostAddress) -> MetaResult<()> {
let mut inner = self.inner.write().await;
let worker = inner.delete_worker(host_address).await?;
if worker.r#type() == PbWorkerType::ComputeNode {
Expand All @@ -190,10 +190,10 @@ impl ClusterController {
// local notification.
self.env
.notification_manager()
.notify_local_subscribers(LocalNotification::WorkerNodeDeleted(worker.clone()))
.notify_local_subscribers(LocalNotification::WorkerNodeDeleted(worker))
.await;

Ok(worker)
Ok(())
}

pub async fn update_schedulability(
Expand Down Expand Up @@ -262,11 +262,12 @@ impl ClusterController {
// 3. Delete expired workers.
let worker_infos = match Worker::find()
.select_only()
.column(worker::Column::WorkerId)
.column(worker::Column::WorkerType)
.column(worker::Column::Host)
.column(worker::Column::Port)
.filter(worker::Column::WorkerId.is_in(worker_to_delete.clone()))
.into_tuple::<(WorkerType, String, i32)>()
.into_tuple::<(WorkerId, WorkerType, String, i32)>()
.all(&inner.db)
.await
{
Expand All @@ -276,33 +277,36 @@ impl ClusterController {
continue;
}
};
drop(inner);

if let Err(err) = Worker::delete_many()
.filter(worker::Column::WorkerId.is_in(worker_to_delete))
.exec(&inner.db)
.await
{
tracing::warn!(error = %err.as_report(), "Failed to delete expire workers from db");
continue;
}

for (worker_type, host, port) in worker_infos {
match worker_type {
WorkerType::Frontend
| WorkerType::ComputeNode
| WorkerType::Compactor
| WorkerType::RiseCtl => {
cluster_controller
.env
.notification_manager()
.delete_sender(
worker_type.into(),
WorkerKey(HostAddress { host, port }),
)
.await
for (worker_id, worker_type, host, port) in worker_infos {
let host_addr = PbHostAddress { host, port };
match cluster_controller.delete_worker(host_addr.clone()).await {
Ok(_) => {
tracing::warn!(
worker_id,
?host_addr,
%now,
"Deleted expired worker"
);
match worker_type {
WorkerType::Frontend
| WorkerType::ComputeNode
| WorkerType::Compactor
| WorkerType::RiseCtl => {
cluster_controller
.env
.notification_manager()
.delete_sender(worker_type.into(), WorkerKey(host_addr))
.await
}
_ => {}
};
}
_ => {}
};
Err(err) => {
tracing::warn!(error = %err.as_report(), "Failed to delete expire worker from db");
}
}
}
}
});
Expand Down

0 comments on commit e7c1035

Please sign in to comment.