From f2a3b9547d431bfe9082667066b3e78434b91a82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Drago=C8=99=20Tiselice?= Date: Fri, 20 Sep 2024 17:58:30 +0300 Subject: [PATCH] Made scopes concurrent. --- Cargo.toml | 2 +- benches/overhead.rs | 4 +- src/job.rs | 36 ++++++++++ src/lib.rs | 163 ++++++++++++++++++++++++++++++++------------ 4 files changed, 158 insertions(+), 47 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 700a9b0..1ad877c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,4 +21,4 @@ rayon = { version = "1.10.0", optional = true } [[bench]] name = "overhead" harness = false -required-features = ["bench"] +required-features = ["bench"] \ No newline at end of file diff --git a/benches/overhead.rs b/benches/overhead.rs index c707e92..7b62830 100644 --- a/benches/overhead.rs +++ b/benches/overhead.rs @@ -46,7 +46,7 @@ fn no_overhead(bencher: Bencher, nodes: (usize, usize)) { } let tree = Node::tree(nodes.0); - let mut thread_pool = ThreadPool::new().unwrap(); + let thread_pool = ThreadPool::new().unwrap(); let mut scope = thread_pool.scope(); bencher.bench_local(move || { @@ -66,7 +66,7 @@ fn chili_overhead(bencher: Bencher, nodes: (usize, usize)) { } let tree = Node::tree(nodes.0); - let mut thread_pool = ThreadPool::new().unwrap(); + let thread_pool = ThreadPool::new().unwrap(); let mut scope = thread_pool.scope(); bencher.bench_local(move || { diff --git a/src/job.rs b/src/job.rs index 38726b8..292fa2c 100644 --- a/src/job.rs +++ b/src/job.rs @@ -41,6 +41,7 @@ impl Future { match result { Ok(_) => { + // SAFETY: // Lock is acquired, only we are accessing `self.waiting_thread`. unsafe { *self.waiting_thread.get() = Some(thread::current()) }; @@ -52,6 +53,7 @@ impl Future { continue; } Err(state) if state == Poll::Ready as u8 => { + // SAFETY: // `state` is `Poll::Ready` only after `Self::complete` // releases the lock. // @@ -83,11 +85,13 @@ impl Future { } } + // SAFETY: // Lock is acquired, only we are accessing `self.val`. unsafe { *self.val.get() = Some(val); } + // SAFETY: // Lock is acquired, only we are accessing `self.waiting_thread`. if let Some(thread) = unsafe { (*self.waiting_thread.get()).take() } { thread.unpark(); @@ -110,8 +114,10 @@ impl JobStack { } } + /// SAFETY: /// It should only be called once. pub unsafe fn take_once(&self) -> F { + // SAFETY: // No `Job` has has been executed, therefore `self.f` has not yet been // `take`n. unsafe { ManuallyDrop::take(&mut *self.f.get()) } @@ -136,6 +142,7 @@ impl Job { F: FnOnce(&mut Scope<'_>) -> T + Send, T: Send, { + /// SAFETY: /// It should only be called while the `stack` is still alive. unsafe fn harness( scope: &mut Scope<'_>, @@ -145,12 +152,15 @@ impl Job { F: FnOnce(&mut Scope<'_>) -> T + Send, T: Send, { + // SAFETY: // The `stack` is still alive. let stack: &JobStack = unsafe { stack.cast().as_ref() }; + // SAFETY: // This is the first call to `take_once` since `Job::execute` // (the only place where this harness is called) is called only // after the job has been popped. let f = unsafe { stack.take_once() }; + // SAFETY: // Before being popped, the `JobQueue` allocates and stores a // `Future` in `self.fur_or_next` that should get passed here. let fut: &Future = unsafe { fut.cast().as_ref() }; @@ -174,11 +184,13 @@ impl Job { self.stack == other.stack } + /// SAFETY: /// It should only be called after being popped from a `JobQueue`. pub unsafe fn poll(&self) -> bool { self.fut_or_next .get() .map(|fut| { + // SAFETY: // Before being popped, the `JobQueue` allocates and stores a // `Future` in `self.fur_or_next` that should get passed here. let fut = unsafe { fut.as_ref() }; @@ -187,12 +199,15 @@ impl Job { .unwrap_or_default() } + /// SAFETY: /// It should only be called after being popped from a `JobQueue`. pub unsafe fn wait(&self) -> Option> { self.fut_or_next.get().and_then(|fut| { + // SAFETY: // Before being popped, the `JobQueue` allocates and stores a // `Future` in `self.fur_or_next` that should get passed here. let result = unsafe { fut.as_ref().wait() }; + // SAFETY: // We only can drop the `Box` *after* waiting on the `Future` // in order to ensure unique access. unsafe { @@ -203,10 +218,12 @@ impl Job { }) } + /// SAFETY: /// It should only be called in the case where the job has been popped /// from the front and will not be `Job::Wait`ed. pub unsafe fn drop(&self) { if let Some(fut) = self.fut_or_next.get() { + // SAFETY: // Before being popped, the `JobQueue` allocates and store a // `Future` in `self.fur_or_next` that should get passed here. unsafe { @@ -217,9 +234,11 @@ impl Job { } impl Job { + /// SAFETY: /// It should only be called while the `JobStack` it was created with is /// still alive and after being popped from a `JobQueue`. pub unsafe fn execute(&self, scope: &mut Scope<'_>) { + // SAFETY: // Before being popped, the `JobQueue` allocates and store a // `Future` in `self.fur_or_next` that should get passed here. unsafe { @@ -228,6 +247,10 @@ impl Job { } } +// SAFETY: +// The job's `stack` will only be accessed after acquiring a lock (in +// `Future`), while `prev` and `fut_or_next` are never accessed after being +// sent across threads. unsafe impl Send for Job {} #[derive(Debug)] @@ -257,6 +280,7 @@ impl Default for JobQueue { impl Drop for JobQueue { fn drop(&mut self) { + // SAFETY: // `self.sentinel` never gets written over, so it contains the original // `leak`ed `Box` that gets allocated in `JobQueue::default`. unsafe { @@ -270,13 +294,16 @@ impl JobQueue { self.len } + /// SAFETY: /// Any `Job` pushed onto the queue should alive at least until it gets /// popped. pub unsafe fn push_back(&mut self, job: &Job) { + // SAFETY: // The tail can either be the root `Box::leak`ed in the default // constructor or a `Job` that has been pushed previously and which is // still alive. let current_tail = unsafe { self.tail.as_ref() }; + // SAFETY: // This effectively casts the `Job`'s `fut_or_next` from `Future` to // `Future<()>` which casts the `Future`'s `Box` to a `Box<()>`. // @@ -294,13 +321,16 @@ impl JobQueue { self.tail = next_tail.into(); } + /// SAFETY: /// The last `Job` in the queue must still be alive. pub unsafe fn pop_back(&mut self) { + // SAFETY: // The tail can either be the root `Box::leak`ed in the default // constructor or a `Job` that has been pushed previously and which is // still alive. let current_tail = unsafe { self.tail.as_ref() }; if let Some(prev_tail) = current_tail.prev.get() { + // SAFETY: // `Job`'s `prev` pointer can only be set by `JobQueue::push_back` // to the previous tail which should still be alive or by // `JobQueue::pop_front` when it's set to `self.sentinel` which is @@ -316,12 +346,15 @@ impl JobQueue { } } + /// SAFETY: /// The first `Job` in the queue must still be alive. pub unsafe fn pop_front(&mut self) -> Option { + // SAFETY: // `self.sentinel` is alive for the entirety of `self`. let sentinel = unsafe { self.sentinel.as_ref() }; sentinel.fut_or_next.get().map(|next| { + // SAFETY: // `self.sentinel`'s `fut_or_next` pointer can only be set by // `JobQueue::push_back` or by `JobQueue::pop_front` when it's set // to a job that was previous set by `JobQueue::push_back` and @@ -331,6 +364,7 @@ impl JobQueue { if let Some(next) = head.fut_or_next.get() { sentinel.fut_or_next.set(Some(next.cast())); + // SAFETY: // `Job`'s `fut_or_next` pointer can only be set by // `JobQueue::push_back` or by `JobQueue::pop_front` when it's set // to a job that was previous set by `JobQueue::push_back` and @@ -345,6 +379,7 @@ impl JobQueue { self.tail = sentinel.into(); } + // SAFETY: // `self.sentinel`'s `fut_or_next` pointer can only be set by // `JobQueue::push_back` or by `JobQueue::pop_front` when it's set // to a job that was previous set by `JobQueue::push_back` and @@ -357,6 +392,7 @@ impl JobQueue { self.len -= 1; + // SAFETY: // `self.sentinel`'s `fut_or_next` pointer can only be set by // `JobQueue::push_back` or by `JobQueue::pop_front` when it's set // to a job that was previous set by `JobQueue::push_back` and diff --git a/src/lib.rs b/src/lib.rs index 069efd0..823ee83 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ #![deny(missing_docs)] #![deny(unsafe_op_in_unsafe_fn)] +#![deny(clippy::undocumented_unsafe_blocks)] //! # chili. Rust port of [Spice], a low-overhead parallelization library //! @@ -50,30 +51,54 @@ //! ``` use std::{ - collections::{btree_map::Entry, BTreeMap}, + cell::Cell, + collections::{btree_map::Entry, BTreeMap, HashMap}, num::NonZero, ops::{Deref, DerefMut}, panic, sync::{ atomic::{AtomicBool, Ordering}, - Arc, Barrier, Condvar, Mutex, + Arc, Barrier, Condvar, Mutex, Weak, }, thread::{self, JoinHandle}, - time::Duration, + time::{Duration, Instant}, }; mod job; use job::{Job, JobQueue, JobStack}; +#[derive(Debug)] +struct Heartbeat { + is_set: Weak, + last_heartbeat: Cell, +} + #[derive(Debug, Default)] struct LockContext { time: u64, is_stopping: bool, shared_jobs: BTreeMap, + heartbeats: HashMap, + heartbeat_index: u64, } impl LockContext { + pub fn new_heartbeat(&mut self) -> Arc { + let is_set = Arc::new(AtomicBool::new(true)); + let heartbeat = Heartbeat { + is_set: Arc::downgrade(&is_set), + last_heartbeat: Cell::new(Instant::now()), + }; + + let i = self.heartbeat_index; + self.heartbeats.insert(i, heartbeat); + + self.heartbeat_index = i.checked_add(1).unwrap(); + + is_set + } + pub fn pop_earliest_shared_job(&mut self) -> Option { self.shared_jobs .pop_first() @@ -85,13 +110,14 @@ impl LockContext { struct Context { lock: Mutex, job_is_ready: Condvar, - heartbeats: Vec, } -fn execute_worker(context: Arc, worker_index: usize, barrier: Arc) -> Option<()> { +fn execute_worker(context: Arc, barrier: Arc) -> Option<()> { let mut first_run = true; let mut job_queue = JobQueue::default(); + let mut scope = Scope::new_from_worker(context.clone(), &mut job_queue); + loop { let job = { let mut lock = context.lock.lock().unwrap(); @@ -99,14 +125,11 @@ fn execute_worker(context: Arc, worker_index: usize, barrier: Arc, worker_index: usize, barrier: Arc, heartbeat_interval: Duration) -> Option<()> { - let interval_between_workers = heartbeat_interval / context.heartbeats.len() as u32; - - let mut i = 0; loop { - if context.lock.lock().ok()?.is_stopping { - break; - } + let interval_between_workers = { + let mut lock = context.lock.lock().ok()?; - context.heartbeats.get(i)?.store(true, Ordering::Relaxed); + if lock.is_stopping { + break; + } - i = (i + 1) % context.heartbeats.len(); + let now = Instant::now(); + lock.heartbeats.retain(|_, h| { + h.is_set + .upgrade() + .inspect(|is_set| { + if now.duration_since(h.last_heartbeat.get()) >= heartbeat_interval { + is_set.store(true, Ordering::Relaxed); + h.last_heartbeat.set(now); + } + }) + .is_some() + }); + + heartbeat_interval / lock.heartbeats.len() as u32 + }; thread::sleep(interval_between_workers); } @@ -188,48 +223,49 @@ impl DerefMut for ThreadJobQueue<'_> { #[derive(Debug)] pub struct Scope<'s> { context: Arc, - worker_index: usize, job_queue: ThreadJobQueue<'s>, + heartbeat: Arc, join_count: u8, } impl<'s> Scope<'s> { - fn new_from_thread_pool(thread_pool: &'s mut ThreadPool) -> Self { - let worker_index = thread_pool.context.heartbeats.len() - 1; - - thread_pool.context.heartbeats[worker_index].store(true, Ordering::Relaxed); + fn new_from_thread_pool(thread_pool: &'s ThreadPool) -> Self { + let heartbeat = thread_pool.context.lock.lock().unwrap().new_heartbeat(); Self { context: thread_pool.context.clone(), - worker_index, job_queue: ThreadJobQueue::Current(JobQueue::default()), + heartbeat, join_count: 0, } } - fn new_from_worker( - context: Arc, - worker_index: usize, - job_queue: &'s mut JobQueue, - ) -> Self { + fn new_from_worker(context: Arc, job_queue: &'s mut JobQueue) -> Self { + let heartbeat = context.lock.lock().unwrap().new_heartbeat(); + Self { context, - worker_index, job_queue: ThreadJobQueue::Worker(job_queue), + heartbeat, join_count: 0, } } + fn heartbeat_id(&self) -> usize { + Arc::as_ptr(&self.heartbeat) as usize + } + fn wait_for_sent_job(&mut self, job: &Job) -> Option> { { let mut lock = self.context.lock.lock().unwrap(); if lock .shared_jobs - .get(&self.worker_index) + .get(&self.heartbeat_id()) .map(|(_, shared_job)| job.eq(shared_job)) .is_some() { - if let Some((_, job)) = lock.shared_jobs.remove(&self.worker_index) { + if let Some((_, job)) = lock.shared_jobs.remove(&self.heartbeat_id()) { + // SAFETY: // Since the `Future` has already been allocated when // popping from the queue, the `Job` needs manual dropping. unsafe { @@ -241,6 +277,7 @@ impl<'s> Scope<'s> { } } + // SAFETY: // For this `Job` to have crossed thread borders, it must have been // popped from the `JobQueue` and shared. while !unsafe { job.poll() } { @@ -250,6 +287,7 @@ impl<'s> Scope<'s> { }; if let Some(job) = job { + // SAFETY: // Any `Job` that was shared between threads is waited upon // before the `JobStack` exits scope. unsafe { @@ -260,6 +298,7 @@ impl<'s> Scope<'s> { } } + // SAFETY: // Any `Job` that was shared between threads is waited upon before the // `JobStack` exits scope. unsafe { job.wait() } @@ -270,7 +309,8 @@ impl<'s> Scope<'s> { let mut lock = self.context.lock.lock().unwrap(); let time = lock.time; - if let Entry::Vacant(e) = lock.shared_jobs.entry(self.worker_index) { + if let Entry::Vacant(e) = lock.shared_jobs.entry(self.heartbeat_id()) { + // SAFETY: // Any `Job` previously pushed onto the queue will be waited upon // and will be alive until that point. if let Some(job) = unsafe { self.job_queue.pop_front() } { @@ -281,7 +321,7 @@ impl<'s> Scope<'s> { } } - self.context.heartbeats[self.worker_index].store(false, Ordering::Relaxed); + self.heartbeat.store(false, Ordering::Relaxed); } fn join_seq(&mut self, a: A, b: B) -> (RA, RB) @@ -305,7 +345,7 @@ impl<'s> Scope<'s> { RB: Send, { let a = move |scope: &mut Scope<'_>| { - if scope.context.heartbeats[scope.worker_index].load(Ordering::Relaxed) { + if scope.heartbeat.load(Ordering::Relaxed) { scope.heartbeat(); } @@ -315,6 +355,7 @@ impl<'s> Scope<'s> { let stack = JobStack::new(a); let job = Job::new(&stack); + // SAFETY: // `job` is alive until the end of this scope. unsafe { self.job_queue.push_back(&job); @@ -323,12 +364,14 @@ impl<'s> Scope<'s> { let rb = b(self); if job.is_in_queue() { + // SAFETY: // `job` is alive until the end of this scope and there has been no // other pop up to this point. unsafe { self.job_queue.pop_back(); } + // SAFETY: // Since the `job` was popped from the back of the queue, it cannot // take the closure out of the `JobStack` anymore. // `JobStack::take_once` is thus called only once. @@ -337,6 +380,7 @@ impl<'s> Scope<'s> { let ra = match self.wait_for_sent_job(&job) { Some(Ok(val)) => val, Some(Err(e)) => panic::resume_unwind(e), + // SAFETY: // Since the `job` didn't have the chance to be actually // sent across threads, it cannot take the closure out of the // `JobStack` anymore. `JobStack::take_once` is thus called @@ -483,15 +527,14 @@ impl ThreadPool { let context = Arc::new(Context { lock: Mutex::new(LockContext::default()), job_is_ready: Condvar::new(), - heartbeats: (0..=thread_count).map(|_| AtomicBool::new(true)).collect(), }); let worker_handles = (0..thread_count) - .map(|i| { + .map(|_| { let context = context.clone(); let barrier = worker_barrier.clone(); thread::spawn(move || { - execute_worker(context, i, barrier); + execute_worker(context, barrier); }) }) .collect(); @@ -523,7 +566,7 @@ impl ThreadPool { /// /// assert_eq!(vals, [1; 2]); /// ``` - pub fn scope(&mut self) -> Scope<'_> { + pub fn scope(&self) -> Scope<'_> { Scope::new_from_thread_pool(self) } } @@ -549,6 +592,8 @@ impl Drop for ThreadPool { #[cfg(test)] mod tests { + use std::sync::atomic::AtomicU8; + use super::*; use thread::ThreadId; @@ -560,7 +605,7 @@ mod tests { #[test] fn join_basic() { - let mut threat_pool = ThreadPool::new().unwrap(); + let threat_pool = ThreadPool::new().unwrap(); let mut scope = threat_pool.scope(); let mut a = 0; @@ -573,7 +618,7 @@ mod tests { #[test] fn join_long() { - let mut threat_pool = ThreadPool::new().unwrap(); + let threat_pool = ThreadPool::new().unwrap(); fn increment(s: &mut Scope, slice: &mut [u32]) { match slice.len() { @@ -596,7 +641,7 @@ mod tests { #[test] fn join_very_long() { - let mut threat_pool = ThreadPool::new().unwrap(); + let threat_pool = ThreadPool::new().unwrap(); fn increment(s: &mut Scope, slice: &mut [u32]) { match slice.len() { @@ -620,7 +665,7 @@ mod tests { #[test] fn join_wait() { - let mut threat_pool = ThreadPool::with_config(Config { + let threat_pool = ThreadPool::with_config(Config { thread_count: Some(2), heartbeat_interval: Duration::from_micros(1), ..Default::default() @@ -655,7 +700,7 @@ mod tests { #[test] #[should_panic(expected = "panicked across threads")] fn join_panic() { - let mut threat_pool = ThreadPool::with_config(Config { + let threat_pool = ThreadPool::with_config(Config { thread_count: Some(2), heartbeat_interval: Duration::from_micros(1), }) @@ -710,4 +755,34 @@ mod tests { panic!("panicked across threads"); } } + + #[test] + fn concurrent_scopes() { + const NUM_THREADS: u8 = 128; + let threat_pool = ThreadPool::with_config(Config { + thread_count: Some(4), + ..Default::default() + }) + .unwrap(); + + let a = AtomicU8::new(0); + let b = AtomicU8::new(0); + + thread::scope(|s| { + for _ in 0..NUM_THREADS { + s.spawn(|| { + let mut scope = threat_pool.scope(); + scope.join( + |_| a.fetch_add(1, Ordering::Relaxed), + |_| b.fetch_add(1, Ordering::Relaxed), + ); + }); + } + }); + + assert_eq!(a.load(Ordering::Relaxed), NUM_THREADS); + assert_eq!(b.load(Ordering::Relaxed), NUM_THREADS); + + assert_eq!(threat_pool.context.lock.lock().unwrap().heartbeats.len(), 4); + } }