Skip to content

Commit

Permalink
Add ChunkStore::drop_time_range
Browse files Browse the repository at this point in the history
  • Loading branch information
emilk committed Oct 6, 2024
1 parent 933fc8f commit fb05fff
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 0 deletions.
114 changes: 114 additions & 0 deletions crates/store/re_chunk_store/src/drop_time_range.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use re_chunk::{ChunkId, Timeline};
use re_log_types::ResolvedTimeRange;

use crate::{ChunkStore, ChunkStoreEvent};

impl ChunkStore {
/// Drop all events that are in the given range on the given timeline.
///
/// Note that matching events will be dropped from all timelines they appear on.
///
/// Static chunks are unaffected.
///
/// Used to implement undo (erase the last event from the blueprint db).
pub fn drop_time_range(
&mut self,
timeline: &Timeline,
drop_range: ResolvedTimeRange,
) -> Vec<ChunkStoreEvent> {
re_tracing::profile_function!();

if drop_range.max() < drop_range.min() {
return Default::default();
}

// Prepare the changes:

let mut chunk_ids_to_drop = vec![];
let mut new_chunks = vec![];

for (chunk_id, chunk) in &self.chunks_per_chunk_id {
let Some(time_column) = chunk.timelines().get(timeline) else {
// static chunk, or chunk that doesn't overlap this timeline
continue; // keep it
};

let chunk_range = time_column.time_range();

if drop_range.contains_range(chunk_range) {
// The whole chunk should be dropped!
chunk_ids_to_drop.push(*chunk_id);
} else if drop_range.intersects(chunk_range) {
let chunk = chunk.sorted_by_timeline_if_unsorted(timeline);

let num_rows = chunk.num_rows();

// Get the sorted times:
#[allow(clippy::unwrap_used)] // We already know the chunk has the timeline
let time_column = chunk.timelines().get(timeline).unwrap();
let times = time_column.times_raw();

let drop_range_min = drop_range.min().as_i64();
let drop_range_max = drop_range.max().as_i64();

let min_idx = times.partition_point(|&time| time < drop_range_min);
let max_idx = times.partition_point(|&time| time <= drop_range_max);

{
// Sanity check:
debug_assert!(min_idx <= max_idx);
debug_assert!(drop_range_min <= times[min_idx]);
if 0 < min_idx {
debug_assert!(times[min_idx - 1] < drop_range_min);
}
if max_idx < num_rows {
debug_assert!(drop_range_max < times[max_idx]);
if 0 < max_idx {
debug_assert!(times[max_idx - 1] <= drop_range_max);
}
}
}

if min_idx < max_idx {
chunk_ids_to_drop.push(*chunk_id);
if 0 < min_idx {
new_chunks.push(chunk.row_sliced(0, min_idx).with_id(ChunkId::new()));
}
if max_idx < num_rows {
new_chunks.push(
chunk
.row_sliced(max_idx, num_rows - max_idx)
.with_id(ChunkId::new()),
);
}
}
}
}

// ------------------
// Apply the changes:

let generation = self.generation();
let mut events: Vec<ChunkStoreEvent> = vec![];

for chunk_id in chunk_ids_to_drop {
for diff in self.remove_chunk(chunk_id) {
events.push(ChunkStoreEvent {
store_id: self.id.clone(),
store_generation: generation.clone(),
event_id: self
.event_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
diff,
});
}
}
for mut chunk in new_chunks {
chunk.sort_if_unsorted();
#[allow(clippy::unwrap_used)] // The chunk came from the store, so it should be fine
events.append(&mut self.insert_chunk(&chunk.into()).unwrap());
}

events
}
}
1 change: 1 addition & 0 deletions crates/store/re_chunk_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//!
mod dataframe;
mod drop_time_range;
mod events;
mod gc;
mod query;
Expand Down
82 changes: 82 additions & 0 deletions crates/store/re_chunk_store/tests/drop_time_range.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// https://github.com/rust-lang/rust-clippy/issues/10011
#![cfg(test)]

use std::sync::Arc;

use re_chunk::{Chunk, RowId};
use re_chunk_store::{ChunkStore, ChunkStoreConfig};
use re_log_types::{example_components::MyColor, ResolvedTimeRange};
use re_log_types::{EntityPath, TimePoint, Timeline};
use re_types_core::Loggable as _;

#[test]
fn drop_time_range() -> anyhow::Result<()> {
re_log::setup_logging();

let entity_path = EntityPath::from("this/that");
let timeline = Timeline::new_sequence("timeline");
let data = MyColor::from_rgb(255, 0, 0);
let time_point_at = |time: i64| TimePoint::from([(timeline, time)]);

for config in [
ChunkStoreConfig::DEFAULT,
ChunkStoreConfig::COMPACTION_DISABLED,
] {
let mut store = ChunkStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
config,
);

let num_events = |store: &ChunkStore| {
store.num_temporal_events_for_component_on_timeline(
&timeline,
&entity_path,
MyColor::name(),
)
};

store.insert_chunk(&Arc::new(
Chunk::builder(entity_path.clone())
.with_component_batch(RowId::new(), time_point_at(0), &data)
.with_component_batch(RowId::new(), time_point_at(1), &data)
.with_component_batch(RowId::new(), time_point_at(2), &data)
.with_component_batch(RowId::new(), time_point_at(3), &data)
.build()?,
))?;

store.insert_chunk(&Arc::new(
Chunk::builder(entity_path.clone())
.with_component_batch(RowId::new(), time_point_at(2), &data)
.with_component_batch(RowId::new(), time_point_at(3), &data)
.with_component_batch(RowId::new(), time_point_at(4), &data)
.with_component_batch(RowId::new(), time_point_at(5), &data)
.build()?,
))?;

store.insert_chunk(&Arc::new(
Chunk::builder(entity_path.clone())
.with_component_batch(RowId::new(), time_point_at(4), &data)
.with_component_batch(RowId::new(), time_point_at(5), &data)
.with_component_batch(RowId::new(), time_point_at(6), &data)
.with_component_batch(RowId::new(), time_point_at(7), &data)
.build()?,
))?;

assert_eq!(num_events(&store), 12);

// Drop nothing:
store.drop_time_range(&timeline, ResolvedTimeRange::new(10, 100));
store.drop_time_range(&timeline, ResolvedTimeRange::new(-100, -10));
assert_eq!(num_events(&store), 12);

// Drop stuff from the middle of the first chunk, and the start of the second:
store.drop_time_range(&timeline, ResolvedTimeRange::new(1, 2));
assert_eq!(num_events(&store), 9);

// Drop a bunch in the middle (including all of middle chunk):
store.drop_time_range(&timeline, ResolvedTimeRange::new(2, 5));
assert_eq!(num_events(&store), 3);
}

Ok(())
}
13 changes: 13 additions & 0 deletions crates/store/re_entity_db/src/entity_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,19 @@ impl EntityDb {
store_events
}

/// Drop all events in the given time range from the given timeline.
///
/// Used to implement undo (erase the last event from the blueprint db).
pub fn drop_time_range(
&mut self,
timeline: &Timeline,
drop_range: ResolvedTimeRange,
) -> Vec<ChunkStoreEvent> {
let store_events = self.data_store.drop_time_range(timeline, drop_range);
self.on_store_deletions(&store_events);
store_events
}

/// Unconditionally drops all the data for a given [`EntityPath`] .
///
/// This is _not_ recursive. Children of this entity will not be affected.
Expand Down

0 comments on commit fb05fff

Please sign in to comment.