From fb05fffed75afd7dcaf45f5f57673560638a6e5c Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Sun, 6 Oct 2024 15:38:02 +0200 Subject: [PATCH] Add `ChunkStore::drop_time_range` --- .../re_chunk_store/src/drop_time_range.rs | 114 ++++++++++++++++++ crates/store/re_chunk_store/src/lib.rs | 1 + .../re_chunk_store/tests/drop_time_range.rs | 82 +++++++++++++ crates/store/re_entity_db/src/entity_db.rs | 13 ++ 4 files changed, 210 insertions(+) create mode 100644 crates/store/re_chunk_store/src/drop_time_range.rs create mode 100644 crates/store/re_chunk_store/tests/drop_time_range.rs diff --git a/crates/store/re_chunk_store/src/drop_time_range.rs b/crates/store/re_chunk_store/src/drop_time_range.rs new file mode 100644 index 0000000000000..7058425db9131 --- /dev/null +++ b/crates/store/re_chunk_store/src/drop_time_range.rs @@ -0,0 +1,114 @@ +use re_chunk::{ChunkId, Timeline}; +use re_log_types::ResolvedTimeRange; + +use crate::{ChunkStore, ChunkStoreEvent}; + +impl ChunkStore { + /// Drop all events that are in the given range on the given timeline. + /// + /// Note that matching events will be dropped from all timelines they appear on. + /// + /// Static chunks are unaffected. + /// + /// Used to implement undo (erase the last event from the blueprint db). + pub fn drop_time_range( + &mut self, + timeline: &Timeline, + drop_range: ResolvedTimeRange, + ) -> Vec { + re_tracing::profile_function!(); + + if drop_range.max() < drop_range.min() { + return Default::default(); + } + + // Prepare the changes: + + let mut chunk_ids_to_drop = vec![]; + let mut new_chunks = vec![]; + + for (chunk_id, chunk) in &self.chunks_per_chunk_id { + let Some(time_column) = chunk.timelines().get(timeline) else { + // static chunk, or chunk that doesn't overlap this timeline + continue; // keep it + }; + + let chunk_range = time_column.time_range(); + + if drop_range.contains_range(chunk_range) { + // The whole chunk should be dropped! + chunk_ids_to_drop.push(*chunk_id); + } else if drop_range.intersects(chunk_range) { + let chunk = chunk.sorted_by_timeline_if_unsorted(timeline); + + let num_rows = chunk.num_rows(); + + // Get the sorted times: + #[allow(clippy::unwrap_used)] // We already know the chunk has the timeline + let time_column = chunk.timelines().get(timeline).unwrap(); + let times = time_column.times_raw(); + + let drop_range_min = drop_range.min().as_i64(); + let drop_range_max = drop_range.max().as_i64(); + + let min_idx = times.partition_point(|&time| time < drop_range_min); + let max_idx = times.partition_point(|&time| time <= drop_range_max); + + { + // Sanity check: + debug_assert!(min_idx <= max_idx); + debug_assert!(drop_range_min <= times[min_idx]); + if 0 < min_idx { + debug_assert!(times[min_idx - 1] < drop_range_min); + } + if max_idx < num_rows { + debug_assert!(drop_range_max < times[max_idx]); + if 0 < max_idx { + debug_assert!(times[max_idx - 1] <= drop_range_max); + } + } + } + + if min_idx < max_idx { + chunk_ids_to_drop.push(*chunk_id); + if 0 < min_idx { + new_chunks.push(chunk.row_sliced(0, min_idx).with_id(ChunkId::new())); + } + if max_idx < num_rows { + new_chunks.push( + chunk + .row_sliced(max_idx, num_rows - max_idx) + .with_id(ChunkId::new()), + ); + } + } + } + } + + // ------------------ + // Apply the changes: + + let generation = self.generation(); + let mut events: Vec = vec![]; + + for chunk_id in chunk_ids_to_drop { + for diff in self.remove_chunk(chunk_id) { + events.push(ChunkStoreEvent { + store_id: self.id.clone(), + store_generation: generation.clone(), + event_id: self + .event_id + .fetch_add(1, std::sync::atomic::Ordering::Relaxed), + diff, + }); + } + } + for mut chunk in new_chunks { + chunk.sort_if_unsorted(); + #[allow(clippy::unwrap_used)] // The chunk came from the store, so it should be fine + events.append(&mut self.insert_chunk(&chunk.into()).unwrap()); + } + + events + } +} diff --git a/crates/store/re_chunk_store/src/lib.rs b/crates/store/re_chunk_store/src/lib.rs index 54f0df1f8e8a0..ddeb2e7bb87e2 100644 --- a/crates/store/re_chunk_store/src/lib.rs +++ b/crates/store/re_chunk_store/src/lib.rs @@ -15,6 +15,7 @@ //! mod dataframe; +mod drop_time_range; mod events; mod gc; mod query; diff --git a/crates/store/re_chunk_store/tests/drop_time_range.rs b/crates/store/re_chunk_store/tests/drop_time_range.rs new file mode 100644 index 0000000000000..2400dcac8127d --- /dev/null +++ b/crates/store/re_chunk_store/tests/drop_time_range.rs @@ -0,0 +1,82 @@ +// https://github.com/rust-lang/rust-clippy/issues/10011 +#![cfg(test)] + +use std::sync::Arc; + +use re_chunk::{Chunk, RowId}; +use re_chunk_store::{ChunkStore, ChunkStoreConfig}; +use re_log_types::{example_components::MyColor, ResolvedTimeRange}; +use re_log_types::{EntityPath, TimePoint, Timeline}; +use re_types_core::Loggable as _; + +#[test] +fn drop_time_range() -> anyhow::Result<()> { + re_log::setup_logging(); + + let entity_path = EntityPath::from("this/that"); + let timeline = Timeline::new_sequence("timeline"); + let data = MyColor::from_rgb(255, 0, 0); + let time_point_at = |time: i64| TimePoint::from([(timeline, time)]); + + for config in [ + ChunkStoreConfig::DEFAULT, + ChunkStoreConfig::COMPACTION_DISABLED, + ] { + let mut store = ChunkStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + config, + ); + + let num_events = |store: &ChunkStore| { + store.num_temporal_events_for_component_on_timeline( + &timeline, + &entity_path, + MyColor::name(), + ) + }; + + store.insert_chunk(&Arc::new( + Chunk::builder(entity_path.clone()) + .with_component_batch(RowId::new(), time_point_at(0), &data) + .with_component_batch(RowId::new(), time_point_at(1), &data) + .with_component_batch(RowId::new(), time_point_at(2), &data) + .with_component_batch(RowId::new(), time_point_at(3), &data) + .build()?, + ))?; + + store.insert_chunk(&Arc::new( + Chunk::builder(entity_path.clone()) + .with_component_batch(RowId::new(), time_point_at(2), &data) + .with_component_batch(RowId::new(), time_point_at(3), &data) + .with_component_batch(RowId::new(), time_point_at(4), &data) + .with_component_batch(RowId::new(), time_point_at(5), &data) + .build()?, + ))?; + + store.insert_chunk(&Arc::new( + Chunk::builder(entity_path.clone()) + .with_component_batch(RowId::new(), time_point_at(4), &data) + .with_component_batch(RowId::new(), time_point_at(5), &data) + .with_component_batch(RowId::new(), time_point_at(6), &data) + .with_component_batch(RowId::new(), time_point_at(7), &data) + .build()?, + ))?; + + assert_eq!(num_events(&store), 12); + + // Drop nothing: + store.drop_time_range(&timeline, ResolvedTimeRange::new(10, 100)); + store.drop_time_range(&timeline, ResolvedTimeRange::new(-100, -10)); + assert_eq!(num_events(&store), 12); + + // Drop stuff from the middle of the first chunk, and the start of the second: + store.drop_time_range(&timeline, ResolvedTimeRange::new(1, 2)); + assert_eq!(num_events(&store), 9); + + // Drop a bunch in the middle (including all of middle chunk): + store.drop_time_range(&timeline, ResolvedTimeRange::new(2, 5)); + assert_eq!(num_events(&store), 3); + } + + Ok(()) +} diff --git a/crates/store/re_entity_db/src/entity_db.rs b/crates/store/re_entity_db/src/entity_db.rs index 76e3636b7c99c..48e75b5222dc2 100644 --- a/crates/store/re_entity_db/src/entity_db.rs +++ b/crates/store/re_entity_db/src/entity_db.rs @@ -438,6 +438,19 @@ impl EntityDb { store_events } + /// Drop all events in the given time range from the given timeline. + /// + /// Used to implement undo (erase the last event from the blueprint db). + pub fn drop_time_range( + &mut self, + timeline: &Timeline, + drop_range: ResolvedTimeRange, + ) -> Vec { + let store_events = self.data_store.drop_time_range(timeline, drop_range); + self.on_store_deletions(&store_events); + store_events + } + /// Unconditionally drops all the data for a given [`EntityPath`] . /// /// This is _not_ recursive. Children of this entity will not be affected.