Skip to content

Commit

Permalink
implement some extra abstraction dark magic
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Apr 25, 2024
1 parent 56c3963 commit c39d2ce
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 51 deletions.
6 changes: 3 additions & 3 deletions crates/re_query_cache2/src/latest_at/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ pub struct CachedLatestAtComponentResults {
pub(crate) promise: Option<Promise>,

/// The resolved, converted, deserialized dense data.
pub(crate) cached_dense: OnceLock<Box<dyn ErasedFlatVecDeque + Send + Sync>>,
pub(crate) cached_dense: OnceLock<Arc<dyn ErasedFlatVecDeque + Send + Sync>>,
}

impl CachedLatestAtComponentResults {
Expand Down Expand Up @@ -274,9 +274,9 @@ impl CachedLatestAtComponentResults {
.map_err(|err| DeserializationError::DataCellError(err.to_string()))?;

#[allow(clippy::borrowed_box)]
let cached: &Box<dyn ErasedFlatVecDeque + Send + Sync> = self
let cached: &Arc<dyn ErasedFlatVecDeque + Send + Sync> = self
.cached_dense
.get_or_init(move || Box::new(FlatVecDeque::from(data)));
.get_or_init(move || Arc::new(FlatVecDeque::from(data)));

downcast(&**cached)
}
Expand Down
103 changes: 87 additions & 16 deletions crates/re_query_cache2/src/range/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use re_data_store::RangeQuery;
use re_log_types::{RowId, TimeInt, TimeRange};
use re_types_core::{Component, ComponentName, DeserializationError, SizeBytes};

use crate::{ErasedFlatVecDeque, FlatVecDeque, Promise, PromiseResolver, PromiseResult};
use crate::{
CachedLatestAtComponentResults, ErasedFlatVecDeque, FlatVecDeque, Promise, PromiseResolver,
PromiseResult,
};

// ---

Expand Down Expand Up @@ -98,6 +101,14 @@ impl CachedRangeResults {

// ---

thread_local! {
/// Keeps track of reentrancy counts for the current thread.
///
/// Used to detect and prevent potential deadlocks when using the cached APIs in work-stealing
/// environments such as Rayon.
static REENTERING: RefCell<u32> = const { RefCell::new(0) };
}

/// Lazily cached results for a particular component when using a cached range query.
#[derive(Debug)]
pub struct CachedRangeComponentResults {
Expand Down Expand Up @@ -156,16 +167,57 @@ impl std::ops::Deref for CachedRangeComponentResults {
}
}

/// Helper datastructure to make it possible to convert latest-at results into ranged results.
#[derive(Debug)]
enum Indices<'a> {
Owned(VecDeque<(TimeInt, RowId)>),
Cached(MappedRwLockReadGuard<'a, VecDeque<(TimeInt, RowId)>>),
}

impl<'a> std::ops::Deref for Indices<'a> {
type Target = VecDeque<(TimeInt, RowId)>;

#[inline]
fn deref(&self) -> &Self::Target {
match self {
Indices::Owned(data) => data,
Indices::Cached(data) => data,
}
}
}

/// Helper datastructure to make it possible to convert latest-at results into ranged results.
enum Data<'a, T> {
Owned(Arc<dyn ErasedFlatVecDeque + Send + Sync>),
Cached(MappedRwLockReadGuard<'a, FlatVecDeque<T>>),
}

impl<'a, T: 'static> std::ops::Deref for Data<'a, T> {
type Target = FlatVecDeque<T>;

#[inline]
fn deref(&self) -> &Self::Target {
match self {
Data::Owned(data) => {
// Unwrap: only way to instantiate a `Data` is via the `From` impl below which we
// fully control.
data.as_any().downcast_ref().unwrap()
}
Data::Cached(data) => data,
}
}
}

pub struct CachedRangeData<'a, T> {
// NOTE: Options so we can represent an empty result without having to somehow conjure a mutex
// guard out of thin air.
//
// TODO(Amanieu/parking_lot#289): we need two distinct mapped guards because it's
// impossible to return an owned type in a `parking_lot` guard.
// See <https://github.com/Amanieu/parking_lot/issues/289#issuecomment-1827545967>.
indices: Option<MappedRwLockReadGuard<'a, VecDeque<(TimeInt, RowId)>>>,
data: Option<MappedRwLockReadGuard<'a, FlatVecDeque<T>>>,
// indices: Option<MappedRwLockReadGuard<'a, VecDeque<(TimeInt, RowId)>>>,
indices: Option<Indices<'a>>,
data: Option<Data<'a, T>>,

time_range: TimeRange,
front_status: PromiseResult<()>,
Expand All @@ -178,6 +230,32 @@ pub struct CachedRangeData<'a, T> {
reentering: &'static std::thread::LocalKey<RefCell<u32>>,
}

