Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GC improvements 7: introduce time_budget GC setting #4401

Merged
merged 6 commits into from
Dec 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/re_arrow_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ once_cell.workspace = true
parking_lot.workspace = true
smallvec.workspace = true
thiserror.workspace = true
web-time.workspace = true

# Optional dependencies:
polars-core = { workspace = true, optional = true, features = [
Expand Down
2 changes: 2 additions & 0 deletions crates/re_arrow_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ fn gc(c: &mut Criterion) {
purge_empty_tables: false,
dont_protect: Default::default(),
enable_batching: false,
time_budget: std::time::Duration::MAX,
});
stats_diff
});
Expand All @@ -317,6 +318,7 @@ fn gc(c: &mut Criterion) {
purge_empty_tables: false,
dont_protect: Default::default(),
enable_batching: false,
time_budget: std::time::Duration::MAX,
});
stats_diff
});
Expand Down
2 changes: 2 additions & 0 deletions crates/re_arrow_store/benches/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ fn plotting_dashboard(c: &mut Criterion) {
purge_empty_tables: false,
dont_protect: Default::default(),
enable_batching: false,
time_budget: std::time::Duration::MAX,
};

// NOTE: insert in multiple timelines to more closely match real world scenarios.
Expand Down Expand Up @@ -163,6 +164,7 @@ fn timeless_logs(c: &mut Criterion) {
purge_empty_tables: false,
dont_protect: Default::default(),
enable_batching: false,
time_budget: std::time::Duration::MAX,
};

let mut timegen = |_| TimePoint::timeless();
Expand Down
52 changes: 29 additions & 23 deletions crates/re_arrow_store/src/store_gc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::BTreeMap;
use std::{collections::BTreeMap, time::Duration};

use ahash::{HashMap, HashSet};
use web_time::Instant;

