From 0a6ea46bb8e53b9be0cf20ef495f126d2506773b Mon Sep 17 00:00:00 2001 From: Igigog Date: Sat, 21 Sep 2024 19:44:44 +0200 Subject: [PATCH 1/4] Replace linked list with VecDeque --- src/job.rs | 295 +++++------------------------------------------------ 1 file changed, 28 insertions(+), 267 deletions(-) diff --git a/src/job.rs b/src/job.rs index 292fa2c..58f8052 100644 --- a/src/job.rs +++ b/src/job.rs @@ -1,5 +1,6 @@ use std::{ cell::{Cell, UnsafeCell}, + collections::VecDeque, mem::ManuallyDrop, panic::{self, AssertUnwindSafe}, ptr::NonNull, @@ -132,7 +133,7 @@ impl JobStack { pub struct Job { stack: NonNull, harness: unsafe fn(&mut Scope<'_>, NonNull, NonNull), - prev: Cell>>, + is_in_queue: Cell, fut_or_next: Cell>>>, } @@ -171,13 +172,13 @@ impl Job { Self { stack: NonNull::from(stack).cast(), harness: harness::, - prev: Cell::new(None), + is_in_queue: Cell::new(false), fut_or_next: Cell::new(None), } } pub fn is_in_queue(&self) -> bool { - self.prev.get().is_some() + self.is_in_queue.get() } pub fn eq(&self, other: &Job) -> bool { @@ -253,285 +254,45 @@ impl Job { // sent across threads. unsafe impl Send for Job {} -#[derive(Debug)] -pub struct JobQueue { - sentinel: NonNull, - tail: NonNull, - len: u32, -} - -impl Default for JobQueue { - fn default() -> Self { - let root = Box::leak(Box::new(Job { - stack: NonNull::dangling(), - harness: |_, _, _| (), - prev: Cell::new(None), - fut_or_next: Cell::new(None), - })) - .into(); - - Self { - sentinel: root, - tail: root, - len: 0, - } - } -} - -impl Drop for JobQueue { - fn drop(&mut self) { - // SAFETY: - // `self.sentinel` never gets written over, so it contains the original - // `leak`ed `Box` that gets allocated in `JobQueue::default`. - unsafe { - drop(Box::from_raw(self.sentinel.as_ptr())); - } - } -} +#[derive(Debug, Default)] +pub struct JobQueue(VecDeque>); impl JobQueue { - pub fn len(&self) -> u32 { - self.len + pub fn len(&self) -> usize { + self.0.len() } /// SAFETY: /// Any `Job` pushed onto the queue should alive at least until it gets /// popped. pub unsafe fn push_back(&mut self, job: &Job) { - // SAFETY: - // The tail can either be the root `Box::leak`ed in the default - // constructor or a `Job` that has been pushed previously and which is - // still alive. - let current_tail = unsafe { self.tail.as_ref() }; - // SAFETY: - // This effectively casts the `Job`'s `fut_or_next` from `Future` to - // `Future<()>` which casts the `Future`'s `Box` to a `Box<()>`. - // - // This box will not be access until the pointer gets passed in the - // `harness` where it gets cast back to `T`. + job.is_in_queue.set(true); let next_tail = unsafe { &*(job as *const Job as *const Job) }; - - current_tail - .fut_or_next - .set(Some(NonNull::from(next_tail).cast())); - next_tail.prev.set(Some(current_tail.into())); - - self.len += 1; - - self.tail = next_tail.into(); + self.0.push_back(NonNull::from(next_tail).cast()); } - /// SAFETY: - /// The last `Job` in the queue must still be alive. - pub unsafe fn pop_back(&mut self) { - // SAFETY: - // The tail can either be the root `Box::leak`ed in the default - // constructor or a `Job` that has been pushed previously and which is - // still alive. - let current_tail = unsafe { self.tail.as_ref() }; - if let Some(prev_tail) = current_tail.prev.get() { + pub fn pop_back(&mut self) { + let val = self.0.pop_back(); + if let Some(job) = val { // SAFETY: - // `Job`'s `prev` pointer can only be set by `JobQueue::push_back` - // to the previous tail which should still be alive or by - // `JobQueue::pop_front` when it's set to `self.sentinel` which is - // alive for the entirety of `self`. - let prev_tail = unsafe { prev_tail.as_ref() }; - - current_tail.prev.set(None); - prev_tail.fut_or_next.set(None); - - self.len -= 1; - - self.tail = prev_tail.into(); - } - } - - /// SAFETY: - /// The first `Job` in the queue must still be alive. - pub unsafe fn pop_front(&mut self) -> Option { - // SAFETY: - // `self.sentinel` is alive for the entirety of `self`. - let sentinel = unsafe { self.sentinel.as_ref() }; - - sentinel.fut_or_next.get().map(|next| { - // SAFETY: - // `self.sentinel`'s `fut_or_next` pointer can only be set by - // `JobQueue::push_back` or by `JobQueue::pop_front` when it's set - // to a job that was previous set by `JobQueue::push_back` and - // should still be alive. - let head: &Job = unsafe { next.cast().as_ref() }; - - if let Some(next) = head.fut_or_next.get() { - sentinel.fut_or_next.set(Some(next.cast())); - - // SAFETY: - // `Job`'s `fut_or_next` pointer can only be set by - // `JobQueue::push_back` or by `JobQueue::pop_front` when it's set - // to a job that was previous set by `JobQueue::push_back` and - // should still be alive. - // - // It can also be set to a `Future`, but that can only happen after - // the job was removed from the queue. - let next: &Job = unsafe { next.cast().as_ref() }; - next.prev.set(Some(sentinel.into())); - } else { - sentinel.fut_or_next.set(None); - self.tail = sentinel.into(); - } - - // SAFETY: - // `self.sentinel`'s `fut_or_next` pointer can only be set by - // `JobQueue::push_back` or by `JobQueue::pop_front` when it's set - // to a job that was previous set by `JobQueue::push_back` and - // should still be alive. - let head: &Job = unsafe { next.cast().as_ref() }; - - head.prev.set(None); - head.fut_or_next - .set(Some(Box::leak(Box::new(Future::default())).into())); - - self.len -= 1; - - // SAFETY: - // `self.sentinel`'s `fut_or_next` pointer can only be set by - // `JobQueue::push_back` or by `JobQueue::pop_front` when it's set - // to a job that was previous set by `JobQueue::push_back` and - // should still be alive. - unsafe { next.cast::().as_ref().clone() } - }) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - impl Job { - pub fn from_usize(val: &'static usize) -> Self { - Self { - stack: NonNull::from(val).cast(), - harness: |_, _, _| (), - prev: Cell::new(None), - fut_or_next: Cell::new(None), + // `Job` is still alive as per contract in `push_back` + unsafe { + job.as_ref().is_in_queue.set(false); } } - - pub fn as_usize(&self) -> usize { - unsafe { *self.stack.cast().as_ref() } - } } - #[test] - fn push_pop_back() { - let mut queue = JobQueue::default(); - - assert_eq!(queue.sentinel, queue.tail); - - let job1 = Job::from_usize(&1); - - unsafe { - queue.push_back(&job1); - } - assert_eq!(unsafe { queue.tail.as_ref().as_usize() }, 1); - - unsafe { - queue.pop_back(); - } - assert_eq!(queue.sentinel, queue.tail); - } - - #[test] - fn push2_pop2_back() { - let mut queue = JobQueue::default(); - - assert_eq!(queue.sentinel, queue.tail); - - let job1 = Job::from_usize(&1); - let job2 = Job::from_usize(&2); - - unsafe { - queue.push_back(&job1); - } - assert_eq!(unsafe { queue.tail.as_ref().as_usize() }, 1); - - unsafe { - queue.push_back(&job2); - } - assert_eq!(unsafe { queue.tail.as_ref().as_usize() }, 2); - - unsafe { - queue.pop_back(); - } - assert_eq!(unsafe { queue.tail.as_ref().as_usize() }, 1); - - unsafe { - queue.pop_back(); - } - assert_eq!(queue.sentinel, queue.tail); - } - - #[test] - fn push_pop_front() { - let mut queue = JobQueue::default(); - - assert_eq!(queue.sentinel, queue.tail); - - let job1 = Job::from_usize(&1); - - unsafe { - queue.push_back(&job1); - } - assert_eq!(unsafe { queue.tail.as_ref().as_usize() }, 1); - - let job = unsafe { queue.pop_front().unwrap() }; - assert_eq!(job.as_usize(), 1); - assert!(job.prev.get().is_none()); - assert!(job.fut_or_next.get().is_some()); - - unsafe { - job.drop(); - } - - assert_eq!(queue.sentinel, queue.tail); - } - - #[test] - fn push2_pop2_front() { - let mut queue = JobQueue::default(); - - assert_eq!(queue.sentinel, queue.tail); - - let job1 = Job::from_usize(&1); - let job2 = Job::from_usize(&2); - - unsafe { - queue.push_back(&job1); - } - assert_eq!(unsafe { queue.tail.as_ref().as_usize() }, 1); - - unsafe { - queue.push_back(&job2); - } - assert_eq!(unsafe { queue.tail.as_ref().as_usize() }, 2); - - let job = unsafe { queue.pop_front().unwrap() }; - assert_eq!(job.as_usize(), 1); - assert!(job.prev.get().is_none()); - assert!(job.fut_or_next.get().is_some()); - - unsafe { - job.drop(); - } - - let job = unsafe { queue.pop_front().unwrap() }; - assert_eq!(job.as_usize(), 2); - assert!(job.prev.get().is_none()); - assert!(job.fut_or_next.get().is_some()); - - unsafe { - job.drop(); - } - - assert_eq!(queue.sentinel, queue.tail); + pub fn pop_front(&mut self) -> Option { + let val = self.0.pop_front(); + let job = match val { + // SAFETY: + // `Job` is still alive as per contract in `push_back` + Some(j) => unsafe { j.as_ref() }, + None => return None, + }; + job.is_in_queue.set(false); + job.fut_or_next + .set(Some(Box::leak(Box::new(Future::default())).into())); + Some(job.clone()) } } From f67e5f1dcdb56cc105ba7285b7abf79cd47eac01 Mon Sep 17 00:00:00 2001 From: Igigog Date: Sat, 21 Sep 2024 22:03:31 +0200 Subject: [PATCH 2/4] Use the ? --- src/job.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/job.rs b/src/job.rs index 58f8052..83b7b48 100644 --- a/src/job.rs +++ b/src/job.rs @@ -283,13 +283,8 @@ impl JobQueue { } pub fn pop_front(&mut self) -> Option { - let val = self.0.pop_front(); - let job = match val { - // SAFETY: - // `Job` is still alive as per contract in `push_back` - Some(j) => unsafe { j.as_ref() }, - None => return None, - }; + let val = self.0.pop_front()?; + let job = unsafe { val.as_ref() }; job.is_in_queue.set(false); job.fut_or_next .set(Some(Box::leak(Box::new(Future::default())).into())); From 9c60b7ae56c13ea2c7bb61edcd335809fa7b12db Mon Sep 17 00:00:00 2001 From: Igigog Date: Sat, 21 Sep 2024 22:26:33 +0200 Subject: [PATCH 3/4] Use fut field instead of is_in_queue --- src/job.rs | 34 ++++++++++++---------------------- src/lib.rs | 2 +- 2 files changed, 13 insertions(+), 23 deletions(-) diff --git a/src/job.rs b/src/job.rs index 83b7b48..f0acd9f 100644 --- a/src/job.rs +++ b/src/job.rs @@ -133,8 +133,7 @@ impl JobStack { pub struct Job { stack: NonNull, harness: unsafe fn(&mut Scope<'_>, NonNull, NonNull), - is_in_queue: Cell, - fut_or_next: Cell>>>, + fut: Cell>>>, } impl Job { @@ -172,13 +171,12 @@ impl Job { Self { stack: NonNull::from(stack).cast(), harness: harness::, - is_in_queue: Cell::new(false), - fut_or_next: Cell::new(None), + fut: Cell::new(None), } } - pub fn is_in_queue(&self) -> bool { - self.is_in_queue.get() + pub fn is_waiting(&self) -> bool { + self.fut.get().is_none() } pub fn eq(&self, other: &Job) -> bool { @@ -188,7 +186,7 @@ impl Job { /// SAFETY: /// It should only be called after being popped from a `JobQueue`. pub unsafe fn poll(&self) -> bool { - self.fut_or_next + self.fut .get() .map(|fut| { // SAFETY: @@ -203,7 +201,7 @@ impl Job { /// SAFETY: /// It should only be called after being popped from a `JobQueue`. pub unsafe fn wait(&self) -> Option> { - self.fut_or_next.get().and_then(|fut| { + self.fut.get().and_then(|fut| { // SAFETY: // Before being popped, the `JobQueue` allocates and stores a // `Future` in `self.fur_or_next` that should get passed here. @@ -223,7 +221,7 @@ impl Job { /// It should only be called in the case where the job has been popped /// from the front and will not be `Job::Wait`ed. pub unsafe fn drop(&self) { - if let Some(fut) = self.fut_or_next.get() { + if let Some(fut) = self.fut.get() { // SAFETY: // Before being popped, the `JobQueue` allocates and store a // `Future` in `self.fur_or_next` that should get passed here. @@ -243,7 +241,7 @@ impl Job { // Before being popped, the `JobQueue` allocates and store a // `Future` in `self.fur_or_next` that should get passed here. unsafe { - (self.harness)(scope, self.stack, self.fut_or_next.get().unwrap()); + (self.harness)(scope, self.stack, self.fut.get().unwrap()); } } } @@ -266,28 +264,20 @@ impl JobQueue { /// Any `Job` pushed onto the queue should alive at least until it gets /// popped. pub unsafe fn push_back(&mut self, job: &Job) { - job.is_in_queue.set(true); let next_tail = unsafe { &*(job as *const Job as *const Job) }; self.0.push_back(NonNull::from(next_tail).cast()); } pub fn pop_back(&mut self) { - let val = self.0.pop_back(); - if let Some(job) = val { - // SAFETY: - // `Job` is still alive as per contract in `push_back` - unsafe { - job.as_ref().is_in_queue.set(false); - } - } + self.0.pop_back(); } pub fn pop_front(&mut self) -> Option { let val = self.0.pop_front()?; + // SAFETY: + // `Job` is still alive as per contract in `push_back` let job = unsafe { val.as_ref() }; - job.is_in_queue.set(false); - job.fut_or_next - .set(Some(Box::leak(Box::new(Future::default())).into())); + job.fut.set(Some(Box::leak(Box::new(Future::default())).into())); Some(job.clone()) } } diff --git a/src/lib.rs b/src/lib.rs index 3ecc720..a576c38 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -383,7 +383,7 @@ impl<'s> Scope<'s> { let rb = b(self); - if job.is_in_queue() { + if job.is_waiting() { // SAFETY: // `job` is alive until the end of this scope and there has been no // other pop up to this point. From d0de8998019ebe62348bfb333238b237386eee57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Drago=C8=99=20Tiselice?= Date: Mon, 23 Sep 2024 13:19:38 +0300 Subject: [PATCH 4/4] Small fixes. --- src/job.rs | 12 ++++++------ src/lib.rs | 12 ++---------- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/src/job.rs b/src/job.rs index f0acd9f..a43d24c 100644 --- a/src/job.rs +++ b/src/job.rs @@ -264,8 +264,7 @@ impl JobQueue { /// Any `Job` pushed onto the queue should alive at least until it gets /// popped. pub unsafe fn push_back(&mut self, job: &Job) { - let next_tail = unsafe { &*(job as *const Job as *const Job) }; - self.0.push_back(NonNull::from(next_tail).cast()); + self.0.push_back(NonNull::from(job).cast()); } pub fn pop_back(&mut self) { @@ -273,11 +272,12 @@ impl JobQueue { } pub fn pop_front(&mut self) -> Option { - let val = self.0.pop_front()?; // SAFETY: - // `Job` is still alive as per contract in `push_back` - let job = unsafe { val.as_ref() }; - job.fut.set(Some(Box::leak(Box::new(Future::default())).into())); + // `Job` is still alive as per contract in `push_back`. + let job = unsafe { self.0.pop_front()?.as_ref() }; + job.fut + .set(Some(Box::leak(Box::new(Future::default())).into())); + Some(job.clone()) } } diff --git a/src/lib.rs b/src/lib.rs index a576c38..823a9ee 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -330,10 +330,7 @@ impl<'s> Scope<'s> { let time = lock.time; if let Entry::Vacant(e) = lock.shared_jobs.entry(self.heartbeat_id()) { - // SAFETY: - // Any `Job` previously pushed onto the queue will be waited upon - // and will be alive until that point. - if let Some(job) = unsafe { self.job_queue.pop_front() } { + if let Some(job) = self.job_queue.pop_front() { e.insert((time, job)); lock.time += 1; @@ -384,12 +381,7 @@ impl<'s> Scope<'s> { let rb = b(self); if job.is_waiting() { - // SAFETY: - // `job` is alive until the end of this scope and there has been no - // other pop up to this point. - unsafe { - self.job_queue.pop_back(); - } + self.job_queue.pop_back(); // SAFETY: // Since the `job` was popped from the back of the queue, it cannot