Skip to content

Commit

Permalink
wait all exit
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jul 10, 2024
1 parent 5f5604a commit 739d0ea
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 3 deletions.
5 changes: 2 additions & 3 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,15 +456,14 @@ pub async fn compute_node_serve(
// Wait for the shutdown signal.
shutdown.cancelled().await;

// TODO(shutdown): gracefully unregister from the meta service (need to cautious since it may
// TODO(shutdown): gracefully unregister from the meta service (need to be cautious since it may
// trigger auto-scaling)
meta_client.try_unregister().await;

// NOTE(shutdown): We can't simply join the tonic server here because it only returns when all
// existing connections are closed, while we have long-running streaming calls that never
// close. From the other side, there's also no need to gracefully shutdown them if we have
// unregistered from the meta service.

meta_client.try_unregister().await;
}

/// Check whether the compute node has enough memory to perform computing tasks. Apart from storage,
Expand Down
8 changes: 8 additions & 0 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,14 @@ pub async fn start_service_as_election_leader(

// Wait for the shutdown signal.
shutdown.cancelled().await;

if election_client.is_leader() {
let res = metadata_manager.wait_till_all_worker_nodes_exit().await;
if let Err(e) = res {
tracing::error!(error = %e.as_report(), "failed to wait for all worker nodes to exit, directly shutdown");
}
}

// TODO(shutdown): may warn user if there's any other node still running in the cluster.
// TODO(shutdown): do we have any other shutdown tasks?
Ok(())
Expand Down
31 changes: 31 additions & 0 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,37 @@ impl MetadataManager {
}
}

pub async fn wait_till_all_worker_nodes_exit(&self) -> MetaResult<()> {
let mut interval = tokio::time::interval(Duration::from_secs(1));
let mut last_remaining = 0;

loop {
interval.tick().await;

let remaining = self
.list_worker_node(None, Some(State::Running))
.await?
.into_iter()
.filter(|w| {
matches!(
w.r#type(),
WorkerType::Frontend | WorkerType::ComputeNode | WorkerType::Compactor
)
})
.count();

if remaining == 0 {
tracing::info!("all worker nodes exited");
return Ok(());
}

if remaining != last_remaining {
last_remaining = remaining;
warn!(remaining, "waiting for all worker nodes to exit");
}
}
}

pub async fn subscribe_active_streaming_compute_nodes(
&self,
) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
Expand Down

0 comments on commit 739d0ea

Please sign in to comment.