use nohash_hasher::IntMap;
use re_log_types::{
Expand Down Expand Up @@ -34,6 +35,17 @@ pub struct GarbageCollectionOptions {
/// What target threshold should the GC try to meet.
pub target: GarbageCollectionTarget,

/// How long the garbage collection in allowed to run for.
///
/// Trades off latency for throughput:
/// - A smaller `time_budget` will clear less data in a shorter amount of time, allowing for a
/// more responsive UI at the cost of more GC overhead and more frequent runs.
/// - A larger `time_budget` will clear more data in a longer amount of time, increasing the
/// chance of UI freeze frames but decreasing GC overhead and running less often.
///
/// The default is an unbounded time budget (i.e. throughput only).
pub time_budget: Duration,

/// Whether to also GC timeless data.
pub gc_timeless: bool,

Expand All @@ -56,6 +68,7 @@ impl GarbageCollectionOptions {
pub fn gc_everything() -> Self {
GarbageCollectionOptions {
target: GarbageCollectionTarget::Everything,
time_budget: std::time::Duration::MAX,
gc_timeless: true,
protect_latest: 0,
purge_empty_tables: true,
Expand Down Expand Up @@ -137,12 +150,7 @@ impl DataStore {
"starting GC"
);

self.gc_drop_at_least_num_bytes(
options.enable_batching,
num_bytes_to_drop,
options.gc_timeless,
&protected_rows,
)
self.gc_drop_at_least_num_bytes(options, num_bytes_to_drop, &protected_rows)
}
GarbageCollectionTarget::Everything => {
re_log::trace!(
Expand All @@ -154,12 +162,7 @@ impl DataStore {
"starting GC"
);

self.gc_drop_at_least_num_bytes(
options.enable_batching,
f64::INFINITY,
options.gc_timeless,
&protected_rows,
)
self.gc_drop_at_least_num_bytes(options, f64::INFINITY, &protected_rows)
}
};

Expand Down Expand Up @@ -216,9 +219,8 @@ impl DataStore {
/// Tries to drop _at least_ `num_bytes_to_drop` bytes of data from the store.
fn gc_drop_at_least_num_bytes(
&mut self,
enable_batching: bool,
options: &GarbageCollectionOptions,
mut num_bytes_to_drop: f64,
include_timeless: bool,
protected_rows: &HashSet<RowId>,
) -> Vec<StoreDiff> {
re_tracing::profile_function!();
Expand Down Expand Up @@ -247,6 +249,7 @@ impl DataStore {
..
} = self;

let now = Instant::now();
for (&row_id, (timepoint, entity_path_hash)) in &metadata_registry.registry {
if protected_rows.contains(&row_id) {
batch_is_protected = true;
Expand All @@ -259,12 +262,11 @@ impl DataStore {
}

let dropped = Self::drop_batch(
enable_batching,
options,
tables,
timeless_tables,
cluster_cell_cache,
*cluster_key,
include_timeless,
&mut num_bytes_to_drop,
&batch,
batch_is_protected,
Expand All @@ -290,7 +292,7 @@ impl DataStore {
diffs.push(dropped);
}

if num_bytes_to_drop <= 0.0 {
if now.elapsed() >= options.time_budget || num_bytes_to_drop <= 0.0 {
break;
}

Expand All @@ -301,12 +303,11 @@ impl DataStore {
// Handle leftovers.
{
let dropped = Self::drop_batch(
enable_batching,
options,
tables,
timeless_tables,
cluster_cell_cache,
*cluster_key,
include_timeless,
&mut num_bytes_to_drop,
&batch,
batch_is_protected,
Expand Down Expand Up @@ -344,16 +345,21 @@ impl DataStore {

#[allow(clippy::too_many_arguments, clippy::fn_params_excessive_bools)]
fn drop_batch(
enable_batching: bool,
options: &GarbageCollectionOptions,
tables: &mut BTreeMap<(EntityPathHash, Timeline), IndexedTable>,
timeless_tables: &mut IntMap<EntityPathHash, PersistentIndexedTable>,
cluster_cell_cache: &ClusterCellCache,
cluster_key: ComponentName,
include_timeless: bool,
num_bytes_to_drop: &mut f64,
batch: &[(TimePoint, (EntityPathHash, RowId))],
batch_is_protected: bool,
) -> Vec<StoreDiff> {
let &GarbageCollectionOptions {
gc_timeless,
enable_batching,
..
} = options;

let mut diffs = Vec::new();

// The algorithm is straightforward:
Expand Down Expand Up @@ -425,7 +431,7 @@ impl DataStore {

// TODO(jleibs): This is a worst-case removal-order. Would be nice to collect all the rows
// first and then remove them in one pass.
if timepoint.is_timeless() && include_timeless {
if timepoint.is_timeless() && gc_timeless {
for table in timeless_tables.values_mut() {
// let deleted_comps = deleted.timeless.entry(ent_path.clone()_hash).or_default();
let (removed, num_bytes_removed) =
Expand Down
2 changes: 2 additions & 0 deletions crates/re_arrow_store/tests/correctness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ fn gc_metadata_size() -> anyhow::Result<()> {
purge_empty_tables: false,
dont_protect: Default::default(),
enable_batching,
time_budget: std::time::Duration::MAX,
});
_ = store.gc(&GarbageCollectionOptions {
target: re_arrow_store::GarbageCollectionTarget::DropAtLeastFraction(1.0),
Expand All @@ -598,6 +599,7 @@ fn gc_metadata_size() -> anyhow::Result<()> {
purge_empty_tables: false,
dont_protect: Default::default(),
enable_batching,
time_budget: std::time::Duration::MAX,
});
}
}
Expand Down
4 changes: 4 additions & 0 deletions crates/re_arrow_store/tests/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,7 @@ fn gc_impl(store: &mut DataStore) {
purge_empty_tables: false,
dont_protect: Default::default(),
enable_batching: false,
time_budget: std::time::Duration::MAX,
});
for event in store_events {
assert!(store.get_msg_metadata(&event.row_id).is_none());
Expand Down Expand Up @@ -1013,6 +1014,7 @@ fn protected_gc_impl(store: &mut DataStore) {
purge_empty_tables: true,
dont_protect: Default::default(),
enable_batching: false,
time_budget: std::time::Duration::MAX,
});

let mut assert_latest_components = |frame_nr: TimeInt, rows: &[(ComponentName, &DataRow)]| {
Expand Down Expand Up @@ -1110,6 +1112,7 @@ fn protected_gc_clear_impl(store: &mut DataStore) {
purge_empty_tables: true,
dont_protect: Default::default(),
enable_batching: false,
time_budget: std::time::Duration::MAX,
});

let mut assert_latest_components = |frame_nr: TimeInt, rows: &[(ComponentName, &DataRow)]| {
Expand Down Expand Up @@ -1153,6 +1156,7 @@ fn protected_gc_clear_impl(store: &mut DataStore) {
purge_empty_tables: true,
dont_protect: Default::default(),
enable_batching: false,
time_budget: std::time::Duration::MAX,
});

// No rows should remain because the table should have been purged
Expand Down
5 changes: 5 additions & 0 deletions crates/re_data_store/src/store_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ const MAX_INSERT_ROW_ATTEMPTS: usize = 1_000;
/// See [`insert_row_with_retries`].
const DEFAULT_INSERT_ROW_STEP_SIZE: u64 = 100;

/// See [`GarbageCollectionOptions::time_budget`].
const DEFAULT_GC_TIME_BUDGET: std::time::Duration = std::time::Duration::from_micros(3500); // empirical

/// Inserts a [`DataRow`] into the [`DataStore`], retrying in case of duplicated `RowId`s.
///
/// Retries a maximum of `num_attempts` times if the row couldn't be inserted because of a
Expand Down Expand Up @@ -434,6 +437,7 @@ impl StoreDb {
.into_iter()
.collect(),
enable_batching: false,
time_budget: DEFAULT_GC_TIME_BUDGET,
});
}

Expand All @@ -451,6 +455,7 @@ impl StoreDb {
purge_empty_tables: false,
dont_protect: Default::default(),
enable_batching: false,
time_budget: DEFAULT_GC_TIME_BUDGET,
});
}

Expand Down
6 changes: 3 additions & 3 deletions scripts/clippy_wasm/clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ too-many-lines-threshold = 600 # TODO(emilk): decrease this

# https://rust-lang.github.io/rust-clippy/master/index.html#disallowed_methods
disallowed-methods = [
"std::time::Instant::now", # use `instant` crate instead for wasm/web compatibility
"std::time::Duration::elapsed", # use `instant` crate instead for wasm/web compatibility
"std::time::SystemTime::now", # use `instant` or `time` crates instead for wasm/web compatibility
"std::time::Instant::now", # use `web-time` crate instead for wasm/web compatibility
"std::time::Duration::elapsed", # use `web-time` crate instead for wasm/web compatibility
"std::time::SystemTime::now", # use `web-time` or `time` crates instead for wasm/web compatibility

# Cannot spawn threads on wasm:
"std::thread::spawn",
Expand Down
Loading