Skip to content

Commit

Permalink
ChunkStore: Add ability to protect specific time ranges from GC
Browse files Browse the repository at this point in the history
  • Loading branch information
emilk committed Oct 6, 2024
1 parent fb05fff commit 5ee00d5
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 19 deletions.
22 changes: 21 additions & 1 deletion crates/store/re_chunk_store/src/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use nohash_hasher::IntMap;
use web_time::Instant;

use re_chunk::{Chunk, ChunkId};
use re_log_types::{EntityPath, TimeInt, Timeline};
use re_log_types::{EntityPath, ResolvedTimeRange, TimeInt, Timeline};
use re_types_core::{ComponentName, SizeBytes};

use crate::{
Expand Down Expand Up @@ -51,6 +51,9 @@ pub struct GarbageCollectionOptions {

/// How many component revisions to preserve on each timeline.
pub protect_latest: usize,

/// Do not remove any data within these time ranges.
pub protected_time_ranges: HashMap<Timeline, ResolvedTimeRange>,
}

impl GarbageCollectionOptions {
Expand All @@ -59,8 +62,21 @@ impl GarbageCollectionOptions {
target: GarbageCollectionTarget::Everything,
time_budget: std::time::Duration::MAX,
protect_latest: 0,
protected_time_ranges: Default::default(),
}
}

/// If true, we cannot remove this chunk.
pub fn is_chunk_protected(&self, chunk: &Chunk) -> bool {
for (timeline, protected_time_range) in &self.protected_time_ranges {
if let Some(time_column) = chunk.timelines().get(timeline) {
if time_column.time_range().intersects(*protected_time_range) {
return true;
}
}
}
false
}
}

