Skip to content

Commit

Permalink
feat(spawn): JobHandle now supports cancellation.
Browse files Browse the repository at this point in the history
  • Loading branch information
scottbot95 committed Mar 5, 2024
1 parent 111803f commit 00dedbf
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 15 deletions.
8 changes: 8 additions & 0 deletions screeps-async/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@ use std::fmt::{Debug, Display, Formatter};

/// An Error returned by the [crate::runtime::ScreepsRuntime]
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
#[non_exhaustive]
pub enum RuntimeError {
/// The allocated time this tick has already been consumed
OutOfTime,
/// The runtime has detected that deadlock has occurred.
/// This usually means you tried to [block_on](crate::block_on) a future that [delay](crate::time::delay_ticks)s
/// across ticks
DeadlockDetected,
}

impl Display for RuntimeError {
Expand All @@ -14,6 +19,9 @@ impl Display for RuntimeError {
RuntimeError::OutOfTime => {
write!(f, "Ran out of allocated time this tick")
}
RuntimeError::DeadlockDetected => {
write!(f, "Async runtime has been deadlocked")
}
}
}
}
Expand Down
64 changes: 56 additions & 8 deletions screeps-async/src/job.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! See [JobHandle]
use async_task::Task;
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
Expand All @@ -8,40 +9,58 @@ use std::task::{Context, Poll};

/// Reference to a [Future] that has been scheduled via [send](crate::ScreepsRuntime::spawn)
///
/// This type is safe to drop without canceling the underlying task
/// Dropping a [`JobHandle`] cancels it, which means its future won't be polled again.
/// To drop the [`JobHandle`] handle without canceling it, use [detach()](JobHandle::detach) instead.
/// To cancel a task gracefully and wait until it is fully destroyed, use the [cancel()](JobHandle::cancel) method
///
/// This type implements [Future] to allow awaiting on the result of the spawned task
pub struct JobHandle<T> {
pub(crate) fut_res: Rc<RefCell<Option<T>>>,
complete: bool,
task: Task<()>,
}

impl<T> JobHandle<T> {
pub(crate) fn new(fut_res: Rc<RefCell<Option<T>>>) -> Self {
pub(crate) fn new(fut_res: Rc<RefCell<Option<T>>>, task: Task<()>) -> Self {
Self {
fut_res,
complete: false,
task,
}
}

/// Cancels the task and waits for it to stop running.
///
/// Returns the task's output if it was completed just before it got canceled, or [`None`] if
/// it didn't complete.
///
/// While it's possible to simply drop the [`JobHandle`] to cancel it, this is a cleaner way of
/// canceling because it also waits for the task to stop running.
pub async fn cancel(self) -> Option<T> {
let _ = self.task.cancel().await;
self.fut_res.take()
}

/// Detaches the task to let it keep running in the background.
pub fn detach(self) {
self.task.detach()
}

/// Check whether this job has completed
pub fn is_complete(&self) -> bool {
self.complete || self.fut_res.borrow().is_some()
self.task.is_finished() || self.fut_res.borrow().is_some()
}
}

impl<T> Future for JobHandle<T> {
type Output = T;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.complete {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.is_complete() {
panic!("Cannot await on a JobHandle that has already completed")
}
let res = self.fut_res.borrow_mut().take();

match res {
Some(res) => {
self.complete = true;
Poll::Ready(res)
}
None => {
Expand All @@ -51,3 +70,32 @@ impl<T> Future for JobHandle<T> {
}
}
}

#[cfg(test)]
mod tests {
use crate::spawn;
use crate::tests::init_test;

#[test]
fn test_cancel() {
init_test();

let result = crate::block_on(async move {
let task = spawn(async move { true });

task.cancel().await
})
.unwrap();

assert_eq!(None, result);
}

#[test]
fn test_await() {
init_test();

let result = crate::block_on(async move { spawn(async move { true }).await }).unwrap();

assert!(result, "Failed to await spawned future");
}
}
13 changes: 9 additions & 4 deletions screeps-async/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ impl ScreepsRuntime {

/// The main entrypoint for the async runtime. Runs a future to completion.
///
/// Returns [RuntimeError::DeadlockDetected] if blocking [Future] doesn't complete this tick
///
/// # Panics
///
/// Panics if another future is already being blocked on. You should `.await` the second future
Expand All @@ -151,7 +153,7 @@ impl ScreepsRuntime {

while !handle.is_complete() {
if !self.try_poll_scheduled()? {
unreachable!("Task polling stalled while blocking on a future");
return Err(RuntimeError::DeadlockDetected);
}
}

Expand Down Expand Up @@ -259,7 +261,8 @@ mod tests {
let has_run = has_run.clone();
spawn(async move {
has_run.set(()).unwrap();
});
})
.detach();
}

// task hasn't run yet
Expand All @@ -284,7 +287,8 @@ mod tests {
assert_eq!(3, result);

has_run.set(()).unwrap();
});
})
.detach();
}

// task hasn't run yet
Expand All @@ -305,7 +309,8 @@ mod tests {
let has_run = has_run.clone();
spawn(async move {
has_run.set(()).unwrap();
});
})
.detach();
}

TIME_USED.with_borrow_mut(|t| *t = 0.95);
Expand Down
9 changes: 6 additions & 3 deletions screeps-async/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ mod tests {
assert_eq!(expected, game_time());

has_run.set(()).unwrap();
});
})
.detach();
}

// task hasn't run yet
Expand Down Expand Up @@ -162,13 +163,15 @@ mod tests {
// should run after second spawn if yield_now works correctly
steps.borrow_mut().push(3);
}
});
})
.detach();
}
{
let steps = steps.clone();
spawn(async move {
steps.borrow_mut().push(2);
});
})
.detach();
}

crate::run().unwrap();
Expand Down

0 comments on commit 00dedbf

Please sign in to comment.