From 739d0ea00cc469fd1f8272f1b4de1779af57bd7a Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 10 Jul 2024 13:46:46 +0800 Subject: [PATCH] wait all exit Signed-off-by: Bugen Zhao --- src/compute/src/server.rs | 5 ++--- src/meta/node/src/server.rs | 8 ++++++++ src/meta/src/manager/metadata.rs | 31 +++++++++++++++++++++++++++++++ 3 files changed, 41 insertions(+), 3 deletions(-) diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index abf7e4f0949d..0b4e60fef35a 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -456,15 +456,14 @@ pub async fn compute_node_serve( // Wait for the shutdown signal. shutdown.cancelled().await; - // TODO(shutdown): gracefully unregister from the meta service (need to cautious since it may + // TODO(shutdown): gracefully unregister from the meta service (need to be cautious since it may // trigger auto-scaling) + meta_client.try_unregister().await; // NOTE(shutdown): We can't simply join the tonic server here because it only returns when all // existing connections are closed, while we have long-running streaming calls that never // close. From the other side, there's also no need to gracefully shutdown them if we have // unregistered from the meta service. - - meta_client.try_unregister().await; } /// Check whether the compute node has enough memory to perform computing tasks. Apart from storage, diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 55f069e5e010..6b89b5be4330 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -795,6 +795,14 @@ pub async fn start_service_as_election_leader( // Wait for the shutdown signal. shutdown.cancelled().await; + + if election_client.is_leader() { + let res = metadata_manager.wait_till_all_worker_nodes_exit().await; + if let Err(e) = res { + tracing::error!(error = %e.as_report(), "failed to wait for all worker nodes to exit, directly shutdown"); + } + } + // TODO(shutdown): may warn user if there's any other node still running in the cluster. // TODO(shutdown): do we have any other shutdown tasks? Ok(()) diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 241a47941755..966f3640c318 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -317,6 +317,37 @@ impl MetadataManager { } } + pub async fn wait_till_all_worker_nodes_exit(&self) -> MetaResult<()> { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + let mut last_remaining = 0; + + loop { + interval.tick().await; + + let remaining = self + .list_worker_node(None, Some(State::Running)) + .await? + .into_iter() + .filter(|w| { + matches!( + w.r#type(), + WorkerType::Frontend | WorkerType::ComputeNode | WorkerType::Compactor + ) + }) + .count(); + + if remaining == 0 { + tracing::info!("all worker nodes exited"); + return Ok(()); + } + + if remaining != last_remaining { + last_remaining = remaining; + warn!(remaining, "waiting for all worker nodes to exit"); + } + } + } + pub async fn subscribe_active_streaming_compute_nodes( &self, ) -> MetaResult<(Vec, UnboundedReceiver)> {