From e2062dae416f80d86add65dd917008fd5a9c8af1 Mon Sep 17 00:00:00 2001 From: Parsa Ghadimi Date: Fri, 10 May 2024 17:17:39 -0400 Subject: [PATCH] fix(node): wait for drop to complete --- core/node/src/lib.rs | 50 +++++++++++++++++++------------------------- 1 file changed, 22 insertions(+), 28 deletions(-) diff --git a/core/node/src/lib.rs b/core/node/src/lib.rs index 80cbb6252..64a4ce87a 100644 --- a/core/node/src/lib.rs +++ b/core/node/src/lib.rs @@ -6,7 +6,7 @@ use std::time::Duration; use anyhow::Result; use lightning_interfaces::prelude::*; use lightning_interfaces::ShutdownController; -use tokio::runtime::Runtime; +use tokio::runtime::{Handle, Runtime}; use tokio::task::JoinHandle; use tokio::time::timeout; @@ -107,36 +107,16 @@ impl ContainedNode { /// In other words you can still trigger the shutdown event by calling this method and never /// awaiting the returned future. pub fn shutdown(mut self) -> impl Future { + let handle = + Handle::try_current().expect("calling from a non-tokio context not supported yet."); + // Take state out (because we implement drop). let mut shutdown = std::mem::take(&mut self.shutdown); - let runtime = self.runtime.take().unwrap(); // Tell the controller it's time to go down. shutdown.trigger_shutdown(); let task_name = format!("{}::RuntimeDrop", self.name); - let duration = Duration::from_secs(30); - - // Give the runtime 30 seconds to stop. - match tokio::runtime::Handle::try_current() { - Ok(handle) => { - tokio::spawn(async move { - tokio::time::sleep(duration).await; - tokio::task::Builder::new() - .name(&task_name) - .spawn_blocking_on( - || { - runtime.shutdown_background(); - }, - &handle, - ) - .unwrap(); - }); - }, - Err(_) => { - todo!("Not yet supported outside a tokio runtime."); - }, - }; async move { for i in 0.. { @@ -145,7 +125,7 @@ impl ContainedNode { .is_ok() { // shutdown completed. - return; + break; } match i { @@ -165,8 +145,9 @@ impl ContainedNode { }, } - if i == 10 { - // 33s + if i == 9 { + // 30s: timeout + tracing::error!("Shutdown timed out. Force killing the runtime."); break; } @@ -178,6 +159,19 @@ impl ContainedNode { eprintln!("Pending task backtrace #{i}:\n{trace:#?}"); } } + + let runtime = self.runtime.take().unwrap(); + tokio::task::Builder::new() + .name(&task_name) + .spawn_blocking_on( + || { + drop(runtime); + }, + &handle, + ) + .unwrap() + .await + .expect("Failed to wait for the node runtime to drop."); } } @@ -204,7 +198,7 @@ impl Drop for ContainedNode { // here. if let Ok(handle) = tokio::runtime::Handle::try_current() { handle.spawn_blocking(move || { - runtime.shutdown_background(); + drop(runtime); }); } }