From e06435dcd8d0fd13abbd8cbf511ac25d60b55bc2 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Thu, 30 Nov 2023 14:39:06 +0100 Subject: [PATCH] introduce IndexedTable::uphold_indexing_invariants --- crates/re_arrow_store/src/store.rs | 32 +++++++++++++++++++++++ crates/re_arrow_store/src/store_gc.rs | 37 ++------------------------- 2 files changed, 34 insertions(+), 35 deletions(-) diff --git a/crates/re_arrow_store/src/store.rs b/crates/re_arrow_store/src/store.rs index 14d7df4ed8baf..5bd0f7bedd7bf 100644 --- a/crates/re_arrow_store/src/store.rs +++ b/crates/re_arrow_store/src/store.rs @@ -445,6 +445,38 @@ impl IndexedTable { buckets_size_bytes, } } + + /// Makes sure bucketing invariants are upheld, and takes necessary actions if not. + /// + /// Invariants are: + /// 1. There must always be at least one bucket alive. + /// 2. The first bucket must always have an _indexing time_ `-∞`. + pub(crate) fn uphold_indexing_invariants(&mut self) { + if self.buckets.is_empty() { + let Self { + timeline, + ent_path: _, + cluster_key, + buckets, + all_components: _, // keep the history on purpose + buckets_num_rows, + buckets_size_bytes, + } = self; + + let bucket = IndexedBucket::new(*cluster_key, *timeline); + let size_bytes = bucket.total_size_bytes(); + + *buckets = [(i64::MIN.into(), bucket)].into(); + *buckets_num_rows = 0; + *buckets_size_bytes = size_bytes; + } + + // NOTE: Make sure the first bucket is responsible for `-∞`, which might or might not be + // the case now if we've been moving buckets around. + if let Some((_, bucket)) = self.buckets.pop_first() { + self.buckets.insert(TimeInt::MIN, bucket); + } + } } /// An `IndexedBucket` holds a chunk of rows from an [`IndexedTable`] diff --git a/crates/re_arrow_store/src/store_gc.rs b/crates/re_arrow_store/src/store_gc.rs index 148479d976acb..4f0ec9d39c307 100644 --- a/crates/re_arrow_store/src/store_gc.rs +++ b/crates/re_arrow_store/src/store_gc.rs @@ -637,10 +637,6 @@ impl DataStore { false }); - // TODO(cmc): Hmm, this is dropping buckets but doesn't seem to handle the case where all - // buckets are removed (which is an illegal state). - // Doesn't seem to handle the case where the only bucket left isn't indexed at -inf either. - diffs.into_values() } } @@ -716,32 +712,7 @@ impl IndexedTable { self.buckets .retain(|bucket_time, _| !dropped_bucket_times.contains(bucket_time)); - if self.buckets.is_empty() { - let Self { - timeline, - ent_path: _, - cluster_key, - buckets, - all_components: _, // keep the history on purpose - buckets_num_rows, - buckets_size_bytes, - } = self; - - let bucket = IndexedBucket::new(*cluster_key, *timeline); - let size_bytes = bucket.total_size_bytes(); - - *buckets = [(i64::MIN.into(), bucket)].into(); - *buckets_num_rows = 0; - *buckets_size_bytes = size_bytes; - - return (diffs, dropped_num_bytes); - } - - // NOTE: Make sure the first bucket is responsible for `-∞`, which might or might not be - // the case now that we've been moving buckets around. - if let Some((_, bucket)) = self.buckets.pop_first() { - self.buckets.insert(TimeInt::MIN, bucket); - } + self.uphold_indexing_invariants(); self.buckets_num_rows -= dropped_num_rows; self.buckets_size_bytes -= dropped_num_bytes; @@ -794,11 +765,7 @@ impl IndexedTable { dropped_num_bytes = bucket_num_bytes; self.buckets.remove(&bucket_key); - // NOTE: Make sure the first bucket is responsible for `-∞`, which might or might not be - // the case now that we've been moving buckets around. - if let Some((_, bucket)) = self.buckets.pop_first() { - self.buckets.insert(TimeInt::MIN, bucket); - } + self.uphold_indexing_invariants(); } self.buckets_size_bytes -= dropped_num_bytes;