Skip to content

Commit

Permalink
m: Centralize all global state into a single structure
Browse files Browse the repository at this point in the history
Signed-off-by: John Nunley <[email protected]>
  • Loading branch information
notgull authored Sep 10, 2023
1 parent 5e8e0b7 commit 52a693e
Showing 1 changed file with 184 additions and 134 deletions.
318 changes: 184 additions & 134 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,170 @@ mod sealed {
pub trait Sealed {}
}

/// An event delivered every time the SIGCHLD signal occurs.
static SIGCHLD: Event = Event::new();
/// The zombie process reaper.
///
/// This structure reaps zombie processes and emits the `SIGCHLD` signal.
struct Reaper {
/// An event delivered every time the SIGCHLD signal occurs.
sigchld: Event,

/// The list of zombie processes.
zombies: Mutex<Vec<std::process::Child>>,

/// The pipe that delivers signal notifications.
pipe: Pipe,
}

impl Reaper {
/// Get the singleton instance of the reaper.
fn get() -> &'static Self {
static REAPER: OnceCell<Reaper> = OnceCell::new();

REAPER.get_or_init_blocking(|| {
thread::Builder::new()
.name("async-process".to_string())
.spawn(|| REAPER.wait_blocking().reap())
.expect("cannot spawn async-process thread");

Reaper {
sigchld: Event::new(),
zombies: Mutex::new(Vec::new()),
pipe: Pipe::new().expect("cannot create SIGCHLD pipe"),
}
})
}

/// Reap zombie processes forever.
fn reap(&'static self) -> ! {
loop {
// Wait for the next SIGCHLD signal.
self.pipe.wait();

// Notify all listeners waiting on the SIGCHLD event.
self.sigchld.notify(std::usize::MAX);

// Reap zombie processes.
let mut zombies = self.zombies.lock().unwrap();
let mut i = 0;
while i < zombies.len() {
if let Ok(None) = zombies[i].try_wait() {
i += 1;
} else {
zombies.swap_remove(i);
}
}
}
}

/// Register a process with this reaper.
fn register(&'static self, child: &std::process::Child) -> io::Result<()> {
self.pipe.register(child)
}
}

cfg_if::cfg_if! {
if #[cfg(windows)] {
use std::ffi::c_void;
use std::os::windows::io::AsRawHandle;
use std::sync::mpsc;

use windows_sys::Win32::{
Foundation::{BOOLEAN, HANDLE},
System::Threading::{
RegisterWaitForSingleObject, INFINITE, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE,
},
};

/// Waits for the next SIGCHLD signal.
struct Pipe {
/// The sender channel for the SIGCHLD signal.
sender: mpsc::SyncSender<()>,

/// The receiver channel for the SIGCHLD signal.
receiver: Mutex<mpsc::Receiver<()>>,
}

impl Pipe {
/// Creates a new pipe.
fn new() -> io::Result<Pipe> {
let (sender, receiver) = mpsc::sync_channel(1);
Ok(Pipe {
sender,
receiver: Mutex::new(receiver),
})
}

/// Waits for the next SIGCHLD signal.
fn wait(&self) {
self.receiver.lock().unwrap().recv().ok();
}

/// Register a process object into this pipe.
fn register(&self, child: &std::process::Child) -> io::Result<()> {
// Called when a child exits.
unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) {
Reaper::get().pipe.sender.try_send(()).ok();
}

// Register this child process to invoke `callback` on exit.
let mut wait_object = 0;
let ret = unsafe {
RegisterWaitForSingleObject(
&mut wait_object,
child.as_raw_handle() as HANDLE,
Some(callback),
std::ptr::null_mut(),
INFINITE,
WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE,
)
};

if ret == 0 {
Err(io::Error::last_os_error())
} else {
Ok(())
}
}
}

// Wraps a sync I/O type into an async I/O type.
fn wrap<T>(io: T) -> io::Result<Unblock<T>> {
Ok(Unblock::new(io))
}
} else if #[cfg(unix)] {
use async_signal::{Signal, Signals};

/// Waits for the next SIGCHLD signal.
struct Pipe {
/// The iterator over SIGCHLD signals.
signals: Signals,
}

impl Pipe {
/// Creates a new pipe.
fn new() -> io::Result<Pipe> {
Ok(Pipe {
signals: Signals::new(Some(Signal::Child))?,
})
}

/// Waits for the next SIGCHLD signal.
fn wait(&self) {
async_io::block_on((&self.signals).next());
}

/// Register a process object into this pipe.
fn register(&self, _child: &std::process::Child) -> io::Result<()> {
Ok(())
}
}

/// Wrap a file descriptor into a non-blocking I/O type.
fn wrap<T: std::os::unix::io::AsRawFd>(io: T) -> io::Result<Async<T>> {
Async::new(io)
}
}
}

/// A guard that can kill child processes, or push them into the zombie list.
struct ChildGuard {
Expand All @@ -106,6 +268,21 @@ impl ChildGuard {
}
}

