Skip to content

Commit

Permalink
fix(node): use spawn_blocking if dropping a ContainedNode from async …
Browse files Browse the repository at this point in the history
…context without prior call to shutdown
  • Loading branch information
qti3e committed May 10, 2024
1 parent 29f42e2 commit 70987e2
Showing 1 changed file with 29 additions and 7 deletions.
36 changes: 29 additions & 7 deletions core/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct ContainedNode<C: Collection> {
shutdown: ShutdownController,

/// A handle to the tokio runtime.
runtime: Runtime,
runtime: Option<Runtime>,

collection: PhantomData<C>,
}
Expand Down Expand Up @@ -71,7 +71,7 @@ impl<C: Collection> ContainedNode<C> {
Self {
name,
provider,
runtime,
runtime: Some(runtime),
shutdown,
collection: PhantomData,
}
Expand All @@ -82,7 +82,7 @@ impl<C: Collection> ContainedNode<C> {
pub fn spawn(&self) -> JoinHandle<Result<()>> {
let provider = self.provider.clone();

self.runtime.spawn_blocking(move || {
self.runtime.as_ref().unwrap().spawn_blocking(move || {
let graph = C::build_graph();
let mut provider = provider.get_local_provider();

Expand All @@ -107,8 +107,12 @@ impl<C: Collection> ContainedNode<C> {
/// In other words you can still trigger the shutdown event by calling this method and never
/// awaiting the returned future.
pub fn shutdown(mut self) -> impl Future<Output = ()> {
// Take state out (because we implement drop).
let mut shutdown = std::mem::take(&mut self.shutdown);
let runtime = self.runtime.take().unwrap();

// Tell the controller it's time to go down.
self.shutdown.trigger_shutdown();
shutdown.trigger_shutdown();

let task_name = format!("{}::RuntimeDrop", self.name);
let duration = Duration::from_secs(30);
Expand All @@ -122,7 +126,7 @@ impl<C: Collection> ContainedNode<C> {
.name(&task_name)
.spawn_blocking_on(
|| {
self.runtime.shutdown_background();
runtime.shutdown_background();
},
&handle,
)
Expand All @@ -136,7 +140,7 @@ impl<C: Collection> ContainedNode<C> {

async move {
for i in 0.. {
if timeout(Duration::from_secs(3), self.shutdown.wait_for_completion())
if timeout(Duration::from_secs(3), shutdown.wait_for_completion())
.await
.is_ok()
{
Expand Down Expand Up @@ -166,7 +170,7 @@ impl<C: Collection> ContainedNode<C> {
break;
}

let Some(iter) = self.shutdown.pending_backtraces() else {
let Some(iter) = shutdown.pending_backtraces() else {
continue;
};

Expand All @@ -188,3 +192,21 @@ impl<C: Collection> Default for ContainedNode<C> {
Self::new(fdi::MultiThreadedProvider::default(), None)
}
}

impl<C: Collection> Drop for ContainedNode<C> {
fn drop(&mut self) {
// if runtime doesn't exist it means `shutdown` has been called before.
if let Some(runtime) = self.runtime.take() {
self.shutdown.trigger_shutdown();

// If we're running within nested runtime env, dropping the runtime in async
// context would not be allowed by tokio. so we have to use spawn blocking
// here.
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn_blocking(move || {
runtime.shutdown_background();
});
}
}
}
}

0 comments on commit 70987e2

Please sign in to comment.