diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index bf0fdceeb19b3..555710a13d466 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -493,7 +493,7 @@ async fn check_result< Ok(true) } -pub fn optimize_by_copy_block(compact_task: &CompactTask, context: CompactorContext) -> bool { +pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorContext) -> bool { let sstable_infos = compact_task .input_ssts .iter() @@ -534,7 +534,7 @@ pub fn optimize_by_copy_block(compact_task: &CompactTask, context: CompactorCont .iter() .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0)); - let mut compact_table_ids: HashSet = HashSet::from_iter( + let compact_table_ids: HashSet = HashSet::from_iter( compact_task .input_ssts .iter() @@ -558,7 +558,8 @@ pub fn optimize_by_copy_block(compact_task: &CompactTask, context: CompactorCont pub async fn generate_splits_for_task( compact_task: &mut CompactTask, - context: CompactorContext, + context: &CompactorContext, + optimize_by_copy_block: bool, ) -> bool { let sstable_infos = compact_task .input_ssts @@ -577,51 +578,6 @@ pub async fn generate_splits_for_task( .map(|table_info| table_info.file_size) .sum::(); - let all_ssts_are_blocked_filter = sstable_infos - .iter() - .all(|table_info| table_info.bloom_filter_kind() == BloomFilterType::Blocked); - - let delete_key_count = sstable_infos - .iter() - .map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count) - .sum::(); - let total_key_count = sstable_infos - .iter() - .map(|table_info| table_info.total_key_count) - .sum::(); - - let has_tombstone = compact_task - .input_ssts - .iter() - .flat_map(|level| level.table_infos.iter()) - .any(|sst| sst.range_tombstone_count > 0); - let has_ttl = compact_task - .table_options - .iter() - .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0)); - - let mut compact_table_ids = compact_task - .input_ssts - .iter() - .flat_map(|level| level.table_infos.iter()) - .flat_map(|sst| sst.table_ids.clone()) - .collect_vec(); - compact_table_ids.sort(); - compact_table_ids.dedup(); - let single_table = compact_table_ids.len() == 1; - - let optimize_by_copy_block = context.storage_opts.enable_fast_compaction - && all_ssts_are_blocked_filter - && !has_tombstone - && !has_ttl - && single_table - && compact_task.target_level > 0 - && compact_task.input_ssts.len() == 2 - && compaction_size < context.storage_opts.compactor_fast_max_compact_task_size - && delete_key_count * 100 - < context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count - && compact_task.task_type() == TaskType::Dynamic; - if !optimize_by_copy_block { match generate_splits(&sstable_infos, compaction_size, context.clone()).await { Ok(splits) => { @@ -633,18 +589,12 @@ pub async fn generate_splits_for_task( } Err(e) => { tracing::warn!(error = %e.as_report(), "Failed to generate_splits"); - // task_status = TaskStatus::ExecuteFailed; - // return ( - // compact_done(compact_task, context.clone(), vec![], task_status), - // None, - // ); - return false; } } } - return true; + true } pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorContext) { diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 54b39f3d4a198..9d7f45cbd08d2 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -365,7 +365,7 @@ pub async fn compact( .map(|table_info| table_info.file_size) .sum::(); - let optimize_by_copy_block = optimize_by_copy_block(&compact_task, context.clone()); + let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context); if !optimize_by_copy_block { match generate_splits(&sstable_infos, compaction_size, context.clone()).await { @@ -618,7 +618,7 @@ pub async fn compact( } /// Fills in the compact task and tries to report the task result to meta node. -fn compact_done( +pub(crate) fn compact_done( mut compact_task: CompactTask, context: CompactorContext, output_ssts: Vec, diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index 654581449b1d3..c81ed65d99cfc 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -80,7 +80,10 @@ use super::{ use crate::filter_key_extractor::{ FilterKeyExtractorImpl, FilterKeyExtractorManager, StaticFilterKeyExtractorManager, }; -use crate::hummock::compactor::compactor_runner::compact_and_build_sst; +use crate::hummock::compactor::compaction_utils::{ + generate_splits_for_task, optimize_by_copy_block, +}; +use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done}; use crate::hummock::iterator::{Forward, HummockIterator}; use crate::hummock::multi_builder::SplitTableOutput; use crate::hummock::vacuum::Vacuum; @@ -454,121 +457,215 @@ pub fn start_compactor( let meta_client = hummock_meta_client.clone(); let sstable_object_id_manager = sstable_object_id_manager.clone(); let filter_key_extractor_manager = filter_key_extractor_manager.clone(); - executor.spawn(async move { - match event { - ResponseEvent::CompactTask(compact_task) => { - let (tx, rx) = tokio::sync::oneshot::channel(); + + match event { + ResponseEvent::CompactTask(mut compact_task) => { + // let mut is_task_fail = false; + let optimize_by_copy_block = + optimize_by_copy_block(&compact_task, &context); + if !generate_splits_for_task( + &mut compact_task, + &context, + optimize_by_copy_block, + ) + .await + { + let (compact_task, table_stats) = compact_done( + compact_task, + context.clone(), + vec![], + TaskStatus::ExecuteFailed, + ); + if let Err(e) = + request_sender.send(SubscribeCompactionEventRequest { + event: Some(RequestEvent::ReportTask(ReportTask { + task_id: compact_task.task_id, + task_status: compact_task.task_status, + sorted_output_ssts: compact_task + .sorted_output_ssts + .clone(), + table_stats_change: to_prost_table_stats_map( + table_stats, + ), + })), + create_at: SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("Clock may have gone backwards") + .as_millis() + as u64, + }) + { let task_id = compact_task.task_id; - shutdown.lock().unwrap().insert(task_id, tx); - let ((compact_task, table_stats), _memory_tracker) = match sstable_object_id_manager.add_watermark_object_id(None).await + tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"); + } + + continue 'start_stream; + } + + let scopeguard = scopeguard::guard( + (compact_task.clone(), context.clone()), + |(compact_task, context)| { + context.running_task_parallelism.fetch_sub( + compact_task.splits.len() as u32, + Ordering::SeqCst, + ); + }, + ); + + executor.spawn(async move { + let _scopeguard = scopeguard; + let (tx, rx) = tokio::sync::oneshot::channel(); + let task_id = compact_task.task_id; + shutdown.lock().unwrap().insert(task_id, tx); + let ((compact_task, table_stats), _memory_tracker) = + match sstable_object_id_manager + .add_watermark_object_id(None) + .await { Ok(tracker_id) => { - let sstable_object_id_manager_clone = sstable_object_id_manager.clone(); + let sstable_object_id_manager_clone = + sstable_object_id_manager.clone(); let _guard = scopeguard::guard( (tracker_id, sstable_object_id_manager_clone), |(tracker_id, sstable_object_id_manager)| { - sstable_object_id_manager.remove_watermark_object_id(tracker_id); + sstable_object_id_manager + .remove_watermark_object_id(tracker_id); }, ); - compactor_runner::compact(context.clone(), compact_task, rx, Box::new(sstable_object_id_manager.clone()), filter_key_extractor_manager.clone()).await - }, + compactor_runner::compact( + context.clone(), + compact_task, + rx, + Box::new(sstable_object_id_manager.clone()), + filter_key_extractor_manager.clone(), + ) + .await + } Err(err) => { tracing::warn!(error = %err.as_report(), "Failed to track pending SST object id"); let mut compact_task = compact_task; - compact_task.set_task_status(TaskStatus::TrackSstObjectIdFailed); - ((compact_task, HashMap::default()),None) + compact_task.set_task_status( + TaskStatus::TrackSstObjectIdFailed, + ); + ((compact_task, HashMap::default()), None) } }; - shutdown.lock().unwrap().remove(&task_id); - - let enable_check_compaction_result = context.storage_opts.check_compaction_result; - let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status() == TaskStatus::Success; - if let Err(e) = request_sender.send(SubscribeCompactionEventRequest { - event: Some(RequestEvent::ReportTask( - ReportTask { - task_id: compact_task.task_id, - task_status: compact_task.task_status, - sorted_output_ssts: compact_task.sorted_output_ssts.clone(), - table_stats_change:to_prost_table_stats_map(table_stats), - } - )), + shutdown.lock().unwrap().remove(&task_id); + + if let Err(e) = + request_sender.send(SubscribeCompactionEventRequest { + event: Some(RequestEvent::ReportTask(ReportTask { + task_id: compact_task.task_id, + task_status: compact_task.task_status, + sorted_output_ssts: compact_task + .sorted_output_ssts + .clone(), + table_stats_change: to_prost_table_stats_map( + table_stats, + ), + })), create_at: SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .expect("Clock may have gone backwards") - .as_millis() as u64, - }) { - tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"); - } - if enable_check_compaction_result && need_check_task { - match check_compaction_result(&compact_task, context.clone()).await { - Err(e) => { - tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id); - }, - Ok(true) => (), - Ok(false) => { - panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task)); - } - } - } + .as_millis() + as u64, + }) + { + tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"); } - ResponseEvent::VacuumTask(vacuum_task) => { - match Vacuum::handle_vacuum_task( - context.sstable_store.clone(), - &vacuum_task.sstable_object_ids, - ) - .await{ - Ok(_) => { - Vacuum::report_vacuum_task(vacuum_task, meta_client).await; - } + + let enable_check_compaction_result = + context.storage_opts.check_compaction_result; + let need_check_task = !compact_task.sorted_output_ssts.is_empty() + && compact_task.task_status() == TaskStatus::Success; + + if enable_check_compaction_result && need_check_task { + match check_compaction_result(&compact_task, context.clone()) + .await + { Err(e) => { - tracing::warn!(error = %e.as_report(), "Failed to vacuum task") - } - } - } - ResponseEvent::FullScanTask(full_scan_task) => { - match Vacuum::handle_full_scan_task(full_scan_task, context.sstable_store.clone()).await { - Ok((object_ids, total_object_count, total_object_size)) => { - Vacuum::report_full_scan_task(object_ids, total_object_count, total_object_size, meta_client).await; + tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id); } - Err(e) => { - tracing::warn!(error = %e.as_report(), "Failed to iter object"); + Ok(true) => (), + Ok(false) => { + panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task)); } } } - ResponseEvent::ValidationTask(validation_task) => { - validate_ssts( - validation_task, - context.sstable_store.clone(), - ) - .await; + }); + } + ResponseEvent::VacuumTask(vacuum_task) => { + executor.spawn(async move { + match Vacuum::handle_vacuum_task( + context.sstable_store.clone(), + &vacuum_task.sstable_object_ids, + ) + .await + { + Ok(_) => { + Vacuum::report_vacuum_task(vacuum_task, meta_client).await; + } + Err(e) => { + tracing::warn!(error = %e.as_report(), "Failed to vacuum task") + } } - ResponseEvent::CancelCompactTask(cancel_compact_task) => { - if let Some(tx) = shutdown - .lock() - .unwrap() - .remove(&cancel_compact_task.task_id) - { - if tx.send(()).is_err() { - tracing::warn!( - "Cancellation of compaction task failed. task_id: {}", - cancel_compact_task.task_id - ); - } - } else { - tracing::warn!( - "Attempting to cancel non-existent compaction task. task_id: {}", - cancel_compact_task.task_id - ); + }); + } + ResponseEvent::FullScanTask(full_scan_task) => { + executor.spawn(async move { + match Vacuum::handle_full_scan_task( + full_scan_task, + context.sstable_store.clone(), + ) + .await + { + Ok((object_ids, total_object_count, total_object_size)) => { + Vacuum::report_full_scan_task( + object_ids, + total_object_count, + total_object_size, + meta_client, + ) + .await; + } + Err(e) => { + tracing::warn!(error = %e.as_report(), "Failed to iter object"); } } - - ResponseEvent::PullTaskAck(_pull_task_ack) => { - // set flag - pull_task_ack.store(true, Ordering::SeqCst); + }); + } + ResponseEvent::ValidationTask(validation_task) => { + executor.spawn(async move { + validate_ssts(validation_task, context.sstable_store.clone()) + .await; + }); + } + ResponseEvent::CancelCompactTask(cancel_compact_task) => { + if let Some(tx) = shutdown + .lock() + .unwrap() + .remove(&cancel_compact_task.task_id) + { + if tx.send(()).is_err() { + tracing::warn!( + "Cancellation of compaction task failed. task_id: {}", + cancel_compact_task.task_id + ); } + } else { + tracing::warn!( + "Attempting to cancel non-existent compaction task. task_id: {}", + cancel_compact_task.task_id + ); } - }); + } + + ResponseEvent::PullTaskAck(_pull_task_ack) => { + // set flag + pull_task_ack.store(true, Ordering::SeqCst); + } + } } Some(Err(e)) => { tracing::warn!("Failed to consume stream. {}", e.message()); @@ -792,4 +889,4 @@ fn get_task_progress( progress_list.push(progress.snapshot(task_id)); } progress_list -} \ No newline at end of file +}