Skip to content

Commit

Permalink
introduce IndexedTable::uphold_indexing_invariants
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Dec 2, 2023
1 parent df00660 commit 5f1e3e2
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 35 deletions.
32 changes: 32 additions & 0 deletions crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,38 @@ impl IndexedTable {
buckets_size_bytes,
}
}

/// Makes sure bucketing invariants are upheld, and takes necessary actions if not.
///
/// Invariants are:
/// 1. There must always be at least one bucket alive.
/// 2. The first bucket must always have an _indexing time_ `-∞`.
pub(crate) fn uphold_indexing_invariants(&mut self) {
if self.buckets.is_empty() {
let Self {
timeline,
ent_path: _,
cluster_key,
buckets,
all_components: _, // keep the history on purpose
buckets_num_rows,
buckets_size_bytes,
} = self;

let bucket = IndexedBucket::new(*cluster_key, *timeline);
let size_bytes = bucket.total_size_bytes();

*buckets = [(i64::MIN.into(), bucket)].into();
*buckets_num_rows = 0;
*buckets_size_bytes = size_bytes;
}

// NOTE: Make sure the first bucket is responsible for `-∞`, which might or might not be
// the case now if we've been moving buckets around.
if let Some((_, bucket)) = self.buckets.pop_first() {
self.buckets.insert(TimeInt::MIN, bucket);
}
}
}

/// An `IndexedBucket` holds a chunk of rows from an [`IndexedTable`]
Expand Down
37 changes: 2 additions & 35 deletions crates/re_arrow_store/src/store_gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,10 +637,6 @@ impl DataStore {
false
});

// TODO(cmc): Hmm, this is dropping buckets but doesn't seem to handle the case where all
// buckets are removed (which is an illegal state).
// Doesn't seem to handle the case where the only bucket left isn't indexed at -inf either.

diffs.into_values()
}
}
Expand Down Expand Up @@ -716,32 +712,7 @@ impl IndexedTable {
self.buckets
.retain(|bucket_time, _| !dropped_bucket_times.contains(bucket_time));

if self.buckets.is_empty() {
let Self {
timeline,
ent_path: _,
cluster_key,
buckets,
all_components: _, // keep the history on purpose
buckets_num_rows,
buckets_size_bytes,
} = self;

let bucket = IndexedBucket::new(*cluster_key, *timeline);
let size_bytes = bucket.total_size_bytes();

*buckets = [(i64::MIN.into(), bucket)].into();
*buckets_num_rows = 0;
*buckets_size_bytes = size_bytes;

return (diffs, dropped_num_bytes);
}

// NOTE: Make sure the first bucket is responsible for `-∞`, which might or might not be
// the case now that we've been moving buckets around.
if let Some((_, bucket)) = self.buckets.pop_first() {
self.buckets.insert(TimeInt::MIN, bucket);
}
self.uphold_indexing_invariants();

self.buckets_num_rows -= dropped_num_rows;
self.buckets_size_bytes -= dropped_num_bytes;
Expand Down Expand Up @@ -794,11 +765,7 @@ impl IndexedTable {
dropped_num_bytes = bucket_num_bytes;
self.buckets.remove(&bucket_key);

// NOTE: Make sure the first bucket is responsible for `-∞`, which might or might not be
// the case now that we've been moving buckets around.
if let Some((_, bucket)) = self.buckets.pop_first() {
self.buckets.insert(TimeInt::MIN, bucket);
}
self.uphold_indexing_invariants();
}

self.buckets_size_bytes -= dropped_num_bytes;
Expand Down

0 comments on commit 5f1e3e2

Please sign in to comment.