From 5726cf81f4d3ec40a0ab4f6a0260126d7c5c6e6d Mon Sep 17 00:00:00 2001 From: Li0k Date: Wed, 27 Mar 2024 16:40:22 +0800 Subject: [PATCH] refactor(storage): refactor compact iter recreate stream (#15919) --- src/common/src/config.rs | 9 ++++-- src/config/docs.md | 1 + src/config/example.toml | 1 + .../src/hummock/compactor/compaction_utils.rs | 7 ++--- .../src/hummock/compactor/compactor_runner.rs | 16 +++++------ src/storage/src/hummock/compactor/iterator.rs | 28 ++++++++++--------- src/storage/src/opts.rs | 3 ++ 7 files changed, 38 insertions(+), 27 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 1352eaf7be7b4..92db04cc722b5 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -688,6 +688,7 @@ pub struct StorageConfig { #[serde(default = "default::storage::compactor_max_sst_key_count")] pub compactor_max_sst_key_count: u64, + // DEPRECATED: This config will be deprecated in the future version, use `storage.compactor_iter_max_io_retry_times` instead. #[serde(default = "default::storage::compact_iter_recreate_timeout_ms")] pub compact_iter_recreate_timeout_ms: u64, #[serde(default = "default::storage::compactor_max_sst_size")] @@ -698,12 +699,12 @@ pub struct StorageConfig { pub check_compaction_result: bool, #[serde(default = "default::storage::max_preload_io_retry_times")] pub max_preload_io_retry_times: usize, - #[serde(default = "default::storage::compactor_fast_max_compact_delete_ratio")] pub compactor_fast_max_compact_delete_ratio: u32, - #[serde(default = "default::storage::compactor_fast_max_compact_task_size")] pub compactor_fast_max_compact_task_size: u64, + #[serde(default = "default::storage::compactor_iter_max_io_retry_times")] + pub compactor_iter_max_io_retry_times: usize, #[serde(default, flatten)] #[config_doc(omitted)] @@ -1329,6 +1330,10 @@ pub mod default { 10 * 60 * 1000 } + pub fn compactor_iter_max_io_retry_times() -> usize { + 8 + } + pub fn compactor_max_sst_size() -> u64 { 512 * 1024 * 1024 // 512m } diff --git a/src/config/docs.md b/src/config/docs.md index d0667829c04c5..8bfbe76c7d394 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -100,6 +100,7 @@ This page is automatically generated by `./risedev generate-example-config` | compact_iter_recreate_timeout_ms | | 600000 | | compactor_fast_max_compact_delete_ratio | | 40 | | compactor_fast_max_compact_task_size | | 2147483648 | +| compactor_iter_max_io_retry_times | | 8 | | compactor_max_sst_key_count | | 2097152 | | compactor_max_sst_size | | 536870912 | | compactor_max_task_multiplier | Compactor calculates the maximum number of tasks that can be executed on the node based on `worker_num` and `compactor_max_task_multiplier`. `max_pull_task_count` = `worker_num` * `compactor_max_task_multiplier` | 2.5 | diff --git a/src/config/example.toml b/src/config/example.toml index f0c4e2457bd9e..25c2f4b200f8a 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -133,6 +133,7 @@ check_compaction_result = false max_preload_io_retry_times = 3 compactor_fast_max_compact_delete_ratio = 40 compactor_fast_max_compact_task_size = 2147483648 +compactor_iter_max_io_retry_times = 8 mem_table_spill_threshold = 4194304 [storage.cache.block_cache_eviction] diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index 1424ebdd7f5e2..9594d7295a03e 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -348,7 +348,6 @@ pub async fn check_compaction_result( let mut table_iters = Vec::new(); let mut del_iter = ForwardMergeRangeIterator::default(); - let compact_io_retry_time = context.storage_opts.compact_iter_recreate_timeout_ms; for level in &compact_task.input_ssts { if level.table_infos.is_empty() { continue; @@ -365,7 +364,7 @@ pub async fn check_compaction_result( KeyRange::inf(), context.sstable_store.clone(), Arc::new(TaskProgress::default()), - compact_io_retry_time, + context.storage_opts.compactor_iter_max_io_retry_times, )); } else { let mut stats = StoreLocalStatistic::default(); @@ -381,7 +380,7 @@ pub async fn check_compaction_result( KeyRange::inf(), context.sstable_store.clone(), Arc::new(TaskProgress::default()), - compact_io_retry_time, + context.storage_opts.compactor_iter_max_io_retry_times, )); } } @@ -401,7 +400,7 @@ pub async fn check_compaction_result( KeyRange::inf(), context.sstable_store.clone(), Arc::new(TaskProgress::default()), - compact_io_retry_time, + context.storage_opts.compactor_iter_max_io_retry_times, ); let right_iter = UserIterator::new( SkipWatermarkIterator::from_safe_epoch_watermarks(iter, &compact_task.table_watermarks), diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 0bbe41e6649f8..c22d8eb5a30a9 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -154,12 +154,6 @@ impl CompactorRunner { task_progress: Arc, ) -> HummockResult> { let mut table_iters = Vec::new(); - let compact_io_retry_time = self - .compactor - .context - .storage_opts - .compact_iter_recreate_timeout_ms; - for level in &self.compact_task.input_ssts { if level.table_infos.is_empty() { continue; @@ -189,7 +183,10 @@ impl CompactorRunner { self.compactor.task_config.key_range.clone(), self.sstable_store.clone(), task_progress.clone(), - compact_io_retry_time, + self.compactor + .context + .storage_opts + .compactor_iter_max_io_retry_times, )); } else { for table_info in &level.table_infos { @@ -209,7 +206,10 @@ impl CompactorRunner { self.compactor.task_config.key_range.clone(), self.sstable_store.clone(), task_progress.clone(), - compact_io_retry_time, + self.compactor + .context + .storage_opts + .compactor_iter_max_io_retry_times, )); } } diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index 13fafe720be4c..f89d70a756486 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -55,8 +55,8 @@ pub struct SstableStreamIterator { sstable_info: SstableInfo, existing_table_ids: HashSet, task_progress: Arc, - io_retry_timeout_ms: u64, - create_time: Instant, + io_retry_times: usize, + max_io_retry_times: usize, } impl SstableStreamIterator { @@ -82,7 +82,7 @@ impl SstableStreamIterator { stats: &StoreLocalStatistic, task_progress: Arc, sstable_store: SstableStoreRef, - io_retry_timeout_ms: u64, + max_io_retry_times: usize, ) -> Self { Self { block_stream: None, @@ -92,10 +92,10 @@ impl SstableStreamIterator { stats_ptr: stats.remote_io_time.clone(), existing_table_ids, sstable_info, - create_time: Instant::now(), sstable_store, task_progress, - io_retry_timeout_ms, + io_retry_times: 0, + max_io_retry_times, } } @@ -178,13 +178,11 @@ impl SstableStreamIterator { } Ok(None) => break, Err(e) => { - if !e.is_object_error() - || self.create_time.elapsed().as_millis() as u64 - > self.io_retry_timeout_ms - { + if !e.is_object_error() || !self.need_recreate_io_stream() { return Err(e); } self.block_stream.take(); + self.io_retry_times += 1; fail_point!("create_stream_err"); } } @@ -250,6 +248,10 @@ impl SstableStreamIterator { self.sstable_info.table_ids ) } + + fn need_recreate_io_stream(&self) -> bool { + self.io_retry_times < self.max_io_retry_times + } } impl Drop for SstableStreamIterator { @@ -280,7 +282,7 @@ pub struct ConcatSstableIterator { stats: StoreLocalStatistic, task_progress: Arc, - io_retry_timeout_ms: u64, + max_io_retry_times: usize, } impl ConcatSstableIterator { @@ -293,7 +295,7 @@ impl ConcatSstableIterator { key_range: KeyRange, sstable_store: SstableStoreRef, task_progress: Arc, - io_retry_timeout_ms: u64, + max_io_retry_times: usize, ) -> Self { Self { key_range, @@ -304,7 +306,7 @@ impl ConcatSstableIterator { sstable_store, task_progress, stats: StoreLocalStatistic::default(), - io_retry_timeout_ms, + max_io_retry_times, } } @@ -405,7 +407,7 @@ impl ConcatSstableIterator { &self.stats, self.task_progress.clone(), self.sstable_store.clone(), - self.io_retry_timeout_ms, + self.max_io_retry_times, ); sstable_iter.seek(seek_key).await?; diff --git a/src/storage/src/opts.rs b/src/storage/src/opts.rs index eb4008da89942..43a73b42a8bee 100644 --- a/src/storage/src/opts.rs +++ b/src/storage/src/opts.rs @@ -68,6 +68,7 @@ pub struct StorageOpts { /// Capacity of sstable meta cache. pub compactor_memory_limit_mb: usize, /// compactor streaming iterator recreate timeout. + /// deprecated pub compact_iter_recreate_timeout_ms: u64, /// Number of SST ids fetched from meta per RPC pub sstable_id_remote_fetch_number: u32, @@ -77,6 +78,7 @@ pub struct StorageOpts { pub max_sub_compaction: u32, pub max_concurrent_compaction_task_number: u64, pub max_version_pinning_duration_sec: u64, + pub compactor_iter_max_io_retry_times: usize, pub data_file_cache_dir: String, pub data_file_cache_capacity_mb: usize, @@ -269,6 +271,7 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt .storage .compactor_fast_max_compact_delete_ratio, compactor_fast_max_compact_task_size: c.storage.compactor_fast_max_compact_task_size, + compactor_iter_max_io_retry_times: c.storage.compactor_iter_max_io_retry_times, } } }