Skip to content

Commit

Permalink
New data APIs 10: stats and debug tools for new caches (#5990)
Browse files Browse the repository at this point in the history
Title.

The new cache being natively component-based makes things much smoothier
than before.

---

Part of a PR series to completely revamp the data APIs in preparation
for the removal of instance keys and the introduction of promises:
- #5573
- #5574
- #5581
- #5605
- #5606
- #5633
- #5673
- #5679
- #5687
- #5755
- #5990
- #5992
- #5993 
- #5994
- #6035
- #6036
- #6037

Builds on top of the static data PR series:
- #5534
  • Loading branch information
teh-cmc authored Apr 26, 2024
1 parent f42ad74 commit 1cd97aa
Show file tree
Hide file tree
Showing 13 changed files with 322 additions and 149 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 54 additions & 4 deletions crates/re_query_cache2/src/cache.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::{collections::BTreeSet, sync::Arc};
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
};

use ahash::{HashMap, HashSet};
use parking_lot::RwLock;

use re_data_store::{DataStore, StoreDiff, StoreEvent, StoreSubscriber, TimeInt};
use re_log_types::{EntityPath, StoreId, Timeline};
use re_log_types::{EntityPath, StoreId, TimeRange, Timeline};
use re_types_core::ComponentName;

use crate::{LatestAtCache, RangeCache};
Expand Down Expand Up @@ -63,7 +66,6 @@ impl CacheKey {
}
}

#[derive(Debug)]
pub struct Caches {
/// The [`StoreId`] of the associated [`DataStore`].
pub(crate) store_id: StoreId,
Expand All @@ -75,6 +77,54 @@ pub struct Caches {
pub(crate) range_per_cache_key: RwLock<HashMap<CacheKey, Arc<RwLock<RangeCache>>>>,
}

impl std::fmt::Debug for Caches {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
store_id,
latest_at_per_cache_key,
range_per_cache_key,
} = self;

let mut strings = Vec::new();

strings.push(format!("[LatestAt @ {store_id}]"));
{
let latest_at_per_cache_key = latest_at_per_cache_key.read();
let latest_at_per_cache_key: BTreeMap<_, _> = latest_at_per_cache_key.iter().collect();

for (cache_key, cache) in &latest_at_per_cache_key {
let cache = cache.read();
strings.push(format!(
" [{cache_key:?} (pending_invalidation_min={:?})]",
cache.pending_invalidations.first().map(|&t| cache_key
.timeline
.format_time_range_utc(&TimeRange::new(t, TimeInt::MAX))),
));
strings.push(indent::indent_all_by(4, format!("{cache:?}")));
}
}

strings.push(format!("[Range @ {store_id}]"));
{
let range_per_cache_key = range_per_cache_key.read();
let range_per_cache_key: BTreeMap<_, _> = range_per_cache_key.iter().collect();

for (cache_key, cache) in &range_per_cache_key {
let cache = cache.read();
strings.push(format!(
" [{cache_key:?} (pending_invalidation_min={:?})]",
cache.pending_invalidation.map(|t| cache_key
.timeline
.format_time_range_utc(&TimeRange::new(t, TimeInt::MAX))),
));
strings.push(indent::indent_all_by(4, format!("{cache:?}")));
}
}

f.write_str(&strings.join("\n").replace("\n\n", "\n"))
}
}