impl std::fmt::Display for GarbageCollectionTarget {
Expand Down Expand Up @@ -269,6 +285,10 @@ impl ChunkStore {
.filter(|chunk_id| !protected_chunk_ids.contains(chunk_id))
{
if let Some(chunk) = self.chunks_per_chunk_id.get(chunk_id) {
if options.is_chunk_protected(chunk) {
continue;
}

// NOTE: Do _NOT_ use `chunk.total_size_bytes` as it is sitting behind an Arc
// and would count as amortized (i.e. 0 bytes).
num_bytes_to_drop -= <Chunk as SizeBytes>::total_size_bytes(chunk) as f64;
Expand Down
104 changes: 87 additions & 17 deletions crates/store/re_chunk_store/tests/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use re_chunk_store::{
use re_log_types::{
build_frame_nr, build_log_time,
example_components::{MyColor, MyIndex, MyPoint},
EntityPath, Time, TimeType, Timeline,
EntityPath, ResolvedTimeRange, Time, TimeType, Timeline,
};
use re_types::testing::build_some_large_structs;
use re_types_core::Loggable as _;
Expand Down Expand Up @@ -51,11 +51,7 @@ fn simple() -> anyhow::Result<()> {

let mut store = ChunkStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
ChunkStoreConfig {
chunk_max_bytes: 0,
chunk_max_rows: 0,
..ChunkStoreConfig::DEFAULT
},
ChunkStoreConfig::COMPACTION_DISABLED,
);

for _ in 0..2 {
Expand All @@ -82,8 +78,7 @@ fn simple() -> anyhow::Result<()> {

let (_store_events, stats_diff) = store.gc(&GarbageCollectionOptions {
target: GarbageCollectionTarget::DropAtLeastFraction(1.0 / 3.0),
protect_latest: 0,
time_budget: std::time::Duration::MAX,
..GarbageCollectionOptions::gc_everything()
});

// NOTE: only temporal data gets purged!
Expand Down Expand Up @@ -174,9 +169,8 @@ fn simple_static() -> anyhow::Result<()> {
store.insert_chunk(&Arc::new(chunk2_static))?;

store.gc(&GarbageCollectionOptions {
target: GarbageCollectionTarget::Everything,
protect_latest: 1,
time_budget: std::time::Duration::MAX,
..GarbageCollectionOptions::gc_everything()
});

let assert_latest_components = |frame_nr: TimeInt, rows: &[(ComponentName, RowId)]| {
Expand Down Expand Up @@ -215,11 +209,7 @@ fn protected() -> anyhow::Result<()> {

let mut store = ChunkStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
ChunkStoreConfig {
chunk_max_bytes: 0,
chunk_max_rows: 0,
..ChunkStoreConfig::DEFAULT
},
ChunkStoreConfig::COMPACTION_DISABLED,
);

let entity_path = EntityPath::from("this/that");
Expand Down Expand Up @@ -267,9 +257,8 @@ fn protected() -> anyhow::Result<()> {
store.insert_chunk(&Arc::new(chunk4))?;

store.gc(&GarbageCollectionOptions {
target: GarbageCollectionTarget::Everything,
protect_latest: 1,
time_budget: std::time::Duration::MAX,
..GarbageCollectionOptions::gc_everything()
});

let assert_latest_components = |frame_nr: TimeInt, rows: &[(ComponentName, Option<RowId>)]| {
Expand Down Expand Up @@ -329,6 +318,87 @@ fn protected() -> anyhow::Result<()> {
Ok(())
}

#[test]
fn protected_time_ranges() -> anyhow::Result<()> {
re_log::setup_logging();

let mut store = ChunkStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
ChunkStoreConfig::COMPACTION_DISABLED,
);

let entity_path = EntityPath::from("this/that");

let frame1 = TimeInt::new_temporal(1);
let frame2 = TimeInt::new_temporal(2);
let frame3 = TimeInt::new_temporal(3);
let frame4 = TimeInt::new_temporal(4);

let row_id1 = RowId::new();
let (indices1, colors1) = (MyIndex::from_iter(0..3), MyColor::from_iter(0..3));
let chunk1 = Chunk::builder(entity_path.clone())
.with_component_batches(
row_id1,
[build_frame_nr(frame1)],
[&indices1 as _, &colors1 as _],
)
.build()?;

let row_id2 = RowId::new();
let points2 = MyPoint::from_iter(0..3);
let chunk2 = Chunk::builder(entity_path.clone())
.with_component_batches(
row_id2,
[build_frame_nr(frame2)],
[&indices1 as _, &points2 as _],
)
.build()?;

let row_id3 = RowId::new();
let points3 = MyPoint::from_iter(0..10);
let chunk3 = Chunk::builder(entity_path.clone())
.with_component_batches(row_id3, [build_frame_nr(frame3)], [&points3 as _])
.build()?;

let row_id4 = RowId::new();
let colors4 = MyColor::from_iter(0..5);
let chunk4 = Chunk::builder(entity_path.clone())
.with_component_batches(row_id4, [build_frame_nr(frame4)], [&colors4 as _])
.build()?;

let chunk1 = Arc::new(chunk1);
let chunk2 = Arc::new(chunk2);
let chunk3 = Arc::new(chunk3);
let chunk4 = Arc::new(chunk4);

store.insert_chunk(&chunk1)?;
store.insert_chunk(&chunk2)?;
store.insert_chunk(&chunk3)?;
store.insert_chunk(&chunk4)?;

fn protect_time_range(time_range: ResolvedTimeRange) -> GarbageCollectionOptions {
GarbageCollectionOptions {
protected_time_ranges: [(Timeline::new_sequence("frame_nr"), time_range)]
.into_iter()
.collect(),
..GarbageCollectionOptions::gc_everything()
}
}

let (events, _) = store.gc(&protect_time_range(ResolvedTimeRange::new(1, 4)));
assert_eq!(events.len(), 0);

let (events, _) = store.gc(&protect_time_range(ResolvedTimeRange::new(2, 4)));
assert_eq!(events.len(), 1);
assert!(Arc::ptr_eq(&events[0].diff.chunk, &chunk1));

let (events, _) = store.gc(&protect_time_range(ResolvedTimeRange::new(2, 3)));
assert_eq!(events.len(), 1);
assert!(Arc::ptr_eq(&events[0].diff.chunk, &chunk4));

Ok(())
}

// ---

#[test]
Expand Down
10 changes: 9 additions & 1 deletion crates/store/re_entity_db/src/entity_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,9 @@ impl EntityDb {

self.gc(&GarbageCollectionOptions {
target: GarbageCollectionTarget::Everything,
protect_latest: 1, // TODO(jleibs): Bump this after we have an undo buffer
protect_latest: 1,
time_budget: DEFAULT_GC_TIME_BUDGET,
protected_time_ranges: Default::default(), // TODO#3135): Use this for undo buffer
})
}

Expand All @@ -409,6 +410,13 @@ impl EntityDb {
target: GarbageCollectionTarget::DropAtLeastFraction(fraction_to_purge as _),
protect_latest: 1,
time_budget: DEFAULT_GC_TIME_BUDGET,

// TODO(emilk): we could protect the data that is currently being viewed
// (e.g. when paused in the live camera example).
// To be perfect it would need margins (because of latest-at), i.e. we would need to know
// exactly how far back the latest-at is of each component at the current time…
// …but maybe it doesn't have to be perfect.
protected_time_ranges: Default::default(),
});

if store_events.is_empty() {
Expand Down

0 comments on commit 5ee00d5

Please sign in to comment.