From 3da8d1431981cd29b475114a193b700f8775ef28 Mon Sep 17 00:00:00 2001 From: Li0k Date: Tue, 6 Feb 2024 17:01:32 +0800 Subject: [PATCH] fix(storage): fix compactor oom 0206 (#15023) --- .../hummock_test/src/compactor_tests.rs | 12 ++--- .../hummock_test/src/sync_point_tests.rs | 2 +- .../src/hummock/compactor/compactor_runner.rs | 45 +++++++++++++------ src/storage/src/hummock/compactor/mod.rs | 6 +-- src/storage/src/hummock/sstable/builder.rs | 3 +- 5 files changed, 42 insertions(+), 26 deletions(-) diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index a634bcafc6ff1..fc6debe8183c8 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -296,7 +296,7 @@ pub(crate) mod tests { compact_task.current_epoch_time = 0; let (_tx, rx) = tokio::sync::oneshot::channel(); - let (result_task, task_stats) = compact( + let ((result_task, task_stats), _) = compact( compact_ctx.clone(), compact_task.clone(), rx, @@ -452,7 +452,7 @@ pub(crate) mod tests { { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); - let (result_task, task_stats) = compact( + let ((result_task, task_stats), _) = compact( compact_ctx.clone(), compact_task.clone(), rx, @@ -784,7 +784,7 @@ pub(crate) mod tests { // 4. compact let (_tx, rx) = tokio::sync::oneshot::channel(); - let (result_task, task_stats) = compact( + let ((result_task, task_stats), _) = compact( compact_ctx, compact_task.clone(), rx, @@ -978,7 +978,7 @@ pub(crate) mod tests { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); - let (result_task, task_stats) = compact( + let ((result_task, task_stats), _) = compact( compact_ctx, compact_task.clone(), rx, @@ -1166,7 +1166,7 @@ pub(crate) mod tests { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); - let (result_task, task_stats) = compact( + let ((result_task, task_stats), _) = compact( compact_ctx, compact_task.clone(), rx, @@ -1336,7 +1336,7 @@ pub(crate) mod tests { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); - let (result_task, task_stats) = compact( + let ((result_task, task_stats), _) = compact( compact_ctx, compact_task.clone(), rx, diff --git a/src/storage/hummock_test/src/sync_point_tests.rs b/src/storage/hummock_test/src/sync_point_tests.rs index b41d2d0bd8527..b70df9184e054 100644 --- a/src/storage/hummock_test/src/sync_point_tests.rs +++ b/src/storage/hummock_test/src/sync_point_tests.rs @@ -200,7 +200,7 @@ pub async fn compact_once( compact_task.compaction_filter_mask = compaction_filter_flag.bits(); // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); - let (result_task, task_stats) = compact( + let ((result_task, task_stats), _) = compact( compact_ctx, compact_task.clone(), rx, diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index ade78acc7ff3b..710a6fb3c4a65 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -49,6 +49,7 @@ use crate::hummock::iterator::{ Forward, ForwardMergeRangeIterator, HummockIterator, MergeIterator, SkipWatermarkIterator, }; use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory}; +use crate::hummock::utils::MemoryTracker; use crate::hummock::value::HummockValue; use crate::hummock::{ BlockedXor16FilterBuilder, CachePolicy, CompactionDeleteRangeIterator, CompressionAlgorithm, @@ -251,7 +252,10 @@ pub async fn compact( mut shutdown_rx: Receiver<()>, object_id_getter: Box, filter_key_extractor_manager: FilterKeyExtractorManager, -) -> (CompactTask, HashMap) { +) -> ( + (CompactTask, HashMap), + Option, +) { let context = compactor_context.clone(); let group_label = compact_task.compaction_group_id.to_string(); let cur_level_label = compact_task.input_ssts[0].level_idx.to_string(); @@ -330,7 +334,10 @@ pub async fn compact( Err(e) => { tracing::error!(error = %e.as_report(), "Failed to fetch filter key extractor tables [{:?}], it may caused by some RPC error", compact_task.existing_table_ids); let task_status = TaskStatus::ExecuteFailed; - return compact_done(compact_task, context.clone(), vec![], task_status); + return ( + compact_done(compact_task, context.clone(), vec![], task_status), + None, + ); } Ok(extractor) => extractor, }; @@ -344,7 +351,10 @@ pub async fn compact( if !removed_tables.is_empty() { tracing::error!("Failed to fetch filter key extractor tables [{:?}. [{:?}] may be removed by meta-service. ", compact_table_ids, removed_tables); let task_status = TaskStatus::ExecuteFailed; - return compact_done(compact_task, context.clone(), vec![], task_status); + return ( + compact_done(compact_task, context.clone(), vec![], task_status), + None, + ); } } @@ -410,7 +420,10 @@ pub async fn compact( Err(e) => { tracing::warn!(error = %e.as_report(), "Failed to generate_splits"); task_status = TaskStatus::ExecuteFailed; - return compact_done(compact_task, context.clone(), vec![], task_status); + return ( + compact_done(compact_task, context.clone(), vec![], task_status), + None, + ); } } } @@ -426,11 +439,14 @@ pub async fn compact( context.running_task_parallelism.load(Ordering::Relaxed), context.max_task_parallelism.load(Ordering::Relaxed), ); - return compact_done( - compact_task, - context.clone(), - vec![], - TaskStatus::NoAvailCpuResourceCanceled, + return ( + compact_done( + compact_task, + context.clone(), + vec![], + TaskStatus::NoAvailCpuResourceCanceled, + ), + None, ); } @@ -488,7 +504,10 @@ pub async fn compact( context.memory_limiter.quota() ); task_status = TaskStatus::NoAvailMemoryResourceCanceled; - return compact_done(compact_task, context.clone(), output_ssts, task_status); + return ( + compact_done(compact_task, context.clone(), output_ssts, task_status), + memory_detector, + ); } context.compactor_metrics.compact_task_pending_num.inc(); @@ -546,7 +565,7 @@ pub async fn compact( cost_time, compact_task_to_string(&compact_task) ); - return (compact_task, table_stats); + return ((compact_task, table_stats), memory_detector); } for (split_index, _) in compact_task.splits.iter().enumerate() { let filter = multi_filter.clone(); @@ -619,8 +638,6 @@ pub async fn compact( } } - drop(memory_detector); - if task_status != TaskStatus::Success { for abort_handle in abort_handles { abort_handle.abort(); @@ -641,7 +658,7 @@ pub async fn compact( cost_time, compact_task_to_string(&compact_task) ); - (compact_task, table_stats) + ((compact_task, table_stats), memory_detector) } /// Fills in the compact task and tries to report the task result to meta node. diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index 33710e95d47d9..b71231ed162a0 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -464,7 +464,7 @@ pub fn start_compactor( let (tx, rx) = tokio::sync::oneshot::channel(); let task_id = compact_task.task_id; shutdown.lock().unwrap().insert(task_id, tx); - let (compact_task, table_stats) = match sstable_object_id_manager.add_watermark_object_id(None).await + let ((compact_task, table_stats), _memory_tracker) = match sstable_object_id_manager.add_watermark_object_id(None).await { Ok(tracker_id) => { let sstable_object_id_manager_clone = sstable_object_id_manager.clone(); @@ -480,7 +480,7 @@ pub fn start_compactor( tracing::warn!(error = %err.as_report(), "Failed to track pending SST object id"); let mut compact_task = compact_task; compact_task.set_task_status(TaskStatus::TrackSstObjectIdFailed); - (compact_task, HashMap::default()) + ((compact_task, HashMap::default()),None) } }; shutdown.lock().unwrap().remove(&task_id); @@ -669,7 +669,7 @@ pub fn start_shared_compactor( let task_id = compact_task.task_id; shutdown.lock().unwrap().insert(task_id, tx); - let (compact_task, table_stats) = compactor_runner::compact( + let ((compact_task, table_stats), _memory_tracker)= compactor_runner::compact( context.clone(), compact_task, rx, diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index 4a8573ea19ecf..4fe331f677321 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -356,8 +356,7 @@ impl SstableBuilder { if !self.block_builder.is_empty() { self.build_block().await?; } - } else if is_new_user_key - && self.block_builder.approximate_len() >= self.options.block_capacity + } else if self.block_builder.approximate_len() >= self.options.block_capacity && could_switch_block { self.build_block().await?;