diff --git a/trin-storage/src/versioned/id_indexed_v1/config.rs b/trin-storage/src/versioned/id_indexed_v1/config.rs index 560b099a5..1d01d6208 100644 --- a/trin-storage/src/versioned/id_indexed_v1/config.rs +++ b/trin-storage/src/versioned/id_indexed_v1/config.rs @@ -5,16 +5,12 @@ use ethportal_api::types::portal_wire::ProtocolId; use r2d2::Pool; use r2d2_sqlite::SqliteConnectionManager; -use crate::{ - versioned::{usage_stats::UsageStats, ContentType}, - DistanceFunction, PortalStorageConfig, BYTES_IN_MB_U64, -}; +use crate::{versioned::ContentType, DistanceFunction, PortalStorageConfig, BYTES_IN_MB_U64}; -/// The fraction of the storage capacity that we should aim for when pruning. -const TARGET_CAPACITY_FRACTION: f64 = 0.95; +use super::pruning_strategy::PruningConfig; /// The config for the IdIndexedV1Store -#[derive(Debug, Clone)] +#[derive(Clone, Debug)] pub struct IdIndexedV1StoreConfig { pub content_type: ContentType, pub network: ProtocolId, @@ -23,6 +19,7 @@ pub struct IdIndexedV1StoreConfig { pub storage_capacity_bytes: u64, pub sql_connection_pool: Pool, pub distance_fn: DistanceFunction, + pub pruning_config: PruningConfig, } impl IdIndexedV1StoreConfig { @@ -39,153 +36,8 @@ impl IdIndexedV1StoreConfig { storage_capacity_bytes: config.storage_capacity_mb * BYTES_IN_MB_U64, sql_connection_pool: config.sql_connection_pool, distance_fn: config.distance_fn, + // consider making this a parameter if we start using non-default value + pruning_config: PruningConfig::default(), } } - - /// The capacity that we aim for when pruning. - pub fn target_capacity_bytes(&self) -> u64 { - (self.storage_capacity_bytes as f64 * TARGET_CAPACITY_FRACTION).round() as u64 - } - - /// Returns the estimated number of items to delete to reach target capacity. It returns 0 if - /// already below target capacity. - pub fn estimate_to_delete_until_target(&self, usage_stats: &UsageStats) -> u64 { - self.estimated_target_capacity_count(usage_stats) - .map(|target_capacity_count| { - if usage_stats.entry_count > target_capacity_count { - usage_stats.entry_count - target_capacity_count - } else { - 0 - } - }) - .unwrap_or(0) - } - - fn estimated_target_capacity_count(&self, usage_stats: &UsageStats) -> Option { - usage_stats - .average_entry_size_bytes() - .map(|average_entry_size_bytes| { - (self.target_capacity_bytes() as f64 / average_entry_size_bytes).floor() as u64 - }) - } -} - -#[cfg(test)] -mod tests { - use std::path::PathBuf; - - use discv5::enr::NodeId; - use ethportal_api::types::portal_wire::ProtocolId; - use r2d2::Pool; - use r2d2_sqlite::SqliteConnectionManager; - use rstest::rstest; - - use crate::{versioned::ContentType, DistanceFunction}; - - use super::*; - - const STORAGE_CAPACITY_BYTES: u64 = 1000; - - fn create_config() -> IdIndexedV1StoreConfig { - IdIndexedV1StoreConfig { - content_type: ContentType::State, - network: ProtocolId::State, - node_id: NodeId::random(), - node_data_dir: PathBuf::default(), - storage_capacity_bytes: STORAGE_CAPACITY_BYTES, - sql_connection_pool: Pool::new(SqliteConnectionManager::memory()).unwrap(), - distance_fn: DistanceFunction::Xor, - } - } - - #[rstest] - #[case::no_usage(0, 0, false)] - #[case::low_usage(10, 100, false)] - #[case::just_below_target_capacity(89, 890, false)] - #[case::target_capacity(90, 900, false)] - #[case::between_target_and_pruning(92, 920, false)] - #[case::pruning(95, 950, false)] - #[case::between_pruning_and_full(97, 970, true)] - #[case::full(100, 1000, true)] - #[case::above_full(110, 1100, true)] - fn is_above_target_capacity( - #[case] entry_count: u64, - #[case] total_entry_size_bytes: u64, - #[case] expected: bool, - ) { - let config = create_config(); - let usage_stats = UsageStats::new(entry_count, total_entry_size_bytes); - - assert_eq!( - usage_stats.is_above(config.target_capacity_bytes()), - expected - ); - } - - #[test] - fn estimate_capacity_count_no_usage() { - let config = create_config(); - let usage_stats = UsageStats::default(); - assert_eq!( - config.estimated_target_capacity_count(&usage_stats), - None, - "testing estimated_target_capacity_count" - ); - } - - #[rstest] - #[case::low_usage_1(10, 100, 95)] - #[case::low_usage_2(20, 100, 190)] - #[case::low_usage_3(50, 100, 475)] - #[case::mid_usage_1(10, 500, 19)] - #[case::mid_usage_2(20, 500, 38)] - #[case::mid_usage_3(50, 500, 95)] - #[case::between_target_and_full_1(10, 970, 9)] - #[case::between_target_and_full_2(20, 970, 19)] - #[case::between_target_and_full_3(50, 970, 48)] - #[case::between_target_and_full_4(100, 970, 97)] - #[case::above_full_1(10, 1050, 9)] - #[case::above_full_2(20, 1050, 18)] - #[case::above_full_3(50, 1050, 45)] - fn estimate_target_capacity_count( - #[case] entry_count: u64, - #[case] total_entry_size_bytes: u64, - #[case] estimated_target_capacity_count: u64, - ) { - let config = create_config(); - let usage_stats = UsageStats::new(entry_count, total_entry_size_bytes); - assert_eq!( - config.estimated_target_capacity_count(&usage_stats), - Some(estimated_target_capacity_count), - "testing estimated_target_capacity_count" - ); - } - - #[rstest] - #[case::low_usage_1(10, 100, 0)] - #[case::low_usage_2(20, 100, 0)] - #[case::low_usage_3(50, 100, 0)] - #[case::mid_usage_1(10, 500, 0)] - #[case::mid_usage_2(25, 500, 0)] - #[case::mid_usage_3(50, 500, 0)] - #[case::between_target_and_full_1(10, 970, 1)] - #[case::between_target_and_full_2(20, 970, 1)] - #[case::between_target_and_full_3(50, 970, 2)] - #[case::between_target_and_full_4(100, 970, 3)] - #[case::above_full_1(10, 1050, 1)] - #[case::above_full_2(20, 1050, 2)] - #[case::above_full_3(50, 1050, 5)] - fn to_delete_until_target( - #[case] entry_count: u64, - #[case] total_entry_size_bytes: u64, - #[case] expected_to_delete_until_target: u64, - ) { - let config = create_config(); - let usage_stats = UsageStats::new(entry_count, total_entry_size_bytes); - - assert_eq!( - config.estimate_to_delete_until_target(&usage_stats), - expected_to_delete_until_target - ); - } } diff --git a/trin-storage/src/versioned/id_indexed_v1/mod.rs b/trin-storage/src/versioned/id_indexed_v1/mod.rs index 240eb0f97..d9cef0554 100644 --- a/trin-storage/src/versioned/id_indexed_v1/mod.rs +++ b/trin-storage/src/versioned/id_indexed_v1/mod.rs @@ -1,5 +1,6 @@ mod config; mod migration; +mod pruning_strategy; pub(super) mod sql; mod store; diff --git a/trin-storage/src/versioned/id_indexed_v1/pruning_strategy.rs b/trin-storage/src/versioned/id_indexed_v1/pruning_strategy.rs new file mode 100644 index 000000000..9b71c43dd --- /dev/null +++ b/trin-storage/src/versioned/id_indexed_v1/pruning_strategy.rs @@ -0,0 +1,364 @@ +use std::{fmt::Debug, ops::Range, time::Duration}; + +use tracing::debug; + +use crate::versioned::usage_stats::UsageStats; + +use super::IdIndexedV1StoreConfig; + +/// The configuration parameters used by [PruningStrategy]. +#[derive(Clone, Debug)] +pub struct PruningConfig { + /// The fraction of storage capacity that we aim for when pruning. + pub target_capacity_fraction: f64, + /// The fraction by which we increase/decrease the `max_pruning_count` when pruning duration is + /// outside `optimal_pruning_duration_range`. + /// + /// For example, let's assume that value is `0.1`. If pruning is too slow, the + /// `max_pruning_count` will decrease by 10%, while if pruning is too fast, the + /// `max_pruning_count` will increase by 10%. Note that increase and decrease don't cancel out. + pub max_pruning_count_change_fraction: f64, + /// The range of pruning durations that we consider optimal. + pub optimal_pruning_duration_range: Range, +} + +impl PruningConfig { + /// By default, we aim to prune down to 95% of storage capacity. + pub const DEFAULT_TARGET_CAPACITY_FRACTION: f64 = 0.95; + /// By default, we increase/decrease `max_pruning_count` by 20%. + pub const DEFAULT_CHANGE_FRACTION: f64 = 0.2; + /// By default, we consider optimal pruning duration between 0.1 and 0.3 seconds. + pub const DEFAULT_OPTIMAL_PRUNING_DURATION_RANGE: Range = + Duration::from_millis(100)..Duration::from_millis(300); + + pub fn new( + target_capacity_fraction: f64, + max_pruning_count_change_fraction: f64, + optimal_pruning_duration_range: Range, + ) -> Self { + if !(0.0..=1.0).contains(&target_capacity_fraction) { + panic!( + "Invalid pruning strategy parameters: target_capacity_fraction={}", + target_capacity_fraction + ) + } + if !(0.0..1.0).contains(&max_pruning_count_change_fraction) { + panic!( + "Invalid pruning strategy parameters: change_fraction={}", + max_pruning_count_change_fraction + ) + } + if optimal_pruning_duration_range.is_empty() { + panic!( + "Invalid pruning strategy parameters: optimal_pruning_duration_range: {:?}", + optimal_pruning_duration_range + ) + } + Self { + target_capacity_fraction, + max_pruning_count_change_fraction, + optimal_pruning_duration_range, + } + } +} + +impl Default for PruningConfig { + fn default() -> Self { + Self::new( + Self::DEFAULT_TARGET_CAPACITY_FRACTION, + Self::DEFAULT_CHANGE_FRACTION, + Self::DEFAULT_OPTIMAL_PRUNING_DURATION_RANGE, + ) + } +} + +/// The dynamic pruning strategy that adjusts the number of entries to prune based on duration. +/// +/// Ideally, we would want to prune down to `target_capacity_bytes`, but this would usually be too +/// slow. The `max_pruning_count` represents the maximum number of entries that we will prune at +/// one point, and it will be updated based on how long it takes in comparison to +/// `optimal_pruning_duration_range`: +/// - it will not change if pruning duration falls within `optimal_pruning_duration_range` +/// - it will increase by `max_pruning_count_change_fraction` if pruning duration is below the +/// `optimal_pruning_duration_range` +/// - it will decrease by `max_pruning_count_change_fraction` if pruning duration is above the +/// `optimal_pruning_duration_range` +pub struct PruningStrategy { + /// The store configuration. + config: IdIndexedV1StoreConfig, + /// The maximum number of entries to prune at the time. + max_pruning_count: u64, +} + +impl PruningStrategy { + /// The starting value for `max_pruning_count`. + pub const STARTING_MAX_PRUNING_COUNT: u64 = 100; + + pub fn new(config: IdIndexedV1StoreConfig) -> Self { + Self { + config, + max_pruning_count: Self::STARTING_MAX_PRUNING_COUNT, + } + } + + /// The capacity that we aim for when pruning. + pub fn target_capacity_bytes(&self) -> u64 { + (self.config.storage_capacity_bytes as f64 + * self.config.pruning_config.target_capacity_fraction) + .round() as u64 + } + + /// Returns `true` when used capacity is above target capacity. + pub fn is_usage_above_target_capacity(&self, usage_stats: &UsageStats) -> bool { + usage_stats.is_above(self.target_capacity_bytes()) + } + + /// Returns `true` when used capacity is above storage capacity. + pub fn should_prune(&self, usage_stats: &UsageStats) -> bool { + usage_stats.is_above(self.config.storage_capacity_bytes) + } + + /// Returns the number of entries to prune. + pub fn get_pruning_count(&self, usage_stats: &UsageStats) -> u64 { + if !self.should_prune(usage_stats) { + return 0; + } + + // If storage capacity is 0, prune everything. + if self.config.storage_capacity_bytes == 0 { + debug!( + Db = %self.config.content_type, + "Storage capacity is 0. Pruning everything ({})", + usage_stats.entry_count + ); + return usage_stats.entry_count; + } + + self.estimate_to_delete_until_target(usage_stats) + .min(self.max_pruning_count) + } + + /// Should be called after pruning in order to update `max_pruning_count` based on pruning + /// duration. + pub fn observe_pruning_duration(&mut self, pruning_duration: Duration) { + let pruning_config = &self.config.pruning_config; + let optimal_pruning_duration = &pruning_config.optimal_pruning_duration_range; + + let change_ratio = if pruning_duration < optimal_pruning_duration.start { + debug!( + Db = %self.config.content_type, + "Pruning was too fast. Increasing max_pruning_count", + ); + 1. + pruning_config.max_pruning_count_change_fraction + } else if pruning_duration > optimal_pruning_duration.end { + debug!( + Db = %self.config.content_type, + "Pruning was too slow. Decreasing max_pruning_count", + ); + 1. - pruning_config.max_pruning_count_change_fraction + } else { + // no change needed + return; + }; + + self.max_pruning_count = (change_ratio * self.max_pruning_count as f64).round() as u64; + self.max_pruning_count = self.max_pruning_count.max(1); // make sure it's at least one. + } + + /// Returns the estimated number of items to delete to reach target capacity. It returns 0 if + /// already below target capacity. + fn estimate_to_delete_until_target(&self, usage_stats: &UsageStats) -> u64 { + let Some(average_entry_size_bytes) = usage_stats.average_entry_size_bytes() else { + // Means that storage is empty and nothing can be deleted. + return 0; + }; + + // The estimated number of entries at the target capacity. + let estimated_target_capacity_count = + (self.target_capacity_bytes() as f64 / average_entry_size_bytes).floor() as u64; + if usage_stats.entry_count > estimated_target_capacity_count { + usage_stats.entry_count - estimated_target_capacity_count + } else { + 0 + } + } +} + +impl Debug for PruningStrategy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PruningStrategy") + .field("max_pruning_count", &self.max_pruning_count) + .finish() + } +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + + use discv5::enr::NodeId; + use ethportal_api::types::portal_wire::ProtocolId; + use r2d2::Pool; + use r2d2_sqlite::SqliteConnectionManager; + use rstest::rstest; + + use crate::{versioned::ContentType, DistanceFunction}; + + use super::*; + + const DEFAULT_STORAGE_CAPACITY_BYTES: u64 = 1_000_000; + + fn create_default_pruning_strategy() -> PruningStrategy { + create_pruning_strategy(DEFAULT_STORAGE_CAPACITY_BYTES) + } + + fn create_pruning_strategy(storage_capacity_bytes: u64) -> PruningStrategy { + let config = IdIndexedV1StoreConfig { + content_type: ContentType::State, + network: ProtocolId::State, + node_id: NodeId::random(), + node_data_dir: PathBuf::default(), + storage_capacity_bytes, + sql_connection_pool: Pool::new(SqliteConnectionManager::memory()).unwrap(), + distance_fn: DistanceFunction::Xor, + pruning_config: PruningConfig::default(), + }; + PruningStrategy::new(config) + } + + #[rstest] + #[case::no_usage(0, 0, false)] + #[case::low_usage(10, 100_000, false)] + #[case::just_below_target_capacity(89, 890_000, false)] + #[case::target_capacity(90, 900_000, false)] + #[case::between_target_and_pruning(92, 920_000, false)] + #[case::pruning(95, 950_000, false)] + #[case::between_pruning_and_full(97, 970_000, true)] + #[case::full(100, 1_000_000, true)] + #[case::above_full(110, 1_100_000, true)] + fn is_usage_above_target_capacity( + #[case] entry_count: u64, + #[case] total_entry_size_bytes: u64, + #[case] expected: bool, + ) { + let pruning_strategy = create_default_pruning_strategy(); + let usage_stats = UsageStats { + entry_count, + total_entry_size_bytes, + }; + + assert_eq!( + pruning_strategy.is_usage_above_target_capacity(&usage_stats), + expected + ); + } + + #[rstest] + #[case::low_usage(50, 100_000, false, 0)] + #[case::mid_usage(50, 500_000, false, 0)] + #[case::between_target_and_full(50, 970_000, false, 0)] + #[case::above_full_1(10, 1_050_000, true, 1)] + #[case::above_full_2(20, 1_050_000, true, 2)] + #[case::above_full_3(50, 1_050_000, true, 5)] + #[case::above_full_4(500, 1_050_000, true, 48)] + #[case::above_full_5(1000, 1_050_000, true, 96)] + #[case::above_full_6(2000, 1_050_000, true, 100)] + fn should_prune_and_pruning_count( + #[case] entry_count: u64, + #[case] total_entry_size_bytes: u64, + #[case] should_prune: bool, + #[case] pruning_count: u64, + ) { + let pruning_strategy = create_default_pruning_strategy(); + let usage_stats = UsageStats { + entry_count, + total_entry_size_bytes, + }; + assert_eq!( + pruning_strategy.should_prune(&usage_stats), + should_prune, + "testing should_prune" + ); + assert_eq!( + pruning_strategy.get_pruning_count(&usage_stats), + pruning_count, + "testing pruning_count" + ); + } + + #[rstest] + #[case::empty(0, 0, false, false, 0)] + #[case::few_entries(100, 20_000, true, true, 100)] + #[case::many_entries(10_000, 1_000_000, true, true, 10_000)] + fn zero_storage_capacity( + #[case] entry_count: u64, + #[case] total_entry_size_bytes: u64, + #[case] is_usage_above_target_capacity: bool, + #[case] should_prune: bool, + #[case] pruning_count: u64, + ) { + let pruning_strategy = create_pruning_strategy(/* storage_capacity_bytes= */ 0); + let usage_stats = UsageStats { + entry_count, + total_entry_size_bytes, + }; + assert_eq!( + pruning_strategy.is_usage_above_target_capacity(&usage_stats), + is_usage_above_target_capacity, + "testing is_usage_above_target_capacity" + ); + assert_eq!( + pruning_strategy.should_prune(&usage_stats), + should_prune, + "testing should_prune" + ); + assert_eq!( + pruning_strategy.get_pruning_count(&usage_stats), + pruning_count, + "testing pruning_count" + ); + } + + #[test] + fn observe_pruning_duration_optimal() { + let mut pruning_strategy = create_default_pruning_strategy(); + + assert_eq!(pruning_strategy.max_pruning_count, 100); + + // Slightly slower than lower optimal bound -> max_pruning_count shouldn't change + pruning_strategy.observe_pruning_duration( + PruningConfig::DEFAULT_OPTIMAL_PRUNING_DURATION_RANGE.start + Duration::from_millis(1), + ); + assert_eq!(pruning_strategy.max_pruning_count, 100); + + // Slightly faster than upper optimal bound -> max_pruning_count shouldn't change + pruning_strategy.observe_pruning_duration( + PruningConfig::DEFAULT_OPTIMAL_PRUNING_DURATION_RANGE.end - Duration::from_millis(1), + ); + assert_eq!(pruning_strategy.max_pruning_count, 100); + } + + #[test] + fn observe_pruning_duration_too_fast() { + let mut pruning_strategy = create_default_pruning_strategy(); + + assert_eq!(pruning_strategy.max_pruning_count, 100); + pruning_strategy.observe_pruning_duration( + PruningConfig::DEFAULT_OPTIMAL_PRUNING_DURATION_RANGE.start - Duration::from_millis(1), + ); + // max_pruning_count should have increased by DEFAULT_CHANGE_FRACTION (20%) + assert_eq!(pruning_strategy.max_pruning_count, 120); + } + + #[test] + fn observe_pruning_duration_too_slow() { + let mut pruning_strategy = create_default_pruning_strategy(); + + assert_eq!(pruning_strategy.max_pruning_count, 100); + pruning_strategy.observe_pruning_duration( + PruningConfig::DEFAULT_OPTIMAL_PRUNING_DURATION_RANGE.end + Duration::from_millis(1), + ); + // max_pruning_count should have decreased by DEFAULT_CHANGE_FRACTION (20%) + assert_eq!(pruning_strategy.max_pruning_count, 80); + } +} diff --git a/trin-storage/src/versioned/id_indexed_v1/store.rs b/trin-storage/src/versioned/id_indexed_v1/store.rs index c35b85683..9491f9891 100644 --- a/trin-storage/src/versioned/id_indexed_v1/store.rs +++ b/trin-storage/src/versioned/id_indexed_v1/store.rs @@ -1,3 +1,5 @@ +use std::time::Instant; + use ethportal_api::types::distance::Distance; use r2d2::Pool; use r2d2_sqlite::SqliteConnectionManager; @@ -5,7 +7,10 @@ use rusqlite::{named_params, types::Type, OptionalExtension}; use tracing::{debug, error, warn}; use trin_metrics::storage::StorageMetricsReporter; -use super::{migration::migrate_legacy_history_store, sql, IdIndexedV1StoreConfig}; +use super::{ + migration::migrate_legacy_history_store, pruning_strategy::PruningStrategy, sql, + IdIndexedV1StoreConfig, +}; use crate::{ error::ContentStoreError, utils::get_total_size_of_directory_in_bytes, @@ -13,13 +18,6 @@ use crate::{ ContentId, }; -/// The maximum number of entries to prune with a single query. -/// -/// The value is chosen based on observation that it takes 100-300 ms for such query on a 10GB -/// history storage db. Value can be adjusted based on future performance observations and it can -/// be moved to the `config` if different values are desired for different content types. -const MAX_TO_PRUNE_PER_QUERY: u64 = 100; - /// The result of looking for the farthest content. struct FarthestQueryResult { content_id: ContentId, @@ -47,6 +45,8 @@ pub struct IdIndexedV1Store { /// The maximum distance between `NodeId` and content id that store should keep. Updated /// dynamically after pruning to the farthest distance still stored. radius: Distance, + /// The strategy for deciding when and how much to prune. + pruning_strategy: PruningStrategy, /// The usage stats tracked manually. usage_stats: UsageStats, /// The Metrics for tracking performance. @@ -79,9 +79,12 @@ impl VersionedContentStore for IdIndexedV1Store { let protocol_id = config.network; + let pruning_strategy = PruningStrategy::new(config.clone()); + let mut store = Self { config, radius: Distance::MAX, + pruning_strategy, usage_stats: UsageStats::default(), metrics: StorageMetricsReporter::new(protocol_id), }; @@ -98,28 +101,37 @@ impl IdIndexedV1Store { self.init_usage_stats()?; - if self - .usage_stats - .is_above(self.config.storage_capacity_bytes) - { + if self.pruning_strategy.should_prune(&self.usage_stats) { debug!( Db = %self.config.content_type, - "Used capacity ({}) is above storage capacity ({}) -> Pruning", + "High storage usage ({}) -> Pruning", self.usage_stats.total_entry_size_bytes, - self.config.storage_capacity_bytes ); self.prune()?; } else if self - .usage_stats - .is_above(self.config.target_capacity_bytes()) + .pruning_strategy + .is_usage_above_target_capacity(&self.usage_stats) { + debug!( + Db = %self.config.content_type, + "Used capacity ({}) is above target capacity ({}) -> Using distance to farthest for radius", + self.usage_stats.total_entry_size_bytes, + self.pruning_strategy.target_capacity_bytes() + ); self.set_radius_to_farthest()?; + } else if self.config.storage_capacity_bytes == 0 { + debug!( + Db = %self.config.content_type, + "Storage capacity is 0 -> Using ZERO radius", + ); + self.radius = Distance::ZERO; + self.metrics.report_radius(self.radius); } else { debug!( Db = %self.config.content_type, "Used capacity ({}) is below target capacity ({}) -> Using MAX radius", self.usage_stats.total_entry_size_bytes, - self.config.target_capacity_bytes() + self.pruning_strategy.target_capacity_bytes() ); self.radius = Distance::MAX; self.metrics.report_radius(self.radius); @@ -260,10 +272,7 @@ impl IdIndexedV1Store { self.usage_stats.total_entry_size_bytes += content_size as u64; self.usage_stats.report_metrics(&self.metrics); - if self - .usage_stats - .is_above(self.config.storage_capacity_bytes) - { + if self.pruning_strategy.should_prune(&self.usage_stats) { self.prune()?; } @@ -409,8 +418,16 @@ impl IdIndexedV1Store { fn set_radius_to_farthest(&mut self) -> Result<(), ContentStoreError> { match self.lookup_farthest()? { None => { - error!(Db = %self.config.content_type, "Farthest not found!"); - self.radius = Distance::MAX; + if self.config.storage_capacity_bytes == 0 { + debug!( + Db = %self.config.content_type, + "Farthest not found and storage capacity is 0", + ); + self.radius = Distance::ZERO; + } else { + error!(Db = %self.config.content_type, "Farthest not found!"); + self.radius = Distance::MAX; + } } Some(farthest) => { self.radius = self.distance_to_content_id(&farthest.content_id); @@ -422,16 +439,13 @@ impl IdIndexedV1Store { /// Prunes database and updates `radius`. fn prune(&mut self) -> Result<(), ContentStoreError> { - if !self - .usage_stats - .is_above(self.config.target_capacity_bytes()) - { + if !self.pruning_strategy.should_prune(&self.usage_stats) { warn!(Db = %self.config.content_type, - "Pruning requested but we are below target capacity. Skipping"); + "Pruning requested but not needed. Skipping"); return Ok(()); } - let timer = self.metrics.start_process_timer("prune"); + let pruning_timer = self.metrics.start_process_timer("prune"); debug!(Db = %self.config.content_type, "Pruning start: count={} capacity={}", self.usage_stats.entry_count, @@ -441,26 +455,27 @@ impl IdIndexedV1Store { let conn = self.config.sql_connection_pool.get()?; let mut delete_query = conn.prepare(&sql::delete_farthest(&self.config.content_type))?; - while self - .usage_stats - .is_above(self.config.storage_capacity_bytes) - { - let to_delete = self - .config - .estimate_to_delete_until_target(&self.usage_stats) - .min(MAX_TO_PRUNE_PER_QUERY); + while self.pruning_strategy.should_prune(&self.usage_stats) { + let to_delete = self.pruning_strategy.get_pruning_count(&self.usage_stats); if to_delete == 0 { - error!(Db = %self.config.content_type, "Should delete 0. This is not expected to happen (we should be above storage capacity)."); + error!( + Db = %self.config.content_type, + "Entries to prune is 0. This is not supposed to happen (we should be above storage capacity)." + ); return Ok(()); } + let pruning_start_time = Instant::now(); let delete_timer = self.metrics.start_process_timer("prune_delete"); - let params = named_params! { ":limit": to_delete }; let deleted_content_sizes = delete_query - .query_map(params, |row| row.get("content_size"))? + .query_map(named_params! { ":limit": to_delete }, |row| { + row.get("content_size") + })? .collect::, rusqlite::Error>>()?; self.metrics.stop_process_timer(delete_timer); + self.pruning_strategy + .observe_pruning_duration(pruning_start_time.elapsed()); if to_delete != deleted_content_sizes.len() as u64 { error!(Db = %self.config.content_type, @@ -486,8 +501,7 @@ impl IdIndexedV1Store { self.usage_stats.entry_count, self.usage_stats.total_entry_size_bytes, ); - - self.metrics.stop_process_timer(timer); + self.metrics.stop_process_timer(pruning_timer); Ok(()) } } @@ -510,7 +524,10 @@ mod tests { use rand::Rng; use tempfile::TempDir; - use crate::{test_utils::generate_random_bytes, utils::setup_sql, DistanceFunction}; + use crate::{ + test_utils::generate_random_bytes, utils::setup_sql, + versioned::id_indexed_v1::pruning_strategy::PruningConfig, DistanceFunction, + }; use super::*; @@ -522,7 +539,7 @@ mod tests { // Storage capacity that stores 10000 items of default size const STORAGE_CAPACITY_10000_ITEMS: u64 = 10000 * CONTENT_DEFAULT_SIZE_BYTES; - fn create_config(temp_dir: &TempDir) -> IdIndexedV1StoreConfig { + fn create_config(temp_dir: &TempDir, storage_capacity_bytes: u64) -> IdIndexedV1StoreConfig { IdIndexedV1StoreConfig { content_type: ContentType::State, network: ProtocolId::State, @@ -530,7 +547,8 @@ mod tests { node_data_dir: temp_dir.path().to_path_buf(), distance_fn: DistanceFunction::Xor, sql_connection_pool: setup_sql(temp_dir.path()).unwrap(), - storage_capacity_bytes: STORAGE_CAPACITY_100_ITEMS, + storage_capacity_bytes, + pruning_config: PruningConfig::default(), } } @@ -582,7 +600,7 @@ mod tests { #[test] fn create_empty() -> Result<()> { let temp_dir = TempDir::new()?; - let config = create_config(&temp_dir); + let config = create_config(&temp_dir, STORAGE_CAPACITY_100_ITEMS); let store = IdIndexedV1Store::create(ContentType::State, config)?; assert_eq!(store.usage_stats.entry_count, 0); assert_eq!(store.usage_stats.total_entry_size_bytes, 0); @@ -593,7 +611,7 @@ mod tests { #[test] fn create_low_usage() -> Result<()> { let temp_dir = TempDir::new()?; - let config = create_config(&temp_dir); + let config = create_config(&temp_dir, STORAGE_CAPACITY_100_ITEMS); let item_count = 20; // 20% create_and_populate_table(&config, item_count)?; @@ -612,7 +630,7 @@ mod tests { #[test] fn create_half_full() -> Result<()> { let temp_dir = TempDir::new()?; - let config = create_config(&temp_dir); + let config = create_config(&temp_dir, STORAGE_CAPACITY_100_ITEMS); let item_count = 50; // 50% create_and_populate_table(&config, item_count)?; @@ -631,9 +649,10 @@ mod tests { #[test] fn create_at_target_capacity() -> Result<()> { let temp_dir = TempDir::new()?; - let config = create_config(&temp_dir); + let config = create_config(&temp_dir, STORAGE_CAPACITY_100_ITEMS); - let target_capacity_count = config.target_capacity_bytes() / CONTENT_DEFAULT_SIZE_BYTES; + let target_capacity_bytes = PruningStrategy::new(config.clone()).target_capacity_bytes(); + let target_capacity_count = target_capacity_bytes / CONTENT_DEFAULT_SIZE_BYTES; create_and_populate_table(&config, target_capacity_count)?; let store = IdIndexedV1Store::create(ContentType::State, config)?; @@ -649,10 +668,10 @@ mod tests { #[test] fn create_above_target_capacity() -> Result<()> { let temp_dir = TempDir::new()?; - let config = create_config(&temp_dir); + let config = create_config(&temp_dir, STORAGE_CAPACITY_100_ITEMS); - let above_target_capacity_count = - 1 + config.target_capacity_bytes() / CONTENT_DEFAULT_SIZE_BYTES; + let target_capacity_bytes = PruningStrategy::new(config.clone()).target_capacity_bytes(); + let above_target_capacity_count = 1 + target_capacity_bytes / CONTENT_DEFAULT_SIZE_BYTES; create_and_populate_table(&config, above_target_capacity_count)?; @@ -673,7 +692,7 @@ mod tests { #[test] fn create_at_full_capacity() -> Result<()> { let temp_dir = TempDir::new()?; - let config = create_config(&temp_dir); + let config = create_config(&temp_dir, STORAGE_CAPACITY_100_ITEMS); let full_capacity_count = config.storage_capacity_bytes / CONTENT_DEFAULT_SIZE_BYTES; @@ -696,7 +715,7 @@ mod tests { #[test] fn create_above_full_capacity() -> Result<()> { let temp_dir = TempDir::new()?; - let config = create_config(&temp_dir); + let config = create_config(&temp_dir, STORAGE_CAPACITY_100_ITEMS); let above_full_capacity_count = 10 + config.storage_capacity_bytes / CONTENT_DEFAULT_SIZE_BYTES; @@ -708,20 +727,47 @@ mod tests { // should prune until target capacity assert_eq!( store.usage_stats.entry_count, - config.target_capacity_bytes() / CONTENT_DEFAULT_SIZE_BYTES + store.pruning_strategy.target_capacity_bytes() / CONTENT_DEFAULT_SIZE_BYTES ); assert_eq!( store.usage_stats.total_entry_size_bytes, - config.target_capacity_bytes() + store.pruning_strategy.target_capacity_bytes() ); assert!(store.radius() < Distance::MAX); Ok(()) } + #[test] + fn create_zero_storage_empty() -> Result<()> { + let temp_dir = TempDir::new()?; + let config = create_config(&temp_dir, /* storage_capacity_bytes= */ 0); + + let store = IdIndexedV1Store::create(ContentType::State, config)?; + + assert_eq!(store.usage_stats, UsageStats::default()); + assert_eq!(store.radius(), Distance::ZERO); + Ok(()) + } + + #[test] + fn create_zero_storage_non_empty() -> Result<()> { + let temp_dir = TempDir::new()?; + let config = create_config(&temp_dir, /* storage_capacity_bytes= */ 0); + + // Add 1K entries, more than we would normally prune. + create_and_populate_table(&config, 1_000)?; + let store = IdIndexedV1Store::create(ContentType::State, config)?; + + // Check that db is empty and distance is ZERO. + assert_eq!(store.usage_stats, UsageStats::default()); + assert_eq!(store.radius(), Distance::ZERO); + Ok(()) + } + #[test] fn simple_insert_and_lookup() -> Result<()> { let temp_dir = TempDir::new()?; - let config = create_config(&temp_dir); + let config = create_config(&temp_dir, STORAGE_CAPACITY_100_ITEMS); // fill 50% of storage with 50 items, 1% each create_and_populate_table(&config, 50)?; @@ -751,7 +797,7 @@ mod tests { #[test] fn simple_insert_and_delete() -> Result<()> { let temp_dir = TempDir::new()?; - let config = create_config(&temp_dir); + let config = create_config(&temp_dir, STORAGE_CAPACITY_100_ITEMS); // fill 50% of storage with 50 items, 1% each create_and_populate_table(&config, 50)?; @@ -781,7 +827,7 @@ mod tests { #[test] fn prune_simple() -> Result<()> { let temp_dir = TempDir::new()?; - let config = create_config(&temp_dir); + let config = create_config(&temp_dir, STORAGE_CAPACITY_100_ITEMS); let mut store = IdIndexedV1Store::create(ContentType::State, config.clone())?; assert_eq!(store.radius(), Distance::MAX); @@ -809,7 +855,7 @@ mod tests { assert_eq!(store.usage_stats.entry_count, 95); assert_eq!( store.usage_stats.total_entry_size_bytes, - config.target_capacity_bytes() + store.pruning_strategy.target_capacity_bytes() ); assert!(store.radius() < Distance::MAX); assert!(store.radius().big_endian()[0] == 0xFF); @@ -843,7 +889,7 @@ mod tests { #[test] fn prune_different_sizes_elements() -> Result<()> { let temp_dir = TempDir::new()?; - let config = create_config(&temp_dir); + let config = create_config(&temp_dir, STORAGE_CAPACITY_100_ITEMS); // fill 50% of storage with 50 items, 1% each create_and_populate_table(&config, 50)?; @@ -895,7 +941,7 @@ mod tests { #[test] fn prune_with_one_large_item() -> Result<()> { let temp_dir = TempDir::new()?; - let config = create_config(&temp_dir); + let config = create_config(&temp_dir, STORAGE_CAPACITY_100_ITEMS); // fill 50% of storage with 50 items, 1% each create_and_populate_table(&config, 50)?; @@ -927,7 +973,10 @@ mod tests { // leaving us with 48 elements and between target and full capacity. assert_eq!(store.usage_stats.entry_count, 48); assert!(store.usage_stats.total_entry_size_bytes <= store.config.storage_capacity_bytes); - assert!(store.usage_stats.total_entry_size_bytes > store.config.target_capacity_bytes()); + assert!( + store.usage_stats.total_entry_size_bytes + > store.pruning_strategy.target_capacity_bytes() + ); // Check that the big_value_key is still stored assert!(store.has_content(&big_value_key.content_id().into())?); @@ -938,15 +987,7 @@ mod tests { #[test] fn prune_big_db() -> Result<()> { let temp_dir = TempDir::new()?; - let config = IdIndexedV1StoreConfig { - content_type: ContentType::State, - network: ProtocolId::State, - node_id: NodeId::random(), - node_data_dir: temp_dir.path().to_path_buf(), - distance_fn: DistanceFunction::Xor, - sql_connection_pool: setup_sql(temp_dir.path()).unwrap(), - storage_capacity_bytes: STORAGE_CAPACITY_10000_ITEMS, - }; + let config = create_config(&temp_dir, STORAGE_CAPACITY_10000_ITEMS); let mut store = IdIndexedV1Store::create(ContentType::State, config.clone())?; @@ -972,7 +1013,7 @@ mod tests { // we should have deleted exactly MAX_TO_PRUNE_PER_QUERY entries assert_eq!( store.usage_stats.entry_count, - 10_001 - MAX_TO_PRUNE_PER_QUERY + 10_001 - PruningStrategy::STARTING_MAX_PRUNING_COUNT ); // used capacity should be below storage capacity but above 99% @@ -985,15 +1026,7 @@ mod tests { #[test] fn prune_big_db_with_big_entry() -> Result<()> { let temp_dir = TempDir::new()?; - let config = IdIndexedV1StoreConfig { - content_type: ContentType::State, - network: ProtocolId::State, - node_id: NodeId::random(), - node_data_dir: temp_dir.path().to_path_buf(), - distance_fn: DistanceFunction::Xor, - sql_connection_pool: setup_sql(temp_dir.path()).unwrap(), - storage_capacity_bytes: STORAGE_CAPACITY_10000_ITEMS, - }; + let config = create_config(&temp_dir, STORAGE_CAPACITY_10000_ITEMS); let mut store = IdIndexedV1Store::create(ContentType::State, config.clone())?; @@ -1027,7 +1060,9 @@ mod tests { assert!(store.radius() < Distance::MAX); // we should have deleted more than MAX_TO_PRUNE_PER_QUERY entries - assert!(store.usage_stats.entry_count < 10_001 - MAX_TO_PRUNE_PER_QUERY); + assert!( + store.usage_stats.entry_count < 10_001 - PruningStrategy::STARTING_MAX_PRUNING_COUNT + ); // used capacity should not be above storage capacity assert!(store.usage_stats.total_entry_size_bytes <= STORAGE_CAPACITY_10000_ITEMS); @@ -1038,7 +1073,7 @@ mod tests { #[test] fn pagination_empty() -> Result<()> { let temp_dir = TempDir::new()?; - let config = create_config(&temp_dir); + let config = create_config(&temp_dir, STORAGE_CAPACITY_100_ITEMS); let store = IdIndexedV1Store::create(ContentType::State, config)?; assert_eq!( @@ -1054,7 +1089,7 @@ mod tests { #[test] fn pagination() -> Result<()> { let temp_dir = TempDir::new()?; - let config = create_config(&temp_dir); + let config = create_config(&temp_dir, STORAGE_CAPACITY_100_ITEMS); let mut store = IdIndexedV1Store::create(ContentType::State, config.clone())?; let entry_count = 12; diff --git a/trin-storage/src/versioned/usage_stats.rs b/trin-storage/src/versioned/usage_stats.rs index e388ed544..9733e94cf 100644 --- a/trin-storage/src/versioned/usage_stats.rs +++ b/trin-storage/src/versioned/usage_stats.rs @@ -17,7 +17,7 @@ impl UsageStats { } } - /// Returns the average entry size + /// Returns the average entry size, or `None` when empty. pub fn average_entry_size_bytes(&self) -> Option { if self.entry_count == 0 { Option::None