From 1f79a61d501ffa8d0e5bb326f89f942c774de5a4 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Wed, 3 Apr 2024 16:31:45 +0200 Subject: [PATCH] better faster stronger reentrancy support --- crates/re_query_cache2/src/range/query.rs | 1 - crates/re_query_cache2/src/range/results.rs | 155 ++++++++++++-------- 2 files changed, 91 insertions(+), 65 deletions(-) diff --git a/crates/re_query_cache2/src/range/query.rs b/crates/re_query_cache2/src/range/query.rs index 78e30cc9ad18c..206759cff861e 100644 --- a/crates/re_query_cache2/src/range/query.rs +++ b/crates/re_query_cache2/src/range/query.rs @@ -163,7 +163,6 @@ impl RangeCache { pending_invalidation: _, } = self; - // A plain old `write()` (as opposed to a `try_write()`) here _should_ be fine. let mut per_data_time = per_data_time.write(); if let Some(query_front) = per_data_time.compute_front_query(query) { diff --git a/crates/re_query_cache2/src/range/results.rs b/crates/re_query_cache2/src/range/results.rs index 4ecf08b1b5673..09bb4a9a83772 100644 --- a/crates/re_query_cache2/src/range/results.rs +++ b/crates/re_query_cache2/src/range/results.rs @@ -1,4 +1,5 @@ use std::{ + cell::RefCell, collections::VecDeque, ops::Range, sync::{Arc, OnceLock}, @@ -146,6 +147,20 @@ pub struct CachedRangeData<'a, T> { front_status: (TimeInt, PromiseResult<()>), back_status: (TimeInt, PromiseResult<()>), + + /// Keeps track of reentrancy counts for the current thread. + /// + /// Used to detect and prevent potential deadlocks when using the cached APIs in work-stealing + /// environments such as Rayon. + reentering: &'static std::thread::LocalKey>, +} + +impl<'a, T> Drop for CachedRangeData<'a, T> { + #[inline] + fn drop(&mut self) { + self.reentering + .with_borrow_mut(|reentering| *reentering = reentering.saturating_sub(1)); + } } impl<'a, T> CachedRangeData<'a, T> { @@ -244,44 +259,49 @@ impl CachedRangeComponentResults { pub fn to_dense(&self, resolver: &PromiseResolver) -> CachedRangeData<'_, C> { // --- Step 1: try and upsert pending data (write lock) --- - // # Multithreading semantics - // - // There is only one situation where this `try_write()` might fail: there is another task that - // is already in the process of upserting that specific cache (e.g. a cloned space view). - // - // That task might be on the same thread (due to work-stealing), or a different one. - // Either way, we need to give up trying to upsert the cache in order to prevent a - // deadlock in case the other task is in fact running on the same thread. - // - // It's fine, though: - // - Best case scenario, the data we need is already cached. - // - Worst case scenario, the data will be missing for this current frame but is guaranteed - // to be there for the next. - // - // Data invalidation happens at the per-archetype cache layer, so this won't return - // out-of-date data in either scenario. - // - // There is a lot of complexity we could add to make this whole process more efficient: - // keep track of failed queries in a queue so we don't rely on probabilities, keep track - // of the thread-local reentrancy state to skip this logic when it's not needed, return raw - // data when the lock is busy and the data isn't already cached, etc. - // - // In the end, this is a edge-case inherent to our current "immediate query" model that we - // already know we want -- and have to -- move away from: the extra complexity isn't worth it. - let mut results = self.0.try_write(); + thread_local! { + /// Keeps track of reentrancy counts for the current thread. + /// + /// Used to detect and prevent potential deadlocks when using the cached APIs in work-stealing + /// environments such as Rayon. + static REENTERING: RefCell = const { RefCell::new(0) }; + } + + REENTERING.with_borrow_mut(|reentering| *reentering = reentering.saturating_add(1)); + + let mut results = if let Some(results) = self.0.try_write() { + // The lock was free to grab, nothing else to worry about. + Some(results) + } else { + REENTERING.with_borrow_mut(|reentering| { + if *reentering > 1 { + // The lock is busy, and at least one of the lock holders is the current thread from a + // previous stack frame. + // + // Return `None` so that we skip straight to the read-only part of the operation. + // All the data will be there already, since the previous stack frame already + // took care of upserting it. + None + } else { + // The lock is busy, but it is not held by the current thread. + // Just block untils it gets released. + Some(self.0.write()) + } + }) + }; if let Some(results) = &mut results { + // NOTE: This is just a lazy initialization of the underlying deque, because we + // just now finally know the expected type! + if results.cached_dense.is_none() { + results.cached_dense = Some(Box::new(FlatVecDeque::::new())); + } + if results.cached_sparse.is_some() { re_log::error!( "a component cannot be both dense and sparse -- try `to_sparse()` instead" ); } else { - // NOTE: This is just a lazy initialization of the underlying deque, because we - // just now finally know the expected type! - if results.cached_dense.is_none() { - results.cached_dense = Some(Box::new(FlatVecDeque::::new())); - } - if !results.promises_front.is_empty() { let mut resolved_indices = Vec::with_capacity(results.promises_front.len()); let mut resolved_data = Vec::with_capacity(results.promises_front.len()); @@ -411,7 +431,7 @@ impl CachedRangeComponentResults { // work-stealing thread-pool and might swap a task on one thread with another task on the // same thread, where both tasks happen to query the same exact data (e.g. cloned space views). // - // See comment above for more details. + // See `REENTERING` comments above for more details. self.read_recursive() }; @@ -439,6 +459,7 @@ impl CachedRangeComponentResults { data, front_status, back_status, + reentering: &REENTERING, } } @@ -458,44 +479,49 @@ impl CachedRangeComponentResults { ) -> CachedRangeData<'_, Option> { // --- Step 1: try and upsert pending data (write lock) --- - // # Multithreading semantics - // - // There is only one situation where this `try_write()` might fail: there is another task that - // is already in the process of upserting that specific cache (e.g. a cloned space view). - // - // That task might be on the same thread (due to work-stealing), or a different one. - // Either way, we need to give up trying to upsert the cache in order to prevent a - // deadlock in case the other task is in fact running on the same thread. - // - // It's fine, though: - // - Best case scenario, the data we need is already cached. - // - Worst case scenario, the data will be missing for this current frame but is guaranteed - // to be there for the next. - // - // Data invalidation happens at the per-archetype cache layer, so this won't return - // out-of-date data in either scenario. - // - // There is a lot of complexity we could add to make this whole process more efficient: - // keep track of failed queries in a queue so we don't rely on probabilities, keep track - // of the thread-local reentrancy state to skip this logic when it's not needed, return raw - // data when the lock is busy and the data isn't already cached, etc. - // - // In the end, this is a edge-case inherent to our current "immediate query" model that we - // already know we want -- and have to -- move away from: the extra complexity isn't worth it. - let mut results = self.0.try_write(); + thread_local! { + /// Keeps track of reentrancy counts for the current thread. + /// + /// Used to detect and prevent potential deadlocks when using the cached APIs in work-stealing + /// environments such as Rayon. + static REENTERING: RefCell = const { RefCell::new(0) }; + } + + REENTERING.with_borrow_mut(|reentering| *reentering = reentering.saturating_add(1)); + + let mut results = if let Some(results) = self.0.try_write() { + // The lock was free to grab, nothing else to worry about. + Some(results) + } else { + REENTERING.with_borrow_mut(|reentering| { + if *reentering > 1 { + // The lock is busy, and at least one of the lock holders is the current thread from a + // previous stack frame. + // + // Return `None` so that we skip straight to the read-only part of the operation. + // All the data will be there already, since the previous stack frame already + // took care of upserting it. + None + } else { + // The lock is busy, but it is not held by the current thread. + // Just block untils it gets released. + Some(self.0.write()) + } + }) + }; if let Some(results) = &mut results { + // NOTE: This is just a lazy initialization of the underlying deque, because we + // just now finally know the expected type! + if results.cached_sparse.is_none() { + results.cached_sparse = Some(Box::new(FlatVecDeque::>::new())); + } + if results.cached_dense.is_some() { re_log::error!( "a component cannot be both dense and sparse -- try `to_dense()` instead" ); } else { - // NOTE: This is just a lazy initialization of the underlying deque, because we - // just now finally know the expected type! - if results.cached_sparse.is_none() { - results.cached_sparse = Some(Box::new(FlatVecDeque::>::new())); - } - if !results.promises_front.is_empty() { let mut resolved_indices = Vec::with_capacity(results.promises_front.len()); let mut resolved_data = Vec::with_capacity(results.promises_front.len()); @@ -625,7 +651,7 @@ impl CachedRangeComponentResults { // work-stealing thread-pool and might swap a task on one thread with another task on the // same thread, where both tasks happen to query the same exact data (e.g. cloned space views). // - // See comment above for more details. + // See `REENTERING` comments above for more details. self.read_recursive() }; @@ -653,6 +679,7 @@ impl CachedRangeComponentResults { data, front_status, back_status, + reentering: &REENTERING, } } }