Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Dec 2, 2023
1 parent 5f1e3e2 commit 62e5e95
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 48 deletions.
12 changes: 6 additions & 6 deletions crates/re_arrow_store/benches/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ fn plotting_dashboard(c: &mut Criterion) {
for &num_rows_per_bucket in num_rows_per_bucket() {
for &gc_batching in gc_batching() {
group.bench_function(
if !gc_batching {
format!("bucketsz={num_rows_per_bucket}")
} else {
if gc_batching {
format!("bucketsz={num_rows_per_bucket}/gc_batching=true")
} else {
format!("bucketsz={num_rows_per_bucket}")
},
|b| {
let store = build_store(
Expand Down Expand Up @@ -193,10 +193,10 @@ fn timeless_logs(c: &mut Criterion) {
for &num_rows_per_bucket in num_rows_per_bucket() {
for &gc_batching in gc_batching() {
group.bench_function(
if !gc_batching {
format!("bucketsz={num_rows_per_bucket}")
} else {
if gc_batching {
format!("bucketsz={num_rows_per_bucket}/gc_batching=true")
} else {
format!("bucketsz={num_rows_per_bucket}")
},
|b| {
let store = build_store(
Expand Down
7 changes: 3 additions & 4 deletions crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ impl IndexedTable {
Self {
timeline,
ent_path,
buckets: [(i64::MIN.into(), bucket)].into(),
buckets: [(TimeInt::MIN, bucket)].into(),
cluster_key,
all_components: Default::default(),
buckets_num_rows: 0,
Expand Down Expand Up @@ -466,14 +466,13 @@ impl IndexedTable {
let bucket = IndexedBucket::new(*cluster_key, *timeline);
let size_bytes = bucket.total_size_bytes();

*buckets = [(i64::MIN.into(), bucket)].into();
*buckets = [(TimeInt::MIN, bucket)].into();
*buckets_num_rows = 0;
*buckets_size_bytes = size_bytes;
}

// NOTE: Make sure the first bucket is responsible for `-∞`, which might or might not be
// the case now if we've been moving buckets around.
if let Some((_, bucket)) = self.buckets.pop_first() {
else if let Some((_, bucket)) = self.buckets.pop_first() {
self.buckets.insert(TimeInt::MIN, bucket);
}
}
Expand Down
19 changes: 13 additions & 6 deletions crates/re_arrow_store/src/store_gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,6 @@ impl DataStore {
}

/// Tries to drop _at least_ `num_bytes_to_drop` bytes of data from the store.
///
/// Returns the list of `RowId`s that were purged from the store.
fn gc_drop_at_least_num_bytes(
&mut self,
enable_batching: bool,
Expand All @@ -229,13 +227,15 @@ impl DataStore {
let mut diffs = Vec::new();

// The algorithm is straightforward:
// 1. Find the oldest `RowId` that is not protected
// 2. Find all tables that potentially hold data associated with that `RowId`
// 3. Drop the associated row and account for the space we got back
// 1. Accumulate a bunch of `RowId`s in ascending order, starting from the beginning of time.
// 2. Check if any `RowId` in the batch is protected, in which case the entire batch is
// considered protected and cannot be dropped all at once.
// 3. Send the batch to `drop_batch` to handle the actual deletion.
// 4. Removed the dropped rows from the metadata registry.

let batch_size = (self.config.indexed_bucket_num_rows as usize).saturating_mul(2);
let batch_size = batch_size.clamp(64, 4096);
// let batch_size = 1;

let mut batch: Vec<(TimePoint, (EntityPathHash, RowId))> = Vec::with_capacity(batch_size);
let mut batch_is_protected = false;

Expand Down Expand Up @@ -357,6 +357,13 @@ impl DataStore {
) -> Vec<StoreDiff> {
let mut diffs = Vec::new();

// The algorithm is straightforward:
// 1. If the batch isn't protected, find and drop all buckets that are guaranteed to
// contain only rows older than the ones in the batch.
// 2. Check how many bytes were dropped; continue if we haven't met our objective.
// 3. Fallback to deletion of individual rows.
// 4. Check how many bytes were dropped; continue if we haven't met our objective.

// NOTE: The batch is already sorted by definition since it's extracted from the registry's btreemap.
let max_row_id = batch.last().map(|(_, (_, row_id))| *row_id);

Expand Down
66 changes: 34 additions & 32 deletions crates/re_arrow_store/tests/correctness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,41 +563,43 @@ fn check_still_readable(_store: &DataStore) {
// getting the confirmation that the row was really removed.
#[test]
fn gc_metadata_size() -> anyhow::Result<()> {
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);
for enable_batching in [false, true] {
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);

let point = MyPoint::new(1.0, 1.0);
let point = MyPoint::new(1.0, 1.0);

for _ in 0..3 {
let row = DataRow::from_component_batches(
RowId::random(),
TimePoint::timeless(),
"xxx".into(),
[&[point] as _],
)?;
store.insert_row(&row).unwrap();
}
for _ in 0..3 {
let row = DataRow::from_component_batches(
RowId::random(),
TimePoint::timeless(),
"xxx".into(),
[&[point] as _],
)?;
store.insert_row(&row).unwrap();
}

for _ in 0..2 {
_ = store.gc(&GarbageCollectionOptions {
target: re_arrow_store::GarbageCollectionTarget::DropAtLeastFraction(1.0),
gc_timeless: false,
protect_latest: 1,
purge_empty_tables: false,
dont_protect: Default::default(),
enable_batching: false,
});
_ = store.gc(&GarbageCollectionOptions {
target: re_arrow_store::GarbageCollectionTarget::DropAtLeastFraction(1.0),
gc_timeless: false,
protect_latest: 1,
purge_empty_tables: false,
dont_protect: Default::default(),
enable_batching: false,
});
for _ in 0..2 {
_ = store.gc(&GarbageCollectionOptions {
target: re_arrow_store::GarbageCollectionTarget::DropAtLeastFraction(1.0),
gc_timeless: false,
protect_latest: 1,
purge_empty_tables: false,
dont_protect: Default::default(),
enable_batching,
});
_ = store.gc(&GarbageCollectionOptions {
target: re_arrow_store::GarbageCollectionTarget::DropAtLeastFraction(1.0),
gc_timeless: false,
protect_latest: 1,
purge_empty_tables: false,
dont_protect: Default::default(),
enable_batching,
});
}
}

Ok(())
Expand Down

0 comments on commit 62e5e95

Please sign in to comment.