Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of time panel #8224

Merged
merged 15 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6271,13 +6271,16 @@ dependencies = [
"criterion",
"egui",
"itertools 0.13.0",
"nohash-hasher",
"once_cell",
"rand",
"re_chunk_store",
"re_context_menu",
"re_data_ui",
"re_entity_db",
"re_format",
"re_int_histogram",
"re_log",
"re_log_types",
"re_tracing",
"re_types",
Expand Down
49 changes: 27 additions & 22 deletions crates/store/re_chunk/src/chunk.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::{
collections::BTreeMap,
sync::atomic::{AtomicU64, Ordering},
};
use std::sync::atomic::{AtomicU64, Ordering};

use ahash::HashMap;
use arrow2::{
array::{
Array as Arrow2Array, ListArray as Arrow2ListArray, PrimitiveArray as Arrow2PrimitiveArray,
Expand Down Expand Up @@ -452,24 +450,31 @@ impl Chunk {

debug_assert!(!time_column.is_sorted());

self.components
.values()
.flat_map(move |list_array| {
izip!(
time_column.times(),
// Reminder: component columns are sparse, we must take a look at the validity bitmaps.
list_array.validity().map_or_else(
|| arrow2::Either::Left(std::iter::repeat(1).take(self.num_rows())),
|validity| arrow2::Either::Right(validity.iter().map(|b| b as u64)),
)
)
})
.fold(BTreeMap::default(), |mut acc, (time, is_valid)| {
*acc.entry(time).or_default() += is_valid;
acc
})
.into_iter()
.collect()
// NOTE: This is used on some very hot paths (time panel rendering).

let result_unordered =
self.components
.values()
.fold(HashMap::default(), |acc, list_array| {
if let Some(validity) = list_array.validity() {
time_column.times().zip(validity.iter()).fold(
acc,
|mut acc, (time, is_valid)| {
*acc.entry(time).or_default() += is_valid as u64;
acc
},
)
} else {
time_column.times().fold(acc, |mut acc, time| {
*acc.entry(time).or_default() += 1;
acc
})
}
});

let mut result = result_unordered.into_iter().collect_vec();
result.sort_by_key(|val| val.0);
result
}

/// The number of events in this chunk for the specified component.
Expand Down
29 changes: 22 additions & 7 deletions crates/store/re_chunk_store/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,23 @@ impl ChunkStoreDiffKind {
}
}

/// Reports which [`Chunk`]s were merged into a new [`Chunk`] during a compaction.
#[derive(Debug, Clone)]
pub struct ChunkCompactionReport {
/// The chunks that were merged into a new chunk.
pub srcs: BTreeMap<ChunkId, Arc<Chunk>>,

/// The new chunk that was created as the result of the compaction.
pub new_chunk: Arc<Chunk>,
}

impl PartialEq for ChunkCompactionReport {
#[inline]
fn eq(&self, rhs: &Self) -> bool {
self.srcs.keys().eq(rhs.srcs.keys()) && self.new_chunk.id() == rhs.new_chunk.id()
}
}

/// Describes an atomic change in the Rerun [`ChunkStore`]: a chunk has been added or deleted.
///
/// From a query model standpoint, the [`ChunkStore`] _always_ operates one chunk at a time:
Expand Down Expand Up @@ -120,14 +137,15 @@ pub struct ChunkStoreDiff {
// deallocated.
pub chunk: Arc<Chunk>,

/// Reports which [`Chunk`]s were merged into a new [`ChunkId`] (srcs, dst) during a compaction.
/// Reports which [`Chunk`]s were merged into a new [`Chunk`] during a compaction.
///
/// This is only specified if an addition to the store triggered a compaction.
/// When that happens, it is guaranteed that [`ChunkStoreDiff::chunk`] will be present in the
/// set of source chunks below, since it was compacted on arrival.
///
/// A corollary to that is that the destination [`ChunkId`] must have never been seen before.
pub compacted: Option<(BTreeMap<ChunkId, Arc<Chunk>>, ChunkId)>,
/// A corollary to that is that the destination [`Chunk`] must have never been seen before,
/// i.e. it's [`ChunkId`] must have never been seen before.
pub compacted: Option<ChunkCompactionReport>,
}

impl PartialEq for ChunkStoreDiff {
Expand All @@ -146,10 +164,7 @@ impl Eq for ChunkStoreDiff {}

impl ChunkStoreDiff {
#[inline]
pub fn addition(
chunk: Arc<Chunk>,
compacted: Option<(BTreeMap<ChunkId, Arc<Chunk>>, ChunkId)>,
) -> Self {
pub fn addition(chunk: Arc<Chunk>, compacted: Option<ChunkCompactionReport>) -> Self {
Self {
kind: ChunkStoreDiffKind::Addition,
chunk,
Expand Down
4 changes: 3 additions & 1 deletion crates/store/re_chunk_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ pub use self::dataframe::{
IndexRange, IndexValue, QueryExpression, SparseFillStrategy, TimeColumnDescriptor,
TimeColumnSelector, ViewContentsSelector,
};
pub use self::events::{ChunkStoreDiff, ChunkStoreDiffKind, ChunkStoreEvent};
pub use self::events::{
ChunkCompactionReport, ChunkStoreDiff, ChunkStoreDiffKind, ChunkStoreEvent,
};
pub use self::gc::{GarbageCollectionOptions, GarbageCollectionTarget};
pub use self::stats::{ChunkStoreChunkStats, ChunkStoreStats};
pub use self::store::{
Expand Down
3 changes: 2 additions & 1 deletion crates/store/re_chunk_store/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,8 @@ impl ChunkStore {
query: &RangeQuery,
temporal_chunk_ids_per_times: impl Iterator<Item = &'a ChunkIdSetPerTime>,
) -> Vec<Arc<Chunk>> {
re_tracing::profile_function!();
// Too small & frequent for profiling scopes.
//re_tracing::profile_function!();

temporal_chunk_ids_per_times
.map(|temporal_chunk_ids_per_time| {
Expand Down
6 changes: 4 additions & 2 deletions crates/store/re_chunk_store/src/writes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,11 @@ impl ChunkStore {
.map(|diff| (diff.chunk.id(), diff.chunk)),
)
.collect();
let dst = chunk_or_compacted.id();

diff.compacted = Some((srcs, dst));
diff.compacted = Some(crate::ChunkCompactionReport {
srcs,
new_chunk: chunk_or_compacted.clone(),
});
}

(chunk_or_compacted, vec![diff])
Expand Down
28 changes: 20 additions & 8 deletions crates/store/re_query/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use nohash_hasher::IntSet;
use parking_lot::RwLock;

use re_chunk::ChunkId;
use re_chunk_store::{ChunkStoreDiff, ChunkStoreEvent, ChunkStoreHandle, ChunkStoreSubscriber};
use re_chunk_store::{
ChunkCompactionReport, ChunkStoreDiff, ChunkStoreEvent, ChunkStoreHandle, ChunkStoreSubscriber,
};
use re_log_types::{EntityPath, ResolvedTimeRange, StoreId, TimeInt, Timeline};
use re_types_core::{components::ClearIsRecursive, Component as _, ComponentName};

Expand Down Expand Up @@ -315,11 +317,12 @@ impl ChunkStoreSubscriber for QueryCache {

compacted_events.insert(chunk.id());
// If a compaction was triggered, make sure to drop the original chunks too.
compacted_events.extend(
compacted
.iter()
.flat_map(|(compacted_chunks, _)| compacted_chunks.keys().copied()),
);
compacted_events.extend(compacted.iter().flat_map(
|ChunkCompactionReport {
srcs: compacted_chunks,
new_chunk: _,
}| compacted_chunks.keys().copied(),
));
}
}

Expand All @@ -336,7 +339,11 @@ impl ChunkStoreSubscriber for QueryCache {
let mut data_time_min = time_range.min();

// If a compaction was triggered, make sure to drop the original chunks too.
if let Some((compacted_chunks, _)) = compacted {
if let Some(ChunkCompactionReport {
srcs: compacted_chunks,
new_chunk: _,
}) = compacted
{
for chunk in compacted_chunks.values() {
let data_time_compacted = chunk
.time_range_per_component()
Expand Down Expand Up @@ -366,7 +373,12 @@ impl ChunkStoreSubscriber for QueryCache {
compacted_events.insert(chunk.id());
// If a compaction was triggered, make sure to drop the original chunks too.
compacted_events.extend(compacted.iter().flat_map(
|(compacted_chunks, _)| compacted_chunks.keys().copied(),
|ChunkCompactionReport {
srcs: compacted_chunks,
new_chunk: _,
}| {
compacted_chunks.keys().copied()
},
));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ pub struct TransformComponentTracker {
}

impl TransformComponentTracker {
/// Accesses the spatial topology for a given store.
/// Accesses the transform component tracking data for a given store.
#[inline]
pub fn access<T>(store_id: &StoreId, f: impl FnOnce(&Self) -> T) -> Option<T> {
ChunkStore::with_subscriber_once(
TransformComponentTrackerStoreSubscriber::subscription_handle(),
move |susbcriber: &TransformComponentTrackerStoreSubscriber| {
susbcriber.per_store.get(store_id).map(f)
move |subscriber: &TransformComponentTrackerStoreSubscriber| {
subscriber.per_store.get(store_id).map(f)
},
)
.flatten()
Expand Down
3 changes: 3 additions & 0 deletions crates/viewer/re_time_panel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ re_entity_db.workspace = true
re_format.workspace = true
re_int_histogram.workspace = true
re_log_types.workspace = true
re_log.workspace = true
re_tracing.workspace = true
re_types.workspace = true
re_ui.workspace = true
Expand All @@ -34,6 +35,8 @@ re_viewport_blueprint.workspace = true

egui.workspace = true
itertools.workspace = true
nohash-hasher.workspace = true
once_cell.workspace = true
serde.workspace = true
vec1.workspace = true

Expand Down
Loading