From 884f2466a276b8a1b1b2be49baf145e85bffacc7 Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 18 Apr 2024 19:38:43 +0800 Subject: [PATCH] refactore(storage): refactor --- .../src/hummock/compactor/compaction_utils.rs | 203 +++++++++++++++++- .../src/hummock/compactor/compactor_runner.rs | 160 +++++--------- src/storage/src/hummock/compactor/mod.rs | 3 +- 3 files changed, 256 insertions(+), 110 deletions(-) diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index 9594d7295a03e..bf0fdceeb19b3 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -26,9 +26,12 @@ use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; use risingwave_hummock_sdk::table_stats::TableStatsMap; use risingwave_hummock_sdk::{can_concat, EpochWithGap, KeyComparator}; +use risingwave_pb::hummock::compact_task::TaskType; use risingwave_pb::hummock::{ - compact_task, CompactTask, KeyRange as KeyRange_vec, LevelType, SstableInfo, TableSchema, + compact_task, BloomFilterType, CompactTask, KeyRange as KeyRange_vec, LevelType, SstableInfo, + TableSchema, }; +use thiserror_ext::AsReport; use tokio::time::Instant; pub use super::context::CompactorContext; @@ -489,3 +492,201 @@ async fn check_result< } Ok(true) } + +pub fn optimize_by_copy_block(compact_task: &CompactTask, context: CompactorContext) -> bool { + let sstable_infos = compact_task + .input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .filter(|table_info| { + let table_ids = &table_info.table_ids; + table_ids + .iter() + .any(|table_id| compact_task.existing_table_ids.contains(table_id)) + }) + .cloned() + .collect_vec(); + let compaction_size = sstable_infos + .iter() + .map(|table_info| table_info.file_size) + .sum::(); + + let all_ssts_are_blocked_filter = sstable_infos + .iter() + .all(|table_info| table_info.bloom_filter_kind() == BloomFilterType::Blocked); + + let delete_key_count = sstable_infos + .iter() + .map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count) + .sum::(); + let total_key_count = sstable_infos + .iter() + .map(|table_info| table_info.total_key_count) + .sum::(); + + let has_tombstone = compact_task + .input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .any(|sst| sst.range_tombstone_count > 0); + let has_ttl = compact_task + .table_options + .iter() + .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0)); + + let mut compact_table_ids: HashSet = HashSet::from_iter( + compact_task + .input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .flat_map(|sst| sst.table_ids.clone()), + ); + let single_table = compact_table_ids.len() == 1; + + context.storage_opts.enable_fast_compaction + && all_ssts_are_blocked_filter + && !has_tombstone + && !has_ttl + && single_table + && compact_task.target_level > 0 + && compact_task.input_ssts.len() == 2 + && compaction_size < context.storage_opts.compactor_fast_max_compact_task_size + && delete_key_count * 100 + < context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count + && compact_task.task_type() == TaskType::Dynamic +} + +pub async fn generate_splits_for_task( + compact_task: &mut CompactTask, + context: CompactorContext, +) -> bool { + let sstable_infos = compact_task + .input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .filter(|table_info| { + let table_ids = &table_info.table_ids; + table_ids + .iter() + .any(|table_id| compact_task.existing_table_ids.contains(table_id)) + }) + .cloned() + .collect_vec(); + let compaction_size = sstable_infos + .iter() + .map(|table_info| table_info.file_size) + .sum::(); + + let all_ssts_are_blocked_filter = sstable_infos + .iter() + .all(|table_info| table_info.bloom_filter_kind() == BloomFilterType::Blocked); + + let delete_key_count = sstable_infos + .iter() + .map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count) + .sum::(); + let total_key_count = sstable_infos + .iter() + .map(|table_info| table_info.total_key_count) + .sum::(); + + let has_tombstone = compact_task + .input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .any(|sst| sst.range_tombstone_count > 0); + let has_ttl = compact_task + .table_options + .iter() + .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0)); + + let mut compact_table_ids = compact_task + .input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .flat_map(|sst| sst.table_ids.clone()) + .collect_vec(); + compact_table_ids.sort(); + compact_table_ids.dedup(); + let single_table = compact_table_ids.len() == 1; + + let optimize_by_copy_block = context.storage_opts.enable_fast_compaction + && all_ssts_are_blocked_filter + && !has_tombstone + && !has_ttl + && single_table + && compact_task.target_level > 0 + && compact_task.input_ssts.len() == 2 + && compaction_size < context.storage_opts.compactor_fast_max_compact_task_size + && delete_key_count * 100 + < context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count + && compact_task.task_type() == TaskType::Dynamic; + + if !optimize_by_copy_block { + match generate_splits(&sstable_infos, compaction_size, context.clone()).await { + Ok(splits) => { + if !splits.is_empty() { + compact_task.splits = splits; + } + + return true; + } + Err(e) => { + tracing::warn!(error = %e.as_report(), "Failed to generate_splits"); + // task_status = TaskStatus::ExecuteFailed; + // return ( + // compact_done(compact_task, context.clone(), vec![], task_status), + // None, + // ); + + return false; + } + } + } + + return true; +} + +pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorContext) { + let group_label = compact_task.compaction_group_id.to_string(); + let cur_level_label = compact_task.input_ssts[0].level_idx.to_string(); + let select_table_infos = compact_task + .input_ssts + .iter() + .filter(|level| level.level_idx != compact_task.target_level) + .flat_map(|level| level.table_infos.iter()) + .collect_vec(); + let target_table_infos = compact_task + .input_ssts + .iter() + .filter(|level| level.level_idx == compact_task.target_level) + .flat_map(|level| level.table_infos.iter()) + .collect_vec(); + let select_size = select_table_infos + .iter() + .map(|table| table.file_size) + .sum::(); + context + .compactor_metrics + .compact_read_current_level + .with_label_values(&[&group_label, &cur_level_label]) + .inc_by(select_size); + context + .compactor_metrics + .compact_read_sstn_current_level + .with_label_values(&[&group_label, &cur_level_label]) + .inc_by(select_table_infos.len() as u64); + + let target_level_read_bytes = target_table_infos.iter().map(|t| t.file_size).sum::(); + let next_level_label = compact_task.target_level.to_string(); + context + .compactor_metrics + .compact_read_next_level + .with_label_values(&[&group_label, next_level_label.as_str()]) + .inc_by(target_level_read_bytes); + context + .compactor_metrics + .compact_read_sstn_next_level + .with_label_values(&[&group_label, next_level_label.as_str()]) + .inc_by(target_table_infos.len() as u64); +} diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 46ccaf0ebb94d..54b39f3d4a198 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -30,8 +30,8 @@ use risingwave_hummock_sdk::table_stats::{add_table_stats_map, TableStats, Table use risingwave_hummock_sdk::{ can_concat, compact_task_output_to_string, HummockSstableObjectId, KeyComparator, }; -use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType}; -use risingwave_pb::hummock::{BloomFilterType, CompactTask, LevelType, SstableInfo}; +use risingwave_pb::hummock::compact_task::TaskStatus; +use risingwave_pb::hummock::{CompactTask, LevelType, SstableInfo}; use thiserror_ext::AsReport; use tokio::sync::oneshot::Receiver; @@ -41,6 +41,7 @@ use super::{CompactionStatistics, TaskConfig}; use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorManager}; use crate::hummock::compactor::compaction_utils::{ build_multi_compaction_filter, estimate_task_output_capacity, generate_splits, + metrics_report_for_task, optimize_by_copy_block, }; use crate::hummock::compactor::iterator::ConcatSstableIterator; use crate::hummock::compactor::task_progress::TaskProgressGuard; @@ -304,46 +305,7 @@ pub async fn compact( ) { let context = compactor_context.clone(); let group_label = compact_task.compaction_group_id.to_string(); - let cur_level_label = compact_task.input_ssts[0].level_idx.to_string(); - let select_table_infos = compact_task - .input_ssts - .iter() - .filter(|level| level.level_idx != compact_task.target_level) - .flat_map(|level| level.table_infos.iter()) - .collect_vec(); - let target_table_infos = compact_task - .input_ssts - .iter() - .filter(|level| level.level_idx == compact_task.target_level) - .flat_map(|level| level.table_infos.iter()) - .collect_vec(); - let select_size = select_table_infos - .iter() - .map(|table| table.file_size) - .sum::(); - context - .compactor_metrics - .compact_read_current_level - .with_label_values(&[&group_label, &cur_level_label]) - .inc_by(select_size); - context - .compactor_metrics - .compact_read_sstn_current_level - .with_label_values(&[&group_label, &cur_level_label]) - .inc_by(select_table_infos.len() as u64); - - let target_level_read_bytes = target_table_infos.iter().map(|t| t.file_size).sum::(); - let next_level_label = compact_task.target_level.to_string(); - context - .compactor_metrics - .compact_read_next_level - .with_label_values(&[&group_label, next_level_label.as_str()]) - .inc_by(target_level_read_bytes); - context - .compactor_metrics - .compact_read_sstn_next_level - .with_label_values(&[&group_label, next_level_label.as_str()]) - .inc_by(target_table_infos.len() as u64); + metrics_report_for_task(&compact_task, &context); let timer = context .compactor_metrics @@ -356,64 +318,34 @@ pub async fn compact( let multi_filter = build_multi_compaction_filter(&compact_task); - let mut compact_table_ids = compact_task - .input_ssts - .iter() - .flat_map(|level| level.table_infos.iter()) - .flat_map(|sst| sst.table_ids.clone()) - .collect_vec(); - compact_table_ids.sort(); - compact_table_ids.dedup(); - let single_table = compact_table_ids.len() == 1; - let existing_table_ids: HashSet = HashSet::from_iter(compact_task.existing_table_ids.clone()); let compact_table_ids = HashSet::from_iter( - compact_table_ids - .into_iter() + compact_task + .input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .flat_map(|sst| sst.table_ids.clone()) .filter(|table_id| existing_table_ids.contains(table_id)), ); - let multi_filter_key_extractor = match filter_key_extractor_manager - .acquire(compact_table_ids.clone()) - .await + + let multi_filter_key_extractor = match build_filter_key_extractor( + &compact_task, + filter_key_extractor_manager.clone(), + &compact_table_ids, + ) + .await { - Err(e) => { - tracing::error!(error = %e.as_report(), "Failed to fetch filter key extractor tables [{:?}], it may caused by some RPC error", compact_task.existing_table_ids); + Some(multi_filter_key_extractor) => multi_filter_key_extractor, + None => { let task_status = TaskStatus::ExecuteFailed; return ( compact_done(compact_task, context.clone(), vec![], task_status), None, ); } - Ok(extractor) => extractor, }; - if let FilterKeyExtractorImpl::Multi(multi) = &multi_filter_key_extractor { - let found_tables = multi.get_existing_table_ids(); - let removed_tables = compact_table_ids - .iter() - .filter(|table_id| !found_tables.contains(table_id)) - .collect_vec(); - if !removed_tables.is_empty() { - tracing::error!("Failed to fetch filter key extractor tables [{:?}. [{:?}] may be removed by meta-service. ", compact_table_ids, removed_tables); - let task_status = TaskStatus::ExecuteFailed; - return ( - compact_done(compact_task, context.clone(), vec![], task_status), - None, - ); - } - } - - let multi_filter_key_extractor = Arc::new(multi_filter_key_extractor); - let has_tombstone = compact_task - .input_ssts - .iter() - .flat_map(|level| level.table_infos.iter()) - .any(|sst| sst.range_tombstone_count > 0); - let has_ttl = compact_task - .table_options - .iter() - .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0)); let mut task_status = TaskStatus::Success; // skip sst related to non-existent able_id to reduce io let sstable_infos = compact_task @@ -432,29 +364,8 @@ pub async fn compact( .iter() .map(|table_info| table_info.file_size) .sum::(); - let all_ssts_are_blocked_filter = sstable_infos - .iter() - .all(|table_info| table_info.bloom_filter_kind() == BloomFilterType::Blocked); - let delete_key_count = sstable_infos - .iter() - .map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count) - .sum::(); - let total_key_count = sstable_infos - .iter() - .map(|table_info| table_info.total_key_count) - .sum::(); - let optimize_by_copy_block = context.storage_opts.enable_fast_compaction - && all_ssts_are_blocked_filter - && !has_tombstone - && !has_ttl - && single_table - && compact_task.target_level > 0 - && compact_task.input_ssts.len() == 2 - && compaction_size < context.storage_opts.compactor_fast_max_compact_task_size - && delete_key_count * 100 - < context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count - && compact_task.task_type() == TaskType::Dynamic; + let optimize_by_copy_block = optimize_by_copy_block(&compact_task, context.clone()); if !optimize_by_copy_block { match generate_splits(&sstable_infos, compaction_size, context.clone()).await { @@ -927,6 +838,39 @@ where Ok(compaction_statistics) } +async fn build_filter_key_extractor( + compact_task: &CompactTask, + filter_key_extractor_manager: FilterKeyExtractorManager, + compact_table_ids: &HashSet, +) -> Option> { + let multi_filter_key_extractor = match filter_key_extractor_manager + .acquire(compact_table_ids.clone()) + .await + { + Err(e) => { + tracing::error!(error = %e.as_report(), "Failed to fetch filter key extractor tables [{:?}], it may caused by some RPC error", compact_task.existing_table_ids); + return None; + } + Ok(extractor) => extractor, + }; + + if let FilterKeyExtractorImpl::Multi(multi) = &multi_filter_key_extractor { + let found_tables = multi.get_existing_table_ids(); + let removed_tables = compact_table_ids + .iter() + .filter(|table_id| !found_tables.contains(table_id)) + .collect_vec(); + if !removed_tables.is_empty() { + tracing::error!("Failed to fetch filter key extractor tables [{:?}. [{:?}] may be removed by meta-service. ", compact_table_ids, removed_tables); + return None; + } + } + + let multi_filter_key_extractor = Arc::new(multi_filter_key_extractor); + + Some(multi_filter_key_extractor) +} + #[cfg(test)] pub mod tests { use risingwave_hummock_sdk::can_concat; diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index c18828bc93e72..654581449b1d3 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -470,6 +470,7 @@ pub fn start_compactor( sstable_object_id_manager.remove_watermark_object_id(tracker_id); }, ); + compactor_runner::compact(context.clone(), compact_task, rx, Box::new(sstable_object_id_manager.clone()), filter_key_extractor_manager.clone()).await }, Err(err) => { @@ -791,4 +792,4 @@ fn get_task_progress( progress_list.push(progress.snapshot(task_id)); } progress_list -} +} \ No newline at end of file