// When the last reference to the child process is dropped, push it into the zombie list.
impl Drop for ChildGuard {
fn drop(&mut self) {
if self.kill_on_drop {
self.get_mut().kill().ok();
}
if self.reap_on_drop {
let mut zombies = Reaper::get().zombies.lock().unwrap();
if let Ok(None) = self.get_mut().try_wait() {
zombies.push(self.inner.take().unwrap());
}
}
}
}

/// A spawned child process.
///
/// The process can be in running or exited state. Use [`status()`][`Child::status()`] or
Expand Down Expand Up @@ -144,144 +321,17 @@ impl Child {
/// The "async-process" thread waits for processes in the global list and cleans up the
/// resources when they exit.
fn new(cmd: &mut Command) -> io::Result<Child> {
// Make sure the reaper exists before we spawn the child process.
let reaper = Reaper::get();
let mut child = cmd.inner.spawn()?;

// Convert sync I/O types into async I/O types.
let stdin = child.stdin.take().map(wrap).transpose()?.map(ChildStdin);
let stdout = child.stdout.take().map(wrap).transpose()?.map(ChildStdout);
let stderr = child.stderr.take().map(wrap).transpose()?.map(ChildStderr);

cfg_if::cfg_if! {
if #[cfg(windows)] {
use std::ffi::c_void;
use std::os::windows::io::AsRawHandle;
use std::sync::mpsc;

use windows_sys::Win32::{
Foundation::{BOOLEAN, HANDLE},
System::Threading::{
RegisterWaitForSingleObject, INFINITE, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE,
},
};

// This channel is used to simulate SIGCHLD on Windows.
fn callback_channel() -> (&'static mpsc::SyncSender<()>, &'static Mutex<mpsc::Receiver<()>>) {
static CALLBACK: OnceCell<(mpsc::SyncSender<()>, Mutex<mpsc::Receiver<()>>)> =
OnceCell::new();

let (s, r) = CALLBACK.get_or_init_blocking(|| {
let (s, r) = mpsc::sync_channel(1);
(s, Mutex::new(r))
});

(s, r)
}

// Called when a child exits.
unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) {
callback_channel().0.try_send(()).ok();
}

// Register this child process to invoke `callback` on exit.
let mut wait_object = 0;
let ret = unsafe {
RegisterWaitForSingleObject(
&mut wait_object,
child.as_raw_handle() as HANDLE,
Some(callback),
std::ptr::null_mut(),
INFINITE,
WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE,
)
};
if ret == 0 {
return Err(io::Error::last_os_error());
}

// Waits for the next SIGCHLD signal.
fn wait_sigchld() {
callback_channel().1.lock().unwrap().recv().ok();
}

// Wraps a sync I/O type into an async I/O type.
fn wrap<T>(io: T) -> io::Result<Unblock<T>> {
Ok(Unblock::new(io))
}

} else if #[cfg(unix)] {
use async_signal::{Signal, Signals};

static SIGNALS: OnceCell<Signals> = OnceCell::new();

// Make sure the signal handler is registered before interacting with the process.
SIGNALS.get_or_init_blocking(|| {
Signals::new(Some(Signal::Child))
.expect("Failed to register SIGCHLD handler")
});

// Waits for the next SIGCHLD signal.
fn wait_sigchld() {
async_io::block_on(
SIGNALS
.get()
.expect("Signals not registered")
.next()
);
}

// Wraps a sync I/O type into an async I/O type.
fn wrap<T: std::os::unix::io::AsRawFd>(io: T) -> io::Result<Async<T>> {
Async::new(io)
}
}
}

static ZOMBIES: OnceCell<Mutex<Vec<std::process::Child>>> = OnceCell::new();

// Make sure the thread is started.
ZOMBIES.get_or_init_blocking(|| {
// Start a thread that handles SIGCHLD and notifies tasks when child processes exit.
thread::Builder::new()
.name("async-process".to_string())
.spawn(move || {
loop {
// Wait for the next SIGCHLD signal.
wait_sigchld();

// Notify all listeners waiting on the SIGCHLD event.
SIGCHLD.notify(std::usize::MAX);

// Reap zombie processes.
let mut zombies = ZOMBIES.get().unwrap().lock().unwrap();
let mut i = 0;
while i < zombies.len() {
if let Ok(None) = zombies[i].try_wait() {
i += 1;
} else {
zombies.swap_remove(i);
}
}
}
})
.expect("cannot spawn async-process thread");

Mutex::new(Vec::new())
});

// When the last reference to the child process is dropped, push it into the zombie list.
impl Drop for ChildGuard {
fn drop(&mut self) {
if self.kill_on_drop {
self.get_mut().kill().ok();
}
if self.reap_on_drop {
let mut zombies = ZOMBIES.get().unwrap().lock().unwrap();
if let Ok(None) = self.get_mut().try_wait() {
zombies.push(self.inner.take().unwrap());
}
}
}
}
// Register the child process in the global list.
reaper.register(&child)?;

Ok(Child {
stdin,
Expand Down Expand Up @@ -381,7 +431,7 @@ impl Child {
let child = self.child.clone();

async move {
let listener = EventListener::new(&SIGCHLD);
let listener = EventListener::new(&Reaper::get().sigchld);
let mut listening = false;
futures_lite::pin!(listener);

Expand Down

0 comments on commit 52a693e

Please sign in to comment.