Skip to content

Commit

Permalink
GC improvements 5: Store{Diff,Event} optimizations (#4399)
Browse files Browse the repository at this point in the history
Optimize the creation of `StoreDiff`s and `StoreEvent`s, which turns out
to be a major cost in time series use cases, when it is common to
generate several millions of those on any single GC run.

Once again some pretty significant wins.

### Benchmarks

Compared to `main`:
```
group                                                     gc_improvements_0                       gc_improvements_5
-----                                                     -----------------                       -----------------
.../plotting_dashboard/drop_at_least=0.3/bucketsz=1024    13.00  1084.0±4.47ms 54.1 KElem/sec     1.00     83.4±1.16ms 702.9 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=2048    25.37      2.1±0.02s 27.6 KElem/sec     1.00     83.7±0.61ms 700.0 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=256     5.55    465.8±2.50ms 125.8 KElem/sec    1.00     84.0±0.50ms 697.8 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=512     7.94    655.3±2.61ms 89.4 KElem/sec     1.00     82.5±1.33ms 710.0 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/default          8.02    652.8±4.12ms 89.8 KElem/sec     1.00     81.4±0.94ms 720.0 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=1024         35.87      2.4±0.05s 24.2 KElem/sec     1.00     67.5±2.21ms 867.5 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=2048         35.91      2.4±0.03s 24.1 KElem/sec     1.00     67.8±1.86ms 863.9 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=256          37.02      2.5±0.08s 23.5 KElem/sec     1.00     67.5±1.43ms 868.2 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=512          35.47      2.4±0.02s 24.5 KElem/sec     1.00     67.4±1.40ms 869.4 KElem/sec
.../timeless_logs/drop_at_least=0.3/default               36.00      2.4±0.03s 24.4 KElem/sec     1.00     66.8±0.85ms 877.3 KElem/sec
```

Compared to previous PR:
```
group                                                     gc_improvements_4                       gc_improvements_5
-----                                                     -----------------                       -----------------
.../plotting_dashboard/drop_at_least=0.3/bucketsz=1024    1.26    105.0±0.91ms 558.1 KElem/sec    1.00     83.4±1.16ms 702.9 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=2048    1.28    107.3±0.83ms 546.2 KElem/sec    1.00     83.7±0.61ms 700.0 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=256     1.27    106.3±0.74ms 551.3 KElem/sec    1.00     84.0±0.50ms 697.8 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=512     1.29    106.4±0.94ms 550.6 KElem/sec    1.00     82.5±1.33ms 710.0 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/default          1.26    102.9±0.75ms 569.4 KElem/sec    1.00     81.4±0.94ms 720.0 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=1024         1.00     65.3±0.81ms 897.6 KElem/sec    1.03     67.5±2.21ms 867.5 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=2048         1.00     64.9±1.07ms 903.2 KElem/sec    1.05     67.8±1.86ms 863.9 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=256          1.00     64.4±0.99ms 910.2 KElem/sec    1.05     67.5±1.43ms 868.2 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=512          1.00     64.6±1.08ms 906.9 KElem/sec    1.04     67.4±1.40ms 869.4 KElem/sec
.../timeless_logs/drop_at_least=0.3/default               1.00     65.3±1.29ms 897.3 KElem/sec    1.02     66.8±0.85ms 877.3 KElem/sec
```

---

Part of the GC improvements series:
- #4394
- #4395
- #4396
- #4397
- #4398
- #4399
- #4400
- #4401
  • Loading branch information
teh-cmc authored Dec 2, 2023
1 parent 1ce4563 commit 28d8336
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 77 deletions.
66 changes: 20 additions & 46 deletions crates/re_arrow_store/src/store_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,17 @@ pub struct StoreDiff {
/// one addition and (optionally) one deletion (in that order!).
pub row_id: RowId,

/// The [`TimePoint`] associated with that row.
/// The time data associated with that row.
///
/// Since insertions and deletions both work on a row-level basis, this is guaranteed to be the
/// same value for both the insertion and deletion events (if any).
pub timepoint: TimePoint,
///
/// This is not a [`TimePoint`] for performance reasons.
//
// NOTE: Empirical testing shows that a SmallVec isn't any better in the best case, and can be a
// significant performant drop at worst.
// pub times: SmallVec<[(Timeline, TimeInt); 5]>, // "5 timelines ought to be enough for anyone"
pub times: Vec<(Timeline, TimeInt)>,

/// The [`EntityPath`] associated with that row.
///
Expand All @@ -137,7 +143,7 @@ impl StoreDiff {
Self {
kind: StoreDiffKind::Addition,
row_id: row_id.into(),
timepoint: TimePoint::timeless(),
times: Default::default(),
entity_path: entity_path.into(),
cells: Default::default(),
}
Expand All @@ -148,75 +154,43 @@ impl StoreDiff {
Self {
kind: StoreDiffKind::Deletion,
row_id: row_id.into(),
timepoint: TimePoint::timeless(),
times: Default::default(),
entity_path: entity_path.into(),
cells: Default::default(),
}
}

#[inline]
pub fn at_timepoint(mut self, timepoint: impl Into<TimePoint>) -> StoreDiff {
self.timepoint = self.timepoint.union_max(&timepoint.into());
pub fn at_timepoint(&mut self, timepoint: impl Into<TimePoint>) -> &mut Self {
self.times.extend(timepoint.into());
self
}

#[inline]
pub fn at_timestamp(
mut self,
&mut self,
timeline: impl Into<Timeline>,
time: impl Into<TimeInt>,
) -> StoreDiff {
self.timepoint.insert(timeline.into(), time.into());
) -> &mut Self {
self.times.push((timeline.into(), time.into()));
self
}

#[inline]
pub fn with_cells(mut self, cells: impl IntoIterator<Item = DataCell>) -> Self {
pub fn with_cells(&mut self, cells: impl IntoIterator<Item = DataCell>) -> &mut Self {
self.cells
.extend(cells.into_iter().map(|cell| (cell.component_name(), cell)));
self
}

/// Returns the union of two [`StoreDiff`]s.
///
/// They must share the same [`RowId`], [`EntityPath`] and [`StoreDiffKind`].
#[inline]
pub fn union(&self, rhs: &Self) -> Option<Self> {
let Self {
kind: lhs_kind,
row_id: lhs_row_id,
timepoint: lhs_timepoint,
entity_path: lhs_entity_path,
cells: lhs_cells,
} = self;
let Self {
kind: rhs_kind,
row_id: rhs_row_id,
timepoint: rhs_timepoint,
entity_path: rhs_entity_path,
cells: rhs_cells,
} = rhs;

let same_kind = lhs_kind == rhs_kind;
let same_row_id = lhs_row_id == rhs_row_id;
let same_entity_path = lhs_entity_path == rhs_entity_path;

(same_kind && same_row_id && same_entity_path).then(|| Self {
kind: *lhs_kind,
row_id: *lhs_row_id,
timepoint: lhs_timepoint.clone().union_max(rhs_timepoint),
entity_path: lhs_entity_path.clone(),
cells: [lhs_cells.values(), rhs_cells.values()]
.into_iter()
.flatten()
.map(|cell| (cell.component_name(), cell.clone()))
.collect(),
})
pub fn timepoint(&self) -> TimePoint {
self.times.clone().into_iter().collect()
}

#[inline]
pub fn is_timeless(&self) -> bool {
self.timepoint.is_timeless()
self.times.is_empty()
}

/// `-1` for deletions, `+1` for additions.
Expand Down Expand Up @@ -297,7 +271,7 @@ mod tests {
if event.is_timeless() {
self.timeless += delta;
} else {
for (&timeline, &time) in &event.timepoint {
for &(timeline, time) in &event.times {
*self.timelines.entry(timeline).or_default() += delta;
*self.times.entry(time).or_default() += delta;
}
Expand Down
19 changes: 9 additions & 10 deletions crates/re_arrow_store/src/store_gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl DataStore {
table.try_drop_row(&self.cluster_cell_cache, row_id, time.as_i64());
if let Some(inner) = diff.as_mut() {
if let Some(removed) = removed {
diff = inner.union(&removed);
inner.times.extend(removed.times);
}
} else {
diff = removed;
Expand All @@ -258,7 +258,7 @@ impl DataStore {
table.try_drop_row(&self.cluster_cell_cache, row_id);
if let Some(inner) = diff.as_mut() {
if let Some(removed) = removed {
diff = inner.union(&removed);
inner.times.extend(removed.times);
}
} else {
diff = removed;
Expand Down Expand Up @@ -476,7 +476,7 @@ impl DataStore {
.entry(row_id)
.or_insert_with(|| StoreDiff::deletion(row_id, entity_path.clone()));

diff.timepoint.insert(bucket.timeline, time.into());
diff.times.push((bucket.timeline, time.into()));

for column in &mut inner.columns.values_mut() {
let cell = column[i].take();
Expand Down Expand Up @@ -657,10 +657,9 @@ impl IndexedBucketInner {
if let Some(inner) = diff.as_mut() {
inner.cells.insert(cell.component_name(), cell);
} else {
diff = StoreDiff::deletion(removed_row_id, ent_path.clone())
.at_timestamp(timeline, time)
.with_cells([cell])
.into();
let mut d = StoreDiff::deletion(removed_row_id, ent_path.clone());
d.at_timestamp(timeline, time).with_cells([cell]);
diff = Some(d);
}
}
}
Expand Down Expand Up @@ -752,9 +751,9 @@ impl PersistentIndexedTable {
if let Some(inner) = diff.as_mut() {
inner.cells.insert(cell.component_name(), cell);
} else {
diff = StoreDiff::deletion(removed_row_id, ent_path.clone())
.with_cells([cell])
.into();
let mut d = StoreDiff::deletion(removed_row_id, ent_path.clone());
d.cells.insert(cell.component_name(), cell);
diff = Some(d);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/re_arrow_store/src/store_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ impl DataStore {
}
}

let diff = StoreDiff::addition(*row_id, entity_path.clone())
.at_timepoint(timepoint.clone())
let mut diff = StoreDiff::addition(*row_id, entity_path.clone());
diff.at_timepoint(timepoint.clone())
.with_cells(cells.iter().cloned());

// TODO(#4220): should we fire for auto-generated data?
Expand Down
22 changes: 11 additions & 11 deletions crates/re_data_store/src/entity_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl CompactedStoreEvents {
event.delta().unsigned_abs();
}
} else {
for (&timeline, &time) in &event.timepoint {
for &(timeline, time) in &event.times {
let per_timeline = this.timeful.entry(event.entity_path.hash()).or_default();
let per_component = per_timeline.entry(timeline).or_default();
for component_name in event.cells.keys() {
Expand Down Expand Up @@ -205,7 +205,7 @@ impl EntityTree {
let leaf = self.create_subtrees_recursively(
event.diff.entity_path.as_slice(),
0,
&event.diff.timepoint,
&event.diff.times,
event.num_components() as _,
);

Expand Down Expand Up @@ -235,7 +235,7 @@ impl EntityTree {
pending_clears = self.flat_clears.clone().into_iter().collect_vec();
Default::default()
});
per_component.add(&store_diff.timepoint, 1);
per_component.add(&store_diff.times, 1);

// Is the newly added component under the influence of previously logged `Clear`
// component?
Expand Down Expand Up @@ -343,7 +343,7 @@ impl EntityTree {
next,
is_recursive,
store_diff.row_id,
store_diff.timepoint.clone(),
store_diff.timepoint(),
));
stack.extend(next.children.values_mut().collect::<Vec<&mut Self>>());
}
Expand All @@ -352,7 +352,7 @@ impl EntityTree {
self,
is_recursive,
store_diff.row_id,
store_diff.timepoint.clone(),
store_diff.timepoint(),
));
}

Expand Down Expand Up @@ -387,7 +387,7 @@ impl EntityTree {
.entry(component_path.entity_path().clone())
.or_default();

*timepoint = store_diff.timepoint.clone().union_max(timepoint);
*timepoint = store_diff.timepoint().union_max(timepoint);
component_paths.insert(component_path.clone());
}
}
Expand Down Expand Up @@ -433,7 +433,7 @@ impl EntityTree {
for event in filtered_events.iter().filter(|e| &e.entity_path == path) {
for component_name in event.cells.keys() {
if let Some(histo) = self.time_histograms_per_component.get_mut(component_name) {
histo.remove(&event.timepoint, 1);
histo.remove(&event.timepoint(), 1);
if histo.is_empty() {
self.time_histograms_per_component.remove(component_name);
}
Expand All @@ -442,7 +442,7 @@ impl EntityTree {
}

for event in &filtered_events {
recursive_time_histogram.remove(&event.timepoint, event.num_components() as _);
recursive_time_histogram.remove(&event.timepoint(), event.num_components() as _);
}

children.retain(|_, child| {
Expand All @@ -458,10 +458,10 @@ impl EntityTree {
&mut self,
full_path: &[EntityPathPart],
depth: usize,
timepoint: &TimePoint,
times: &[(Timeline, TimeInt)],
num_components: u32,
) -> &mut Self {
self.recursive_time_histogram.add(timepoint, num_components);
self.recursive_time_histogram.add(times, num_components);

match full_path.get(depth) {
None => {
Expand All @@ -473,7 +473,7 @@ impl EntityTree {
.or_insert_with(|| {
EntityTree::new(full_path[..depth + 1].into(), self.recursive_clears.clone())
})
.create_subtrees_recursively(full_path, depth + 1, timepoint, num_components),
.create_subtrees_recursively(full_path, depth + 1, times, num_components),
}
}

Expand Down
12 changes: 6 additions & 6 deletions crates/re_data_store/src/time_histogram_per_timeline.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::BTreeMap;

use re_arrow_store::{StoreEvent, StoreSubscriber};
use re_log_types::{TimePoint, Timeline};
use re_log_types::{TimeInt, TimePoint, Timeline};

// ---

Expand Down Expand Up @@ -51,8 +51,8 @@ impl TimeHistogramPerTimeline {
self.num_timeless_messages
}

pub fn add(&mut self, timepoint: &TimePoint, n: u32) {
if timepoint.is_timeless() {
pub fn add(&mut self, times: &[(Timeline, TimeInt)], n: u32) {
if times.is_empty() {
self.num_timeless_messages = self
.num_timeless_messages
.checked_add(n as u64)
Expand All @@ -61,11 +61,11 @@ impl TimeHistogramPerTimeline {
u64::MAX
});
} else {
for (timeline, time_value) in timepoint.iter() {
for &(timeline, time) in times {
self.times
.entry(*timeline)
.entry(timeline)
.or_default()
.increment(time_value.as_i64(), n);
.increment(time.as_i64(), n);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/re_data_store/src/times_per_timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl StoreSubscriber for TimesPerTimeline {
re_tracing::profile_function!(format!("num_events={}", events.len()));

for event in events {
for (&timeline, &time) in &event.timepoint {
for &(timeline, time) in &event.times {
let per_time = self.0.entry(timeline).or_default();
let count = per_time.entry(time).or_default();

Expand Down
2 changes: 1 addition & 1 deletion examples/rust/custom_store_subscriber/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl StoreSubscriber for TimeRangesPerEntity {

fn on_events(&mut self, events: &[StoreEvent]) {
for event in events {
for (&timeline, &time) in &event.timepoint {
for &(timeline, time) in &event.times {
// update counters
let per_timeline = self.times.entry(event.entity_path.clone()).or_default();
let per_time = per_timeline.entry(timeline).or_default();
Expand Down

0 comments on commit 28d8336

Please sign in to comment.