Skip to content

Commit

Permalink
New data APIs 9: cached range queries (#5755)
Browse files Browse the repository at this point in the history
Static-aware, key-less, component-based, cached range APIs.

```rust
let caches = re_query_cache2::Caches::new(&store);

// First, get the raw results for this query.
//
// They might or might not already be cached. We won't know for sure until we try to access
// each individual component's data below.
let results: CachedRangeResults = caches.range(
    &store,
    &query,
    &entity_path.into(),
    MyPoints::all_components().iter().copied(), // no generics!
);

// Then, grab the results for each individual components.
// * `get_required` returns an error if the component batch is missing
// * `get_or_empty` returns an empty set of results if the component if missing
// * `get` returns an option
//
// At this point we still don't know whether they are cached or not. That's the next step.
let all_points: &CachedRangeComponentResults = results.get_required(MyPoint::name())?;
let all_colors: &CachedRangeComponentResults = results.get_or_empty(MyColor::name());
let all_labels: &CachedRangeComponentResults = results.get_or_empty(MyLabel::name());

// Then comes the time to resolve/convert and deserialize the data.
// These steps have to be done together for efficiency reasons.
//
// That's when caching comes into play.
// If the data has already been accessed in the past, then this will just grab the
// pre-deserialized, pre-resolved/pre-converted result from the cache.
// Otherwise, this will trigger a deserialization and cache the result for next time.
let all_points = all_points.to_dense::<MyPoint>(&resolver);
let all_colors = all_colors.to_dense::<MyColor>(&resolver);
let all_labels = all_labels.to_dense::<MyLabel>(&resolver);

// The cache might not have been able to resolve and deserialize the entire dataset across all
// available timestamps.
//
// We can use the following APIs to check the status of the front and back sides of the data range.
//
// E.g. it is possible that the front-side of the range is still waiting for pending data while
// the back-side has been fully loaded.
assert!(matches!(
    all_points.status(),
    (PromiseResult::Ready(()), PromiseResult::Ready(()))
));

// Zip the results together using a stateful time-based join.
let all_frames = range_zip_1x2(
    all_points.range_indexed(),
    all_colors.range_indexed(),
    all_labels.range_indexed(),
);

// Then comes the time to resolve/convert and deserialize the data, _for each timestamp_.
// These steps have to be done together for efficiency reasons.
//
// Both the resolution and deserialization steps might fail, which is why this returns a `Result<Result<T>>`.
// Use `PromiseResult::flatten` to simplify it down to a single result.
eprintln!("results:");
for ((data_time, row_id), points, colors, labels) in all_frames {
    let colors = colors.unwrap_or(&[]);
    let color_default_fn = || {
        static DEFAULT: MyColor = MyColor(0xFF00FFFF);
        &DEFAULT
    };

    let labels = labels.unwrap_or(&[]).iter().cloned().map(Some);
    let label_default_fn = || None;

    // With the data now fully resolved/converted and deserialized, the joining logic can be
    // applied.
    //
    // In most cases this will be either a clamped zip, or no joining at all.

    let results = clamped_zip_1x2(points, colors, color_default_fn, labels, label_default_fn)
        .collect_vec();
    eprintln!("{data_time:?} @ {row_id}:\n    {results:?}");
}
```

---

Part of a PR series to completely revamp the data APIs in preparation
for the removal of instance keys and the introduction of promises:
- #5573
- #5574
- #5581
- #5605
- #5606
- #5633
- #5673
- #5679
- #5687
- #5755
- #5990
- #5992
- #5993 
- #5994
- #6035
- #6036
- #6037

Builds on top of the static data PR series:
- #5534
  • Loading branch information
teh-cmc authored Apr 26, 2024
1 parent c203d4e commit f42ad74
Show file tree
Hide file tree
Showing 18 changed files with 2,018 additions and 130 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions crates/re_data_store/src/store_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,24 @@ impl RangeQuery {
pub const fn new(timeline: Timeline, range: TimeRange) -> Self {
Self { timeline, range }
}

#[inline]
pub const fn everything(timeline: Timeline) -> Self {
Self {
timeline,
range: TimeRange::EVERYTHING,
}
}

#[inline]
pub fn timeline(&self) -> Timeline {
self.timeline
}

#[inline]
pub fn range(&self) -> TimeRange {
self.range
}
}

// --- Data store ---
Expand Down
6 changes: 3 additions & 3 deletions crates/re_query2/examples/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ fn main() -> anyhow::Result<()> {
// _component batch_ itself (that says nothing about its _instances_!).
//
// * `get_required` returns an error if the component batch is missing
// * `get_optional` returns an empty set of results if the component if missing
// * `get_or_empty` returns an empty set of results if the component if missing
// * `get` returns an option
let all_points: &RangeComponentResults = results.get_required(MyPoint::name())?;
let all_colors: &RangeComponentResults = results.get_optional(MyColor::name());
let all_labels: &RangeComponentResults = results.get_optional(MyLabel::name());
let all_colors: &RangeComponentResults = results.get_or_empty(MyColor::name());
let all_labels: &RangeComponentResults = results.get_or_empty(MyLabel::name());

let all_indexed_points = izip!(
all_points.iter_indices(),
Expand Down
17 changes: 16 additions & 1 deletion crates/re_query2/src/promise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ impl std::fmt::Display for PromiseId {
}
}

impl re_types_core::SizeBytes for PromiseId {
#[inline]
fn heap_size_bytes(&self) -> u64 {
self.0.heap_size_bytes()
}
}

impl PromiseId {
/// Create a new unique [`PromiseId`] based on the current time.
#[allow(clippy::new_without_default)]
Expand All @@ -39,6 +46,14 @@ pub struct Promise {

static_assertions::assert_eq_size!(Promise, Option<Promise>);

impl re_types_core::SizeBytes for Promise {
#[inline]
fn heap_size_bytes(&self) -> u64 {
let Self { id, source } = self;
id.heap_size_bytes() + source.heap_size_bytes()
}
}

impl Promise {
#[inline]
pub fn new(source: DataCell) -> Self {
Expand Down Expand Up @@ -75,7 +90,7 @@ impl PromiseResolver {
}

/// The result of resolving a [`Promise`] through a [`PromiseResolver`].
#[derive(Clone)]
#[derive(Debug, Clone)]
pub enum PromiseResult<T> {
/// The resolution process is still in progress.
///
Expand Down
22 changes: 14 additions & 8 deletions crates/re_query2/src/range/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{Promise, PromiseResolver, PromiseResult};
/// The data is neither deserialized, nor resolved/converted.
/// It it the raw [`DataCell`]s, straight from our datastore.
///
/// Use [`RangeResults::get`], [`RangeResults::get_required`] and [`RangeResults::get_optional`]
/// Use [`RangeResults::get`], [`RangeResults::get_required`] and [`RangeResults::get_or_empty`]
/// in order to access the raw results for each individual component.
#[derive(Default, Debug, Clone)]
pub struct RangeResults {
Expand Down Expand Up @@ -56,7 +56,7 @@ impl RangeResults {
///
/// Returns empty results if the component is not present.
#[inline]
pub fn get_optional(&self, component_name: impl Into<ComponentName>) -> &RangeComponentResults {
pub fn get_or_empty(&self, component_name: impl Into<ComponentName>) -> &RangeComponentResults {
let component_name = component_name.into();
if let Some(component) = self.components.get(&component_name) {
component
Expand All @@ -79,7 +79,10 @@ impl RangeResults {
.map(|(index, cell)| (index, Promise::new(cell)))
.unzip();

let results = RangeComponentResults { indices, cells };
let results = RangeComponentResults {
indices,
promises: cells,
};
results.sanity_check();

self.components.insert(component_name, results);
Expand All @@ -92,7 +95,7 @@ impl RangeResults {
#[derive(Debug, Clone)]
pub struct RangeComponentResults {
pub indices: Vec<(TimeInt, RowId)>,
pub cells: Vec<Promise>,
pub promises: Vec<Promise>,
}

impl Default for RangeComponentResults {
Expand All @@ -107,14 +110,17 @@ impl RangeComponentResults {
pub const fn empty() -> Self {
Self {
indices: Vec::new(),
cells: Vec::new(),
promises: Vec::new(),
}
}

/// No-op in release.
#[inline]
pub fn sanity_check(&self) {
let Self { indices, cells } = self;
let Self {
indices,
promises: cells,
} = self;
if cfg!(debug_assertions) {
assert_eq!(indices.len(), cells.len());
}
Expand All @@ -141,7 +147,7 @@ impl RangeComponentResults {
&self,
resolver: &PromiseResolver,
) -> Vec<PromiseResult<DeserializationResult<Vec<C>>>> {
self.cells
self.promises
.iter()
.map(|cell| {
resolver.resolve(cell).map(|cell| {
Expand Down Expand Up @@ -173,7 +179,7 @@ impl RangeComponentResults {
&self,
resolver: &PromiseResolver,
) -> Vec<PromiseResult<DeserializationResult<Vec<Option<C>>>>> {
self.cells
self.promises
.iter()
.map(|cell| {
resolver.resolve(cell).map(|cell| {
Expand Down
10 changes: 5 additions & 5 deletions crates/re_query2/tests/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ fn simple_range() -> anyhow::Result<()> {
);

let all_points = results.get_required(MyPoint::name())?;
let all_colors = results.get_optional(MyColor::name());
let all_colors = results.get_or_empty(MyColor::name());

let all_points = izip!(
all_points.iter_indices(),
Expand Down Expand Up @@ -131,7 +131,7 @@ fn simple_range() -> anyhow::Result<()> {
);

let all_points = results.get_required(MyPoint::name())?;
let all_colors = results.get_optional(MyColor::name());
let all_colors = results.get_or_empty(MyColor::name());

let all_points = izip!(
all_points.iter_indices(),
Expand Down Expand Up @@ -313,7 +313,7 @@ fn static_range() -> anyhow::Result<()> {
);

let all_points = results.get_required(MyPoint::name())?;
let all_colors = results.get_optional(MyColor::name());
let all_colors = results.get_or_empty(MyColor::name());

let all_points = izip!(
all_points.iter_indices(),
Expand Down Expand Up @@ -363,7 +363,7 @@ fn static_range() -> anyhow::Result<()> {
);

let all_points = results.get_required(MyPoint::name())?;
let all_colors = results.get_optional(MyColor::name());
let all_colors = results.get_or_empty(MyColor::name());

let all_points = izip!(
all_points.iter_indices(),
Expand Down Expand Up @@ -434,7 +434,7 @@ fn static_range() -> anyhow::Result<()> {
);

let all_points = results.get_required(MyPoint::name())?;
let all_colors = results.get_optional(MyColor::name());
let all_colors = results.get_or_empty(MyColor::name());

let all_points = izip!(
all_points.iter_indices(),
Expand Down
1 change: 1 addition & 0 deletions crates/re_query_cache2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ ahash.workspace = true
anyhow.workspace = true
backtrace.workspace = true
indent.workspace = true
indexmap.workspace = true
itertools.workspace = true
nohash-hasher.workspace = true
parking_lot.workspace = true
Expand Down
15 changes: 4 additions & 11 deletions crates/re_query_cache2/examples/latest_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,9 @@ fn main() -> anyhow::Result<()> {
// Both the resolution and deserialization steps might fail, which is why this returns a `Result<Result<T>>`.
// Use `PromiseResult::flatten` to simplify it down to a single result.
//
// A choice now has to be made regarding the nullability of the _component batch's instances_.
// Our IDL doesn't support nullable instances at the moment -- so for the foreseeable future you probably
// shouldn't be using anything but `iter_dense`.
//
// This is the step at which caching comes into play.
//
// If the data has already been accessed with the same nullability characteristics in the
// past, then this will just grab the pre-deserialized, pre-resolved/pre-converted result from
// the cache.
//
// If the data has already been accessed in the past, then this will just grab the pre-deserialized,
// pre-resolved/pre-converted result from the cache.
// Otherwise, this will trigger a deserialization and cache the result for next time.

let points = match points.iter_dense::<MyPoint>(&resolver).flatten() {
Expand All @@ -81,12 +74,12 @@ fn main() -> anyhow::Result<()> {
PromiseResult::Error(err) => return Err(err.into()),
};

let labels = match labels.iter_sparse::<MyLabel>(&resolver).flatten() {
let labels = match labels.iter_dense::<MyLabel>(&resolver).flatten() {
PromiseResult::Pending => {
// Handle the fact that the data isn't ready appropriately.
return Ok(());
}
PromiseResult::Ready(data) => data,
PromiseResult::Ready(data) => data.map(Some),
PromiseResult::Error(err) => return Err(err.into()),
};

Expand Down
Loading

0 comments on commit f42ad74

Please sign in to comment.