diff --git a/Cargo.lock b/Cargo.lock index 3175708dc900..da19fba00f29 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4547,6 +4547,7 @@ dependencies = [ "backtrace", "criterion", "indent", + "indexmap 2.1.0", "itertools 0.12.0", "mimalloc", "nohash-hasher", diff --git a/crates/re_data_store/src/store_read.rs b/crates/re_data_store/src/store_read.rs index 6700163ff668..ad83948579e4 100644 --- a/crates/re_data_store/src/store_read.rs +++ b/crates/re_data_store/src/store_read.rs @@ -87,6 +87,24 @@ impl RangeQuery { pub const fn new(timeline: Timeline, range: TimeRange) -> Self { Self { timeline, range } } + + #[inline] + pub const fn everything(timeline: Timeline) -> Self { + Self { + timeline, + range: TimeRange::EVERYTHING, + } + } + + #[inline] + pub fn timeline(&self) -> Timeline { + self.timeline + } + + #[inline] + pub fn range(&self) -> TimeRange { + self.range + } } // --- Data store --- diff --git a/crates/re_query2/examples/range.rs b/crates/re_query2/examples/range.rs index 5bb5d62503ad..e8c1a5227dc8 100644 --- a/crates/re_query2/examples/range.rs +++ b/crates/re_query2/examples/range.rs @@ -39,11 +39,11 @@ fn main() -> anyhow::Result<()> { // _component batch_ itself (that says nothing about its _instances_!). // // * `get_required` returns an error if the component batch is missing - // * `get_optional` returns an empty set of results if the component if missing + // * `get_or_empty` returns an empty set of results if the component if missing // * `get` returns an option let all_points: &RangeComponentResults = results.get_required(MyPoint::name())?; - let all_colors: &RangeComponentResults = results.get_optional(MyColor::name()); - let all_labels: &RangeComponentResults = results.get_optional(MyLabel::name()); + let all_colors: &RangeComponentResults = results.get_or_empty(MyColor::name()); + let all_labels: &RangeComponentResults = results.get_or_empty(MyLabel::name()); let all_indexed_points = izip!( all_points.iter_indices(), diff --git a/crates/re_query2/src/promise.rs b/crates/re_query2/src/promise.rs index a8f473c725a5..a6234e12f21f 100644 --- a/crates/re_query2/src/promise.rs +++ b/crates/re_query2/src/promise.rs @@ -14,6 +14,13 @@ impl std::fmt::Display for PromiseId { } } +impl re_types_core::SizeBytes for PromiseId { + #[inline] + fn heap_size_bytes(&self) -> u64 { + self.0.heap_size_bytes() + } +} + impl PromiseId { /// Create a new unique [`PromiseId`] based on the current time. #[allow(clippy::new_without_default)] @@ -39,6 +46,14 @@ pub struct Promise { static_assertions::assert_eq_size!(Promise, Option); +impl re_types_core::SizeBytes for Promise { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { id, source } = self; + id.heap_size_bytes() + source.heap_size_bytes() + } +} + impl Promise { #[inline] pub fn new(source: DataCell) -> Self { @@ -75,7 +90,7 @@ impl PromiseResolver { } /// The result of resolving a [`Promise`] through a [`PromiseResolver`]. -#[derive(Clone)] +#[derive(Debug, Clone)] pub enum PromiseResult { /// The resolution process is still in progress. /// diff --git a/crates/re_query2/src/range/results.rs b/crates/re_query2/src/range/results.rs index 883101db0264..2cfe48751a7d 100644 --- a/crates/re_query2/src/range/results.rs +++ b/crates/re_query2/src/range/results.rs @@ -12,7 +12,7 @@ use crate::{Promise, PromiseResolver, PromiseResult}; /// The data is neither deserialized, nor resolved/converted. /// It it the raw [`DataCell`]s, straight from our datastore. /// -/// Use [`RangeResults::get`], [`RangeResults::get_required`] and [`RangeResults::get_optional`] +/// Use [`RangeResults::get`], [`RangeResults::get_required`] and [`RangeResults::get_or_empty`] /// in order to access the raw results for each individual component. #[derive(Default, Debug, Clone)] pub struct RangeResults { @@ -56,7 +56,7 @@ impl RangeResults { /// /// Returns empty results if the component is not present. #[inline] - pub fn get_optional(&self, component_name: impl Into) -> &RangeComponentResults { + pub fn get_or_empty(&self, component_name: impl Into) -> &RangeComponentResults { let component_name = component_name.into(); if let Some(component) = self.components.get(&component_name) { component @@ -79,7 +79,10 @@ impl RangeResults { .map(|(index, cell)| (index, Promise::new(cell))) .unzip(); - let results = RangeComponentResults { indices, cells }; + let results = RangeComponentResults { + indices, + promises: cells, + }; results.sanity_check(); self.components.insert(component_name, results); @@ -92,7 +95,7 @@ impl RangeResults { #[derive(Debug, Clone)] pub struct RangeComponentResults { pub indices: Vec<(TimeInt, RowId)>, - pub cells: Vec, + pub promises: Vec, } impl Default for RangeComponentResults { @@ -107,14 +110,17 @@ impl RangeComponentResults { pub const fn empty() -> Self { Self { indices: Vec::new(), - cells: Vec::new(), + promises: Vec::new(), } } /// No-op in release. #[inline] pub fn sanity_check(&self) { - let Self { indices, cells } = self; + let Self { + indices, + promises: cells, + } = self; if cfg!(debug_assertions) { assert_eq!(indices.len(), cells.len()); } @@ -141,7 +147,7 @@ impl RangeComponentResults { &self, resolver: &PromiseResolver, ) -> Vec>>> { - self.cells + self.promises .iter() .map(|cell| { resolver.resolve(cell).map(|cell| { @@ -173,7 +179,7 @@ impl RangeComponentResults { &self, resolver: &PromiseResolver, ) -> Vec>>>> { - self.cells + self.promises .iter() .map(|cell| { resolver.resolve(cell).map(|cell| { diff --git a/crates/re_query2/tests/range.rs b/crates/re_query2/tests/range.rs index 6f47f1fb1184..a000f588b311 100644 --- a/crates/re_query2/tests/range.rs +++ b/crates/re_query2/tests/range.rs @@ -68,7 +68,7 @@ fn simple_range() -> anyhow::Result<()> { ); let all_points = results.get_required(MyPoint::name())?; - let all_colors = results.get_optional(MyColor::name()); + let all_colors = results.get_or_empty(MyColor::name()); let all_points = izip!( all_points.iter_indices(), @@ -131,7 +131,7 @@ fn simple_range() -> anyhow::Result<()> { ); let all_points = results.get_required(MyPoint::name())?; - let all_colors = results.get_optional(MyColor::name()); + let all_colors = results.get_or_empty(MyColor::name()); let all_points = izip!( all_points.iter_indices(), @@ -313,7 +313,7 @@ fn static_range() -> anyhow::Result<()> { ); let all_points = results.get_required(MyPoint::name())?; - let all_colors = results.get_optional(MyColor::name()); + let all_colors = results.get_or_empty(MyColor::name()); let all_points = izip!( all_points.iter_indices(), @@ -363,7 +363,7 @@ fn static_range() -> anyhow::Result<()> { ); let all_points = results.get_required(MyPoint::name())?; - let all_colors = results.get_optional(MyColor::name()); + let all_colors = results.get_or_empty(MyColor::name()); let all_points = izip!( all_points.iter_indices(), @@ -434,7 +434,7 @@ fn static_range() -> anyhow::Result<()> { ); let all_points = results.get_required(MyPoint::name())?; - let all_colors = results.get_optional(MyColor::name()); + let all_colors = results.get_or_empty(MyColor::name()); let all_points = izip!( all_points.iter_indices(), diff --git a/crates/re_query_cache2/Cargo.toml b/crates/re_query_cache2/Cargo.toml index 61dc907792b3..b8b67820c4ee 100644 --- a/crates/re_query_cache2/Cargo.toml +++ b/crates/re_query_cache2/Cargo.toml @@ -46,6 +46,7 @@ ahash.workspace = true anyhow.workspace = true backtrace.workspace = true indent.workspace = true +indexmap.workspace = true itertools.workspace = true nohash-hasher.workspace = true parking_lot.workspace = true diff --git a/crates/re_query_cache2/examples/latest_at.rs b/crates/re_query_cache2/examples/latest_at.rs index e47f70fa08fa..c283aac83f0f 100644 --- a/crates/re_query_cache2/examples/latest_at.rs +++ b/crates/re_query_cache2/examples/latest_at.rs @@ -51,16 +51,9 @@ fn main() -> anyhow::Result<()> { // Both the resolution and deserialization steps might fail, which is why this returns a `Result>`. // Use `PromiseResult::flatten` to simplify it down to a single result. // - // A choice now has to be made regarding the nullability of the _component batch's instances_. - // Our IDL doesn't support nullable instances at the moment -- so for the foreseeable future you probably - // shouldn't be using anything but `iter_dense`. - // // This is the step at which caching comes into play. - // - // If the data has already been accessed with the same nullability characteristics in the - // past, then this will just grab the pre-deserialized, pre-resolved/pre-converted result from - // the cache. - // + // If the data has already been accessed in the past, then this will just grab the pre-deserialized, + // pre-resolved/pre-converted result from the cache. // Otherwise, this will trigger a deserialization and cache the result for next time. let points = match points.iter_dense::(&resolver).flatten() { @@ -81,12 +74,12 @@ fn main() -> anyhow::Result<()> { PromiseResult::Error(err) => return Err(err.into()), }; - let labels = match labels.iter_sparse::(&resolver).flatten() { + let labels = match labels.iter_dense::(&resolver).flatten() { PromiseResult::Pending => { // Handle the fact that the data isn't ready appropriately. return Ok(()); } - PromiseResult::Ready(data) => data, + PromiseResult::Ready(data) => data.map(Some), PromiseResult::Error(err) => return Err(err.into()), }; diff --git a/crates/re_query_cache2/examples/range.rs b/crates/re_query_cache2/examples/range.rs new file mode 100644 index 000000000000..45d46093beb0 --- /dev/null +++ b/crates/re_query_cache2/examples/range.rs @@ -0,0 +1,151 @@ +use itertools::Itertools; +use re_data_store::{DataStore, RangeQuery}; +use re_log_types::example_components::{MyColor, MyLabel, MyPoint, MyPoints}; +use re_log_types::{build_frame_nr, DataRow, RowId, TimeRange, TimeType, Timeline}; +use re_types_core::{Archetype as _, Loggable as _}; + +use re_query_cache2::{ + clamped_zip_1x2, range_zip_1x2, CachedRangeComponentResults, CachedRangeResults, + PromiseResolver, PromiseResult, +}; + +// --- + +fn main() -> anyhow::Result<()> { + let store = store()?; + eprintln!("store:\n{}", store.to_data_table()?); + + let resolver = PromiseResolver::default(); + + let entity_path = "points"; + let timeline = Timeline::new("frame_nr", TimeType::Sequence); + let query = RangeQuery::new(timeline, TimeRange::EVERYTHING); + eprintln!("query:{query:?}"); + + let caches = re_query_cache2::Caches::new(&store); + + // First, get the raw results for this query. + // + // They might or might not already be cached. We won't know for sure until we try to access + // each individual component's data below. + let results: CachedRangeResults = caches.range( + &store, + &query, + &entity_path.into(), + MyPoints::all_components().iter().copied(), // no generics! + ); + + // Then, grab the results for each individual components. + // * `get_required` returns an error if the component batch is missing + // * `get_or_empty` returns an empty set of results if the component if missing + // * `get` returns an option + // + // At this point we still don't know whether they are cached or not. That's the next step. + let all_points: &CachedRangeComponentResults = results.get_required(MyPoint::name())?; + let all_colors: &CachedRangeComponentResults = results.get_or_empty(MyColor::name()); + let all_labels: &CachedRangeComponentResults = results.get_or_empty(MyLabel::name()); + + // Then comes the time to resolve/convert and deserialize the data. + // These steps have to be done together for efficiency reasons. + // + // That's when caching comes into play. + // If the data has already been accessed in the past, then this will just grab the + // pre-deserialized, pre-resolved/pre-converted result from the cache. + // Otherwise, this will trigger a deserialization and cache the result for next time. + let all_points = all_points.to_dense::(&resolver); + let all_colors = all_colors.to_dense::(&resolver); + let all_labels = all_labels.to_dense::(&resolver); + + // The cache might not have been able to resolve and deserialize the entire dataset across all + // available timestamps. + // + // We can use the following APIs to check the status of the front and back sides of the data range. + // + // E.g. it is possible that the front-side of the range is still waiting for pending data while + // the back-side has been fully loaded. + assert!(matches!( + all_points.status(), + (PromiseResult::Ready(()), PromiseResult::Ready(())) + )); + + // Zip the results together using a stateful time-based join. + let all_frames = range_zip_1x2( + all_points.range_indexed(), + all_colors.range_indexed(), + all_labels.range_indexed(), + ); + + // Then comes the time to resolve/convert and deserialize the data, _for each timestamp_. + // These steps have to be done together for efficiency reasons. + // + // Both the resolution and deserialization steps might fail, which is why this returns a `Result>`. + // Use `PromiseResult::flatten` to simplify it down to a single result. + eprintln!("results:"); + for ((data_time, row_id), points, colors, labels) in all_frames { + let colors = colors.unwrap_or(&[]); + let color_default_fn = || { + static DEFAULT: MyColor = MyColor(0xFF00FFFF); + &DEFAULT + }; + + let labels = labels.unwrap_or(&[]).iter().cloned().map(Some); + let label_default_fn = || None; + + // With the data now fully resolved/converted and deserialized, the joining logic can be + // applied. + // + // In most cases this will be either a clamped zip, or no joining at all. + + let results = clamped_zip_1x2(points, colors, color_default_fn, labels, label_default_fn) + .collect_vec(); + eprintln!("{data_time:?} @ {row_id}:\n {results:?}"); + } + + Ok(()) +} + +// --- + +fn store() -> anyhow::Result { + let mut store = DataStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + re_types::components::InstanceKey::name(), + Default::default(), + ); + + let entity_path = "points"; + + { + let timepoint = [build_frame_nr(123)]; + + let points = vec![MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)]; + let row = DataRow::from_cells1_sized(RowId::new(), entity_path, timepoint, 2, points)?; + store.insert_row(&row)?; + + let colors = vec![MyColor::from_rgb(255, 0, 0)]; + let row = DataRow::from_cells1_sized(RowId::new(), entity_path, timepoint, 1, colors)?; + store.insert_row(&row)?; + + let labels = vec![MyLabel("a".into()), MyLabel("b".into())]; + let row = DataRow::from_cells1_sized(RowId::new(), entity_path, timepoint, 2, labels)?; + store.insert_row(&row)?; + } + + { + let timepoint = [build_frame_nr(456)]; + + let colors = vec![MyColor::from_rgb(255, 0, 0), MyColor::from_rgb(0, 0, 255)]; + let row = DataRow::from_cells1_sized(RowId::new(), entity_path, timepoint, 1, colors)?; + store.insert_row(&row)?; + + let points = vec![ + MyPoint::new(10.0, 20.0), + MyPoint::new(30.0, 40.0), + MyPoint::new(50.0, 60.0), + ]; + let row = DataRow::from_cells1_sized(RowId::new(), entity_path, timepoint, 2, points)?; + store.insert_row(&row)?; + } + + Ok(store) +} diff --git a/crates/re_query_cache2/src/cache.rs b/crates/re_query_cache2/src/cache.rs index 959707eca1c5..6e5278a9f1e5 100644 --- a/crates/re_query_cache2/src/cache.rs +++ b/crates/re_query_cache2/src/cache.rs @@ -7,7 +7,7 @@ use re_data_store::{DataStore, StoreDiff, StoreEvent, StoreSubscriber, TimeInt}; use re_log_types::{EntityPath, StoreId, Timeline}; use re_types_core::ComponentName; -use crate::LatestAtCache; +use crate::{LatestAtCache, RangeCache}; // --- @@ -19,6 +19,20 @@ pub struct CacheKey { pub component_name: ComponentName, } +impl re_types_core::SizeBytes for CacheKey { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { + entity_path, + timeline, + component_name, + } = self; + entity_path.heap_size_bytes() + + timeline.heap_size_bytes() + + component_name.heap_size_bytes() + } +} + impl std::fmt::Debug for CacheKey { #[inline] fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -55,7 +69,10 @@ pub struct Caches { pub(crate) store_id: StoreId, // NOTE: `Arc` so we can cheaply free the top-level lock early when needed. - pub(crate) per_cache_key: RwLock>>>, + pub(crate) latest_at_per_cache_key: RwLock>>>, + + // NOTE: `Arc` so we can cheaply free the top-level lock early when needed. + pub(crate) range_per_cache_key: RwLock>>>, } impl Caches { @@ -63,7 +80,8 @@ impl Caches { pub fn new(store: &DataStore) -> Self { Self { store_id: store.id().clone(), - per_cache_key: Default::default(), + latest_at_per_cache_key: Default::default(), + range_per_cache_key: Default::default(), } } } @@ -139,8 +157,9 @@ impl StoreSubscriber for Caches { } } - let caches = self.per_cache_key.write(); - // NOTE: Don't release the top-level lock -- even though this cannot happen yet with + let caches_latest_at = self.latest_at_per_cache_key.write(); + let caches_range = self.range_per_cache_key.write(); + // NOTE: Don't release the top-level locks -- even though this cannot happen yet with // our current macro-architecture, we want to prevent queries from concurrently // running while we're updating the invalidation flags. @@ -152,11 +171,17 @@ impl StoreSubscriber for Caches { // But since this pretty much never happens in practice, let's not go there until we // have metrics showing that show we need to. for (entity_path, component_name) in compacted.static_ { - for (key, cache) in caches.iter() { + for (key, cache) in caches_latest_at.iter() { if key.entity_path == entity_path && key.component_name == component_name { cache.write().pending_invalidations.insert(TimeInt::STATIC); } } + + for (key, cache) in caches_range.iter() { + if key.entity_path == entity_path && key.component_name == component_name { + cache.write().pending_invalidation = Some(TimeInt::STATIC); + } + } } } @@ -164,12 +189,19 @@ impl StoreSubscriber for Caches { re_tracing::profile_scope!("temporal"); for (key, times) in compacted.temporal { - if let Some(cache) = caches.get(&key) { + if let Some(cache) = caches_latest_at.get(&key) { cache .write() .pending_invalidations .extend(times.iter().copied()); } + + if let Some(cache) = caches_range.get(&key) { + let pending_invalidation = &mut cache.write().pending_invalidation; + let min_time = times.first().copied(); + *pending_invalidation = + Option::min(*pending_invalidation, min_time).or(min_time); + } } } } diff --git a/crates/re_query_cache2/src/latest_at/query.rs b/crates/re_query_cache2/src/latest_at/query.rs index 878dece4f81d..b2e9e931d2e3 100644 --- a/crates/re_query_cache2/src/latest_at/query.rs +++ b/crates/re_query_cache2/src/latest_at/query.rs @@ -36,7 +36,7 @@ impl Caches { for component_name in component_names { let key = CacheKey::new(entity_path.clone(), query.timeline(), component_name); let cache = Arc::clone( - self.per_cache_key + self.latest_at_per_cache_key .write() .entry(key.clone()) .or_insert_with(|| Arc::new(RwLock::new(LatestAtCache::new(key.clone())))), @@ -217,7 +217,6 @@ impl LatestAtCache { index: (data_time, row_id), promise: Some(Promise::new(cell)), cached_dense: Default::default(), - cached_sparse: Default::default(), cached_heap_size_bytes: AtomicU64::new(0), }); @@ -248,8 +247,6 @@ impl LatestAtCache { pending_invalidations, } = self; - let pending_invalidations = std::mem::take(pending_invalidations); - // First, remove any data indexed by a _query time_ that's more recent than the oldest // _data time_ that's been invalidated. // @@ -260,6 +257,27 @@ impl LatestAtCache { } // Second, remove any data indexed by _data time_, if it's been invalidated. - per_data_time.retain(|data_time, _| !pending_invalidations.contains(data_time)); + let mut dropped_data_times = Vec::new(); + per_data_time.retain(|data_time, _| { + if pending_invalidations.contains(data_time) { + dropped_data_times.push(*data_time); + false + } else { + true + } + }); + + // TODO(#5974): Because of non-deterministic ordering and parallelism and all things of that + // nature, it can happen that we try to handle pending invalidations before we even cached + // the associated data. + // + // If that happens, the data will be cached after we've invalidated *nothing*, and will stay + // there indefinitely since the cache doesn't have a dedicated GC yet. + // + // TL;DR: make sure to keep track of pending invalidations indefinitely as long as we + // haven't had the opportunity to actually invalidate the associated data. + for data_time in dropped_data_times { + pending_invalidations.remove(&data_time); + } } } diff --git a/crates/re_query_cache2/src/latest_at/results.rs b/crates/re_query_cache2/src/latest_at/results.rs index ec591ff97315..4597807f3ec5 100644 --- a/crates/re_query_cache2/src/latest_at/results.rs +++ b/crates/re_query_cache2/src/latest_at/results.rs @@ -131,9 +131,6 @@ pub struct CachedLatestAtComponentResults { /// The resolved, converted, deserialized dense data. pub(crate) cached_dense: OnceLock>, - /// The resolved, converted, deserialized sparse data. - pub(crate) cached_sparse: OnceLock>, - pub(crate) cached_heap_size_bytes: AtomicU64, } @@ -144,7 +141,6 @@ impl CachedLatestAtComponentResults { index: (TimeInt::STATIC, RowId::ZERO), promise: None, cached_dense: OnceLock::new(), - cached_sparse: OnceLock::new(), cached_heap_size_bytes: AtomicU64::new(0), } } @@ -177,8 +173,7 @@ impl std::fmt::Debug for CachedLatestAtComponentResults { let Self { index, promise: _, - cached_dense: _, // we can't, we don't know the type - cached_sparse: _, // we can't, we don't know the type + cached_dense: _, // we can't, we don't know the type cached_heap_size_bytes, } = self; @@ -242,42 +237,6 @@ impl CachedLatestAtComponentResults { self.to_dense(resolver) .map(|data| data.map(|data| data.iter())) } - - /// Returns the component data as a sparse vector. - /// - /// Returns an error if the component is missing or cannot be deserialized. - /// - /// Use [`PromiseResult::flatten`] to merge the results of resolving the promise and of - /// deserializing the data into a single one, if you don't need the extra flexibility. - #[inline] - pub fn to_sparse( - &self, - resolver: &PromiseResolver, - ) -> PromiseResult]>> { - if let Some(cell) = self.promise.as_ref() { - resolver - .resolve(cell) - .map(|cell| self.downcast_sparse::(&cell)) - } else { - // Manufactured empty result. - PromiseResult::Ready(Ok(&[])) - } - } - - /// Iterates over the component data, assuming it is sparse. - /// - /// Returns an error if the component is missing or cannot be deserialized. - /// - /// Use [`PromiseResult::flatten`] to merge the results of resolving the promise and of - /// deserializing the data into a single one, if you don't need the extra flexibility. - #[inline] - pub fn iter_sparse( - &self, - resolver: &PromiseResolver, - ) -> PromiseResult>>> { - self.to_sparse(resolver) - .map(|data| data.map(|data| data.iter().map(Option::as_ref))) - } } impl CachedLatestAtComponentResults { @@ -306,32 +265,6 @@ impl CachedLatestAtComponentResults { downcast(&**cached) } - - fn downcast_sparse(&self, cell: &DataCell) -> crate::Result<&[Option]> { - // `OnceLock::get` is non-blocking -- this is a best-effort fast path in case the - // data has already been computed. - // - // See next comment as to why we need this. - if let Some(cached) = self.cached_sparse.get() { - return downcast_opt(&**cached); - } - - // We have to do this outside of the callback in order to propagate errors. - // Hence the early exit check above. - let data = cell - .try_to_native_opt::() - .map_err(|err| DeserializationError::DataCellError(err.to_string()))?; - - #[allow(clippy::borrowed_box)] - let cached: &Box = - self.cached_sparse.get_or_init(move || { - self.cached_heap_size_bytes - .fetch_add(data.total_size_bytes(), Relaxed); - Box::new(FlatVecDeque::from(data)) - }); - - downcast_opt(&**cached) - } } fn downcast(cached: &(dyn ErasedFlatVecDeque + Send + Sync)) -> crate::Result<&[C]> { @@ -349,21 +282,3 @@ fn downcast(cached: &(dyn ErasedFlatVecDeque + Send + Sync)) -> cr // unwrap checked just above ^^^ Ok(cached.iter().next().unwrap()) } - -fn downcast_opt( - cached: &(dyn ErasedFlatVecDeque + Send + Sync), -) -> crate::Result<&[Option]> { - let cached = cached - .as_any() - .downcast_ref::>>() - .ok_or_else(|| QueryError::TypeMismatch { - actual: "".into(), - requested: C::name(), - })?; - - if cached.num_entries() != 1 { - return Err(anyhow::anyhow!("latest_at deque must be single entry").into()); - } - // unwrap checked just above ^^^ - Ok(cached.iter().next().unwrap()) -} diff --git a/crates/re_query_cache2/src/lib.rs b/crates/re_query_cache2/src/lib.rs index 58e3f4b46669..d8965db3cc81 100644 --- a/crates/re_query_cache2/src/lib.rs +++ b/crates/re_query_cache2/src/lib.rs @@ -3,18 +3,21 @@ mod cache; mod flat_vec_deque; mod latest_at; +mod range; pub use self::cache::{CacheKey, Caches}; pub use self::flat_vec_deque::{ErasedFlatVecDeque, FlatVecDeque}; pub use self::latest_at::{ CachedLatestAtComponentResults, CachedLatestAtMonoResult, CachedLatestAtResults, }; +pub use self::range::{CachedRangeComponentResults, CachedRangeData, CachedRangeResults}; pub(crate) use self::latest_at::LatestAtCache; +pub(crate) use self::range::{CachedRangeComponentResultsInner, RangeCache}; pub use re_query2::{ - clamped_zip::*, Promise, PromiseId, PromiseResolver, PromiseResult, QueryError, Result, - ToArchetype, + clamped_zip::*, range_zip::*, ExtraQueryHistory, Promise, PromiseId, PromiseResolver, + PromiseResult, QueryError, Result, ToArchetype, VisibleHistory, VisibleHistoryBoundary, }; pub mod external { diff --git a/crates/re_query_cache2/src/range/mod.rs b/crates/re_query_cache2/src/range/mod.rs new file mode 100644 index 000000000000..ffb93dd2f720 --- /dev/null +++ b/crates/re_query_cache2/src/range/mod.rs @@ -0,0 +1,8 @@ +mod query; +mod results; + +pub use self::query::RangeCache; +pub use self::results::{ + CachedRangeComponentResults, CachedRangeComponentResultsInner, CachedRangeData, + CachedRangeResults, +}; diff --git a/crates/re_query_cache2/src/range/query.rs b/crates/re_query_cache2/src/range/query.rs new file mode 100644 index 000000000000..f04664b96c25 --- /dev/null +++ b/crates/re_query_cache2/src/range/query.rs @@ -0,0 +1,393 @@ +use std::sync::Arc; + +use parking_lot::RwLock; + +use re_data_store::{DataStore, RangeQuery, TimeInt}; +use re_log_types::{EntityPath, TimeRange}; +use re_types_core::ComponentName; +use re_types_core::SizeBytes; + +use crate::{ + CacheKey, CachedRangeComponentResults, CachedRangeComponentResultsInner, CachedRangeResults, + Caches, Promise, +}; + +// --- + +impl Caches { + /// Queries for the given `component_names` using range semantics. + /// + /// See [`CachedRangeResults`] for more information about how to handle the results. + /// + /// This is a cached API -- data will be lazily cached upon access. + pub fn range( + &self, + store: &DataStore, + query: &RangeQuery, + entity_path: &EntityPath, + component_names: impl IntoIterator, + ) -> CachedRangeResults { + re_tracing::profile_function!(entity_path.to_string()); + + let mut results = CachedRangeResults::new(query.clone()); + + for component_name in component_names { + let key = CacheKey::new(entity_path.clone(), query.timeline(), component_name); + + let cache = Arc::clone( + self.range_per_cache_key + .write() + .entry(key.clone()) + .or_insert_with(|| Arc::new(RwLock::new(RangeCache::new(key.clone())))), + ); + + let mut cache = cache.write(); + cache.handle_pending_invalidation(); + let cached = cache.range(store, query, entity_path, component_name); + results.add(component_name, cached); + } + + results + } +} + +// --- + +/// Caches the results of `Range` queries for a given [`CacheKey`]. +pub struct RangeCache { + /// For debugging purposes. + pub cache_key: CacheKey, + + /// All temporal data, organized by _data_ time. + /// + /// Query time is irrelevant for range queries. + pub per_data_time: CachedRangeComponentResults, + + /// Everything greater than or equal to this timestamp has been asynchronously invalidated. + /// + /// The next time this cache gets queried, it must remove any entry matching this criteria. + /// `None` indicates that there's no pending invalidation. + /// + /// Invalidation is deferred to query time because it is far more efficient that way: the frame + /// time effectively behaves as a natural micro-batching mechanism. + pub pending_invalidation: Option, +} + +impl RangeCache { + #[inline] + pub fn new(cache_key: CacheKey) -> Self { + Self { + cache_key, + per_data_time: CachedRangeComponentResults::default(), + pending_invalidation: None, + } + } +} + +impl std::fmt::Debug for RangeCache { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + cache_key, + per_data_time, + pending_invalidation: _, + } = self; + + let mut strings = Vec::new(); + + let mut data_time_min = TimeInt::MAX; + let mut data_time_max = TimeInt::MIN; + + { + let per_data_time = per_data_time.read(); + + let per_data_time_indices = &per_data_time.indices; + if let Some(time_front) = per_data_time_indices.front().map(|(t, _)| *t) { + data_time_min = TimeInt::min(data_time_min, time_front); + } + if let Some(time_back) = per_data_time_indices.back().map(|(t, _)| *t) { + data_time_max = TimeInt::max(data_time_max, time_back); + } + } + + strings.push(format!( + "{} ({})", + cache_key + .timeline + .typ() + .format_range_utc(TimeRange::new(data_time_min, data_time_max)), + re_format::format_bytes(per_data_time.total_size_bytes() as _), + )); + + if strings.is_empty() { + return f.write_str(""); + } + + f.write_str(&strings.join("\n").replace("\n\n", "\n")) + } +} + +impl SizeBytes for RangeCache { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { + cache_key, + per_data_time, + pending_invalidation, + } = self; + + cache_key.heap_size_bytes() + + per_data_time.heap_size_bytes() + + pending_invalidation.heap_size_bytes() + } +} + +impl RangeCache { + /// Queries cached range data for a single component. + pub fn range( + &mut self, + store: &DataStore, + query: &RangeQuery, + entity_path: &EntityPath, + component_name: ComponentName, + ) -> CachedRangeComponentResults { + re_tracing::profile_scope!("range", format!("{query:?}")); + + let RangeCache { + cache_key: _, + per_data_time, + pending_invalidation: _, + } = self; + + let mut per_data_time = per_data_time.write(); + + let query_front = per_data_time.compute_front_query(query); + if let Some(query_front) = query_front.as_ref() { + re_tracing::profile_scope!("front"); + + for (data_time, row_id, mut cells) in + store.range(query_front, entity_path, [component_name]) + { + // Soundness: + // * `cells[0]` is guaranteed to exist since we passed in `&[component_name]` + // * `cells[0]` is guaranteed to be non-null, otherwise this whole result would be null + let Some(cell) = cells[0].take() else { + debug_assert!(cells[0].is_some(), "unreachable: `cells[0]` is missing"); + continue; + }; + + per_data_time + .promises_front + .push(((data_time, row_id), Promise::new(cell))); + per_data_time + .promises_front + .sort_by_key(|(index, _)| *index); + } + } + + if let Some(query_back) = per_data_time.compute_back_query(query, query_front.as_ref()) { + re_tracing::profile_scope!("back"); + + for (data_time, row_id, mut cells) in store + .range(&query_back, entity_path, [component_name]) + // If there's static data to be found, the front query will take care of it already. + .filter(|(data_time, _, _)| !data_time.is_static()) + { + // Soundness: + // * `cells[0]` is guaranteed to exist since we passed in `&[component_name]` + // * `cells[0]` is guaranteed to be non-null, otherwise this whole result would be null + let Some(cell) = cells[0].take() else { + debug_assert!(cells[0].is_some(), "unreachable: `cells[0]` is missing"); + continue; + }; + + per_data_time + .promises_back + .push(((data_time, row_id), Promise::new(cell))); + per_data_time.promises_back.sort_by_key(|(index, _)| *index); + } + } + + per_data_time.sanity_check(); + drop(per_data_time); + + self.per_data_time.clone_at(query.range()) + } + + pub fn handle_pending_invalidation(&mut self) { + re_tracing::profile_function!(); + + let Self { + cache_key: _, + per_data_time, + pending_invalidation, + } = self; + + let Some(pending_invalidation) = pending_invalidation.take() else { + return; + }; + + per_data_time.write().truncate_at_time(pending_invalidation); + } +} + +// --- + +impl CachedRangeComponentResultsInner { + /// How many _indices_ across this entire cache? + #[inline] + pub fn num_indices(&self) -> u64 { + self.indices.len() as _ + } + + /// How many _instances_ across this entire cache? + #[inline] + pub fn num_instances(&self) -> u64 { + self.cached_dense + .as_ref() + .map_or(0u64, |cached| cached.dyn_num_values() as _) + } + + /// Given a `query`, returns N reduced queries that are sufficient to fill the missing data + /// on both the front & back sides of the cache. + #[inline] + pub fn compute_queries(&self, query: &RangeQuery) -> impl Iterator { + let front = self.compute_front_query(query); + let back = self.compute_back_query(query, front.as_ref()); + front.into_iter().chain(back) + } + + /// Given a `query`, returns a reduced query that is sufficient to fill the missing data + /// on the front side of the cache, or `None` if all the necessary data is already + /// cached. + pub fn compute_front_query(&self, query: &RangeQuery) -> Option { + let mut reduced_query = query.clone(); + + // If nothing has been cached already, then we just want to query everything. + if self.indices.is_empty() + && self.promises_front.is_empty() + && self.promises_back.is_empty() + { + return Some(reduced_query); + } + + // If the cache contains static data, then there's no point in querying anything else since + // static data overrides everything anyway. + if self + .indices + .front() + .map_or(false, |(data_time, _)| data_time.is_static()) + { + return None; + } + + // Otherwise, query for what's missing on the front-side of the cache, while making sure to + // take pending promises into account! + // + // Keep in mind: it is not possible for the cache to contain only part of a given + // timestamp. All entries for a given timestamp are loaded and invalidated atomically, + // whether it's promises or already resolved entries. + + let pending_front_min = self + .promises_front + .first() + .map_or(TimeInt::MAX.as_i64(), |((t, _), _)| { + t.as_i64().saturating_sub(1) + }); + + if let Some(time_range) = self.time_range() { + let time_range_min = i64::min( + time_range.min().as_i64().saturating_sub(1), + pending_front_min, + ); + reduced_query + .range + .set_max(i64::min(reduced_query.range.max().as_i64(), time_range_min)); + } else { + reduced_query.range.set_max(i64::min( + reduced_query.range.max().as_i64(), + pending_front_min, + )); + } + + if reduced_query.range.max() < reduced_query.range.min() { + return None; + } + + Some(reduced_query) + } + + /// Given a `query`, returns a reduced query that is sufficient to fill the missing data + /// on the back side of the cache, or `None` if all the necessary data is already + /// cached. + pub fn compute_back_query( + &self, + query: &RangeQuery, + query_front: Option<&RangeQuery>, + ) -> Option { + let mut reduced_query = query.clone(); + + // If nothing has been cached already, then the front query is already going to take care + // of everything. + if self.indices.is_empty() + && self.promises_front.is_empty() + && self.promises_back.is_empty() + { + return None; + } + + // If the cache contains static data, then there's no point in querying anything else since + // static data overrides everything anyway. + if self + .indices + .front() + .map_or(false, |(data_time, _)| data_time.is_static()) + { + return None; + } + + // Otherwise, query for what's missing on the back-side of the cache, while making sure to + // take pending promises into account! + // + // Keep in mind: it is not possible for the cache to contain only part of a given + // timestamp. All entries for a given timestamp are loaded and invalidated atomically, + // whether it's promises or already resolved entries. + + let pending_back_max = self + .promises_back + .last() + .map_or(TimeInt::MIN.as_i64(), |((t, _), _)| { + t.as_i64().saturating_add(1) + }); + + if let Some(time_range) = self.time_range() { + let time_range_max = i64::max( + time_range.max().as_i64().saturating_add(1), + pending_back_max, + ); + reduced_query + .range + .set_min(i64::max(reduced_query.range.min().as_i64(), time_range_max)); + } else { + reduced_query.range.set_min(i64::max( + reduced_query.range.min().as_i64(), + pending_back_max, + )); + } + + // Back query should never overlap with the front query. + // Reminder: time ranges are all inclusive. + if let Some(query_front) = query_front { + let front_max_plus_one = query_front.range().max().as_i64().saturating_add(1); + let back_min = reduced_query.range().min().as_i64(); + reduced_query + .range + .set_min(i64::max(back_min, front_max_plus_one)); + } + + if reduced_query.range.max() < reduced_query.range.min() { + return None; + } + + Some(reduced_query) + } +} diff --git a/crates/re_query_cache2/src/range/results.rs b/crates/re_query_cache2/src/range/results.rs new file mode 100644 index 000000000000..cfd804055fc0 --- /dev/null +++ b/crates/re_query_cache2/src/range/results.rs @@ -0,0 +1,748 @@ +use std::{ + cell::RefCell, + collections::VecDeque, + ops::Range, + sync::{Arc, OnceLock}, +}; + +use nohash_hasher::IntMap; + +use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use re_data_store::RangeQuery; +use re_log_types::{RowId, TimeInt, TimeRange}; +use re_types_core::{Component, ComponentName, DeserializationError, SizeBytes}; + +use crate::{ErasedFlatVecDeque, FlatVecDeque, Promise, PromiseResolver, PromiseResult}; + +// --- + +/// Cached results for a range query. +/// +/// The data is both deserialized and resolved/converted. +/// +/// Use [`CachedRangeResults::get`], [`CachedRangeResults::get_required`] and +/// [`CachedRangeResults::get_or_empty`] in order to access the results for each individual component. +#[derive(Debug)] +pub struct CachedRangeResults { + pub query: RangeQuery, + pub components: IntMap, +} + +impl CachedRangeResults { + #[inline] + pub(crate) fn new(query: RangeQuery) -> Self { + Self { + query, + components: Default::default(), + } + } + + #[inline] + pub fn contains(&self, component_name: impl Into) -> bool { + self.components.contains_key(&component_name.into()) + } + + /// Returns the [`CachedRangeComponentResults`] for the specified [`Component`]. + #[inline] + pub fn get( + &self, + component_name: impl Into, + ) -> Option<&CachedRangeComponentResults> { + self.components.get(&component_name.into()) + } + + /// Returns the [`CachedRangeComponentResults`] for the specified [`Component`]. + /// + /// Returns an error if the component is not present. + #[inline] + pub fn get_required( + &self, + component_name: impl Into, + ) -> crate::Result<&CachedRangeComponentResults> { + let component_name = component_name.into(); + if let Some(component) = self.components.get(&component_name) { + Ok(component) + } else { + Err(DeserializationError::MissingComponent { + component: component_name, + backtrace: ::backtrace::Backtrace::new_unresolved(), + } + .into()) + } + } + + /// Returns the [`CachedRangeComponentResults`] for the specified [`Component`]. + /// + /// Returns empty results if the component is not present. + #[inline] + pub fn get_or_empty( + &self, + component_name: impl Into, + ) -> &CachedRangeComponentResults { + let component_name = component_name.into(); + if let Some(component) = self.components.get(&component_name) { + component + } else { + CachedRangeComponentResults::empty() + } + } +} + +impl CachedRangeResults { + #[doc(hidden)] + #[inline] + pub fn add(&mut self, component_name: ComponentName, cached: CachedRangeComponentResults) { + self.components.insert(component_name, cached); + } +} + +// --- + +/// Lazily cached results for a particular component when using a cached range query. +#[derive(Debug)] +pub struct CachedRangeComponentResults { + /// The [`TimeRange`] of the query that was used in order to retrieve these results in the + /// first place. + /// + /// The "original" copy in the cache just stores [`TimeRange::EMPTY`]. It's meaningless. + pub(crate) time_range: TimeRange, + + pub(crate) inner: Arc>, +} + +impl CachedRangeComponentResults { + /// Clones the results while making sure to stamp them with the [`TimeRange`] of the associated query. + #[inline] + pub(crate) fn clone_at(&self, time_range: TimeRange) -> Self { + Self { + time_range, + inner: self.inner.clone(), + } + } +} + +impl CachedRangeComponentResults { + #[inline] + pub fn empty() -> &'static Self { + static EMPTY: OnceLock = OnceLock::new(); + EMPTY.get_or_init(CachedRangeComponentResults::default) + } +} + +impl re_types_core::SizeBytes for CachedRangeComponentResults { + #[inline] + fn heap_size_bytes(&self) -> u64 { + // NOTE: it's all on the heap past this point. + self.inner.read_recursive().total_size_bytes() + } +} + +impl Default for CachedRangeComponentResults { + #[inline] + fn default() -> Self { + Self { + time_range: TimeRange::EMPTY, + inner: Arc::new(RwLock::new(CachedRangeComponentResultsInner::empty())), + } + } +} + +impl std::ops::Deref for CachedRangeComponentResults { + type Target = RwLock; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +#[derive(Debug)] +pub struct CachedRangeData<'a, T> { + // NOTE: Options so we can represent an empty result without having to somehow conjure a mutex + // guard out of thin air. + // + // TODO(Amanieu/parking_lot#289): we need two distinct mapped guards because it's + // impossible to return an owned type in a `parking_lot` guard. + // See . + indices: Option>>, + data: Option>>, + + time_range: TimeRange, + front_status: PromiseResult<()>, + back_status: PromiseResult<()>, + + /// Keeps track of reentrancy counts for the current thread. + /// + /// Used to detect and prevent potential deadlocks when using the cached APIs in work-stealing + /// environments such as Rayon. + reentering: &'static std::thread::LocalKey>, +} + +impl<'a, T> Drop for CachedRangeData<'a, T> { + #[inline] + fn drop(&mut self) { + self.reentering + .with_borrow_mut(|reentering| *reentering = reentering.saturating_sub(1)); + } +} + +impl<'a, T> CachedRangeData<'a, T> { + /// Returns the current status on both ends of the range. + /// + /// E.g. it is possible that the front-side of the range is still waiting for pending data while + /// the back-side has been fully loaded. + #[inline] + pub fn status(&self) -> (PromiseResult<()>, PromiseResult<()>) { + (self.front_status.clone(), self.back_status.clone()) + } + + #[inline] + pub fn range_indices( + &self, + entry_range: Range, + ) -> impl Iterator { + match self.indices.as_ref() { + Some(indices) => itertools::Either::Left(indices.range(entry_range)), + None => itertools::Either::Right(std::iter::empty()), + } + } + + #[inline] + pub fn range_data(&self, entry_range: Range) -> impl Iterator { + match self.data.as_ref() { + Some(indices) => itertools::Either::Left(indices.range(entry_range)), + None => itertools::Either::Right(std::iter::empty()), + } + } + + /// Range both the indices and data by zipping them together. + /// + /// Useful for time-based joins (`range_zip`). + #[inline] + pub fn range_indexed(&self) -> impl Iterator { + let entry_range = self.entry_range(); + itertools::izip!( + self.range_indices(entry_range.clone()), + self.range_data(entry_range) + ) + } + + /// Returns the index range that corresponds to the specified `time_range`. + /// + /// Use the returned range with one of the range iteration methods: + /// - [`Self::range_indices`] + /// - [`Self::range_data`] + /// - [`Self::range_indexed`] + /// + /// Make sure that the bucket hasn't been modified in-between! + /// + /// This is `O(2*log(n))`, so make sure to clone the returned range rather than calling this + /// multiple times. + #[inline] + pub fn entry_range(&self) -> Range { + let Some(indices) = self.indices.as_ref() else { + return 0..0; + }; + + // If there's any static data cached, make sure to look for it explicitly. + // + // Remember: `TimeRange`s can never contain `TimeInt::STATIC`. + let static_override = if matches!(indices.front(), Some((TimeInt::STATIC, _))) { + TimeInt::STATIC + } else { + TimeInt::MAX + }; + + let start_index = indices.partition_point(|(data_time, _)| { + *data_time < TimeInt::min(self.time_range.min(), static_override) + }); + let end_index = indices.partition_point(|(data_time, _)| { + *data_time <= TimeInt::min(self.time_range.max(), static_override) + }); + + start_index..end_index + } +} + +impl CachedRangeComponentResults { + /// Returns the component data as a dense vector. + /// + /// Returns an error if the component is missing or cannot be deserialized. + /// + /// Use [`PromiseResult::flatten`] to merge the results of resolving the promise and of + /// deserializing the data into a single one, if you don't need the extra flexibility. + #[inline] + pub fn to_dense(&self, resolver: &PromiseResolver) -> CachedRangeData<'_, C> { + // It's tracing the deserialization of an entire range query at once -- it's fine. + re_tracing::profile_function!(); + + // --- Step 1: try and upsert pending data (write lock) --- + + thread_local! { + /// Keeps track of reentrancy counts for the current thread. + /// + /// Used to detect and prevent potential deadlocks when using the cached APIs in work-stealing + /// environments such as Rayon. + static REENTERING: RefCell = const { RefCell::new(0) }; + } + + REENTERING.with_borrow_mut(|reentering| *reentering = reentering.saturating_add(1)); + + // Manufactured empty result. + if self.time_range == TimeRange::EMPTY { + return CachedRangeData { + indices: None, + data: None, + time_range: TimeRange::EMPTY, + front_status: PromiseResult::Ready(()), + back_status: PromiseResult::Ready(()), + reentering: &REENTERING, + }; + } + + let mut results = if let Some(results) = self.inner.try_write() { + // The lock was free to grab, nothing else to worry about. + Some(results) + } else { + REENTERING.with_borrow_mut(|reentering| { + if *reentering > 1 { + // The lock is busy, and at least one of the lock holders is the current thread from a + // previous stack frame. + // + // Return `None` so that we skip straight to the read-only part of the operation. + // All the data will be there already, since the previous stack frame already + // took care of upserting it. + None + } else { + // The lock is busy, but it is not held by the current thread. + // Just block until it gets released. + Some(self.inner.write()) + } + }) + }; + + if let Some(results) = &mut results { + // NOTE: This is just a lazy initialization of the underlying deque, because we + // just now finally know the expected type! + if results.cached_dense.is_none() { + results.cached_dense = Some(Box::new(FlatVecDeque::::new())); + } + + if !results.promises_front.is_empty() { + re_tracing::profile_scope!("front"); + + let mut resolved_indices = Vec::with_capacity(results.promises_front.len()); + let mut resolved_data = Vec::with_capacity(results.promises_front.len()); + + // Pop the promises from the end so that if we encounter one that has yet to be + // resolved, we can stop right there and know we have a contiguous range of data + // available up to that point in time. + // + // Reminder: promises are sorted in ascending index order. + while let Some(((data_time, row_id), promise)) = results.promises_front.pop() { + let data = match resolver.resolve(&promise) { + PromiseResult::Pending => { + results.front_status = (data_time, PromiseResult::Pending); + break; + } + PromiseResult::Error(err) => { + results.front_status = (data_time, PromiseResult::Error(err)); + break; + } + PromiseResult::Ready(cell) => { + results.front_status = (data_time, PromiseResult::Ready(())); + match cell + .try_to_native::() + .map_err(|err| DeserializationError::DataCellError(err.to_string())) + { + Ok(data) => data, + Err(err) => { + re_log::error!(%err, component=%C::name(), "data deserialization failed -- skipping"); + continue; + } + } + } + }; + + resolved_indices.push((data_time, row_id)); + resolved_data.push(data); + } + + // We resolved the promises in reversed order, so reverse the results back. + resolved_indices.reverse(); + resolved_data.reverse(); + + let results_indices = std::mem::take(&mut results.indices); + results.indices = resolved_indices + .into_iter() + .chain(results_indices) + .collect(); + + let resolved_data = FlatVecDeque::from_vecs(resolved_data); + // Unwraps: the deque is created when entering this function -- we know it's there + // and we know its type. + let cached_dense = results + .cached_dense + .as_mut() + .unwrap() + .as_any_mut() + .downcast_mut::>() + .unwrap(); + cached_dense.push_front_deque(resolved_data); + } + + if !results.promises_back.is_empty() { + re_tracing::profile_scope!("back"); + + let mut resolved_indices = Vec::with_capacity(results.promises_back.len()); + let mut resolved_data = Vec::with_capacity(results.promises_back.len()); + + // Reverse the promises first so we can pop() from the back. + // It's fine, this is a one-time operation in the successful case, and it's extremely fast to do. + // See below why. + // + // Reminder: promises are sorted in ascending index order. + results.promises_back.reverse(); + + // Pop the promises from the end so that if we encounter one that has yet to be + // resolved, we can stop right there and know we have a contiguous range of data + // available up to that point in time. + while let Some(((data_time, index), promise)) = results.promises_back.pop() { + let data = match resolver.resolve(&promise) { + PromiseResult::Pending => { + results.back_status = (data_time, PromiseResult::Pending); + break; + } + PromiseResult::Error(err) => { + results.back_status = (data_time, PromiseResult::Error(err)); + break; + } + PromiseResult::Ready(cell) => { + results.front_status = (data_time, PromiseResult::Ready(())); + match cell + .try_to_native::() + .map_err(|err| DeserializationError::DataCellError(err.to_string())) + { + Ok(data) => data, + Err(err) => { + re_log::error!(%err, "data deserialization failed -- skipping"); + continue; + } + } + } + }; + + resolved_indices.push((data_time, index)); + resolved_data.push(data); + } + + // Reverse our reversal. + results.promises_back.reverse(); + + results.indices.extend(resolved_indices); + + let resolved_data = FlatVecDeque::from_vecs(resolved_data); + // Unwraps: the deque is created when entering this function -- we know it's there + // and we know its type. + let cached_dense = results + .cached_dense + .as_mut() + .unwrap() + .as_any_mut() + .downcast_mut::>() + .unwrap(); + cached_dense.push_back_deque(resolved_data); + } + + results.sanity_check(); + } + + // --- Step 2: fetch cached data (read lock) --- + + let results = if let Some(results) = results { + RwLockWriteGuard::downgrade(results) + } else { + // # Multithreading semantics + // + // We need the reentrant lock because query contexts (i.e. space views) generally run on a + // work-stealing thread-pool and might swap a task on one thread with another task on the + // same thread, where both tasks happen to query the same exact data (e.g. cloned space views). + // + // See `REENTERING` comments above for more details. + self.read_recursive() + }; + + let front_status = { + let (results_front_time, results_front_status) = &results.front_status; + let query_front_time = self.time_range.min(); + if query_front_time < *results_front_time { + // If the query covers a larger time span on its front-side than the resulting data, then + // we should forward the status of the resulting data so the caller can know why it's + // been cropped off. + results_front_status.clone() + } else { + PromiseResult::Ready(()) + } + }; + let back_status = { + let (results_back_time, results_back_status) = &results.back_status; + let query_back_time = self.time_range.max(); + if query_back_time > *results_back_time { + // If the query covers a larger time span on its back-side than the resulting data, then + // we should forward the status of the resulting data so the caller can know why it's + // been cropped off. + results_back_status.clone() + } else { + PromiseResult::Ready(()) + } + }; + + // TODO(Amanieu/parking_lot#289): we need two distinct mapped guards because it's + // impossible to return an owned type in a `parking_lot` guard. + // See . + let indices = RwLockReadGuard::map(results, |results| &results.indices); + let data = RwLockReadGuard::map(self.inner.read_recursive(), |results| { + // Unwraps: the data is created when entering this function -- we know it's there + // and we know its type. + results + .cached_dense + .as_ref() + .unwrap() + .as_any() + .downcast_ref::>() + .unwrap() + }); + + CachedRangeData { + indices: Some(indices), + data: Some(data), + time_range: self.time_range, + front_status, + back_status, + reentering: &REENTERING, + } + } +} + +// --- + +/// Lazily cached results for a particular component when using a cached range query. +pub struct CachedRangeComponentResultsInner { + pub(crate) indices: VecDeque<(TimeInt, RowId)>, + + /// All the pending promises that must resolved in order to fill the missing data on the + /// front-side of the ringbuffer (i.e. further back in time). + /// + /// Always sorted in ascending index order ([`TimeInt`] + [`RowId`] pair). + pub(crate) promises_front: Vec<((TimeInt, RowId), Promise)>, + + /// All the pending promises that must resolved in order to fill the missing data on the + /// back-side of the ringbuffer (i.e. the most recent data). + /// + /// Always sorted in ascending index order ([`TimeInt`] + [`RowId`] pair). + pub(crate) promises_back: Vec<((TimeInt, RowId), Promise)>, + + /// Keeps track of the status of the data on the front-side of the cache. + pub(crate) front_status: (TimeInt, PromiseResult<()>), + + /// Keeps track of the status of the data on the back-side of the cache. + pub(crate) back_status: (TimeInt, PromiseResult<()>), + + /// The resolved, converted, deserialized dense data. + /// + /// This has to be option because we have no way of initializing the underlying trait object + /// until we know what the actual native type that the caller expects is. + pub(crate) cached_dense: Option>, +} + +impl SizeBytes for CachedRangeComponentResultsInner { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { + indices, + promises_front, + promises_back, + front_status: _, + back_status: _, + cached_dense, + } = self; + + indices.heap_size_bytes() + + promises_front.heap_size_bytes() + + promises_back.heap_size_bytes() + + cached_dense + .as_ref() + .map_or(0, |data| data.dyn_total_size_bytes()) + } +} + +impl std::fmt::Debug for CachedRangeComponentResultsInner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + indices, + promises_front: _, + promises_back: _, + front_status: _, + back_status: _, + cached_dense: _, // we can't, we don't know the type + } = self; + + if indices.is_empty() { + f.write_str("") + } else { + // Unwrap: checked above. + let index_start = indices.front().unwrap(); + let index_end = indices.back().unwrap(); + f.write_fmt(format_args!( + "[{:?}#{} .. {:?}#{}] {}", + index_start.0, + index_start.1, + index_end.0, + index_end.1, + re_format::format_bytes(self.total_size_bytes() as _) + )) + } + } +} + +impl CachedRangeComponentResultsInner { + #[inline] + pub const fn empty() -> Self { + Self { + indices: VecDeque::new(), + promises_front: Vec::new(), + promises_back: Vec::new(), + front_status: (TimeInt::MIN, PromiseResult::Ready(())), + back_status: (TimeInt::MAX, PromiseResult::Ready(())), + cached_dense: None, + } + } + + /// No-op in release. + #[inline] + pub fn sanity_check(&self) { + if !cfg!(debug_assertions) { + return; + } + + let Self { + indices, + promises_front, + promises_back, + front_status: _, + back_status: _, + cached_dense, + } = self; + + assert!( + promises_front.windows(2).all(|promises| { + let index_left = promises[0].0; + let index_right = promises[1].0; + index_left <= index_right + }), + "front promises must always be sorted in ascending index order" + ); + if let (Some(p_index), Some(i_index)) = ( + promises_front.last().map(|(index, _)| index), + indices.front(), + ) { + assert!( + p_index < i_index, + "the rightmost front promise must have an index smaller than the leftmost data index ({p_index:?} < {i_index:?})", + ); + } + + assert!( + promises_back.windows(2).all(|promises| { + let index_left = promises[0].0; + let index_right = promises[1].0; + index_left <= index_right + }), + "back promises must always be sorted in ascending index order" + ); + if let (Some(p_index), Some(i_index)) = + (promises_back.last().map(|(index, _)| index), indices.back()) + { + assert!( + i_index < p_index, + "the leftmost back promise must have an index larger than the rightmost data index ({i_index:?} < {p_index:?})", + ); + } + + if let Some(dense) = cached_dense.as_ref() { + assert_eq!(indices.len(), dense.dyn_num_entries()); + } + } + + /// Returns the time range covered by the cached data. + /// + /// Reminder: [`TimeInt::STATIC`] is never included in [`TimeRange`]s. + #[inline] + pub fn time_range(&self) -> Option { + let first_time = self.indices.front().map(|(t, _)| *t)?; + let last_time = self.indices.back().map(|(t, _)| *t)?; + Some(TimeRange::new(first_time, last_time)) + } + + #[inline] + pub fn contains_data_time(&self, data_time: TimeInt) -> bool { + let first_time = self.indices.front().map_or(&TimeInt::MAX, |(t, _)| t); + let last_time = self.indices.back().map_or(&TimeInt::MIN, |(t, _)| t); + *first_time <= data_time && data_time <= *last_time + } + + /// Removes everything from the bucket that corresponds to a time equal or greater than the + /// specified `threshold`. + /// + /// Returns the number of bytes removed. + #[inline] + pub fn truncate_at_time(&mut self, threshold: TimeInt) { + re_tracing::profile_function!(); + + let time_range = self.time_range(); + + let Self { + indices, + promises_front, + promises_back, + front_status, + back_status, + cached_dense, + } = self; + + if front_status.0 >= threshold { + let time_min = time_range.map_or(TimeInt::MIN, |range| range.min()); + *front_status = (time_min, PromiseResult::Ready(())); + } + if back_status.0 >= threshold { + let time_max = time_range.map_or(TimeInt::MAX, |range| range.max()); + *back_status = (time_max, PromiseResult::Ready(())); + } + + // NOTE: promises are kept ascendingly sorted by index + { + let threshold_idx = + promises_front.partition_point(|((data_time, _), _)| *data_time < threshold); + promises_front.truncate(threshold_idx); + + let threshold_idx = + promises_back.partition_point(|((data_time, _), _)| *data_time < threshold); + promises_back.truncate(threshold_idx); + } + + let threshold_idx = indices.partition_point(|(data_time, _)| data_time < &threshold); + { + indices.truncate(threshold_idx); + if let Some(data) = cached_dense { + data.dyn_truncate(threshold_idx); + } + } + + self.sanity_check(); + } + + #[inline] + pub fn clear(&mut self) { + *self = Self::empty(); + } +} diff --git a/crates/re_query_cache2/tests/latest_at.rs b/crates/re_query_cache2/tests/latest_at.rs index 4af2ba0b8f12..016f1c3d5c80 100644 --- a/crates/re_query_cache2/tests/latest_at.rs +++ b/crates/re_query_cache2/tests/latest_at.rs @@ -494,7 +494,7 @@ fn query_and_compare( let cached_colors = cached.get_or_empty(MyColor::name()); let cached_color_data = cached_colors - .to_sparse::(&resolver) + .to_dense::(&resolver) .flatten() .unwrap(); @@ -513,7 +513,7 @@ fn query_and_compare( let expected_colors = expected.get_or_empty(MyColor::name()); let expected_color_data = expected_colors - .to_sparse::(&resolver) + .to_dense::(&resolver) .flatten() .unwrap(); diff --git a/crates/re_query_cache2/tests/range.rs b/crates/re_query_cache2/tests/range.rs new file mode 100644 index 000000000000..d06a7a8d8b9a --- /dev/null +++ b/crates/re_query_cache2/tests/range.rs @@ -0,0 +1,586 @@ +use itertools::{izip, Itertools as _}; + +use re_data_store::{DataStore, RangeQuery, StoreSubscriber as _, TimeInt, TimeRange}; +use re_log_types::{ + build_frame_nr, + example_components::{MyColor, MyPoint, MyPoints}, + DataRow, EntityPath, RowId, TimePoint, +}; +use re_query_cache2::{Caches, PromiseResolver, PromiseResult}; +use re_types::{components::InstanceKey, Archetype}; +use re_types_core::Loggable as _; + +// --- + +#[test] +fn simple_range() -> anyhow::Result<()> { + let mut store = DataStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + InstanceKey::name(), + Default::default(), + ); + let mut caches = Caches::new(&store); + + let entity_path: EntityPath = "point".into(); + + let timepoint1 = [build_frame_nr(123)]; + { + let points = vec![MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)]; + let row = + DataRow::from_cells1_sized(RowId::new(), entity_path.clone(), timepoint1, 2, points)?; + insert_and_react(&mut store, &mut caches, &row); + + let colors = vec![MyColor::from_rgb(255, 0, 0)]; + let row = + DataRow::from_cells1_sized(RowId::new(), entity_path.clone(), timepoint1, 1, colors)?; + insert_and_react(&mut store, &mut caches, &row); + } + + let timepoint2 = [build_frame_nr(223)]; + { + let colors = vec![MyColor::from_rgb(255, 0, 0)]; + let row = + DataRow::from_cells1_sized(RowId::new(), entity_path.clone(), timepoint2, 1, colors)?; + insert_and_react(&mut store, &mut caches, &row); + } + + let timepoint3 = [build_frame_nr(323)]; + { + let points = vec![MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)]; + let row = + DataRow::from_cells1_sized(RowId::new(), entity_path.clone(), timepoint3, 2, points)?; + insert_and_react(&mut store, &mut caches, &row); + } + + // --- First test: `(timepoint1, timepoint3]` --- + + let query = re_data_store::RangeQuery::new( + timepoint1[0].0, + TimeRange::new(timepoint1[0].1.as_i64() + 1, timepoint3[0].1), + ); + + query_and_compare(&caches, &store, &query, &entity_path); + + // --- Second test: `[timepoint1, timepoint3]` --- + + let query = re_data_store::RangeQuery::new( + timepoint1[0].0, + TimeRange::new(timepoint1[0].1, timepoint3[0].1), + ); + + query_and_compare(&caches, &store, &query, &entity_path); + + Ok(()) +} + +#[test] +fn static_range() { + let mut store = DataStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + InstanceKey::name(), + Default::default(), + ); + let mut caches = Caches::new(&store); + + let entity_path: EntityPath = "point".into(); + + let timepoint1 = [build_frame_nr(123)]; + { + let positions = vec![MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)]; + let row = + DataRow::from_cells1_sized(RowId::new(), entity_path.clone(), timepoint1, 2, positions) + .unwrap(); + insert_and_react(&mut store, &mut caches, &row); + + let colors = vec![MyColor::from_rgb(255, 0, 0)]; + let row = DataRow::from_cells1_sized( + RowId::new(), + entity_path.clone(), + timepoint1, + 1, + colors.clone(), + ) + .unwrap(); + insert_and_react(&mut store, &mut caches, &row); + + // Insert statically too! + let row = DataRow::from_cells1_sized( + RowId::new(), + entity_path.clone(), + TimePoint::default(), + 1, + colors, + ) + .unwrap(); + insert_and_react(&mut store, &mut caches, &row); + } + + let timepoint2 = [build_frame_nr(223)]; + { + let colors = vec![MyColor::from_rgb(255, 0, 0)]; + let row = DataRow::from_cells1_sized( + RowId::new(), + entity_path.clone(), + timepoint2, + 1, + colors.clone(), + ) + .unwrap(); + insert_and_react(&mut store, &mut caches, &row); + + // Insert statically too! + let row = DataRow::from_cells1_sized( + RowId::new(), + entity_path.clone(), + TimePoint::default(), + 1, + colors, + ) + .unwrap(); + insert_and_react(&mut store, &mut caches, &row); + } + + let timepoint3 = [build_frame_nr(323)]; + { + // Create some Positions with implicit instances + let positions = vec![MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)]; + let row = + DataRow::from_cells1_sized(RowId::new(), entity_path.clone(), timepoint3, 2, positions) + .unwrap(); + insert_and_react(&mut store, &mut caches, &row); + } + + // --- First test: `(timepoint1, timepoint3]` --- + + let query = re_data_store::RangeQuery::new( + timepoint1[0].0, + TimeRange::new(timepoint1[0].1.as_i64() + 1, timepoint3[0].1), + ); + + query_and_compare(&caches, &store, &query, &entity_path); + + // --- Second test: `[timepoint1, timepoint3]` --- + + // The inclusion of `timepoint1` means latest-at semantics will fall back to timeless data! + + let query = re_data_store::RangeQuery::new( + timepoint1[0].0, + TimeRange::new(timepoint1[0].1, timepoint3[0].1), + ); + + query_and_compare(&caches, &store, &query, &entity_path); + + // --- Third test: `[-inf, +inf]` --- + + let query = + re_data_store::RangeQuery::new(timepoint1[0].0, TimeRange::new(TimeInt::MIN, TimeInt::MAX)); + + query_and_compare(&caches, &store, &query, &entity_path); +} + +#[test] +fn simple_splatted_range() { + let mut store = DataStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + InstanceKey::name(), + Default::default(), + ); + let mut caches = Caches::new(&store); + + let entity_path: EntityPath = "point".into(); + + let timepoint1 = [build_frame_nr(123)]; + { + let positions = vec![MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)]; + let row = + DataRow::from_cells1_sized(RowId::new(), entity_path.clone(), timepoint1, 2, positions) + .unwrap(); + insert_and_react(&mut store, &mut caches, &row); + + // Assign one of them a color with an explicit instance + let colors = vec![MyColor::from_rgb(255, 0, 0)]; + let row = + DataRow::from_cells1_sized(RowId::new(), entity_path.clone(), timepoint1, 1, colors) + .unwrap(); + insert_and_react(&mut store, &mut caches, &row); + } + + let timepoint2 = [build_frame_nr(223)]; + { + let colors = vec![MyColor::from_rgb(0, 255, 0)]; + let row = + DataRow::from_cells1_sized(RowId::new(), entity_path.clone(), timepoint2, 1, colors) + .unwrap(); + insert_and_react(&mut store, &mut caches, &row); + } + + let timepoint3 = [build_frame_nr(323)]; + { + let positions = vec![MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)]; + let row = + DataRow::from_cells1_sized(RowId::new(), entity_path.clone(), timepoint3, 2, positions) + .unwrap(); + insert_and_react(&mut store, &mut caches, &row); + } + + // --- First test: `(timepoint1, timepoint3]` --- + + let query = re_data_store::RangeQuery::new( + timepoint1[0].0, + TimeRange::new(timepoint1[0].1.as_i64() + 1, timepoint3[0].1), + ); + + query_and_compare(&caches, &store, &query, &entity_path); + + // --- Second test: `[timepoint1, timepoint3]` --- + + let query = re_data_store::RangeQuery::new( + timepoint1[0].0, + TimeRange::new(timepoint1[0].1, timepoint3[0].1), + ); + + query_and_compare(&caches, &store, &query, &entity_path); +} + +#[test] +fn invalidation() { + let entity_path = "point"; + + let test_invalidation = |query: RangeQuery, + present_data_timepoint: TimePoint, + past_data_timepoint: TimePoint, + future_data_timepoint: TimePoint| { + let mut store = DataStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + InstanceKey::name(), + Default::default(), + ); + let mut caches = Caches::new(&store); + + let positions = vec![MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)]; + let row = DataRow::from_cells1_sized( + RowId::new(), + entity_path, + present_data_timepoint.clone(), + 2, + positions, + ) + .unwrap(); + insert_and_react(&mut store, &mut caches, &row); + + let colors = vec![MyColor::from_rgb(1, 2, 3)]; + let row = DataRow::from_cells1_sized( + RowId::new(), + entity_path, + present_data_timepoint.clone(), + 1, + colors, + ) + .unwrap(); + insert_and_react(&mut store, &mut caches, &row); + + query_and_compare(&caches, &store, &query, &entity_path.into()); + + // --- Modify present --- + + // Modify the PoV component + let positions = vec![MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)]; + let row = DataRow::from_cells1_sized( + RowId::new(), + entity_path, + present_data_timepoint.clone(), + 2, + positions, + ) + .unwrap(); + insert_and_react(&mut store, &mut caches, &row); + + query_and_compare(&caches, &store, &query, &entity_path.into()); + + // Modify the optional component + let colors = vec![MyColor::from_rgb(4, 5, 6), MyColor::from_rgb(7, 8, 9)]; + let row = DataRow::from_cells1_sized( + RowId::new(), + entity_path, + present_data_timepoint, + 2, + colors, + ) + .unwrap(); + insert_and_react(&mut store, &mut caches, &row); + + query_and_compare(&caches, &store, &query, &entity_path.into()); + + // --- Modify past --- + + // Modify the PoV component + let positions = vec![MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)]; + let row = DataRow::from_cells1_sized( + RowId::new(), + entity_path, + past_data_timepoint.clone(), + 2, + positions, + ) + .unwrap(); + insert_and_react(&mut store, &mut caches, &row); + + query_and_compare(&caches, &store, &query, &entity_path.into()); + + // Modify the optional component + let colors = vec![MyColor::from_rgb(10, 11, 12), MyColor::from_rgb(13, 14, 15)]; + let row = DataRow::from_cells1_sized( + RowId::new(), + entity_path, + past_data_timepoint.clone(), + 2, + colors, + ) + .unwrap(); + insert_and_react(&mut store, &mut caches, &row); + + query_and_compare(&caches, &store, &query, &entity_path.into()); + + // --- Modify future --- + + // Modify the PoV component + let positions = vec![MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)]; + let row = DataRow::from_cells1_sized( + RowId::new(), + entity_path, + future_data_timepoint.clone(), + 2, + positions, + ) + .unwrap(); + insert_and_react(&mut store, &mut caches, &row); + + query_and_compare(&caches, &store, &query, &entity_path.into()); + + // Modify the optional component + let colors = vec![MyColor::from_rgb(16, 17, 18)]; + let row = + DataRow::from_cells1_sized(RowId::new(), entity_path, future_data_timepoint, 1, colors) + .unwrap(); + insert_and_react(&mut store, &mut caches, &row); + + query_and_compare(&caches, &store, &query, &entity_path.into()); + }; + + let timeless = TimePoint::default(); + let frame_122 = build_frame_nr(122); + let frame_123 = build_frame_nr(123); + let frame_124 = build_frame_nr(124); + + test_invalidation( + RangeQuery::new(frame_123.0, TimeRange::EVERYTHING), + [frame_123].into(), + [frame_122].into(), + [frame_124].into(), + ); + + test_invalidation( + RangeQuery::new(frame_123.0, TimeRange::EVERYTHING), + [frame_123].into(), + timeless, + [frame_124].into(), + ); +} + +// Test the following scenario: +// ```py +// rr.log("points", rr.Points3D([1, 2, 3]), static=True) +// +// # Do first query here: LatestAt(+inf) +// # Expected: points=[[1,2,3]] colors=[] +// +// rr.set_time(2) +// rr.log_components("points", rr.components.MyColor(0xFF0000)) +// +// # Do second query here: LatestAt(+inf) +// # Expected: points=[[1,2,3]] colors=[0xFF0000] +// +// rr.set_time(3) +// rr.log_components("points", rr.components.MyColor(0x0000FF)) +// +// # Do third query here: LatestAt(+inf) +// # Expected: points=[[1,2,3]] colors=[0x0000FF] +// +// rr.set_time(3) +// rr.log_components("points", rr.components.MyColor(0x00FF00)) +// +// # Do fourth query here: LatestAt(+inf) +// # Expected: points=[[1,2,3]] colors=[0x00FF00] +// ``` +#[test] +fn invalidation_of_future_optionals() { + let mut store = DataStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + InstanceKey::name(), + Default::default(), + ); + let mut caches = Caches::new(&store); + + let entity_path = "points"; + + let timeless = TimePoint::default(); + let frame2 = [build_frame_nr(2)]; + let frame3 = [build_frame_nr(3)]; + + let query = re_data_store::RangeQuery::new(frame2[0].0, TimeRange::EVERYTHING); + + let positions = vec![MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)]; + let row = + DataRow::from_cells1_sized(RowId::new(), entity_path, timeless, 2, positions).unwrap(); + insert_and_react(&mut store, &mut caches, &row); + + query_and_compare(&caches, &store, &query, &entity_path.into()); + + let colors = vec![MyColor::from_rgb(255, 0, 0)]; + let row = DataRow::from_cells1_sized(RowId::new(), entity_path, frame2, 1, colors).unwrap(); + insert_and_react(&mut store, &mut caches, &row); + + query_and_compare(&caches, &store, &query, &entity_path.into()); + + let colors = vec![MyColor::from_rgb(0, 0, 255)]; + let row = DataRow::from_cells1_sized(RowId::new(), entity_path, frame3, 1, colors).unwrap(); + insert_and_react(&mut store, &mut caches, &row); + + query_and_compare(&caches, &store, &query, &entity_path.into()); + + let colors = vec![MyColor::from_rgb(0, 255, 0)]; + let row = DataRow::from_cells1_sized(RowId::new(), entity_path, frame3, 1, colors).unwrap(); + insert_and_react(&mut store, &mut caches, &row); + + query_and_compare(&caches, &store, &query, &entity_path.into()); +} + +#[test] +fn invalidation_static() { + let mut store = DataStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + InstanceKey::name(), + Default::default(), + ); + let mut caches = Caches::new(&store); + + let entity_path = "points"; + + let timeless = TimePoint::default(); + + let frame0 = [build_frame_nr(TimeInt::ZERO)]; + let query = re_data_store::RangeQuery::new(frame0[0].0, TimeRange::EVERYTHING); + + let positions = vec![MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)]; + let row = DataRow::from_cells1_sized(RowId::new(), entity_path, timeless.clone(), 2, positions) + .unwrap(); + insert_and_react(&mut store, &mut caches, &row); + + query_and_compare(&caches, &store, &query, &entity_path.into()); + + let colors = vec![MyColor::from_rgb(255, 0, 0)]; + let row = + DataRow::from_cells1_sized(RowId::new(), entity_path, timeless.clone(), 1, colors).unwrap(); + insert_and_react(&mut store, &mut caches, &row); + + query_and_compare(&caches, &store, &query, &entity_path.into()); + + let colors = vec![MyColor::from_rgb(0, 0, 255)]; + let row = DataRow::from_cells1_sized(RowId::new(), entity_path, timeless, 1, colors).unwrap(); + insert_and_react(&mut store, &mut caches, &row); + + query_and_compare(&caches, &store, &query, &entity_path.into()); +} + +// --- + +fn insert_and_react(store: &mut DataStore, caches: &mut Caches, row: &DataRow) { + caches.on_events(&[store.insert_row(row).unwrap()]); +} + +fn query_and_compare( + caches: &Caches, + store: &DataStore, + query: &RangeQuery, + entity_path: &EntityPath, +) { + re_log::setup_logging(); + + let resolver = PromiseResolver::default(); + + for _ in 0..3 { + let cached = caches.range( + store, + query, + entity_path, + MyPoints::all_components().iter().copied(), + ); + + let cached_all_points = cached + .get_required(MyPoint::name()) + .unwrap() + .to_dense::(&resolver); + assert!(matches!( + cached_all_points.status(), + (PromiseResult::Ready(()), PromiseResult::Ready(())), + )); + let cached_all_points_indexed = cached_all_points.range_indexed(); + + let cached_all_colors = cached + .get_or_empty(MyColor::name()) + .to_dense::(&resolver); + assert!(matches!( + cached_all_colors.status(), + (PromiseResult::Ready(()), PromiseResult::Ready(())), + )); + let cached_all_colors_indexed = cached_all_colors.range_indexed(); + + let expected = re_query2::range( + store, + query, + entity_path, + MyPoints::all_components().iter().copied(), + ); + + let expected_all_points = expected.get_required(MyPoint::name()).unwrap(); + let expected_all_points_indices = expected_all_points.indices(); + let expected_all_points_data = expected_all_points + .to_dense::(&resolver) + .into_iter() + .map(|batch| batch.flatten().unwrap()) + .collect_vec(); + let expected_all_points_indexed = + izip!(expected_all_points_indices, expected_all_points_data); + + let expected_all_colors = expected.get_or_empty(MyColor::name()); + let expected_all_colors_indices = expected_all_colors.indices(); + let expected_all_colors_data = expected_all_colors + .to_dense::(&resolver) + .into_iter() + .map(|batch| batch.flatten().unwrap()) + .collect_vec(); + let expected_all_colors_indexed = + izip!(expected_all_colors_indices, expected_all_colors_data); + + eprintln!("{query:?}"); + eprintln!("{}", store.to_data_table().unwrap()); + + similar_asserts::assert_eq!( + expected_all_points_indexed + .map(|(index, data)| (*index, data)) + .collect_vec(), + cached_all_points_indexed + .map(|(index, data)| (*index, data.to_vec())) + .collect_vec(), + ); + + similar_asserts::assert_eq!( + expected_all_colors_indexed + .map(|(index, data)| (*index, data)) + .collect_vec(), + cached_all_colors_indexed + .map(|(index, data)| (*index, data.to_vec())) + .collect_vec(), + ); + } +}