impl<'a, C: Component> CachedRangeData<'a, C> {
/// Useful to abstract over latest-at and ranged results.
#[inline]
pub fn from_latest_at(
resolver: &PromiseResolver,
results: &'a CachedLatestAtComponentResults,
) -> Self {
let CachedLatestAtComponentResults {
index,
promise: _,
cached_dense,
} = results;

let status = results.to_dense::<C>(resolver).map(|_| ());

Self {
indices: Some(Indices::Owned(vec![*index].into())),
data: cached_dense.get().map(|data| Data::Owned(Arc::clone(data))),
time_range: TimeRange::new(index.0, index.0),
front_status: status.clone(),
back_status: status,
reentering: &REENTERING,
}
}
}

impl<'a, T> Drop for CachedRangeData<'a, T> {
#[inline]
fn drop(&mut self) {
Expand All @@ -186,7 +264,7 @@ impl<'a, T> Drop for CachedRangeData<'a, T> {
}
}

impl<'a, T> CachedRangeData<'a, T> {
impl<'a, T: 'static> CachedRangeData<'a, T> {
/// Returns the current status on both ends of the range.
///
/// E.g. it is possible that the front-side of the range is still waiting for pending data while
Expand All @@ -201,10 +279,11 @@ impl<'a, T> CachedRangeData<'a, T> {
&self,
entry_range: Range<usize>,
) -> impl Iterator<Item = &(TimeInt, RowId)> {
match self.indices.as_ref() {
let indices = match self.indices.as_ref() {
Some(indices) => itertools::Either::Left(indices.range(entry_range)),
None => itertools::Either::Right(std::iter::empty()),
}
};
indices
}

#[inline]
Expand Down Expand Up @@ -278,14 +357,6 @@ impl CachedRangeComponentResults {

// --- Step 1: try and upsert pending data (write lock) ---

thread_local! {
/// Keeps track of reentrancy counts for the current thread.
///
/// Used to detect and prevent potential deadlocks when using the cached APIs in work-stealing
/// environments such as Rayon.
static REENTERING: RefCell<u32> = const { RefCell::new(0) };
}

REENTERING.with_borrow_mut(|reentering| *reentering = reentering.saturating_add(1));

// Manufactured empty result.
Expand Down Expand Up @@ -514,8 +585,8 @@ impl CachedRangeComponentResults {
});

CachedRangeData {
indices: Some(indices),
data: Some(data),
indices: Some(Indices::Cached(indices)),
data: Some(Data::Cached(data)),
time_range: self.time_range,
front_status,
back_status,
Expand Down
103 changes: 71 additions & 32 deletions crates/re_space_view_spatial/src/visualizers/results_ext.rs
Original file line number Diff line number Diff line change
@@ -1,74 +1,112 @@
use re_query_cache2::{
CachedLatestAtResults, CachedRangeData, CachedRangeResults, PromiseResolver, PromiseResult,
CachedLatestAtResults, CachedRangeData, CachedRangeResults, CachedResults, PromiseResolver,
PromiseResult,
};
use re_types::Component;

// ---

pub trait CachedLatestAtResultsExt {
/// Extension traits to abstract query result handling for all spatial space views.
///
/// Also turns all results into range results, so that views only have to worry about the ranged
/// case.
pub trait CachedRangeResultsExt {
fn get_dense<'a, C: Component>(
&'a self,
resolver: &PromiseResolver,
) -> Option<re_query_cache2::Result<&'a [C]>>;
) -> Option<re_query_cache2::Result<CachedRangeData<'a, C>>>;

fn get_or_empty_dense<'a, C: Component>(
&'a self,
resolver: &PromiseResolver,
) -> re_query_cache2::Result<&'a [C]>;
) -> re_query_cache2::Result<CachedRangeData<'a, C>>;
}

impl CachedLatestAtResultsExt for CachedLatestAtResults {
#[inline]
impl CachedRangeResultsExt for CachedResults {
fn get_dense<'a, C: Component>(
&'a self,
resolver: &PromiseResolver,
) -> Option<re_query_cache2::Result<&'a [C]>> {
let results = self.get(C::name())?;
// TODO(#5607): what should happen if the promise is still pending?
Some(match results.to_dense(resolver).flatten() {
PromiseResult::Pending => Ok(&[]),
PromiseResult::Error(err) => Err(re_query_cache2::QueryError::Other(err.into())),
PromiseResult::Ready(data) => Ok(data),
})
) -> Option<re_query_cache2::Result<CachedRangeData<'a, C>>> {
match self {
CachedResults::LatestAt(_, results) => results.get_dense(resolver),
CachedResults::Range(_, results) => results.get_dense(resolver),
}
}

