From c1f2cc1b5eead87f1181fe99786c8d0826a98468 Mon Sep 17 00:00:00 2001
From: Eric Fu <eric@singularity-data.com>
Date: Mon, 29 Jan 2024 23:07:03 +0800
Subject: [PATCH] Revert "feat(storage): seperate timeout between process and
 heartbeat (#14366)"

This reverts commit c9e3e92fcad6ce7b3e9058131953b43e6b74bf4e.
---
 src/common/src/config.rs                      | 21 ++----
 src/config/example.toml                       | 11 ++-
 src/meta/node/src/lib.rs                      | 23 ------
 src/meta/src/hummock/compactor_manager.rs     | 59 +++++++--------
 src/meta/src/hummock/manager/mod.rs           | 18 +++--
 src/meta/src/manager/env.rs                   |  2 -
 src/storage/benches/bench_compactor.rs        |  1 +
 .../src/hummock/compactor/compactor_runner.rs | 41 +++++++++--
 .../compactor/fast_compactor_runner.rs        |  8 ++-
 src/storage/src/hummock/compactor/iterator.rs | 71 ++-----------------
 src/storage/src/hummock/compactor/mod.rs      | 15 +++-
 .../src/hummock/compactor/task_progress.rs    | 39 ++--------
 .../src/hummock/sstable/multi_builder.rs      | 12 +++-
 13 files changed, 128 insertions(+), 193 deletions(-)

diff --git a/src/common/src/config.rs b/src/common/src/config.rs
index 49b0453334c17..78cb7146370de 100644
--- a/src/common/src/config.rs
+++ b/src/common/src/config.rs
@@ -294,15 +294,10 @@ pub struct MetaConfig {
     /// split it to an single group.
     pub min_table_split_write_throughput: u64,
 
-    // If the compaction task does not report heartbeat beyond the
-    // `compaction_task_max_heartbeat_interval_secs` interval, we will cancel the task
     #[serde(default = "default::meta::compaction_task_max_heartbeat_interval_secs")]
-    pub compaction_task_max_heartbeat_interval_secs: u64,
-
     // If the compaction task does not change in progress beyond the
     // `compaction_task_max_heartbeat_interval_secs` interval, we will cancel the task
-    #[serde(default = "default::meta::compaction_task_max_progress_interval_secs")]
-    pub compaction_task_max_progress_interval_secs: u64,
+    pub compaction_task_max_heartbeat_interval_secs: u64,
 
     #[serde(default)]
     pub compaction_config: CompactionConfig,
@@ -1064,11 +1059,7 @@ pub mod default {
         }
 
         pub fn compaction_task_max_heartbeat_interval_secs() -> u64 {
-            30 // 30s
-        }
-
-        pub fn compaction_task_max_progress_interval_secs() -> u64 {
-            60 * 10 // 10min
+            60 // 1min
         }
 
         pub fn cut_table_size_limit() -> u64 {
@@ -1498,19 +1489,19 @@ pub mod default {
 
     pub mod object_store_config {
         pub fn object_store_streaming_read_timeout_ms() -> u64 {
-            8 * 60 * 1000
+            10 * 60 * 1000
         }
 
         pub fn object_store_streaming_upload_timeout_ms() -> u64 {
-            8 * 60 * 1000
+            10 * 60 * 1000
         }
 
         pub fn object_store_upload_timeout_ms() -> u64 {
-            8 * 60 * 1000
+            60 * 60 * 1000
         }
 
         pub fn object_store_read_timeout_ms() -> u64 {
-            8 * 60 * 1000
+            60 * 60 * 1000
         }
 
         pub mod s3 {
diff --git a/src/config/example.toml b/src/config/example.toml
index 21d13f81fbdcd..b2eef323c2d00 100644
--- a/src/config/example.toml
+++ b/src/config/example.toml
@@ -43,8 +43,7 @@ do_not_config_object_storage_lifecycle = false
 partition_vnode_count = 16
 table_write_throughput_threshold = 16777216
 min_table_split_write_throughput = 4194304
-compaction_task_max_heartbeat_interval_secs = 30
-compaction_task_max_progress_interval_secs = 600
+compaction_task_max_heartbeat_interval_secs = 60
 hybird_partition_vnode_count = 4
 event_log_enabled = true
 event_log_channel_max_size = 10
@@ -165,10 +164,10 @@ recent_filter_layers = 6
 recent_filter_rotate_interval_ms = 10000
 
 [storage.object_store]
-object_store_streaming_read_timeout_ms = 480000
-object_store_streaming_upload_timeout_ms = 480000
-object_store_upload_timeout_ms = 480000
-object_store_read_timeout_ms = 480000
+object_store_streaming_read_timeout_ms = 600000
+object_store_streaming_upload_timeout_ms = 600000
+object_store_upload_timeout_ms = 3600000
+object_store_read_timeout_ms = 3600000
 
 [storage.object_store.s3]
 object_store_keepalive_ms = 600000
diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs
index 46a9e94ed8616..f6aa1be0d08f5 100644
--- a/src/meta/node/src/lib.rs
+++ b/src/meta/node/src/lib.rs
@@ -248,28 +248,6 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
             ui_path: opts.dashboard_ui_path,
         };
 
-        const MIN_TIMEOUT_INTERVAL_SEC: u64 = 20;
-        let compaction_task_max_progress_interval_secs = {
-            config
-                .storage
-                .object_store
-                .object_store_read_timeout_ms
-                .max(config.storage.object_store.object_store_upload_timeout_ms)
-                .max(
-                    config
-                        .storage
-                        .object_store
-                        .object_store_streaming_read_timeout_ms,
-                )
-                .max(
-                    config
-                        .storage
-                        .object_store
-                        .object_store_streaming_upload_timeout_ms,
-                )
-                .max(config.meta.compaction_task_max_progress_interval_secs)
-        } + MIN_TIMEOUT_INTERVAL_SEC;
-
         let (mut join_handle, leader_lost_handle, shutdown_send) = rpc_serve(
             add_info,
             backend,
@@ -332,7 +310,6 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
                 compaction_task_max_heartbeat_interval_secs: config
                     .meta
                     .compaction_task_max_heartbeat_interval_secs,
-                compaction_task_max_progress_interval_secs,
                 compaction_config: Some(config.meta.compaction_config),
                 cut_table_size_limit: config.meta.cut_table_size_limit,
                 hybird_partition_vnode_count: config.meta.hybird_partition_vnode_count,
diff --git a/src/meta/src/hummock/compactor_manager.rs b/src/meta/src/hummock/compactor_manager.rs
index d472fa20037a3..71cba056ca535 100644
--- a/src/meta/src/hummock/compactor_manager.rs
+++ b/src/meta/src/hummock/compactor_manager.rs
@@ -54,8 +54,6 @@ struct TaskHeartbeat {
     num_pending_write_io: u64,
     create_time: Instant,
     expire_at: u64,
-
-    update_at: u64,
 }
 
 impl Compactor {
@@ -118,8 +116,7 @@ impl Compactor {
 /// - 3. Cancelled: a task is reported as cancelled via `CompactStatus::report_compact_task`. It's
 ///   the final state.
 pub struct CompactorManagerInner {
-    pub task_expired_seconds: u64,
-    pub heartbeat_expired_seconds: u64,
+    pub task_expiry_seconds: u64,
     task_heartbeats: HashMap<HummockCompactionTaskId, TaskHeartbeat>,
 
     /// The outer lock is a RwLock, so we should still be able to modify each compactor
@@ -142,8 +139,7 @@ impl CompactorManagerInner {
                 .collect(),
         };
         let mut manager = Self {
-            task_expired_seconds: env.opts.compaction_task_max_progress_interval_secs,
-            heartbeat_expired_seconds: env.opts.compaction_task_max_heartbeat_interval_secs,
+            task_expiry_seconds: env.opts.compaction_task_max_heartbeat_interval_secs,
             task_heartbeats: Default::default(),
             compactor_map: Default::default(),
         };
@@ -157,8 +153,7 @@ impl CompactorManagerInner {
     /// Only used for unit test.
     pub fn for_test() -> Self {
         Self {
-            task_expired_seconds: 1,
-            heartbeat_expired_seconds: 1,
+            task_expiry_seconds: 1,
             task_heartbeats: Default::default(),
             compactor_map: Default::default(),
         }
@@ -244,18 +239,19 @@ impl CompactorManagerInner {
         ret
     }
 
-    pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
-        let heartbeat_expired_ts: u64 = SystemTime::now()
+    pub fn get_expired_tasks(&self, interval_sec: Option<u64>) -> Vec<CompactTask> {
+        let interval = interval_sec.unwrap_or(0);
+        let now: u64 = SystemTime::now()
             .duration_since(SystemTime::UNIX_EPOCH)
             .expect("Clock may have gone backwards")
             .as_secs()
-            - self.heartbeat_expired_seconds;
-        Self::get_heartbeat_expired_tasks_impl(&self.task_heartbeats, heartbeat_expired_ts)
+            - interval;
+        Self::get_heartbeat_expired_tasks(&self.task_heartbeats, now)
     }
 
-    fn get_heartbeat_expired_tasks_impl(
+    fn get_heartbeat_expired_tasks(
         task_heartbeats: &HashMap<HummockCompactionTaskId, TaskHeartbeat>,
-        heartbeat_expired_ts: u64,
+        now: u64,
     ) -> Vec<CompactTask> {
         let mut cancellable_tasks = vec![];
         const MAX_TASK_DURATION_SEC: u64 = 2700;
@@ -269,24 +265,22 @@ impl CompactorManagerInner {
             num_progress_key,
             num_pending_read_io,
             num_pending_write_io,
-            update_at,
         } in task_heartbeats.values()
         {
-            if *update_at < heartbeat_expired_ts {
+            if *expire_at < now {
+                // task heartbeat expire
                 cancellable_tasks.push(task.clone());
             }
-
             let task_duration_too_long = create_time.elapsed().as_secs() > MAX_TASK_DURATION_SEC;
             if task_duration_too_long {
                 let compact_task_statistics = statistics_compact_task(task);
                 tracing::info!(
-                    "CompactionGroupId {} Task {} duration too long create_time {:?} expire_at {:?} num_ssts_sealed {} num_ssts_uploaded {} num_progress_key {} \
+                    "CompactionGroupId {} Task {} duration too long create_time {:?} num_ssts_sealed {} num_ssts_uploaded {} num_progress_key {} \
                         pending_read_io_count {} pending_write_io_count {} target_level {} \
                         base_level {} target_sub_level_id {} task_type {} compact_task_statistics {:?}",
                         task.compaction_group_id,
                         task.task_id,
                         create_time,
-                        expire_at,
                         num_ssts_sealed,
                         num_ssts_uploaded,
                         num_progress_key,
@@ -318,8 +312,7 @@ impl CompactorManagerInner {
                 num_pending_read_io: 0,
                 num_pending_write_io: 0,
                 create_time: Instant::now(),
-                expire_at: now + self.task_expired_seconds,
-                update_at: now,
+                expire_at: now + self.task_expiry_seconds,
             },
         );
     }
@@ -339,14 +332,12 @@ impl CompactorManagerInner {
         let mut cancel_tasks = vec![];
         for progress in progress_list {
             if let Some(task_ref) = self.task_heartbeats.get_mut(&progress.task_id) {
-                task_ref.update_at = now;
-
                 if task_ref.num_ssts_sealed < progress.num_ssts_sealed
                     || task_ref.num_ssts_uploaded < progress.num_ssts_uploaded
                     || task_ref.num_progress_key < progress.num_progress_key
                 {
-                    // Refresh the expired of the task as it is showing progress.
-                    task_ref.expire_at = now + self.task_expired_seconds;
+                    // Refresh the expiry of the task as it is showing progress.
+                    task_ref.expire_at = now + self.task_expiry_seconds;
                     task_ref.num_ssts_sealed = progress.num_ssts_sealed;
                     task_ref.num_ssts_uploaded = progress.num_ssts_uploaded;
                     task_ref.num_progress_key = progress.num_progress_key;
@@ -439,8 +430,8 @@ impl CompactorManager {
             .check_tasks_status(tasks, slow_task_duration)
     }
 
-    pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
-        self.inner.read().get_heartbeat_expired_tasks()
+    pub fn get_expired_tasks(&self, interval_sec: Option<u64>) -> Vec<CompactTask> {
+        self.inner.read().get_expired_tasks(interval_sec)
     }
 
     pub fn initiate_task_heartbeat(&self, task: CompactTask) {
@@ -506,7 +497,7 @@ mod tests {
             (env, context_id)
         };
 
-        // Restart. Set task_expired_seconds to 0 only to speed up test.
+        // Restart. Set task_expiry_seconds to 0 only to speed up test.
         let compactor_manager = CompactorManager::with_meta(env).await.unwrap();
         // Because task assignment exists.
         // Because compactor gRPC is not established yet.
@@ -515,11 +506,15 @@ mod tests {
 
         // Ensure task is expired.
         tokio::time::sleep(Duration::from_secs(2)).await;
-        let expired = compactor_manager.get_heartbeat_expired_tasks();
+        let expired = compactor_manager.get_expired_tasks(None);
         assert_eq!(expired.len(), 1);
 
         // Mimic no-op compaction heartbeat
-        assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);
+        compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
+            task_id: expired[0].task_id,
+            ..Default::default()
+        }]);
+        assert_eq!(compactor_manager.get_expired_tasks(None).len(), 1);
 
         // Mimic compaction heartbeat with invalid task id
         compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
@@ -529,7 +524,7 @@ mod tests {
             num_progress_key: 100,
             ..Default::default()
         }]);
-        assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);
+        assert_eq!(compactor_manager.get_expired_tasks(None).len(), 1);
 
         // Mimic effective compaction heartbeat
         compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
@@ -539,7 +534,7 @@ mod tests {
             num_progress_key: 100,
             ..Default::default()
         }]);
-        assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 0);
+        assert_eq!(compactor_manager.get_expired_tasks(None).len(), 0);
 
         // Test add
         assert_eq!(compactor_manager.compactor_num(), 0);
diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs
index 717724823a190..48da66ef5e235 100644
--- a/src/meta/src/hummock/manager/mod.rs
+++ b/src/meta/src/hummock/manager/mod.rs
@@ -2186,7 +2186,7 @@ impl HummockManager {
                 GroupSplit,
                 CheckDeadTask,
                 Report,
-                CompactionHeartBeatExpiredCheck,
+                CompactionHeartBeat,
 
                 DynamicCompactionTrigger,
                 SpaceReclaimCompactionTrigger,
@@ -2218,7 +2218,7 @@ impl HummockManager {
                 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
             compaction_heartbeat_interval.reset();
             let compaction_heartbeat_trigger = IntervalStream::new(compaction_heartbeat_interval)
-                .map(|_| HummockTimerEvent::CompactionHeartBeatExpiredCheck);
+                .map(|_| HummockTimerEvent::CompactionHeartBeat);
 
             let mut min_trigger_interval = tokio::time::interval(Duration::from_secs(
                 hummock_manager.env.opts.periodic_compaction_interval_sec,
@@ -2397,17 +2397,27 @@ impl HummockManager {
                                     }
                                 }
 
-                                HummockTimerEvent::CompactionHeartBeatExpiredCheck => {
+                                HummockTimerEvent::CompactionHeartBeat => {
                                     let compactor_manager =
                                         hummock_manager.compactor_manager.clone();
 
                                     // TODO: add metrics to track expired tasks
+                                    const INTERVAL_SEC: u64 = 30;
                                     // The cancel task has two paths
                                     // 1. compactor heartbeat cancels the expired task based on task
                                     // progress (meta + compactor)
                                     // 2. meta periodically scans the task and performs a cancel on
                                     // the meta side for tasks that are not updated by heartbeat
-                                    for task in compactor_manager.get_heartbeat_expired_tasks() {
+
+                                    // So the reason for setting Interval is to let compactor be
+                                    // responsible for canceling the corresponding task as much as
+                                    // possible by relaxing the conditions for detection on the meta
+                                    // side, and meta is just used as a last resort to clean up the
+                                    // tasks that compactor has expired.
+
+                                    for task in
+                                        compactor_manager.get_expired_tasks(Some(INTERVAL_SEC))
+                                    {
                                         if let Err(e) = hummock_manager
                                             .cancel_compact_task(
                                                 task.task_id,
diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs
index 3cc909fb28f26..00bc354b5f615 100644
--- a/src/meta/src/manager/env.rs
+++ b/src/meta/src/manager/env.rs
@@ -187,7 +187,6 @@ pub struct MetaOpts {
     pub min_table_split_write_throughput: u64,
 
     pub compaction_task_max_heartbeat_interval_secs: u64,
-    pub compaction_task_max_progress_interval_secs: u64,
     pub compaction_config: Option<CompactionConfig>,
 
     /// The size limit to split a state-table to independent sstable.
@@ -253,7 +252,6 @@ impl MetaOpts {
             do_not_config_object_storage_lifecycle: true,
             partition_vnode_count: 32,
             compaction_task_max_heartbeat_interval_secs: 0,
-            compaction_task_max_progress_interval_secs: 1,
             compaction_config: None,
             cut_table_size_limit: 1024 * 1024 * 1024,
             hybird_partition_vnode_count: 4,
diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs
index 1a5f8c94cd110..073c5b5119319 100644
--- a/src/storage/benches/bench_compactor.rs
+++ b/src/storage/benches/bench_compactor.rs
@@ -204,6 +204,7 @@ async fn compact<I: HummockIterator<Direction = Forward>>(iter: I, sstable_store
         Arc::new(CompactorMetrics::unused()),
         iter,
         DummyCompactionFilter,
+        None,
     )
     .await
     .unwrap();
diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs
index 49dd1b6b23abb..82ca243ed0836 100644
--- a/src/storage/src/hummock/compactor/compactor_runner.rs
+++ b/src/storage/src/hummock/compactor/compactor_runner.rs
@@ -32,7 +32,6 @@ use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
 use risingwave_pb::hummock::{BloomFilterType, CompactTask, LevelType};
 use tokio::sync::oneshot::Receiver;
 
-use super::iterator::MonitoredCompactorIterator;
 use super::task_progress::TaskProgress;
 use super::{check_compaction_result, CompactionStatistics, TaskConfig};
 use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorManager};
@@ -232,10 +231,7 @@ impl CompactorRunner {
         // in https://github.com/risingwavelabs/risingwave/issues/13148
         Ok((
             SkipWatermarkIterator::from_safe_epoch_watermarks(
-                MonitoredCompactorIterator::new(
-                    UnorderedMergeIteratorInner::for_compactor(table_iters),
-                    task_progress.clone(),
-                ),
+                UnorderedMergeIteratorInner::for_compactor(table_iters),
                 &self.compact_task.table_watermarks,
             ),
             CompactionDeleteRangeIterator::new(del_iter),
@@ -710,6 +706,7 @@ pub async fn compact_and_build_sst<F>(
     compactor_metrics: Arc<CompactorMetrics>,
     mut iter: impl HummockIterator<Direction = Forward>,
     mut compaction_filter: impl CompactionFilter,
+    task_progress: Option<Arc<TaskProgress>>,
 ) -> HummockResult<CompactionStatistics>
 where
     F: TableBuilderFactory,
@@ -753,7 +750,18 @@ where
     let mut last_table_stats = TableStats::default();
     let mut last_table_id = None;
     let mut compaction_statistics = CompactionStatistics::default();
+    let mut progress_key_num: u64 = 0;
+    const PROGRESS_KEY_INTERVAL: u64 = 100;
     while iter.is_valid() {
+        progress_key_num += 1;
+
+        if let Some(task_progress) = task_progress.as_ref()
+            && progress_key_num >= PROGRESS_KEY_INTERVAL
+        {
+            task_progress.inc_progress_key(progress_key_num);
+            progress_key_num = 0;
+        }
+
         let mut iter_key = iter.key();
         compaction_statistics.iter_total_key_counts += 1;
 
@@ -799,6 +807,14 @@ where
                     })
                     .await?;
             }
+
+            progress_key_num += 1;
+            if let Some(task_progress) = task_progress.as_ref()
+                && progress_key_num >= PROGRESS_KEY_INTERVAL
+            {
+                task_progress.inc_progress_key(progress_key_num);
+                progress_key_num = 0;
+            }
         }
 
         let earliest_range_delete_which_can_see_iter_key = del_iter.earliest_delete_since(epoch);
@@ -902,8 +918,23 @@ where
                     event_key,
                 })
                 .await?;
+            progress_key_num += 1;
+            if let Some(task_progress) = task_progress.as_ref()
+                && progress_key_num >= PROGRESS_KEY_INTERVAL
+            {
+                task_progress.inc_progress_key(progress_key_num);
+                progress_key_num = 0;
+            }
         }
     }
+
+    if let Some(task_progress) = task_progress.as_ref()
+        && progress_key_num > 0
+    {
+        // Avoid losing the progress_key_num in the last Interval
+        task_progress.inc_progress_key(progress_key_num);
+    }
+
     if let Some(last_table_id) = last_table_id.take() {
         table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
     }
diff --git a/src/storage/src/hummock/compactor/fast_compactor_runner.rs b/src/storage/src/hummock/compactor/fast_compactor_runner.rs
index 828a9750aa41c..653a00f21b7c2 100644
--- a/src/storage/src/hummock/compactor/fast_compactor_runner.rs
+++ b/src/storage/src/hummock/compactor/fast_compactor_runner.rs
@@ -159,7 +159,9 @@ impl BlockStreamIterator {
 
 impl Drop for BlockStreamIterator {
     fn drop(&mut self) {
-        self.task_progress.dec_num_pending_read_io();
+        self.task_progress
+            .num_pending_read_io
+            .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
     }
 }
 
@@ -240,7 +242,9 @@ impl ConcatSstableIterator {
                 .await?;
             let stats_ptr = self.stats.remote_io_time.clone();
             let now = Instant::now();
-            self.task_progress.inc_num_pending_read_io();
+            self.task_progress
+                .num_pending_read_io
+                .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
             let block_stream = self
                 .sstable_store
                 .get_stream_for_blocks(sstable.value().id, &sstable.value().meta.block_metas)
diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs
index fd2a9b1086e6f..ec115c8ea50f1 100644
--- a/src/storage/src/hummock/compactor/iterator.rs
+++ b/src/storage/src/hummock/compactor/iterator.rs
@@ -34,8 +34,6 @@ use crate::hummock::value::HummockValue;
 use crate::hummock::{BlockHolder, BlockIterator, BlockMeta, HummockResult};
 use crate::monitor::StoreLocalStatistic;
 
-const PROGRESS_KEY_INTERVAL: usize = 100;
-
 /// Iterates over the KV-pairs of an SST while downloading it.
 pub struct SstableStreamIterator {
     sstable_store: SstableStoreRef,
@@ -254,7 +252,9 @@ impl SstableStreamIterator {
 
 impl Drop for SstableStreamIterator {
     fn drop(&mut self) {
-        self.task_progress.dec_num_pending_read_io()
+        self.task_progress
+            .num_pending_read_io
+            .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
     }
 }
 
@@ -396,7 +396,9 @@ impl ConcatSstableIterator {
             if start_index >= end_index {
                 found = false;
             } else {
-                self.task_progress.inc_num_pending_read_io();
+                self.task_progress
+                    .num_pending_read_io
+                    .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
                 let mut sstable_iter = SstableStreamIterator::new(
                     sstable.value().meta.block_metas.clone(),
                     table_info.clone(),
@@ -488,67 +490,6 @@ impl HummockIterator for ConcatSstableIterator {
     }
 }
 
-pub struct MonitoredCompactorIterator<I> {
-    inner: I,
-    task_progress: Arc<TaskProgress>,
-
-    processed_key_num: usize,
-}
-
-impl<I: HummockIterator<Direction = Forward>> MonitoredCompactorIterator<I> {
-    pub fn new(inner: I, task_progress: Arc<TaskProgress>) -> Self {
-        Self {
-            inner,
-            task_progress,
-            processed_key_num: 0,
-        }
-    }
-}
-
-impl<I: HummockIterator<Direction = Forward>> HummockIterator for MonitoredCompactorIterator<I> {
-    type Direction = Forward;
-
-    async fn next(&mut self) -> HummockResult<()> {
-        self.inner.next().await?;
-        self.processed_key_num += 1;
-
-        if self.processed_key_num % PROGRESS_KEY_INTERVAL == 0 {
-            self.task_progress
-                .inc_progress_key(PROGRESS_KEY_INTERVAL as _);
-        }
-
-        Ok(())
-    }
-
-    fn key(&self) -> FullKey<&[u8]> {
-        self.inner.key()
-    }
-
-    fn value(&self) -> HummockValue<&[u8]> {
-        self.inner.value()
-    }
-
-    fn is_valid(&self) -> bool {
-        self.inner.is_valid()
-    }
-
-    async fn rewind(&mut self) -> HummockResult<()> {
-        self.processed_key_num = 0;
-        self.inner.rewind().await?;
-        Ok(())
-    }
-
-    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
-        self.processed_key_num = 0;
-        self.inner.seek(key).await?;
-        Ok(())
-    }
-
-    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
-        self.inner.collect_local_statistic(stats)
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use std::cmp::Ordering;
diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs
index 1c0c6f78c35ff..126477cf4d84b 100644
--- a/src/storage/src/hummock/compactor/mod.rs
+++ b/src/storage/src/hummock/compactor/mod.rs
@@ -226,7 +226,9 @@ impl Compactor {
             rets.push(ret);
             if let Some(tracker) = &task_progress {
                 tracker.inc_ssts_uploaded();
-                tracker.dec_num_pending_write_io();
+                tracker
+                    .num_pending_write_io
+                    .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
             }
             if is_share_buffer_compact {
                 metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
@@ -274,6 +276,7 @@ impl Compactor {
             self.context.compactor_metrics.clone(),
             iter,
             compaction_filter,
+            task_progress,
         )
         .verbose_instrument_await("compact_and_build_sst")
         .await?;
@@ -761,7 +764,15 @@ fn get_task_progress(
 ) -> Vec<CompactTaskProgress> {
     let mut progress_list = Vec::new();
     for (&task_id, progress) in &*task_progress.lock() {
-        progress_list.push(progress.snapshot(task_id));
+        progress_list.push(CompactTaskProgress {
+            task_id,
+            num_ssts_sealed: progress.num_ssts_sealed.load(Ordering::Relaxed),
+            num_ssts_uploaded: progress.num_ssts_uploaded.load(Ordering::Relaxed),
+            num_progress_key: progress.num_progress_key.load(Ordering::Relaxed),
+            num_pending_read_io: progress.num_pending_read_io.load(Ordering::Relaxed) as u64,
+            num_pending_write_io: progress.num_pending_write_io.load(Ordering::Relaxed) as u64,
+            ..Default::default()
+        });
     }
     progress_list
 }
diff --git a/src/storage/src/hummock/compactor/task_progress.rs b/src/storage/src/hummock/compactor/task_progress.rs
index 81413ae2a8d7d..2c85f150cabef 100644
--- a/src/storage/src/hummock/compactor/task_progress.rs
+++ b/src/storage/src/hummock/compactor/task_progress.rs
@@ -18,18 +18,17 @@ use std::sync::Arc;
 
 use parking_lot::Mutex;
 use risingwave_hummock_sdk::HummockCompactionTaskId;
-use risingwave_pb::hummock::CompactTaskProgress;
 
 pub type TaskProgressManagerRef = Arc<Mutex<HashMap<HummockCompactionTaskId, Arc<TaskProgress>>>>;
 
 /// The progress of a compaction task.
 #[derive(Default)]
 pub struct TaskProgress {
-    num_ssts_sealed: AtomicU32,
-    num_ssts_uploaded: AtomicU32,
-    num_progress_key: AtomicU64,
-    num_pending_read_io: AtomicUsize,
-    num_pending_write_io: AtomicUsize,
+    pub num_ssts_sealed: AtomicU32,
+    pub num_ssts_uploaded: AtomicU32,
+    pub num_progress_key: AtomicU64,
+    pub num_pending_read_io: AtomicUsize,
+    pub num_pending_write_io: AtomicUsize,
 }
 
 impl TaskProgress {
@@ -45,34 +44,6 @@ impl TaskProgress {
         self.num_progress_key
             .fetch_add(inc_key_num, Ordering::Relaxed);
     }
-
-    pub fn inc_num_pending_read_io(&self) {
-        self.num_pending_read_io.fetch_add(1, Ordering::SeqCst);
-    }
-
-    pub fn inc_num_pending_write_io(&self) {
-        self.num_pending_write_io.fetch_add(1, Ordering::SeqCst);
-    }
-
-    pub fn dec_num_pending_read_io(&self) {
-        self.num_pending_read_io.fetch_sub(1, Ordering::SeqCst);
-    }
-
-    pub fn dec_num_pending_write_io(&self) {
-        self.num_pending_write_io.fetch_sub(1, Ordering::SeqCst);
-    }
-
-    pub fn snapshot(&self, task_id: u64) -> CompactTaskProgress {
-        CompactTaskProgress {
-            task_id,
-            num_ssts_sealed: self.num_ssts_sealed.load(Ordering::Relaxed),
-            num_ssts_uploaded: self.num_ssts_uploaded.load(Ordering::Relaxed),
-            num_pending_read_io: self.num_pending_read_io.load(Ordering::Relaxed) as u64,
-            num_pending_write_io: self.num_pending_write_io.load(Ordering::Relaxed) as u64,
-            num_progress_key: self.num_progress_key.load(Ordering::Relaxed),
-            ..Default::default()
-        }
-    }
 }
 
 /// An RAII object that contains a [`TaskProgress`] and shares it to all the splits of the task.
diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs
index 786d3eec6a0ba..42a19866fc467 100644
--- a/src/storage/src/hummock/sstable/multi_builder.rs
+++ b/src/storage/src/hummock/sstable/multi_builder.rs
@@ -147,7 +147,9 @@ where
     ) -> HummockResult<bool> {
         if self.current_builder.is_none() {
             if let Some(progress) = &self.task_progress {
-                progress.inc_num_pending_write_io()
+                progress
+                    .num_pending_write_io
+                    .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
             }
             let builder = self.builder_factory.open_builder().await?;
             self.current_builder = Some(builder);
@@ -216,7 +218,9 @@ where
 
         if self.current_builder.is_none() {
             if let Some(progress) = &self.task_progress {
-                progress.inc_num_pending_write_io();
+                progress
+                    .num_pending_write_io
+                    .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
             }
             let mut builder = self.builder_factory.open_builder().await?;
             // If last_range_tombstone_epoch is not MAX, it means that we cut one range-tombstone to
@@ -311,7 +315,9 @@ where
             }
 
             if let Some(progress) = &self.task_progress {
-                progress.inc_num_pending_write_io();
+                progress
+                    .num_pending_write_io
+                    .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
             }
             let builder = self.builder_factory.open_builder().await?;
             self.current_builder = Some(builder);