Skip to content

Commit

Permalink
chore(infra_utils): add spawn_monitored_task
Browse files Browse the repository at this point in the history
commit-id:817cbd25
  • Loading branch information
Itay-Tsabary-Starkware committed Dec 25, 2024
1 parent 3b486e4 commit 82bfaf4
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions crates/infra_utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ workspace = true
[dependencies]
num-traits.workspace = true
regex.workspace = true
tokio = { workspace = true, features = ["process", "time"] }
tokio = { workspace = true, features = ["process", "rt", "time"] }
tracing.workspace = true

[dev-dependencies]
nix.workspace = true
pretty_assertions.workspace = true
rstest.workspace = true
tokio = { workspace = true, features = ["macros", "rt", "sync"] }
tokio = { workspace = true, features = ["macros", "rt", "signal", "sync"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
1 change: 1 addition & 0 deletions crates/infra_utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ pub mod command;
pub mod metrics;
pub mod path;
pub mod run_until;
pub mod tasks;
pub mod tracing;
pub mod type_name;
63 changes: 63 additions & 0 deletions crates/infra_utils/src/tasks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use std::future::Future;

use tokio::task::JoinHandle;
use tracing::error;

#[cfg(test)]
#[path = "tasks_test.rs"]
mod tasks_test;

/// Spawns a monitored asynchronous task in Tokio.
///
/// This function spawns two tasks:
/// 1. The first task executes the provided future.
/// 2. The second task awaits the completion of the first task.
/// - If the first task completes successfully, then it returns its result.
/// - If the first task panics, it logs the error and terminates the process with exit code 1.
///
/// # Type Parameters
///
/// - `F`: The type of the future to be executed. Must implement `Future` and be `Send + 'static`.
/// - `T`: The output type of the future. Must be `Send + 'static`.
///
/// # Arguments
///
/// - `future`: The future to be executed by the spawned task.
///
/// # Returns
///
/// A `JoinHandle<T>` of the second monitoring task.
pub fn spawn_with_exit_on_panic<F, T>(future: F) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
inner_spawn_with_exit_on_panic(future, exit_process)
}

// Use an inner function to enable injecting the exit function for testing.
pub(crate) fn inner_spawn_with_exit_on_panic<F, E, T>(future: F, on_exit_f: E) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
E: FnOnce() + Send + 'static,
T: Send + 'static,
{
// Spawn the first task to execute the future
let monitored_task = tokio::spawn(future);

// Spawn the second task to await the first task and assert its completion
tokio::spawn(async move {
match monitored_task.await {
Ok(res) => res,
Err(err) => {
error!("Monitored task failed: {:?}", err);
on_exit_f();
unreachable!()
}
}
})
}

pub(crate) fn exit_process() {
std::process::exit(1);
}
48 changes: 48 additions & 0 deletions crates/infra_utils/src/tasks_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use rstest::rstest;
use tokio::signal::unix::{signal, SignalKind};
use tokio::time::{sleep, timeout, Duration};

use crate::tasks::{inner_spawn_with_exit_on_panic, spawn_with_exit_on_panic};

#[rstest]
#[tokio::test]
async fn test_spawn_with_exit_on_panic_success() {
let handle = spawn_with_exit_on_panic(async {
sleep(Duration::from_millis(10)).await;
});

// Await the monitoring task
handle.await.unwrap();
}

#[rstest]
#[tokio::test]
async fn test_spawn_with_exit_on_panic_failure() {
// Mock exit process function: instead of calling `std::process::exit(1)`, send 'SIGTERM' to
// self.
let mock_exit_process = || {
// Use fully-qualified nix modules to avoid ambiguity with the tokio ones.
let pid = nix::unistd::getpid();
nix::sys::signal::kill(pid, nix::sys::signal::Signal::SIGTERM)
.expect("Failed to send signal");
};

// Set up a SIGTERM handler.
let mut sigterm = signal(SignalKind::terminate()).expect("Failed to set up SIGTERM handler");

// Spawn a task that panics, and uses the SIGTERM mocked exit process function.
inner_spawn_with_exit_on_panic(
async {
panic!("This task will fail!");
},
mock_exit_process,
);

// Assert the task failure is detected and that the mocked exit process function is called by
// awaiting for the SIGTERM signal. Bound the timeout to ensure the test does not hang
// indefinitely.
assert!(
timeout(Duration::from_millis(10), sigterm.recv()).await.is_ok(),
"Did not receive SIGTERM signal."
);
}

0 comments on commit 82bfaf4

Please sign in to comment.