#[inline]
fn get_or_empty_dense<'a, C: Component>(
&'a self,
resolver: &PromiseResolver,
) -> re_query_cache2::Result<&'a [C]> {
let results = self.get_or_empty(C::name());
// TODO(#5607): what should happen if the promise is still pending?
match results.to_dense(resolver).flatten() {
PromiseResult::Pending => Ok(&[]),
PromiseResult::Error(err) => Err(re_query_cache2::QueryError::Other(err.into())),
PromiseResult::Ready(data) => Ok(data),
) -> re_query_cache2::Result<CachedRangeData<'a, C>> {
match self {
CachedResults::LatestAt(_, results) => results.get_or_empty_dense(resolver),
CachedResults::Range(_, results) => results.get_or_empty_dense(resolver),
}
}
}

pub trait CachedRangeResultsExt {
impl CachedRangeResultsExt for CachedRangeResults {
#[inline]
fn get_dense<'a, C: Component>(
&'a self,
resolver: &PromiseResolver,
) -> Option<re_query_cache2::Result<CachedRangeData<'a, C>>>;
) -> Option<re_query_cache2::Result<CachedRangeData<'a, C>>> {
let results = self.get(C::name())?.to_dense(resolver);

// TODO(#5607): what should happen if the promise is still pending?
let (front_status, back_status) = results.status();
match front_status {
PromiseResult::Error(err) => {
return Some(Err(re_query_cache2::QueryError::Other(err.into())))
}
PromiseResult::Pending | PromiseResult::Ready(_) => {}
}
match back_status {
PromiseResult::Error(err) => {
return Some(Err(re_query_cache2::QueryError::Other(err.into())))
}
PromiseResult::Pending | PromiseResult::Ready(_) => {}
}

Some(Ok(results))
}

#[inline]
fn get_or_empty_dense<'a, C: Component>(
&'a self,
resolver: &PromiseResolver,
) -> re_query_cache2::Result<CachedRangeData<'a, C>>;
) -> re_query_cache2::Result<CachedRangeData<'a, C>> {
let results = self.get_or_empty(C::name()).to_dense(resolver);

// TODO(#5607): what should happen if the promise is still pending?
let (front_status, back_status) = results.status();
match front_status {
PromiseResult::Error(err) => {
return Err(re_query_cache2::QueryError::Other(err.into()))
}
PromiseResult::Pending | PromiseResult::Ready(_) => {}
}
match back_status {
PromiseResult::Error(err) => {
return Err(re_query_cache2::QueryError::Other(err.into()))
}
PromiseResult::Pending | PromiseResult::Ready(_) => {}
}

Ok(results)
}
}

impl CachedRangeResultsExt for CachedRangeResults {
impl CachedRangeResultsExt for CachedLatestAtResults {
#[inline]
fn get_dense<'a, C: Component>(
&'a self,
resolver: &PromiseResolver,
) -> Option<re_query_cache2::Result<CachedRangeData<'a, C>>> {
let results = self.get(C::name())?.to_dense(resolver);
let results = self.get(C::name())?;
let data = CachedRangeData::from_latest_at(resolver, results);

// TODO(#5607): what should happen if the promise is still pending?
let (front_status, back_status) = results.status();
let (front_status, back_status) = data.status();
match front_status {
PromiseResult::Error(err) => {
return Some(Err(re_query_cache2::QueryError::Other(err.into())))
Expand All @@ -82,18 +120,19 @@ impl CachedRangeResultsExt for CachedRangeResults {
PromiseResult::Pending | PromiseResult::Ready(_) => {}
}

Some(Ok(results))
Some(Ok(data))
}

#[inline]
fn get_or_empty_dense<'a, C: Component>(
&'a self,
resolver: &PromiseResolver,
) -> re_query_cache2::Result<CachedRangeData<'a, C>> {
let results = self.get_or_empty(C::name()).to_dense(resolver);
let results = self.get_or_empty(C::name());
let data = CachedRangeData::from_latest_at(resolver, results);

// TODO(#5607): what should happen if the promise is still pending?
let (front_status, back_status) = results.status();
let (front_status, back_status) = data.status();
match front_status {
PromiseResult::Error(err) => {
return Err(re_query_cache2::QueryError::Other(err.into()))
Expand All @@ -107,6 +146,6 @@ impl CachedRangeResultsExt for CachedRangeResults {
PromiseResult::Pending | PromiseResult::Ready(_) => {}
}

Ok(results)
Ok(data)
}
}

0 comments on commit c39d2ce

Please sign in to comment.