diff --git a/src/meta/service/src/cluster_service.rs b/src/meta/service/src/cluster_service.rs index d69debbe90816..7a81098cdc67b 100644 --- a/src/meta/service/src/cluster_service.rs +++ b/src/meta/service/src/cluster_service.rs @@ -145,7 +145,7 @@ impl ClusterService for ClusterServiceImpl { let _ = mgr.cluster_manager.delete_worker_node(host).await?; } MetadataManager::V2(mgr) => { - let _ = mgr.cluster_controller.delete_worker(host).await?; + mgr.cluster_controller.delete_worker(host).await?; } } diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs index 01bf28643bb66..41654192a58c8 100644 --- a/src/meta/src/controller/cluster.rs +++ b/src/meta/src/controller/cluster.rs @@ -175,7 +175,7 @@ impl ClusterController { Ok(()) } - pub async fn delete_worker(&self, host_address: HostAddress) -> MetaResult { + pub async fn delete_worker(&self, host_address: HostAddress) -> MetaResult<()> { let mut inner = self.inner.write().await; let worker = inner.delete_worker(host_address).await?; if worker.r#type() == PbWorkerType::ComputeNode { @@ -190,10 +190,10 @@ impl ClusterController { // local notification. self.env .notification_manager() - .notify_local_subscribers(LocalNotification::WorkerNodeDeleted(worker.clone())) + .notify_local_subscribers(LocalNotification::WorkerNodeDeleted(worker)) .await; - Ok(worker) + Ok(()) } pub async fn update_schedulability( @@ -262,11 +262,12 @@ impl ClusterController { // 3. Delete expired workers. let worker_infos = match Worker::find() .select_only() + .column(worker::Column::WorkerId) .column(worker::Column::WorkerType) .column(worker::Column::Host) .column(worker::Column::Port) .filter(worker::Column::WorkerId.is_in(worker_to_delete.clone())) - .into_tuple::<(WorkerType, String, i32)>() + .into_tuple::<(WorkerId, WorkerType, String, i32)>() .all(&inner.db) .await { @@ -276,33 +277,36 @@ impl ClusterController { continue; } }; + drop(inner); - if let Err(err) = Worker::delete_many() - .filter(worker::Column::WorkerId.is_in(worker_to_delete)) - .exec(&inner.db) - .await - { - tracing::warn!(error = %err.as_report(), "Failed to delete expire workers from db"); - continue; - } - - for (worker_type, host, port) in worker_infos { - match worker_type { - WorkerType::Frontend - | WorkerType::ComputeNode - | WorkerType::Compactor - | WorkerType::RiseCtl => { - cluster_controller - .env - .notification_manager() - .delete_sender( - worker_type.into(), - WorkerKey(HostAddress { host, port }), - ) - .await + for (worker_id, worker_type, host, port) in worker_infos { + let host_addr = PbHostAddress { host, port }; + match cluster_controller.delete_worker(host_addr.clone()).await { + Ok(_) => { + tracing::warn!( + worker_id, + ?host_addr, + %now, + "Deleted expired worker" + ); + match worker_type { + WorkerType::Frontend + | WorkerType::ComputeNode + | WorkerType::Compactor + | WorkerType::RiseCtl => { + cluster_controller + .env + .notification_manager() + .delete_sender(worker_type.into(), WorkerKey(host_addr)) + .await + } + _ => {} + }; } - _ => {} - }; + Err(err) => { + tracing::warn!(error = %err.as_report(), "Failed to delete expire worker from db"); + } + } } } });