diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index a8661dab77371..12b11ba60eddb 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -2612,22 +2612,7 @@ impl HummockManager { // progress (meta + compactor) // 2. meta periodically scans the task and performs a cancel on // the meta side for tasks that are not updated by heartbeat - // for task in compactor_manager.get_heartbeat_expired_tasks() { - // if let Err(e) = hummock_manager - // .cancel_compact_task( - // task.task_id, - // TaskStatus::HeartbeatCanceled, - // ) - // .await - // { - // tracing::error!( - // task_id = task.task_id, - // error = %e.as_report(), - // "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat - // until we can successfully report its status", - // ); - // } - // } + let expired_tasks: Vec = compactor_manager .get_heartbeat_expired_tasks() .into_iter() @@ -3048,38 +3033,6 @@ impl HummockManager { context_id ); } - - // for task in cancel_tasks { - // tracing::info!( - // "Task cancel with group_id {} task_id {} with context_id {} has expired due to lack of visible progress", - // task.compaction_group_id, - // task.task_id, - // context_id, - // ); - - // if let Err(e) = - // hummock_manager - // .cancel_compact_task(task.task_id, TaskStatus::HeartbeatProgressCanceled) - // .await - // { - // tracing::error!( - // task_id = task.task_id, - // error = %e.as_report(), - // "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat - // until we can successfully report its status." - // ); - // } - - // // Forcefully cancel the task so that it terminates - // // early on the compactor - // // node. - // let _ = compactor.cancel_task(task.task_id); - // tracing::info!( - // "CancelTask operation for task_id {} has been sent to node with context_id {}", - // context_id, - // task.task_id - // ); - // } } else { // Determine the validity of the compactor streaming rpc. When the compactor no longer exists in the manager, the stream will be removed. // Tip: Connectivity to the compactor will be determined through the `send_event` operation. When send fails, it will be removed from the manager diff --git a/src/storage/hummock_sdk/src/compact.rs b/src/storage/hummock_sdk/src/compact.rs index 0fb76d1c9b4a8..d6824ab646c43 100644 --- a/src/storage/hummock_sdk/src/compact.rs +++ b/src/storage/hummock_sdk/src/compact.rs @@ -185,7 +185,6 @@ pub fn estimate_memory_for_compact_task( block_size: u64, recv_buffer_size: u64, sst_capacity: u64, - support_streaming_upload: bool, ) -> u64 { let mut result = 0; // When building the SstableStreamIterator, sstable_syncable will fetch the SstableMeta and seek @@ -223,13 +222,9 @@ pub fn estimate_memory_for_compact_task( // output // builder will maintain SstableInfo + block_builder(block) + writer (block to vec) let estimated_meta_size = sst_capacity * task_max_sst_meta_ratio / 100; - // if support_streaming_upload { - // result += estimated_meta_size + 2 * block_size - // } else { - // result += estimated_meta_size + sst_capacity; // Use sst_capacity to avoid BatchUploader - // // memory bursts. - // } + // FIXME: sst_capacity is the upper bound of the memory usage of the streaming sstable uploader + // A more reasonable memory limit method needs to be adopted, this is just a temporary fix. result += estimated_meta_size + sst_capacity; result diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 05b3d7dd182d9..41a55518158de 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -383,7 +383,6 @@ pub async fn compact( .object_store_recv_buffer_size .unwrap_or(6 * 1024 * 1024) as u64, capacity as u64, - context.sstable_store.store().support_streaming_upload(), ) * compact_task.splits.len() as u64; tracing::info!( diff --git a/src/storage/src/hummock/compactor/fast_compactor_runner.rs b/src/storage/src/hummock/compactor/fast_compactor_runner.rs index 7a30a6fd2ef65..5eb057392b436 100644 --- a/src/storage/src/hummock/compactor/fast_compactor_runner.rs +++ b/src/storage/src/hummock/compactor/fast_compactor_runner.rs @@ -505,17 +505,26 @@ impl CompactorRunner { ); let statistic = self.executor.take_statistics(); - let outputs = self.executor.builder.finish().await?; - let ssts = Compactor::report_progress( + let output_ssts = self + .executor + .builder + .finish() + .await? + .into_iter() + .map(|split_table_output| split_table_output.sst_info) + .collect(); + Compactor::report_progress( self.metrics.clone(), Some(self.executor.task_progress.clone()), - outputs, + &output_ssts, false, - ) - .await?; - let sst_infos = ssts.iter().map(|sst| sst.sst_info.clone()).collect_vec(); + ); + let sst_infos = output_ssts + .iter() + .map(|sst| sst.sst_info.clone()) + .collect_vec(); assert!(can_concat(&sst_infos)); - Ok((ssts, statistic)) + Ok((output_ssts, statistic)) } } diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index a94c2b51e5017..a10fdaf48f860 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -86,8 +86,8 @@ use crate::hummock::iterator::{Forward, HummockIterator}; use crate::hummock::multi_builder::SplitTableOutput; use crate::hummock::vacuum::Vacuum; use crate::hummock::{ - validate_ssts, BlockedXor16FilterBuilder, FilterBuilder, HummockError, - SharedComapctorObjectIdManager, SstableWriterFactory, UnifiedSstableWriterFactory, + validate_ssts, BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, + SstableWriterFactory, UnifiedSstableWriterFactory, }; use crate::monitor::CompactorMetrics; @@ -175,13 +175,17 @@ impl Compactor { compact_timer.observe_duration(); - let ssts = Self::report_progress( + let ssts = split_table_outputs + .into_iter() + .map(|x| x.sst_info) + .collect(); + + Self::report_progress( self.context.compactor_metrics.clone(), task_progress, - split_table_outputs, + &ssts, self.context.is_share_buffer_compact, - ) - .await?; + ); self.context .compactor_metrics @@ -204,27 +208,14 @@ impl Compactor { Ok((ssts, table_stats_map)) } - pub async fn report_progress( + pub fn report_progress( metrics: Arc, task_progress: Option>, - split_table_outputs: Vec, + ssts: &Vec, is_share_buffer_compact: bool, - ) -> HummockResult> { - let mut ssts = Vec::with_capacity(split_table_outputs.len()); - // let mut rets = vec![]; - - for SplitTableOutput { - sst_info, - // upload_join_handle, - } in split_table_outputs - { + ) { + for sst_info in ssts { let sst_size = sst_info.file_size(); - ssts.push(sst_info); - // let ret = upload_join_handle - // .verbose_instrument_await("upload") - // .await - // .map_err(HummockError::sstable_upload_error); - // rets.push(ret); if let Some(tracker) = &task_progress { tracker.inc_ssts_uploaded(); tracker.dec_num_pending_write_io(); @@ -235,10 +226,6 @@ impl Compactor { metrics.compaction_upload_sst_counts.inc(); } } - // for ret in rets { - // ret??; - // } - Ok(ssts) } async fn compact_key_range_impl( diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs index ec6ff4d52833c..c09d04868591c 100644 --- a/src/storage/src/hummock/sstable/multi_builder.rs +++ b/src/storage/src/hummock/sstable/multi_builder.rs @@ -45,7 +45,6 @@ pub trait TableBuilderFactory { pub struct SplitTableOutput { pub sst_info: LocalSstableInfo, - // pub upload_join_handle: UploadJoinHandle, } /// A wrapper for [`SstableBuilder`] which automatically split key-value pairs into multiple tables, @@ -304,7 +303,6 @@ where } } - // TEST join upload handle before push builder_output .writer_output .verbose_instrument_await("upload") @@ -312,7 +310,6 @@ where .map_err(HummockError::sstable_upload_error)??; self.sst_outputs.push(SplitTableOutput { - // upload_join_handle: builder_output.writer_output, sst_info: builder_output.sst_info, }); }