From 381e7f5ecbef63e40f275c0128ac24a5ae71e2d4 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Fri, 22 Sep 2023 22:13:29 -0700 Subject: [PATCH] Add a way to run without the async-process thread I know I said that I wouldn't add any more features, but I think this is important enough. Right now, a thread called "async-process" is responsible for listening for SIGCHLD and reaping zombie processes. This listens for the SIGCHLD signal in Unix and uses a channel connected to the waitable handle on Windows. While this works, we can do better. Through async-signal, the signal was already asynchronous on Unix; we were already just using async_io::block_on to wait on the signal. After swapping out the channel used on Windows with async-channel, the process reaping function "reap" can be reimplemented as a fully asynchronous future. From here we must make sure this future is being polled at all times. To facilitate this, a function named "driver()" is added to the public API. This future acquires a lock on the reaper structure and calls the "reap()" future indefinitely. Multiple drivers can be created at once; they will just wait forever on this lock. This future is intended to be spawned onto an executor and left to run forever, making sure all child processes are signalled whenever necessary. If no tasks are running the driver future, the "async-process" thread is spawned and runs the "reap()" future itself. I've added the following controls to make sure that this system is robust: - If a "driver" task is dropped, another "driver" task will acquire the lock and keep the reaper active. - Before being dropped, the task checks to see if it is the last driver. If it is, it will spawn the "async-process" thread to be the driver. - When a Child is being created, it checks if there are any active drivers. If there are none, it spawns the "async-process" thread itself. - One concern is that the driver future wil try to spawn the "async-process" thread as the application exits and the task is being dropped, which will be unnecessary and lead to slower shutdowns. To prevent this, the future checks to see if there are any extant `Child` instances (a new refcount is added to Reaper to facilitate this). If there are none, and if there are no zombie processes, it does not spawn the additional thread. - Someone can still `mem::forget()` the driver thread. This does not lead to undefined behavior and just leads to processes being left dangling. At this point they're asking for wacky behavior. This strategy might also be viable for `async-io`, if we want to try to avoid needing to spawn the additional thread there as well. Closes #7 cc smol-rs/async-io#40 Signed-off-by: John Nunley --- Cargo.toml | 2 + src/lib.rs | 205 +++++++++++++++++++++++++++++++++++++++++++-------- tests/std.rs | 19 +++++ 3 files changed, 196 insertions(+), 30 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 86f279a..52660a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ async-signal = "0.2.0" rustix = { version = "0.38", default-features = false, features = ["std", "fs"] } [target.'cfg(windows)'.dependencies] +async-channel = "1.9.0" blocking = "1.0.0" [target.'cfg(windows)'.dependencies.windows-sys] @@ -37,4 +38,5 @@ features = [ ] [dev-dependencies] +async-executor = "1.5.1" async-io = "1.8" diff --git a/src/lib.rs b/src/lib.rs index 6cacc7b..c2abead 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,11 +56,14 @@ html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" )] +use std::convert::Infallible; use std::ffi::OsStr; use std::fmt; +use std::mem; use std::path::Path; use std::pin::Pin; -use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, Once}; use std::task::{Context, Poll}; use std::thread; @@ -74,7 +77,7 @@ use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd}; #[cfg(windows)] use blocking::Unblock; -use async_lock::OnceCell; +use async_lock::{Mutex as AsyncMutex, OnceCell}; use event_listener::{Event, EventListener}; use futures_lite::{future, io, prelude::*}; @@ -102,6 +105,23 @@ struct Reaper { /// The pipe that delivers signal notifications. pipe: Pipe, + + /// Locking this mutex indicates that we are polling the SIGCHLD event. + driver_guard: AsyncMutex<()>, + + /// The number of tasks polling the SIGCHLD event. + /// + /// If this is zero, the `async-process` thread must be spawned. + drivers: AtomicUsize, + + /// Ensures the `async-process` thread is only ever spawned once. + async_process_thread: Once, + + /// Number of live `Child` instances currently running. + /// + /// This is used to prevent the reaper thread from being spawned right as the program closes, + /// when the reaper thread isn't needed. This represents the number of active processes. + child_count: AtomicUsize, } impl Reaper { @@ -109,53 +129,96 @@ impl Reaper { fn get() -> &'static Self { static REAPER: OnceCell = OnceCell::new(); - REAPER.get_or_init_blocking(|| { + REAPER.get_or_init_blocking(|| Reaper { + sigchld: Event::new(), + zombies: Mutex::new(Vec::new()), + pipe: Pipe::new().expect("cannot create SIGCHLD pipe"), + driver_guard: AsyncMutex::new(()), + drivers: AtomicUsize::new(0), + async_process_thread: Once::new(), + child_count: AtomicUsize::new(0), + }) + } + + /// Ensure that the reaper is driven. + /// + /// If there are no active `driver()` callers, this will spawn the `async-process` thread. + #[inline] + fn ensure_driven(&'static self) { + if self.drivers.load(Ordering::Acquire) == 0 { + self.start_driver_thread(); + } + } + + /// Start the `async-process` thread. + #[cold] + fn start_driver_thread(&'static self) { + self.async_process_thread.call_once(move || { thread::Builder::new() .name("async-process".to_string()) - .spawn(|| REAPER.wait_blocking().reap()) + .spawn(move || { + let driver = async move { + self.drivers.fetch_add(1, Ordering::SeqCst); + let guard = self.driver_guard.lock().await; + self.reap(guard).await + }; + + #[cfg(unix)] + async_io::block_on(driver); + + #[cfg(not(unix))] + future::block_on(driver); + }) .expect("cannot spawn async-process thread"); - - Reaper { - sigchld: Event::new(), - zombies: Mutex::new(Vec::new()), - pipe: Pipe::new().expect("cannot create SIGCHLD pipe"), - } - }) + }); } /// Reap zombie processes forever. - fn reap(&'static self) -> ! { + async fn reap(&'static self, _driver_guard: async_lock::MutexGuard<'_, ()>) -> ! { loop { // Wait for the next SIGCHLD signal. - self.pipe.wait(); + self.pipe.wait().await; // Notify all listeners waiting on the SIGCHLD event. self.sigchld.notify(std::usize::MAX); - // Reap zombie processes. - let mut zombies = self.zombies.lock().unwrap(); + // Reap zombie processes, but make sure we don't hold onto the lock for too long! + let mut zombies = mem::take(&mut *self.zombies.lock().unwrap()); let mut i = 0; - while i < zombies.len() { - if let Ok(None) = zombies[i].try_wait() { - i += 1; - } else { - zombies.swap_remove(i); + 'reap_zombies: loop { + for _ in 0..50 { + if i >= zombies.len() { + break 'reap_zombies; + } + + if let Ok(None) = zombies[i].try_wait() { + i += 1; + } else { + zombies.swap_remove(i); + } } + + // Be a good citizen; yield if there are a lot of processes. + future::yield_now().await; } + + // Put zombie processes back. + self.zombies.lock().unwrap().append(&mut zombies); } } /// Register a process with this reaper. fn register(&'static self, child: &std::process::Child) -> io::Result<()> { + self.ensure_driven(); self.pipe.register(child) } } cfg_if::cfg_if! { if #[cfg(windows)] { + use async_channel::{Sender, Receiver, bounded}; use std::ffi::c_void; use std::os::windows::io::AsRawHandle; - use std::sync::mpsc; use windows_sys::Win32::{ Foundation::{BOOLEAN, HANDLE}, @@ -167,25 +230,25 @@ cfg_if::cfg_if! { /// Waits for the next SIGCHLD signal. struct Pipe { /// The sender channel for the SIGCHLD signal. - sender: mpsc::SyncSender<()>, + sender: Sender<()>, /// The receiver channel for the SIGCHLD signal. - receiver: Mutex>, + receiver: Receiver<()>, } impl Pipe { /// Creates a new pipe. fn new() -> io::Result { - let (sender, receiver) = mpsc::sync_channel(1); + let (sender, receiver) = bounded(1); Ok(Pipe { sender, - receiver: Mutex::new(receiver), + receiver }) } /// Waits for the next SIGCHLD signal. - fn wait(&self) { - self.receiver.lock().unwrap().recv().ok(); + async fn wait(&self) { + self.receiver.recv().await.ok(); } /// Register a process object into this pipe. @@ -238,8 +301,8 @@ cfg_if::cfg_if! { } /// Waits for the next SIGCHLD signal. - fn wait(&self) { - async_io::block_on((&self.signals).next()); + async fn wait(&self) { + (&self.signals).next().await; } /// Register a process object into this pipe. @@ -260,6 +323,7 @@ struct ChildGuard { inner: Option, reap_on_drop: bool, kill_on_drop: bool, + reaper: &'static Reaper, } impl ChildGuard { @@ -275,11 +339,14 @@ impl Drop for ChildGuard { self.get_mut().kill().ok(); } if self.reap_on_drop { - let mut zombies = Reaper::get().zombies.lock().unwrap(); + let mut zombies = self.reaper.zombies.lock().unwrap(); if let Ok(None) = self.get_mut().try_wait() { zombies.push(self.inner.take().unwrap()); } } + + // Decrement number of children. + self.reaper.child_count.fetch_sub(1, Ordering::Acquire); } } @@ -330,6 +397,9 @@ impl Child { let stdout = child.stdout.take().map(wrap).transpose()?.map(ChildStdout); let stderr = child.stderr.take().map(wrap).transpose()?.map(ChildStderr); + // Bump the child count. + reaper.child_count.fetch_add(1, Ordering::Relaxed); + // Register the child process in the global list. reaper.register(&child)?; @@ -341,6 +411,7 @@ impl Child { inner: Some(child), reap_on_drop: cmd.reap_on_drop, kill_on_drop: cmd.kill_on_drop, + reaper, })), }) } @@ -760,6 +831,80 @@ impl TryFrom for OwnedFd { } } +/// Runs the driver for the asynchronous processes. +/// +/// This future takes control of global structures related to driving [`Child`]ren and reaping +/// zombie processes. These responsibilities include listening for the `SIGCHLD` signal and +/// making sure zombie processes are successfully waited on. +/// +/// If multiple tasks run `driver()` at once, only one will actually drive the reaper; the other +/// ones will just sleep. If a task that is driving the reaper is dropped, a previously sleeping +/// task will take over. If all tasks driving the reaper are dropped, the "async-process" thread +/// will be spawned. The "async-process" thread just blocks on this future and will automatically +/// be spawned if no tasks are driving the reaper once a [`Child`] is created. +/// +/// This future will never complete. It is intended to be ran on a background task in your +/// executor of choice. +/// +/// # Examples +/// +/// ```no_run +/// use async_executor::Executor; +/// use async_process::{driver, Command}; +/// +/// # futures_lite::future::block_on(async { +/// // Create an executor and run on it. +/// let ex = Executor::new(); +/// ex.run(async { +/// // Run the driver future in the background. +/// ex.spawn(driver()).detach(); +/// +/// // Run a command. +/// Command::new("ls").output().await.ok(); +/// }).await; +/// # }); +/// ``` +#[inline] +pub fn driver() -> impl Future + Send + 'static { + struct CallOnDrop(F); + + impl Drop for CallOnDrop { + fn drop(&mut self) { + (self.0)(); + } + } + + async { + // Get the reaper. + let reaper = Reaper::get(); + + // Make sure the reaper knows we're driving it. + reaper.drivers.fetch_add(1, Ordering::SeqCst); + + // Decrement the driver count when this future is dropped. + let _guard = CallOnDrop(|| { + let prev_count = reaper.drivers.fetch_sub(1, Ordering::SeqCst); + + // If this was the last driver, and there are still resources actively using the + // reaper, make sure that there is a thread driving the reaper. + if prev_count == 1 + && reaper.child_count.load(Ordering::SeqCst) > 0 + && !reaper + .zombies + .lock() + .unwrap_or_else(|x| x.into_inner()) + .is_empty() + { + reaper.ensure_driven(); + } + }); + + // Acquire the reaper lock and start polling the SIGCHLD event. + let guard = reaper.driver_guard.lock().await; + reaper.reap(guard).await + } +} + /// A builder for spawning processes. /// /// # Examples diff --git a/tests/std.rs b/tests/std.rs index 91c794e..83af853 100644 --- a/tests/std.rs +++ b/tests/std.rs @@ -21,6 +21,25 @@ fn smoke() { }) } +#[test] +fn smoke_driven() { + future::block_on( + async { + async_process::driver().await; + } + .or(async { + let p = if cfg!(target_os = "windows") { + Command::new("cmd").args(["/C", "exit 0"]).spawn() + } else { + Command::new("true").spawn() + }; + assert!(p.is_ok()); + let mut p = p.unwrap(); + assert!(p.status().await.unwrap().success()); + }), + ) +} + #[test] fn smoke_failure() { assert!(Command::new("if-this-is-a-binary-then-the-world-has-ended")