Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ChunkStore::drop_time_range #7602

Merged
merged 6 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions crates/store/re_chunk/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,18 @@ impl PartialEq for Chunk {
}

impl Chunk {
/// Returns a version of us with a new [`ChunkId`].
///
/// Reminder:
/// * The returned [`Chunk`] will re-use the exact same [`RowId`]s as `self`.
/// * Duplicated [`RowId`]s in the `ChunkStore` is undefined behavior.
#[must_use]
#[inline]
pub fn with_id(mut self, id: ChunkId) -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am almost tempted to mark this unsafe... Duplicated RowIds in the ChunkStore is straight up UB.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a reminder in the doc comment.

self.id = id;
self
}

/// Returns `true` is two [`Chunk`]s are similar, although not byte-for-byte equal.
///
/// In particular, this ignores chunks and row IDs, as well as temporal timestamps.
Expand Down
3 changes: 3 additions & 0 deletions crates/store/re_chunk/src/shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ impl Chunk {
/// The underlying arrow data will be copied and shuffled in memory in order to make it contiguous.
///
/// This is a no-op if the underlying timeline is already sorted appropriately (happy path).
///
/// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
#[must_use]
pub fn sorted_by_timeline_if_unsorted(&self, timeline: &Timeline) -> Self {
let mut chunk = self.clone();

Expand Down
21 changes: 21 additions & 0 deletions crates/store/re_chunk/src/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ impl Chunk {
/// This cannot fail nor panic: `index` and `len` will be capped so that they cannot
/// run out of bounds.
/// This can result in an empty [`Chunk`] being returned if the slice is completely OOB.
///
/// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
#[must_use]
#[inline]
pub fn row_sliced(&self, index: usize, len: usize) -> Self {
re_tracing::profile_function!();
Expand Down Expand Up @@ -144,6 +147,9 @@ impl Chunk {
///
/// If `timeline` is not found within the [`Chunk`], the end result will be the same as the
/// current chunk but without any timeline column.
///
/// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
#[must_use]
#[inline]
pub fn timeline_sliced(&self, timeline: Timeline) -> Self {
let Self {
Expand Down Expand Up @@ -184,6 +190,9 @@ impl Chunk {
///
/// If `component_name` is not found within the [`Chunk`], the end result will be the same as the
/// current chunk but without any component column.
///
/// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
#[must_use]
#[inline]
pub fn component_sliced(&self, component_name: ComponentName) -> Self {
let Self {
Expand Down Expand Up @@ -224,6 +233,9 @@ impl Chunk {
///
/// If none of the selected timelines exist in the [`Chunk`], the end result will be the same as the
/// current chunk but without any timeline column.
///
/// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
#[must_use]
#[inline]
pub fn timelines_sliced(&self, timelines_to_keep: &IntSet<Timeline>) -> Self {
let Self {
Expand Down Expand Up @@ -264,6 +276,9 @@ impl Chunk {
///
/// If none of the `component_names` exist in the [`Chunk`], the end result will be the same as the
/// current chunk but without any component column.
///
/// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
#[must_use]
#[inline]
pub fn components_sliced(&self, component_names: &IntSet<ComponentName>) -> Self {
let Self {
Expand Down Expand Up @@ -306,6 +321,9 @@ impl Chunk {
///
/// If `component_name` doesn't exist in this [`Chunk`], or if it is already dense, this method
/// is a no-op.
///
/// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
#[must_use]
#[inline]
pub fn densified(&self, component_name_pov: ComponentName) -> Self {
let Self {
Expand Down Expand Up @@ -400,6 +418,9 @@ impl Chunk {
/// Empties the [`Chunk`] vertically.
///
/// The result is a new [`Chunk`] with the same columns but zero rows.
///
/// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
#[must_use]
#[inline]
pub fn emptied(&self) -> Self {
let Self {
Expand Down
114 changes: 114 additions & 0 deletions crates/store/re_chunk_store/src/drop_time_range.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use re_chunk::{ChunkId, Timeline};
use re_log_types::ResolvedTimeRange;

use crate::{ChunkStore, ChunkStoreEvent};

impl ChunkStore {
/// Drop all events that are in the given range on the given timeline.
///
/// Note that matching events will be dropped from all timelines they appear on.
///
/// Static chunks are unaffected.
///
/// Used to implement undo (erase the last event from the blueprint db).
pub fn drop_time_range(
&mut self,
timeline: &Timeline,
drop_range: ResolvedTimeRange,
) -> Vec<ChunkStoreEvent> {
re_tracing::profile_function!();

if drop_range.max() < drop_range.min() {
return Default::default();
}

// Prepare the changes:

let mut chunk_ids_to_drop = vec![];
let mut new_chunks = vec![];

for (chunk_id, chunk) in &self.chunks_per_chunk_id {
let Some(time_column) = chunk.timelines().get(timeline) else {
// static chunk, or chunk that doesn't overlap this timeline
continue; // keep it
};

let chunk_range = time_column.time_range();

if drop_range.contains_range(chunk_range) {
// The whole chunk should be dropped!
chunk_ids_to_drop.push(*chunk_id);
} else if drop_range.intersects(chunk_range) {
let chunk = chunk.sorted_by_timeline_if_unsorted(timeline);

let num_rows = chunk.num_rows();

// Get the sorted times:
#[allow(clippy::unwrap_used)] // We already know the chunk has the timeline
let time_column = chunk.timelines().get(timeline).unwrap();
let times = time_column.times_raw();

let drop_range_min = drop_range.min().as_i64();
let drop_range_max = drop_range.max().as_i64();

let min_idx = times.partition_point(|&time| time < drop_range_min);
let max_idx = times.partition_point(|&time| time <= drop_range_max);

{
// Sanity check:
debug_assert!(min_idx <= max_idx);
debug_assert!(drop_range_min <= times[min_idx]);
if 0 < min_idx {
debug_assert!(times[min_idx - 1] < drop_range_min);
}
if max_idx < num_rows {
debug_assert!(drop_range_max < times[max_idx]);
if 0 < max_idx {
debug_assert!(times[max_idx - 1] <= drop_range_max);
}
}
}

if min_idx < max_idx {
chunk_ids_to_drop.push(*chunk_id);
if 0 < min_idx {
new_chunks.push(chunk.row_sliced(0, min_idx).with_id(ChunkId::new()));
}
if max_idx < num_rows {
new_chunks.push(
chunk
.row_sliced(max_idx, num_rows - max_idx)
.with_id(ChunkId::new()),
);
}
}
}
}

// ------------------
// Apply the changes:

let generation = self.generation();
let mut events: Vec<ChunkStoreEvent> = vec![];

for chunk_id in chunk_ids_to_drop {
for diff in self.remove_chunk(chunk_id) {
events.push(ChunkStoreEvent {
store_id: self.id.clone(),
store_generation: generation.clone(),
event_id: self
.event_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
diff,
});
}
}
for mut chunk in new_chunks {
chunk.sort_if_unsorted();
#[allow(clippy::unwrap_used)] // The chunk came from the store, so it should be fine
events.append(&mut self.insert_chunk(&chunk.into()).unwrap());
}

events
}
}
1 change: 1 addition & 0 deletions crates/store/re_chunk_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//!

mod dataframe;
mod drop_time_range;
mod events;
mod gc;
mod query;
Expand Down
82 changes: 82 additions & 0 deletions crates/store/re_chunk_store/tests/drop_time_range.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// https://github.com/rust-lang/rust-clippy/issues/10011
#![cfg(test)]

use std::sync::Arc;

use re_chunk::{Chunk, RowId};
use re_chunk_store::{ChunkStore, ChunkStoreConfig};
use re_log_types::{example_components::MyColor, ResolvedTimeRange};
use re_log_types::{EntityPath, TimePoint, Timeline};
use re_types_core::Loggable as _;

#[test]
fn drop_time_range() -> anyhow::Result<()> {
re_log::setup_logging();

let entity_path = EntityPath::from("this/that");
let timeline = Timeline::new_sequence("timeline");
let data = MyColor::from_rgb(255, 0, 0);
let time_point_at = |time: i64| TimePoint::from([(timeline, time)]);

for config in [
ChunkStoreConfig::DEFAULT,
ChunkStoreConfig::COMPACTION_DISABLED,
] {
let mut store = ChunkStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
config,
);

let num_events = |store: &ChunkStore| {
store.num_temporal_events_for_component_on_timeline(
&timeline,
&entity_path,
MyColor::name(),
)
};

store.insert_chunk(&Arc::new(
Chunk::builder(entity_path.clone())
.with_component_batch(RowId::new(), time_point_at(0), &data)
.with_component_batch(RowId::new(), time_point_at(1), &data)
.with_component_batch(RowId::new(), time_point_at(2), &data)
.with_component_batch(RowId::new(), time_point_at(3), &data)
.build()?,
))?;

store.insert_chunk(&Arc::new(
Chunk::builder(entity_path.clone())
.with_component_batch(RowId::new(), time_point_at(2), &data)
.with_component_batch(RowId::new(), time_point_at(3), &data)
.with_component_batch(RowId::new(), time_point_at(4), &data)
.with_component_batch(RowId::new(), time_point_at(5), &data)
.build()?,
))?;

store.insert_chunk(&Arc::new(
Chunk::builder(entity_path.clone())
.with_component_batch(RowId::new(), time_point_at(4), &data)
.with_component_batch(RowId::new(), time_point_at(5), &data)
.with_component_batch(RowId::new(), time_point_at(6), &data)
.with_component_batch(RowId::new(), time_point_at(7), &data)
.build()?,
))?;

assert_eq!(num_events(&store), 12);

// Drop nothing:
store.drop_time_range(&timeline, ResolvedTimeRange::new(10, 100));
store.drop_time_range(&timeline, ResolvedTimeRange::new(-100, -10));
assert_eq!(num_events(&store), 12);

// Drop stuff from the middle of the first chunk, and the start of the second:
store.drop_time_range(&timeline, ResolvedTimeRange::new(1, 2));
assert_eq!(num_events(&store), 9);

// Drop a bunch in the middle (including all of middle chunk):
store.drop_time_range(&timeline, ResolvedTimeRange::new(2, 5));
assert_eq!(num_events(&store), 3);
}

Ok(())
}
13 changes: 13 additions & 0 deletions crates/store/re_entity_db/src/entity_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,19 @@ impl EntityDb {
store_events
}

/// Drop all events in the given time range from the given timeline.
///
/// Used to implement undo (erase the last event from the blueprint db).
pub fn drop_time_range(
&mut self,
timeline: &Timeline,
drop_range: ResolvedTimeRange,
) -> Vec<ChunkStoreEvent> {
let store_events = self.data_store.drop_time_range(timeline, drop_range);
self.on_store_deletions(&store_events);
store_events
}

/// Unconditionally drops all the data for a given [`EntityPath`] .
///
/// This is _not_ recursive. Children of this entity will not be affected.
Expand Down
6 changes: 6 additions & 0 deletions crates/store/re_log_types/src/resolved_time_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ impl ResolvedTimeRange {
self.min <= time && time <= self.max
}

/// Does this range fully contain the other?
#[inline]
pub fn contains_range(&self, other: Self) -> bool {
self.min <= other.min && other.max <= self.max
}

#[inline]
pub fn intersects(&self, other: Self) -> bool {
self.min <= other.max && self.max >= other.min
Expand Down
2 changes: 2 additions & 0 deletions crates/store/re_log_types/src/time_point/time_int.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl TimeInt {

/// Always returns [`Self::STATIC`] for [`Self::STATIC`].
#[inline]
#[must_use]
pub fn inc(&self) -> Self {
match self.0 {
Some(t) => Self::new_temporal(t.get().saturating_add(1)),
Expand All @@ -127,6 +128,7 @@ impl TimeInt {

/// Always returns [`Self::STATIC`] for [`Self::STATIC`].
#[inline]
#[must_use]
pub fn dec(&self) -> Self {
match self.0 {
Some(t) => Self::new_temporal(t.get().saturating_sub(1)),
Expand Down
Loading