Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the scope API. #11

Merged
merged 5 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ rayon = { version = "1.10.0", optional = true }
[[bench]]
name = "overhead"
harness = false
required-features = ["bench"]
required-features = ["bench"]
4 changes: 2 additions & 2 deletions benches/overhead.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn no_overhead(bencher: Bencher, nodes: (usize, usize)) {
}

let tree = Node::tree(nodes.0);
let mut thread_pool = ThreadPool::new().unwrap();
let thread_pool = ThreadPool::new().unwrap();
let mut scope = thread_pool.scope();

bencher.bench_local(move || {
Expand All @@ -66,7 +66,7 @@ fn chili_overhead(bencher: Bencher, nodes: (usize, usize)) {
}

let tree = Node::tree(nodes.0);
let mut thread_pool = ThreadPool::new().unwrap();
let thread_pool = ThreadPool::new().unwrap();
let mut scope = thread_pool.scope();

bencher.bench_local(move || {
Expand Down
36 changes: 36 additions & 0 deletions src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl<T> Future<T> {

match result {
Ok(_) => {
// SAFETY:
// Lock is acquired, only we are accessing `self.waiting_thread`.
unsafe { *self.waiting_thread.get() = Some(thread::current()) };

Expand All @@ -52,6 +53,7 @@ impl<T> Future<T> {
continue;
}
Err(state) if state == Poll::Ready as u8 => {
// SAFETY:
// `state` is `Poll::Ready` only after `Self::complete`
// releases the lock.
//
Expand Down Expand Up @@ -83,11 +85,13 @@ impl<T> Future<T> {
}
}

// SAFETY:
// Lock is acquired, only we are accessing `self.val`.
unsafe {
*self.val.get() = Some(val);
}

// SAFETY:
// Lock is acquired, only we are accessing `self.waiting_thread`.
if let Some(thread) = unsafe { (*self.waiting_thread.get()).take() } {
thread.unpark();
Expand All @@ -110,8 +114,10 @@ impl<F> JobStack<F> {
}
}

/// SAFETY:
/// It should only be called once.
pub unsafe fn take_once(&self) -> F {
// SAFETY:
// No `Job` has has been executed, therefore `self.f` has not yet been
// `take`n.
unsafe { ManuallyDrop::take(&mut *self.f.get()) }
Expand All @@ -136,6 +142,7 @@ impl<T> Job<T> {
F: FnOnce(&mut Scope<'_>) -> T + Send,
T: Send,
{
/// SAFETY:
/// It should only be called while the `stack` is still alive.
unsafe fn harness<F, T>(
scope: &mut Scope<'_>,
Expand All @@ -145,12 +152,15 @@ impl<T> Job<T> {
F: FnOnce(&mut Scope<'_>) -> T + Send,
T: Send,
{
// SAFETY:
// The `stack` is still alive.
let stack: &JobStack<F> = unsafe { stack.cast().as_ref() };
// SAFETY:
// This is the first call to `take_once` since `Job::execute`
// (the only place where this harness is called) is called only
// after the job has been popped.
let f = unsafe { stack.take_once() };
// SAFETY:
// Before being popped, the `JobQueue` allocates and stores a
// `Future` in `self.fur_or_next` that should get passed here.
let fut: &Future<T> = unsafe { fut.cast().as_ref() };
Expand All @@ -174,11 +184,13 @@ impl<T> Job<T> {
self.stack == other.stack
}

/// SAFETY:
/// It should only be called after being popped from a `JobQueue`.
pub unsafe fn poll(&self) -> bool {
self.fut_or_next
.get()
.map(|fut| {
// SAFETY:
// Before being popped, the `JobQueue` allocates and stores a
// `Future` in `self.fur_or_next` that should get passed here.
let fut = unsafe { fut.as_ref() };
Expand All @@ -187,12 +199,15 @@ impl<T> Job<T> {
.unwrap_or_default()
}

/// SAFETY:
/// It should only be called after being popped from a `JobQueue`.
pub unsafe fn wait(&self) -> Option<thread::Result<T>> {
self.fut_or_next.get().and_then(|fut| {
// SAFETY:
// Before being popped, the `JobQueue` allocates and stores a
// `Future` in `self.fur_or_next` that should get passed here.
let result = unsafe { fut.as_ref().wait() };
// SAFETY:
// We only can drop the `Box` *after* waiting on the `Future`
// in order to ensure unique access.
unsafe {
Expand All @@ -203,10 +218,12 @@ impl<T> Job<T> {
})
}

/// SAFETY:
/// It should only be called in the case where the job has been popped
/// from the front and will not be `Job::Wait`ed.
pub unsafe fn drop(&self) {
if let Some(fut) = self.fut_or_next.get() {
// SAFETY:
// Before being popped, the `JobQueue` allocates and store a
// `Future` in `self.fur_or_next` that should get passed here.
unsafe {
Expand All @@ -217,9 +234,11 @@ impl<T> Job<T> {
}

impl Job {
/// SAFETY:
/// It should only be called while the `JobStack` it was created with is
/// still alive and after being popped from a `JobQueue`.
pub unsafe fn execute(&self, scope: &mut Scope<'_>) {
// SAFETY:
// Before being popped, the `JobQueue` allocates and store a
// `Future` in `self.fur_or_next` that should get passed here.
unsafe {
Expand All @@ -228,6 +247,10 @@ impl Job {
}
}

// SAFETY:
// The job's `stack` will only be accessed after acquiring a lock (in
// `Future`), while `prev` and `fut_or_next` are never accessed after being
// sent across threads.
unsafe impl Send for Job {}

#[derive(Debug)]
Expand Down Expand Up @@ -257,6 +280,7 @@ impl Default for JobQueue {

impl Drop for JobQueue {
fn drop(&mut self) {
// SAFETY:
// `self.sentinel` never gets written over, so it contains the original
// `leak`ed `Box` that gets allocated in `JobQueue::default`.
unsafe {
Expand All @@ -270,13 +294,16 @@ impl JobQueue {
self.len
}

/// SAFETY:
/// Any `Job` pushed onto the queue should alive at least until it gets
/// popped.
pub unsafe fn push_back<T>(&mut self, job: &Job<T>) {
// SAFETY:
// The tail can either be the root `Box::leak`ed in the default
// constructor or a `Job` that has been pushed previously and which is
// still alive.
let current_tail = unsafe { self.tail.as_ref() };
// SAFETY:
// This effectively casts the `Job`'s `fut_or_next` from `Future<T>` to
// `Future<()>` which casts the `Future`'s `Box<T>` to a `Box<()>`.
//
Expand All @@ -294,13 +321,16 @@ impl JobQueue {
self.tail = next_tail.into();
}

/// SAFETY:
/// The last `Job` in the queue must still be alive.
pub unsafe fn pop_back(&mut self) {
// SAFETY:
// The tail can either be the root `Box::leak`ed in the default
// constructor or a `Job` that has been pushed previously and which is
// still alive.
let current_tail = unsafe { self.tail.as_ref() };
if let Some(prev_tail) = current_tail.prev.get() {
// SAFETY:
// `Job`'s `prev` pointer can only be set by `JobQueue::push_back`
// to the previous tail which should still be alive or by
// `JobQueue::pop_front` when it's set to `self.sentinel` which is
Expand All @@ -316,12 +346,15 @@ impl JobQueue {
}
}

/// SAFETY:
/// The first `Job` in the queue must still be alive.
pub unsafe fn pop_front(&mut self) -> Option<Job> {
// SAFETY:
// `self.sentinel` is alive for the entirety of `self`.
let sentinel = unsafe { self.sentinel.as_ref() };

sentinel.fut_or_next.get().map(|next| {
// SAFETY:
// `self.sentinel`'s `fut_or_next` pointer can only be set by
// `JobQueue::push_back` or by `JobQueue::pop_front` when it's set
// to a job that was previous set by `JobQueue::push_back` and
Expand All @@ -331,6 +364,7 @@ impl JobQueue {
if let Some(next) = head.fut_or_next.get() {
sentinel.fut_or_next.set(Some(next.cast()));

// SAFETY:
// `Job`'s `fut_or_next` pointer can only be set by
// `JobQueue::push_back` or by `JobQueue::pop_front` when it's set
// to a job that was previous set by `JobQueue::push_back` and
Expand All @@ -345,6 +379,7 @@ impl JobQueue {
self.tail = sentinel.into();
}

// SAFETY:
// `self.sentinel`'s `fut_or_next` pointer can only be set by
// `JobQueue::push_back` or by `JobQueue::pop_front` when it's set
// to a job that was previous set by `JobQueue::push_back` and
Expand All @@ -357,6 +392,7 @@ impl JobQueue {

self.len -= 1;

// SAFETY:
// `self.sentinel`'s `fut_or_next` pointer can only be set by
// `JobQueue::push_back` or by `JobQueue::pop_front` when it's set
// to a job that was previous set by `JobQueue::push_back` and
Expand Down
Loading