impl Caches {
#[inline]
pub fn new(store: &DataStore) -> Self {
Expand Down Expand Up @@ -164,7 +214,7 @@ impl StoreSubscriber for Caches {
// running while we're updating the invalidation flags.

{
re_tracing::profile_scope!("timeless");
re_tracing::profile_scope!("static");

// TODO(cmc): This is horribly stupid and slow and can easily be made faster by adding
// yet another layer of caching indirection.
Expand Down
100 changes: 100 additions & 0 deletions crates/re_query_cache2/src/cache_stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use std::collections::BTreeMap;

use re_log_types::TimeRange;
use re_types_core::SizeBytes as _;

use crate::{CacheKey, Caches};

// ---

/// Stats for all primary caches.
///
/// Fetch them via [`Caches::stats`].
#[derive(Default, Debug, Clone)]
pub struct CachesStats {
pub latest_at: BTreeMap<CacheKey, CachedComponentStats>,
pub range: BTreeMap<CacheKey, (Option<TimeRange>, CachedComponentStats)>,
}

impl CachesStats {
#[inline]
pub fn total_size_bytes(&self) -> u64 {
re_tracing::profile_function!();

let Self { latest_at, range } = self;

let latest_at_size_bytes: u64 =
latest_at.values().map(|stats| stats.total_size_bytes).sum();
let range_size_bytes: u64 = range
.values()
.map(|(_, stats)| stats.total_size_bytes)
.sum();

latest_at_size_bytes + range_size_bytes
}
}

/// Stats for a cached component.
#[derive(Default, Debug, Clone)]
pub struct CachedComponentStats {
pub total_indices: u64,
pub total_instances: u64,
pub total_size_bytes: u64,
}

impl Caches {
/// Computes the stats for all primary caches.
pub fn stats(&self) -> CachesStats {
re_tracing::profile_function!();

let latest_at = {
let latest_at = self.latest_at_per_cache_key.read_recursive().clone();
// Implicitly releasing top-level cache mappings -- concurrent queries can run once again.

latest_at
.iter()
.map(|(key, cache)| {
let cache = cache.read_recursive();
(
key.clone(),
CachedComponentStats {
total_indices: cache.per_data_time.len() as _,
total_instances: cache
.per_data_time
.values()
.map(|results| results.num_instances())
.sum(),
total_size_bytes: cache.total_size_bytes(),
},
)
})
.collect()
};

let range = {
let range = self.range_per_cache_key.read_recursive().clone();
// Implicitly releasing top-level cache mappings -- concurrent queries can run once again.

range
.iter()
.map(|(key, cache)| {
let cache = cache.read_recursive();
let cache = cache.per_data_time.read_recursive();
(
key.clone(),
(
cache.time_range(),
CachedComponentStats {
total_indices: cache.indices.len() as _,
total_instances: cache.num_instances(),
total_size_bytes: cache.total_size_bytes(),
},
),
)
})
.collect()
};

CachesStats { latest_at, range }
}
}
66 changes: 49 additions & 17 deletions crates/re_query_cache2/src/latest_at/query.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::collections::BTreeSet;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering::Relaxed;
use std::{collections::BTreeMap, sync::Arc};

use ahash::HashMap;

use indexmap::IndexMap;
use itertools::Itertools;
use parking_lot::RwLock;

use re_data_store::{DataStore, LatestAtQuery, TimeInt};
use re_log_types::EntityPath;
use re_query2::Promise;
Expand Down Expand Up @@ -108,22 +107,44 @@ impl std::fmt::Debug for LatestAtCache {

let mut strings = Vec::new();

let data_times_per_bucket: HashMap<_, _> = per_data_time
struct StatsPerBucket {
query_times: BTreeSet<TimeInt>,
data_time: TimeInt,
total_size_bytes: u64,
}

let mut buckets: IndexMap<_, _> = per_data_time
.iter()
.map(|(time, bucket)| (Arc::as_ptr(bucket), *time))
.map(|(&data_time, bucket)| {
(
Arc::as_ptr(bucket),
StatsPerBucket {
query_times: Default::default(),
data_time,
total_size_bytes: bucket.total_size_bytes(),
},
)
})
.collect();

for (query_time, bucket) in per_query_time {
let query_time = cache_key.timeline.typ().format_utc(*query_time);
let data_time = data_times_per_bucket.get(&Arc::as_ptr(bucket)).map_or_else(
|| "MISSING?!".to_owned(),
|t| cache_key.timeline.typ().format_utc(*t),
);
for (&query_time, bucket) in per_query_time {
if let Some(bucket) = buckets.get_mut(&Arc::as_ptr(bucket)) {
bucket.query_times.insert(query_time);
}
}

for bucket in buckets.values() {
strings.push(format!(
"query_time={query_time} -> data_time={data_time} ({})",
re_format::format_bytes(bucket.cached_heap_size_bytes.load(Relaxed) as _),
"query_times=[{}] -> data_time={:?} ({})",
bucket
.query_times
.iter()
.map(|t| cache_key.timeline.typ().format_utc(*t))
.collect_vec()
.join(", "),
bucket.data_time.as_i64(),
re_format::format_bytes(bucket.total_size_bytes as _),
));
strings.push(indent::indent_all_by(2, format!("{bucket:?}")));
}

if strings.is_empty() {
Expand All @@ -148,7 +169,19 @@ impl SizeBytes for LatestAtCache {
.keys()
.map(|k| k.total_size_bytes())
.sum::<u64>();
let per_data_time = per_data_time.total_size_bytes();
// NOTE: per query time buckets are just pointers, don't count them.

let per_data_time_keys = per_data_time
.keys()
.map(|k| k.total_size_bytes())
.sum::<u64>();
let per_data_time_values = per_data_time
.values()
// NOTE: make sure to dereference the Arc, else this will account for zero (assumed amortized!)
.map(|arc| (**arc).total_size_bytes())
.sum::<u64>();

let per_data_time = per_data_time_keys + per_data_time_values;
let pending_invalidations = pending_invalidations.total_size_bytes();

per_query_time + per_data_time + pending_invalidations
Expand Down Expand Up @@ -217,7 +250,6 @@ impl LatestAtCache {
index: (data_time, row_id),
promise: Some(Promise::new(cell)),
cached_dense: Default::default(),
cached_heap_size_bytes: AtomicU64::new(0),
});

// Slowest path: this is a complete cache miss.
Expand Down
Loading

0 comments on commit 1cd97aa

Please sign in to comment.