From 9e967b61544ad0b8a7e147a7c9775cfa579f2288 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Thu, 30 Nov 2023 08:54:36 +0100 Subject: [PATCH 1/6] introduce time_budget GC option --- crates/re_arrow_store/src/store_gc.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/crates/re_arrow_store/src/store_gc.rs b/crates/re_arrow_store/src/store_gc.rs index a561e99bfc5c..e07b0068fdd0 100644 --- a/crates/re_arrow_store/src/store_gc.rs +++ b/crates/re_arrow_store/src/store_gc.rs @@ -1,4 +1,4 @@ -use std::collections::BTreeMap; +use std::{collections::BTreeMap, time::Duration}; use ahash::{HashMap, HashSet}; @@ -34,6 +34,17 @@ pub struct GarbageCollectionOptions { /// What target threshold should the GC try to meet. pub target: GarbageCollectionTarget, + /// How long the garbage collection in allowed to run for. + /// + /// Trades off latency for throughput: + /// - A smaller `time_budget` will clear less data in a shorter amount of time, allowing for a + /// more responsive UI at the cost of more GC overhead and more frequent runs. + /// - A larger `time_budget` will clear more data in a longer amount of time, increasing the + /// chance of UI freeze frames but decreasing GC overhead and running less often. + /// + /// The default is an unbounded time budget (i.e. throughput only). + pub time_budget: Duration, + /// Whether to also GC timeless data. pub gc_timeless: bool, @@ -56,6 +67,7 @@ impl GarbageCollectionOptions { pub fn gc_everything() -> Self { GarbageCollectionOptions { target: GarbageCollectionTarget::Everything, + time_budget: std::time::Duration::MAX, gc_timeless: true, protect_latest: 0, purge_empty_tables: true, From 0f24fa456e7b2731bd7160c093ea2619f504fd81 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Thu, 30 Nov 2023 08:54:50 +0100 Subject: [PATCH 2/6] propagate changes everywhere --- crates/re_arrow_store/benches/data_store.rs | 2 ++ crates/re_arrow_store/benches/gc.rs | 2 ++ crates/re_arrow_store/src/store_gc.rs | 34 ++++++++------------- crates/re_arrow_store/tests/correctness.rs | 2 ++ crates/re_arrow_store/tests/data_store.rs | 4 +++ crates/re_data_store/src/store_db.rs | 2 ++ 6 files changed, 25 insertions(+), 21 deletions(-) diff --git a/crates/re_arrow_store/benches/data_store.rs b/crates/re_arrow_store/benches/data_store.rs index e2ff39e483f1..e7074c684350 100644 --- a/crates/re_arrow_store/benches/data_store.rs +++ b/crates/re_arrow_store/benches/data_store.rs @@ -292,6 +292,7 @@ fn gc(c: &mut Criterion) { purge_empty_tables: false, dont_protect: Default::default(), enable_batching: false, + time_budget: std::time::Duration::MAX, }); stats_diff }); @@ -317,6 +318,7 @@ fn gc(c: &mut Criterion) { purge_empty_tables: false, dont_protect: Default::default(), enable_batching: false, + time_budget: std::time::Duration::MAX, }); stats_diff }); diff --git a/crates/re_arrow_store/benches/gc.rs b/crates/re_arrow_store/benches/gc.rs index 7f88804e8ed8..6700869972c9 100644 --- a/crates/re_arrow_store/benches/gc.rs +++ b/crates/re_arrow_store/benches/gc.rs @@ -75,6 +75,7 @@ fn plotting_dashboard(c: &mut Criterion) { purge_empty_tables: false, dont_protect: Default::default(), enable_batching: false, + time_budget: std::time::Duration::MAX, }; // NOTE: insert in multiple timelines to more closely match real world scenarios. @@ -163,6 +164,7 @@ fn timeless_logs(c: &mut Criterion) { purge_empty_tables: false, dont_protect: Default::default(), enable_batching: false, + time_budget: std::time::Duration::MAX, }; let mut timegen = |_| TimePoint::timeless(); diff --git a/crates/re_arrow_store/src/store_gc.rs b/crates/re_arrow_store/src/store_gc.rs index e07b0068fdd0..d4f292762f2c 100644 --- a/crates/re_arrow_store/src/store_gc.rs +++ b/crates/re_arrow_store/src/store_gc.rs @@ -149,12 +149,7 @@ impl DataStore { "starting GC" ); - self.gc_drop_at_least_num_bytes( - options.enable_batching, - num_bytes_to_drop, - options.gc_timeless, - &protected_rows, - ) + self.gc_drop_at_least_num_bytes(options, num_bytes_to_drop, &protected_rows) } GarbageCollectionTarget::Everything => { re_log::trace!( @@ -166,12 +161,7 @@ impl DataStore { "starting GC" ); - self.gc_drop_at_least_num_bytes( - options.enable_batching, - f64::INFINITY, - options.gc_timeless, - &protected_rows, - ) + self.gc_drop_at_least_num_bytes(options, f64::INFINITY, &protected_rows) } }; @@ -228,9 +218,8 @@ impl DataStore { /// Tries to drop _at least_ `num_bytes_to_drop` bytes of data from the store. fn gc_drop_at_least_num_bytes( &mut self, - enable_batching: bool, + options: &GarbageCollectionOptions, mut num_bytes_to_drop: f64, - include_timeless: bool, protected_rows: &HashSet, ) -> Vec { re_tracing::profile_function!(); @@ -271,12 +260,11 @@ impl DataStore { } let dropped = Self::drop_batch( - enable_batching, + options, tables, timeless_tables, cluster_cell_cache, *cluster_key, - include_timeless, &mut num_bytes_to_drop, &batch, batch_is_protected, @@ -313,12 +301,11 @@ impl DataStore { // Handle leftovers. { let dropped = Self::drop_batch( - enable_batching, + options, tables, timeless_tables, cluster_cell_cache, *cluster_key, - include_timeless, &mut num_bytes_to_drop, &batch, batch_is_protected, @@ -356,16 +343,21 @@ impl DataStore { #[allow(clippy::too_many_arguments, clippy::fn_params_excessive_bools)] fn drop_batch( - enable_batching: bool, + options: &GarbageCollectionOptions, tables: &mut BTreeMap<(EntityPathHash, Timeline), IndexedTable>, timeless_tables: &mut IntMap, cluster_cell_cache: &ClusterCellCache, cluster_key: ComponentName, - include_timeless: bool, num_bytes_to_drop: &mut f64, batch: &[(TimePoint, (EntityPathHash, RowId))], batch_is_protected: bool, ) -> Vec { + let &GarbageCollectionOptions { + gc_timeless, + enable_batching, + .. + } = options; + let mut diffs = Vec::new(); // The algorithm is straightforward: @@ -437,7 +429,7 @@ impl DataStore { // TODO(jleibs): This is a worst-case removal-order. Would be nice to collect all the rows // first and then remove them in one pass. - if timepoint.is_timeless() && include_timeless { + if timepoint.is_timeless() && gc_timeless { for table in timeless_tables.values_mut() { // let deleted_comps = deleted.timeless.entry(ent_path.clone()_hash).or_default(); let (removed, num_bytes_removed) = diff --git a/crates/re_arrow_store/tests/correctness.rs b/crates/re_arrow_store/tests/correctness.rs index a456824d17f8..0d34b19795b2 100644 --- a/crates/re_arrow_store/tests/correctness.rs +++ b/crates/re_arrow_store/tests/correctness.rs @@ -590,6 +590,7 @@ fn gc_metadata_size() -> anyhow::Result<()> { purge_empty_tables: false, dont_protect: Default::default(), enable_batching, + time_budget: std::time::Duration::MAX, }); _ = store.gc(&GarbageCollectionOptions { target: re_arrow_store::GarbageCollectionTarget::DropAtLeastFraction(1.0), @@ -598,6 +599,7 @@ fn gc_metadata_size() -> anyhow::Result<()> { purge_empty_tables: false, dont_protect: Default::default(), enable_batching, + time_budget: std::time::Duration::MAX, }); } } diff --git a/crates/re_arrow_store/tests/data_store.rs b/crates/re_arrow_store/tests/data_store.rs index fe71ed7cf206..c2192520a111 100644 --- a/crates/re_arrow_store/tests/data_store.rs +++ b/crates/re_arrow_store/tests/data_store.rs @@ -933,6 +933,7 @@ fn gc_impl(store: &mut DataStore) { purge_empty_tables: false, dont_protect: Default::default(), enable_batching: false, + time_budget: std::time::Duration::MAX, }); for event in store_events { assert!(store.get_msg_metadata(&event.row_id).is_none()); @@ -1013,6 +1014,7 @@ fn protected_gc_impl(store: &mut DataStore) { purge_empty_tables: true, dont_protect: Default::default(), enable_batching: false, + time_budget: std::time::Duration::MAX, }); let mut assert_latest_components = |frame_nr: TimeInt, rows: &[(ComponentName, &DataRow)]| { @@ -1110,6 +1112,7 @@ fn protected_gc_clear_impl(store: &mut DataStore) { purge_empty_tables: true, dont_protect: Default::default(), enable_batching: false, + time_budget: std::time::Duration::MAX, }); let mut assert_latest_components = |frame_nr: TimeInt, rows: &[(ComponentName, &DataRow)]| { @@ -1153,6 +1156,7 @@ fn protected_gc_clear_impl(store: &mut DataStore) { purge_empty_tables: true, dont_protect: Default::default(), enable_batching: false, + time_budget: std::time::Duration::MAX, }); // No rows should remain because the table should have been purged diff --git a/crates/re_data_store/src/store_db.rs b/crates/re_data_store/src/store_db.rs index f1cbe400ce43..871628fddf4d 100644 --- a/crates/re_data_store/src/store_db.rs +++ b/crates/re_data_store/src/store_db.rs @@ -434,6 +434,7 @@ impl StoreDb { .into_iter() .collect(), enable_batching: false, + time_budget: std::time::Duration::MAX, }); } @@ -451,6 +452,7 @@ impl StoreDb { purge_empty_tables: false, dont_protect: Default::default(), enable_batching: false, + time_budget: std::time::Duration::MAX, }); } From e3d6267297ff577c7ac7036640660c8d6a69516a Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Thu, 30 Nov 2023 08:57:31 +0100 Subject: [PATCH 3/6] respect time budget in main GC loop --- crates/re_arrow_store/src/store_gc.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/re_arrow_store/src/store_gc.rs b/crates/re_arrow_store/src/store_gc.rs index d4f292762f2c..e55a141915fb 100644 --- a/crates/re_arrow_store/src/store_gc.rs +++ b/crates/re_arrow_store/src/store_gc.rs @@ -248,6 +248,7 @@ impl DataStore { .. } = self; + let now = std::time::Instant::now(); for (&row_id, (timepoint, entity_path_hash)) in &metadata_registry.registry { if protected_rows.contains(&row_id) { batch_is_protected = true; @@ -290,7 +291,7 @@ impl DataStore { diffs.push(dropped); } - if num_bytes_to_drop <= 0.0 { + if now.elapsed() >= options.time_budget || num_bytes_to_drop <= 0.0 { break; } From daae645c0b5e983fc25e6cc85a680900ac9756db Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Thu, 30 Nov 2023 09:02:24 +0100 Subject: [PATCH 4/6] enable 3.5ms budget in main app --- crates/re_data_store/src/store_db.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/re_data_store/src/store_db.rs b/crates/re_data_store/src/store_db.rs index 871628fddf4d..c6f03e698e77 100644 --- a/crates/re_data_store/src/store_db.rs +++ b/crates/re_data_store/src/store_db.rs @@ -59,6 +59,9 @@ const MAX_INSERT_ROW_ATTEMPTS: usize = 1_000; /// See [`insert_row_with_retries`]. const DEFAULT_INSERT_ROW_STEP_SIZE: u64 = 100; +/// See [`GarbageCollectionOptions::time_budget`]. +const GC_TIME_BUDGET: std::time::Duration = std::time::Duration::from_micros(3500); // empirical + /// Inserts a [`DataRow`] into the [`DataStore`], retrying in case of duplicated `RowId`s. /// /// Retries a maximum of `num_attempts` times if the row couldn't be inserted because of a @@ -434,7 +437,7 @@ impl StoreDb { .into_iter() .collect(), enable_batching: false, - time_budget: std::time::Duration::MAX, + time_budget: GC_TIME_BUDGET, }); } @@ -452,7 +455,7 @@ impl StoreDb { purge_empty_tables: false, dont_protect: Default::default(), enable_batching: false, - time_budget: std::time::Duration::MAX, + time_budget: GC_TIME_BUDGET, }); } From a2b7378c662ade1f2e4d33a60703caab88214dfc Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Sat, 2 Dec 2023 13:00:07 +0100 Subject: [PATCH 5/6] address pr comments --- crates/re_data_store/src/store_db.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/re_data_store/src/store_db.rs b/crates/re_data_store/src/store_db.rs index c6f03e698e77..bc9a919a072d 100644 --- a/crates/re_data_store/src/store_db.rs +++ b/crates/re_data_store/src/store_db.rs @@ -60,7 +60,7 @@ const MAX_INSERT_ROW_ATTEMPTS: usize = 1_000; const DEFAULT_INSERT_ROW_STEP_SIZE: u64 = 100; /// See [`GarbageCollectionOptions::time_budget`]. -const GC_TIME_BUDGET: std::time::Duration = std::time::Duration::from_micros(3500); // empirical +const DEFAULT_GC_TIME_BUDGET: std::time::Duration = std::time::Duration::from_micros(3500); // empirical /// Inserts a [`DataRow`] into the [`DataStore`], retrying in case of duplicated `RowId`s. /// @@ -437,7 +437,7 @@ impl StoreDb { .into_iter() .collect(), enable_batching: false, - time_budget: GC_TIME_BUDGET, + time_budget: DEFAULT_GC_TIME_BUDGET, }); } @@ -455,7 +455,7 @@ impl StoreDb { purge_empty_tables: false, dont_protect: Default::default(), enable_batching: false, - time_budget: GC_TIME_BUDGET, + time_budget: DEFAULT_GC_TIME_BUDGET, }); } From cedcf185065fd88f21392f85f90c7c36bb009a47 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Sat, 2 Dec 2023 13:17:01 +0100 Subject: [PATCH 6/6] :sparkles: the web :sparkles: --- Cargo.lock | 1 + crates/re_arrow_store/Cargo.toml | 1 + crates/re_arrow_store/src/store_gc.rs | 3 ++- scripts/clippy_wasm/clippy.toml | 6 +++--- 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 88a3b672bebb..66cc52bb7ecc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4352,6 +4352,7 @@ dependencies = [ "smallvec", "thiserror", "tinyvec", + "web-time", ] [[package]] diff --git a/crates/re_arrow_store/Cargo.toml b/crates/re_arrow_store/Cargo.toml index f21e6a78a1f6..5a46c47a3c51 100644 --- a/crates/re_arrow_store/Cargo.toml +++ b/crates/re_arrow_store/Cargo.toml @@ -50,6 +50,7 @@ once_cell.workspace = true parking_lot.workspace = true smallvec.workspace = true thiserror.workspace = true +web-time.workspace = true # Optional dependencies: polars-core = { workspace = true, optional = true, features = [ diff --git a/crates/re_arrow_store/src/store_gc.rs b/crates/re_arrow_store/src/store_gc.rs index e55a141915fb..0a9724077870 100644 --- a/crates/re_arrow_store/src/store_gc.rs +++ b/crates/re_arrow_store/src/store_gc.rs @@ -1,6 +1,7 @@ use std::{collections::BTreeMap, time::Duration}; use ahash::{HashMap, HashSet}; +use web_time::Instant; use nohash_hasher::IntMap; use re_log_types::{ @@ -248,7 +249,7 @@ impl DataStore { .. } = self; - let now = std::time::Instant::now(); + let now = Instant::now(); for (&row_id, (timepoint, entity_path_hash)) in &metadata_registry.registry { if protected_rows.contains(&row_id) { batch_is_protected = true; diff --git a/scripts/clippy_wasm/clippy.toml b/scripts/clippy_wasm/clippy.toml index 3fee57b67a67..35c016649717 100644 --- a/scripts/clippy_wasm/clippy.toml +++ b/scripts/clippy_wasm/clippy.toml @@ -27,9 +27,9 @@ too-many-lines-threshold = 600 # TODO(emilk): decrease this # https://rust-lang.github.io/rust-clippy/master/index.html#disallowed_methods disallowed-methods = [ - "std::time::Instant::now", # use `instant` crate instead for wasm/web compatibility - "std::time::Duration::elapsed", # use `instant` crate instead for wasm/web compatibility - "std::time::SystemTime::now", # use `instant` or `time` crates instead for wasm/web compatibility + "std::time::Instant::now", # use `web-time` crate instead for wasm/web compatibility + "std::time::Duration::elapsed", # use `web-time` crate instead for wasm/web compatibility + "std::time::SystemTime::now", # use `web-time` or `time` crates instead for wasm/web compatibility # Cannot spawn threads on wasm: "std::thread::spawn",