diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 01973155589d3..984800f27eb0c 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -302,10 +302,15 @@ pub struct MetaConfig { /// split it to an single group. pub min_table_split_write_throughput: u64, + // If the compaction task does not report heartbeat beyond the + // `compaction_task_max_heartbeat_interval_secs` interval, we will cancel the task #[serde(default = "default::meta::compaction_task_max_heartbeat_interval_secs")] + pub compaction_task_max_heartbeat_interval_secs: u64, + // If the compaction task does not change in progress beyond the // `compaction_task_max_heartbeat_interval_secs` interval, we will cancel the task - pub compaction_task_max_heartbeat_interval_secs: u64, + #[serde(default = "default::meta::compaction_task_max_progress_interval_secs")] + pub compaction_task_max_progress_interval_secs: u64, #[serde(default)] pub compaction_config: CompactionConfig, @@ -1071,7 +1076,11 @@ pub mod default { } pub fn compaction_task_max_heartbeat_interval_secs() -> u64 { - 60 // 1min + 30 // 30s + } + + pub fn compaction_task_max_progress_interval_secs() -> u64 { + 60 * 10 // 10min } pub fn cut_table_size_limit() -> u64 { @@ -1501,19 +1510,19 @@ pub mod default { pub mod object_store_config { pub fn object_store_streaming_read_timeout_ms() -> u64 { - 10 * 60 * 1000 + 8 * 60 * 1000 } pub fn object_store_streaming_upload_timeout_ms() -> u64 { - 10 * 60 * 1000 + 8 * 60 * 1000 } pub fn object_store_upload_timeout_ms() -> u64 { - 60 * 60 * 1000 + 8 * 60 * 1000 } pub fn object_store_read_timeout_ms() -> u64 { - 60 * 60 * 1000 + 8 * 60 * 1000 } pub mod s3 { diff --git a/src/config/example.toml b/src/config/example.toml index b15a8aeb030f9..c9fe336b57b01 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -44,7 +44,8 @@ do_not_config_object_storage_lifecycle = false partition_vnode_count = 16 table_write_throughput_threshold = 16777216 min_table_split_write_throughput = 4194304 -compaction_task_max_heartbeat_interval_secs = 60 +compaction_task_max_heartbeat_interval_secs = 30 +compaction_task_max_progress_interval_secs = 600 hybird_partition_vnode_count = 4 event_log_enabled = true event_log_channel_max_size = 10 @@ -165,10 +166,10 @@ recent_filter_layers = 6 recent_filter_rotate_interval_ms = 10000 [storage.object_store] -object_store_streaming_read_timeout_ms = 600000 -object_store_streaming_upload_timeout_ms = 600000 -object_store_upload_timeout_ms = 3600000 -object_store_read_timeout_ms = 3600000 +object_store_streaming_read_timeout_ms = 480000 +object_store_streaming_upload_timeout_ms = 480000 +object_store_upload_timeout_ms = 480000 +object_store_read_timeout_ms = 480000 [storage.object_store.s3] object_store_keepalive_ms = 600000 diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 44dfad4965dc3..2989ceaa5f5c6 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -248,6 +248,28 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { ui_path: opts.dashboard_ui_path, }; + const MIN_TIMEOUT_INTERVAL_SEC: u64 = 20; + let compaction_task_max_progress_interval_secs = { + config + .storage + .object_store + .object_store_read_timeout_ms + .max(config.storage.object_store.object_store_upload_timeout_ms) + .max( + config + .storage + .object_store + .object_store_streaming_read_timeout_ms, + ) + .max( + config + .storage + .object_store + .object_store_streaming_upload_timeout_ms, + ) + .max(config.meta.compaction_task_max_progress_interval_secs) + } + MIN_TIMEOUT_INTERVAL_SEC; + let (mut join_handle, leader_lost_handle, shutdown_send) = rpc_serve( add_info, backend, @@ -311,6 +333,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { compaction_task_max_heartbeat_interval_secs: config .meta .compaction_task_max_heartbeat_interval_secs, + compaction_task_max_progress_interval_secs, compaction_config: Some(config.meta.compaction_config), cut_table_size_limit: config.meta.cut_table_size_limit, hybird_partition_vnode_count: config.meta.hybird_partition_vnode_count, diff --git a/src/meta/src/hummock/compactor_manager.rs b/src/meta/src/hummock/compactor_manager.rs index 71cba056ca535..d472fa20037a3 100644 --- a/src/meta/src/hummock/compactor_manager.rs +++ b/src/meta/src/hummock/compactor_manager.rs @@ -54,6 +54,8 @@ struct TaskHeartbeat { num_pending_write_io: u64, create_time: Instant, expire_at: u64, + + update_at: u64, } impl Compactor { @@ -116,7 +118,8 @@ impl Compactor { /// - 3. Cancelled: a task is reported as cancelled via `CompactStatus::report_compact_task`. It's /// the final state. pub struct CompactorManagerInner { - pub task_expiry_seconds: u64, + pub task_expired_seconds: u64, + pub heartbeat_expired_seconds: u64, task_heartbeats: HashMap, /// The outer lock is a RwLock, so we should still be able to modify each compactor @@ -139,7 +142,8 @@ impl CompactorManagerInner { .collect(), }; let mut manager = Self { - task_expiry_seconds: env.opts.compaction_task_max_heartbeat_interval_secs, + task_expired_seconds: env.opts.compaction_task_max_progress_interval_secs, + heartbeat_expired_seconds: env.opts.compaction_task_max_heartbeat_interval_secs, task_heartbeats: Default::default(), compactor_map: Default::default(), }; @@ -153,7 +157,8 @@ impl CompactorManagerInner { /// Only used for unit test. pub fn for_test() -> Self { Self { - task_expiry_seconds: 1, + task_expired_seconds: 1, + heartbeat_expired_seconds: 1, task_heartbeats: Default::default(), compactor_map: Default::default(), } @@ -239,19 +244,18 @@ impl CompactorManagerInner { ret } - pub fn get_expired_tasks(&self, interval_sec: Option) -> Vec { - let interval = interval_sec.unwrap_or(0); - let now: u64 = SystemTime::now() + pub fn get_heartbeat_expired_tasks(&self) -> Vec { + let heartbeat_expired_ts: u64 = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .expect("Clock may have gone backwards") .as_secs() - - interval; - Self::get_heartbeat_expired_tasks(&self.task_heartbeats, now) + - self.heartbeat_expired_seconds; + Self::get_heartbeat_expired_tasks_impl(&self.task_heartbeats, heartbeat_expired_ts) } - fn get_heartbeat_expired_tasks( + fn get_heartbeat_expired_tasks_impl( task_heartbeats: &HashMap, - now: u64, + heartbeat_expired_ts: u64, ) -> Vec { let mut cancellable_tasks = vec![]; const MAX_TASK_DURATION_SEC: u64 = 2700; @@ -265,22 +269,24 @@ impl CompactorManagerInner { num_progress_key, num_pending_read_io, num_pending_write_io, + update_at, } in task_heartbeats.values() { - if *expire_at < now { - // task heartbeat expire + if *update_at < heartbeat_expired_ts { cancellable_tasks.push(task.clone()); } + let task_duration_too_long = create_time.elapsed().as_secs() > MAX_TASK_DURATION_SEC; if task_duration_too_long { let compact_task_statistics = statistics_compact_task(task); tracing::info!( - "CompactionGroupId {} Task {} duration too long create_time {:?} num_ssts_sealed {} num_ssts_uploaded {} num_progress_key {} \ + "CompactionGroupId {} Task {} duration too long create_time {:?} expire_at {:?} num_ssts_sealed {} num_ssts_uploaded {} num_progress_key {} \ pending_read_io_count {} pending_write_io_count {} target_level {} \ base_level {} target_sub_level_id {} task_type {} compact_task_statistics {:?}", task.compaction_group_id, task.task_id, create_time, + expire_at, num_ssts_sealed, num_ssts_uploaded, num_progress_key, @@ -312,7 +318,8 @@ impl CompactorManagerInner { num_pending_read_io: 0, num_pending_write_io: 0, create_time: Instant::now(), - expire_at: now + self.task_expiry_seconds, + expire_at: now + self.task_expired_seconds, + update_at: now, }, ); } @@ -332,12 +339,14 @@ impl CompactorManagerInner { let mut cancel_tasks = vec![]; for progress in progress_list { if let Some(task_ref) = self.task_heartbeats.get_mut(&progress.task_id) { + task_ref.update_at = now; + if task_ref.num_ssts_sealed < progress.num_ssts_sealed || task_ref.num_ssts_uploaded < progress.num_ssts_uploaded || task_ref.num_progress_key < progress.num_progress_key { - // Refresh the expiry of the task as it is showing progress. - task_ref.expire_at = now + self.task_expiry_seconds; + // Refresh the expired of the task as it is showing progress. + task_ref.expire_at = now + self.task_expired_seconds; task_ref.num_ssts_sealed = progress.num_ssts_sealed; task_ref.num_ssts_uploaded = progress.num_ssts_uploaded; task_ref.num_progress_key = progress.num_progress_key; @@ -430,8 +439,8 @@ impl CompactorManager { .check_tasks_status(tasks, slow_task_duration) } - pub fn get_expired_tasks(&self, interval_sec: Option) -> Vec { - self.inner.read().get_expired_tasks(interval_sec) + pub fn get_heartbeat_expired_tasks(&self) -> Vec { + self.inner.read().get_heartbeat_expired_tasks() } pub fn initiate_task_heartbeat(&self, task: CompactTask) { @@ -497,7 +506,7 @@ mod tests { (env, context_id) }; - // Restart. Set task_expiry_seconds to 0 only to speed up test. + // Restart. Set task_expired_seconds to 0 only to speed up test. let compactor_manager = CompactorManager::with_meta(env).await.unwrap(); // Because task assignment exists. // Because compactor gRPC is not established yet. @@ -506,15 +515,11 @@ mod tests { // Ensure task is expired. tokio::time::sleep(Duration::from_secs(2)).await; - let expired = compactor_manager.get_expired_tasks(None); + let expired = compactor_manager.get_heartbeat_expired_tasks(); assert_eq!(expired.len(), 1); // Mimic no-op compaction heartbeat - compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress { - task_id: expired[0].task_id, - ..Default::default() - }]); - assert_eq!(compactor_manager.get_expired_tasks(None).len(), 1); + assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1); // Mimic compaction heartbeat with invalid task id compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress { @@ -524,7 +529,7 @@ mod tests { num_progress_key: 100, ..Default::default() }]); - assert_eq!(compactor_manager.get_expired_tasks(None).len(), 1); + assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1); // Mimic effective compaction heartbeat compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress { @@ -534,7 +539,7 @@ mod tests { num_progress_key: 100, ..Default::default() }]); - assert_eq!(compactor_manager.get_expired_tasks(None).len(), 0); + assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 0); // Test add assert_eq!(compactor_manager.compactor_num(), 0); diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 765e83255ac4f..31973566ff75f 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -2192,7 +2192,7 @@ impl HummockManager { GroupSplit, CheckDeadTask, Report, - CompactionHeartBeat, + CompactionHeartBeatExpiredCheck, DynamicCompactionTrigger, SpaceReclaimCompactionTrigger, @@ -2224,7 +2224,7 @@ impl HummockManager { .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); compaction_heartbeat_interval.reset(); let compaction_heartbeat_trigger = IntervalStream::new(compaction_heartbeat_interval) - .map(|_| HummockTimerEvent::CompactionHeartBeat); + .map(|_| HummockTimerEvent::CompactionHeartBeatExpiredCheck); let mut min_trigger_interval = tokio::time::interval(Duration::from_secs( hummock_manager.env.opts.periodic_compaction_interval_sec, @@ -2403,27 +2403,17 @@ impl HummockManager { } } - HummockTimerEvent::CompactionHeartBeat => { + HummockTimerEvent::CompactionHeartBeatExpiredCheck => { let compactor_manager = hummock_manager.compactor_manager.clone(); // TODO: add metrics to track expired tasks - const INTERVAL_SEC: u64 = 30; // The cancel task has two paths // 1. compactor heartbeat cancels the expired task based on task // progress (meta + compactor) // 2. meta periodically scans the task and performs a cancel on // the meta side for tasks that are not updated by heartbeat - - // So the reason for setting Interval is to let compactor be - // responsible for canceling the corresponding task as much as - // possible by relaxing the conditions for detection on the meta - // side, and meta is just used as a last resort to clean up the - // tasks that compactor has expired. - - for task in - compactor_manager.get_expired_tasks(Some(INTERVAL_SEC)) - { + for task in compactor_manager.get_heartbeat_expired_tasks() { if let Err(e) = hummock_manager .cancel_compact_task( task.task_id, diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index dd25c39307905..2b687bc712841 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -188,6 +188,7 @@ pub struct MetaOpts { pub min_table_split_write_throughput: u64, pub compaction_task_max_heartbeat_interval_secs: u64, + pub compaction_task_max_progress_interval_secs: u64, pub compaction_config: Option, /// The size limit to split a state-table to independent sstable. @@ -254,6 +255,7 @@ impl MetaOpts { do_not_config_object_storage_lifecycle: true, partition_vnode_count: 32, compaction_task_max_heartbeat_interval_secs: 0, + compaction_task_max_progress_interval_secs: 1, compaction_config: None, cut_table_size_limit: 1024 * 1024 * 1024, hybird_partition_vnode_count: 4, diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index 073c5b5119319..1a5f8c94cd110 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -204,7 +204,6 @@ async fn compact>(iter: I, sstable_store Arc::new(CompactorMetrics::unused()), iter, DummyCompactionFilter, - None, ) .await .unwrap(); diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 82ca243ed0836..49dd1b6b23abb 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -32,6 +32,7 @@ use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType}; use risingwave_pb::hummock::{BloomFilterType, CompactTask, LevelType}; use tokio::sync::oneshot::Receiver; +use super::iterator::MonitoredCompactorIterator; use super::task_progress::TaskProgress; use super::{check_compaction_result, CompactionStatistics, TaskConfig}; use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorManager}; @@ -231,7 +232,10 @@ impl CompactorRunner { // in https://github.com/risingwavelabs/risingwave/issues/13148 Ok(( SkipWatermarkIterator::from_safe_epoch_watermarks( - UnorderedMergeIteratorInner::for_compactor(table_iters), + MonitoredCompactorIterator::new( + UnorderedMergeIteratorInner::for_compactor(table_iters), + task_progress.clone(), + ), &self.compact_task.table_watermarks, ), CompactionDeleteRangeIterator::new(del_iter), @@ -706,7 +710,6 @@ pub async fn compact_and_build_sst( compactor_metrics: Arc, mut iter: impl HummockIterator, mut compaction_filter: impl CompactionFilter, - task_progress: Option>, ) -> HummockResult where F: TableBuilderFactory, @@ -750,18 +753,7 @@ where let mut last_table_stats = TableStats::default(); let mut last_table_id = None; let mut compaction_statistics = CompactionStatistics::default(); - let mut progress_key_num: u64 = 0; - const PROGRESS_KEY_INTERVAL: u64 = 100; while iter.is_valid() { - progress_key_num += 1; - - if let Some(task_progress) = task_progress.as_ref() - && progress_key_num >= PROGRESS_KEY_INTERVAL - { - task_progress.inc_progress_key(progress_key_num); - progress_key_num = 0; - } - let mut iter_key = iter.key(); compaction_statistics.iter_total_key_counts += 1; @@ -807,14 +799,6 @@ where }) .await?; } - - progress_key_num += 1; - if let Some(task_progress) = task_progress.as_ref() - && progress_key_num >= PROGRESS_KEY_INTERVAL - { - task_progress.inc_progress_key(progress_key_num); - progress_key_num = 0; - } } let earliest_range_delete_which_can_see_iter_key = del_iter.earliest_delete_since(epoch); @@ -918,23 +902,8 @@ where event_key, }) .await?; - progress_key_num += 1; - if let Some(task_progress) = task_progress.as_ref() - && progress_key_num >= PROGRESS_KEY_INTERVAL - { - task_progress.inc_progress_key(progress_key_num); - progress_key_num = 0; - } } } - - if let Some(task_progress) = task_progress.as_ref() - && progress_key_num > 0 - { - // Avoid losing the progress_key_num in the last Interval - task_progress.inc_progress_key(progress_key_num); - } - if let Some(last_table_id) = last_table_id.take() { table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats)); } diff --git a/src/storage/src/hummock/compactor/fast_compactor_runner.rs b/src/storage/src/hummock/compactor/fast_compactor_runner.rs index 653a00f21b7c2..828a9750aa41c 100644 --- a/src/storage/src/hummock/compactor/fast_compactor_runner.rs +++ b/src/storage/src/hummock/compactor/fast_compactor_runner.rs @@ -159,9 +159,7 @@ impl BlockStreamIterator { impl Drop for BlockStreamIterator { fn drop(&mut self) { - self.task_progress - .num_pending_read_io - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + self.task_progress.dec_num_pending_read_io(); } } @@ -242,9 +240,7 @@ impl ConcatSstableIterator { .await?; let stats_ptr = self.stats.remote_io_time.clone(); let now = Instant::now(); - self.task_progress - .num_pending_read_io - .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + self.task_progress.inc_num_pending_read_io(); let block_stream = self .sstable_store .get_stream_for_blocks(sstable.value().id, &sstable.value().meta.block_metas) diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index ec115c8ea50f1..fd2a9b1086e6f 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -34,6 +34,8 @@ use crate::hummock::value::HummockValue; use crate::hummock::{BlockHolder, BlockIterator, BlockMeta, HummockResult}; use crate::monitor::StoreLocalStatistic; +const PROGRESS_KEY_INTERVAL: usize = 100; + /// Iterates over the KV-pairs of an SST while downloading it. pub struct SstableStreamIterator { sstable_store: SstableStoreRef, @@ -252,9 +254,7 @@ impl SstableStreamIterator { impl Drop for SstableStreamIterator { fn drop(&mut self) { - self.task_progress - .num_pending_read_io - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + self.task_progress.dec_num_pending_read_io() } } @@ -396,9 +396,7 @@ impl ConcatSstableIterator { if start_index >= end_index { found = false; } else { - self.task_progress - .num_pending_read_io - .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + self.task_progress.inc_num_pending_read_io(); let mut sstable_iter = SstableStreamIterator::new( sstable.value().meta.block_metas.clone(), table_info.clone(), @@ -490,6 +488,67 @@ impl HummockIterator for ConcatSstableIterator { } } +pub struct MonitoredCompactorIterator { + inner: I, + task_progress: Arc, + + processed_key_num: usize, +} + +impl> MonitoredCompactorIterator { + pub fn new(inner: I, task_progress: Arc) -> Self { + Self { + inner, + task_progress, + processed_key_num: 0, + } + } +} + +impl> HummockIterator for MonitoredCompactorIterator { + type Direction = Forward; + + async fn next(&mut self) -> HummockResult<()> { + self.inner.next().await?; + self.processed_key_num += 1; + + if self.processed_key_num % PROGRESS_KEY_INTERVAL == 0 { + self.task_progress + .inc_progress_key(PROGRESS_KEY_INTERVAL as _); + } + + Ok(()) + } + + fn key(&self) -> FullKey<&[u8]> { + self.inner.key() + } + + fn value(&self) -> HummockValue<&[u8]> { + self.inner.value() + } + + fn is_valid(&self) -> bool { + self.inner.is_valid() + } + + async fn rewind(&mut self) -> HummockResult<()> { + self.processed_key_num = 0; + self.inner.rewind().await?; + Ok(()) + } + + async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> { + self.processed_key_num = 0; + self.inner.seek(key).await?; + Ok(()) + } + + fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) { + self.inner.collect_local_statistic(stats) + } +} + #[cfg(test)] mod tests { use std::cmp::Ordering; diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index 126477cf4d84b..1c0c6f78c35ff 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -226,9 +226,7 @@ impl Compactor { rets.push(ret); if let Some(tracker) = &task_progress { tracker.inc_ssts_uploaded(); - tracker - .num_pending_write_io - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + tracker.dec_num_pending_write_io(); } if is_share_buffer_compact { metrics.shared_buffer_to_sstable_size.observe(sst_size as _); @@ -276,7 +274,6 @@ impl Compactor { self.context.compactor_metrics.clone(), iter, compaction_filter, - task_progress, ) .verbose_instrument_await("compact_and_build_sst") .await?; @@ -764,15 +761,7 @@ fn get_task_progress( ) -> Vec { let mut progress_list = Vec::new(); for (&task_id, progress) in &*task_progress.lock() { - progress_list.push(CompactTaskProgress { - task_id, - num_ssts_sealed: progress.num_ssts_sealed.load(Ordering::Relaxed), - num_ssts_uploaded: progress.num_ssts_uploaded.load(Ordering::Relaxed), - num_progress_key: progress.num_progress_key.load(Ordering::Relaxed), - num_pending_read_io: progress.num_pending_read_io.load(Ordering::Relaxed) as u64, - num_pending_write_io: progress.num_pending_write_io.load(Ordering::Relaxed) as u64, - ..Default::default() - }); + progress_list.push(progress.snapshot(task_id)); } progress_list } diff --git a/src/storage/src/hummock/compactor/task_progress.rs b/src/storage/src/hummock/compactor/task_progress.rs index 2c85f150cabef..81413ae2a8d7d 100644 --- a/src/storage/src/hummock/compactor/task_progress.rs +++ b/src/storage/src/hummock/compactor/task_progress.rs @@ -18,17 +18,18 @@ use std::sync::Arc; use parking_lot::Mutex; use risingwave_hummock_sdk::HummockCompactionTaskId; +use risingwave_pb::hummock::CompactTaskProgress; pub type TaskProgressManagerRef = Arc>>>; /// The progress of a compaction task. #[derive(Default)] pub struct TaskProgress { - pub num_ssts_sealed: AtomicU32, - pub num_ssts_uploaded: AtomicU32, - pub num_progress_key: AtomicU64, - pub num_pending_read_io: AtomicUsize, - pub num_pending_write_io: AtomicUsize, + num_ssts_sealed: AtomicU32, + num_ssts_uploaded: AtomicU32, + num_progress_key: AtomicU64, + num_pending_read_io: AtomicUsize, + num_pending_write_io: AtomicUsize, } impl TaskProgress { @@ -44,6 +45,34 @@ impl TaskProgress { self.num_progress_key .fetch_add(inc_key_num, Ordering::Relaxed); } + + pub fn inc_num_pending_read_io(&self) { + self.num_pending_read_io.fetch_add(1, Ordering::SeqCst); + } + + pub fn inc_num_pending_write_io(&self) { + self.num_pending_write_io.fetch_add(1, Ordering::SeqCst); + } + + pub fn dec_num_pending_read_io(&self) { + self.num_pending_read_io.fetch_sub(1, Ordering::SeqCst); + } + + pub fn dec_num_pending_write_io(&self) { + self.num_pending_write_io.fetch_sub(1, Ordering::SeqCst); + } + + pub fn snapshot(&self, task_id: u64) -> CompactTaskProgress { + CompactTaskProgress { + task_id, + num_ssts_sealed: self.num_ssts_sealed.load(Ordering::Relaxed), + num_ssts_uploaded: self.num_ssts_uploaded.load(Ordering::Relaxed), + num_pending_read_io: self.num_pending_read_io.load(Ordering::Relaxed) as u64, + num_pending_write_io: self.num_pending_write_io.load(Ordering::Relaxed) as u64, + num_progress_key: self.num_progress_key.load(Ordering::Relaxed), + ..Default::default() + } + } } /// An RAII object that contains a [`TaskProgress`] and shares it to all the splits of the task. diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs index 42a19866fc467..786d3eec6a0ba 100644 --- a/src/storage/src/hummock/sstable/multi_builder.rs +++ b/src/storage/src/hummock/sstable/multi_builder.rs @@ -147,9 +147,7 @@ where ) -> HummockResult { if self.current_builder.is_none() { if let Some(progress) = &self.task_progress { - progress - .num_pending_write_io - .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + progress.inc_num_pending_write_io() } let builder = self.builder_factory.open_builder().await?; self.current_builder = Some(builder); @@ -218,9 +216,7 @@ where if self.current_builder.is_none() { if let Some(progress) = &self.task_progress { - progress - .num_pending_write_io - .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + progress.inc_num_pending_write_io(); } let mut builder = self.builder_factory.open_builder().await?; // If last_range_tombstone_epoch is not MAX, it means that we cut one range-tombstone to @@ -315,9 +311,7 @@ where } if let Some(progress) = &self.task_progress { - progress - .num_pending_write_io - .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + progress.inc_num_pending_write_io(); } let builder = self.builder_factory.open_builder().await?; self.current_builder = Some(builder);