Skip to content

Commit

Permalink
m: Remove dependency on waker-fn
Browse files Browse the repository at this point in the history
Resolves #165

Adds a new concrete type BlockOnWaker that implements Wake
  • Loading branch information
asonix authored Nov 6, 2023
1 parent 87ad890 commit e779304
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 13 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ polling = "3.0.0"
rustix = { version = "0.38.2", default-features = false, features = ["fs", "net", "std"] }
slab = "0.4.2"
tracing = { version = "0.1.37", default-features = false }
waker-fn = "1.1.0"

[target.'cfg(windows)'.dependencies]
windows-sys = { version = "0.48.0", features = ["Win32_Foundation"] }
Expand Down
42 changes: 30 additions & 12 deletions src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use std::time::{Duration, Instant};
use async_lock::OnceCell;
use futures_lite::pin;
use parking::Parker;
use waker_fn::waker_fn;

use crate::reactor::Reactor;

Expand Down Expand Up @@ -131,17 +130,7 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {
let io_blocked = Arc::new(AtomicBool::new(false));

// Prepare the waker.
let waker = waker_fn({
let io_blocked = io_blocked.clone();
move || {
if u.unpark() {
// Check if waking from another thread and if currently blocked on I/O.
if !IO_POLLING.with(Cell::get) && io_blocked.load(Ordering::SeqCst) {
Reactor::get().notify();
}
}
}
});
let waker = BlockOnWaker::create(io_blocked.clone(), u);

(p, waker, io_blocked)
}
Expand All @@ -154,6 +143,35 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {
static IO_POLLING: Cell<bool> = Cell::new(false);
}

struct BlockOnWaker {
io_blocked: Arc<AtomicBool>,
unparker: parking::Unparker,
}

impl BlockOnWaker {
fn create(io_blocked: Arc<AtomicBool>, unparker: parking::Unparker) -> Waker {
Waker::from(Arc::new(BlockOnWaker {
io_blocked,
unparker,
}))
}
}

impl std::task::Wake for BlockOnWaker {
fn wake_by_ref(self: &Arc<Self>) {
if self.unparker.unpark() {
// Check if waking from another thread and if currently blocked on I/O.
if !IO_POLLING.with(Cell::get) && self.io_blocked.load(Ordering::SeqCst) {
Reactor::get().notify();
}
}
}

fn wake(self: Arc<Self>) {
self.wake_by_ref()
}
}

CACHE.with(|cache| {
// Try grabbing the cached parker and waker.
let tmp_cached;
Expand Down

0 comments on commit e779304

Please sign in to comment.