diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 9f1c10a0c15bf..24a2f06e5be08 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::net::SocketAddr; -use std::sync::atomic::AtomicU32; use std::sync::Arc; use std::time::Duration; @@ -215,12 +214,6 @@ pub async fn compute_node_serve( )); let compaction_executor = Arc::new(CompactionExecutor::new(Some(1))); - let max_task_parallelism = Arc::new(AtomicU32::new( - (compaction_executor.worker_num() as f32 - * storage_opts.compactor_max_task_multiplier) - .ceil() as u32, - )); - let compactor_context = CompactorContext { storage_opts, sstable_store: storage.sstable_store(), @@ -233,8 +226,6 @@ pub async fn compute_node_serve( await_tree_reg: await_tree_config .clone() .map(new_compaction_await_tree_reg_ref), - running_task_parallelism: Arc::new(AtomicU32::new(0)), - max_task_parallelism, }; let (handle, shutdown_sender) = start_compactor( diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index 95ee16ca3862c..914729c277a9a 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::net::SocketAddr; -use std::sync::atomic::AtomicU32; use std::sync::Arc; use std::time::Duration; @@ -242,10 +241,6 @@ pub async fn compactor_serve( let compaction_executor = Arc::new(CompactionExecutor::new( opts.compaction_worker_threads_number, )); - let max_task_parallelism = Arc::new(AtomicU32::new( - (compaction_executor.worker_num() as f32 * storage_opts.compactor_max_task_multiplier) - .ceil() as u32, - )); let compactor_context = CompactorContext { storage_opts, @@ -257,8 +252,6 @@ pub async fn compactor_serve( task_progress_manager: Default::default(), await_tree_reg: await_tree_reg.clone(), - running_task_parallelism: Arc::new(AtomicU32::new(0)), - max_task_parallelism, }; let mut sub_tasks = vec![ MetaClient::start_heartbeat_loop( @@ -378,10 +371,6 @@ pub async fn shared_compactor_serve( let compaction_executor = Arc::new(CompactionExecutor::new( opts.compaction_worker_threads_number, )); - let max_task_parallelism = Arc::new(AtomicU32::new( - (compaction_executor.worker_num() as f32 * storage_opts.compactor_max_task_multiplier) - .ceil() as u32, - )); let compactor_context = CompactorContext { storage_opts, sstable_store, @@ -391,8 +380,6 @@ pub async fn shared_compactor_serve( memory_limiter, task_progress_manager: Default::default(), await_tree_reg, - running_task_parallelism: Arc::new(AtomicU32::new(0)), - max_task_parallelism, }; let join_handle = tokio::spawn(async move { tonic::transport::Server::builder() diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 7b8da8c92f077..5462d0d49b506 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -17,7 +17,6 @@ pub(crate) mod tests { use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::ops::Bound; - use std::sync::atomic::AtomicU32; use std::sync::Arc; use bytes::{BufMut, Bytes, BytesMut}; @@ -200,12 +199,6 @@ pub(crate) mod tests { storage_opts: Arc, sstable_store: SstableStoreRef, ) -> CompactorContext { - let compaction_executor = Arc::new(CompactionExecutor::new(Some(1))); - let max_task_parallelism = Arc::new(AtomicU32::new( - (compaction_executor.worker_num() as f32 * storage_opts.compactor_max_task_multiplier) - .ceil() as u32, - )); - CompactorContext { storage_opts, sstable_store, @@ -215,8 +208,6 @@ pub(crate) mod tests { memory_limiter: MemoryLimiter::unlimit(), task_progress_manager: Default::default(), await_tree_reg: None, - running_task_parallelism: Arc::new(AtomicU32::new(0)), - max_task_parallelism, } } diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index 9594d7295a03e..d0e5fe93c62ee 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -26,8 +26,10 @@ use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; use risingwave_hummock_sdk::table_stats::TableStatsMap; use risingwave_hummock_sdk::{can_concat, EpochWithGap, KeyComparator}; +use risingwave_pb::hummock::compact_task::TaskType; use risingwave_pb::hummock::{ - compact_task, CompactTask, KeyRange as KeyRange_vec, LevelType, SstableInfo, TableSchema, + compact_task, BloomFilterType, CompactTask, KeyRange as KeyRange_vec, LevelType, SstableInfo, + TableSchema, }; use tokio::time::Instant; @@ -172,24 +174,19 @@ pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompact multi_filter } -const MAX_FILE_COUNT: usize = 32; - fn generate_splits_fast( sstable_infos: &Vec, compaction_size: u64, - context: CompactorContext, -) -> HummockResult> { + context: &CompactorContext, +) -> Vec { let worker_num = context.compaction_executor.worker_num(); let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20; - let parallelism = (compaction_size + parallel_compact_size - 1) / parallel_compact_size; - - let parallelism = std::cmp::min( + let parallelism = calculate_task_parallelism_impl( worker_num, - std::cmp::min( - parallelism as usize, - context.storage_opts.max_sub_compaction as usize, - ), + parallel_compact_size, + compaction_size, + context.storage_opts.max_sub_compaction, ); let mut indexes = vec![]; for sst in sstable_infos { @@ -212,8 +209,9 @@ fn generate_splits_fast( indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.as_ref(), b.as_ref())); indexes.dedup(); if indexes.len() <= parallelism { - return Ok(vec![]); + return vec![]; } + let mut splits = vec![]; splits.push(KeyRange_vec::new(vec![], vec![])); let parallel_key_count = indexes.len() / parallelism; @@ -226,18 +224,24 @@ fn generate_splits_fast( } last_split_key_count += 1; } - Ok(splits) + + splits } pub async fn generate_splits( sstable_infos: &Vec, compaction_size: u64, - context: CompactorContext, + context: &CompactorContext, ) -> HummockResult> { + const MAX_FILE_COUNT: usize = 32; let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20; if compaction_size > parallel_compact_size { if sstable_infos.len() > MAX_FILE_COUNT { - return generate_splits_fast(sstable_infos, compaction_size, context); + return Ok(generate_splits_fast( + sstable_infos, + compaction_size, + context, + )); } let mut indexes = vec![]; // preload the meta and get the smallest key to split sub_compaction @@ -267,18 +271,15 @@ pub async fn generate_splits( let mut splits = vec![]; splits.push(KeyRange_vec::new(vec![], vec![])); - let worker_num = context.compaction_executor.worker_num(); - - let parallelism = std::cmp::min( - worker_num as u64, - std::cmp::min( - indexes.len() as u64, - context.storage_opts.max_sub_compaction as u64, - ), + let parallelism = calculate_task_parallelism_impl( + context.compaction_executor.worker_num(), + parallel_compact_size, + compaction_size, + context.storage_opts.max_sub_compaction, ); + let sub_compaction_data_size = - std::cmp::max(compaction_size / parallelism, parallel_compact_size); - let parallelism = compaction_size / sub_compaction_data_size; + std::cmp::max(compaction_size / parallelism as u64, parallel_compact_size); if parallelism > 1 { let mut last_buffer_size = 0; @@ -489,3 +490,185 @@ async fn check_result< } Ok(true) } + +pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorContext) -> bool { + let sstable_infos = compact_task + .input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .filter(|table_info| { + let table_ids = &table_info.table_ids; + table_ids + .iter() + .any(|table_id| compact_task.existing_table_ids.contains(table_id)) + }) + .cloned() + .collect_vec(); + let compaction_size = sstable_infos + .iter() + .map(|table_info| table_info.file_size) + .sum::(); + + let all_ssts_are_blocked_filter = sstable_infos + .iter() + .all(|table_info| table_info.bloom_filter_kind() == BloomFilterType::Blocked); + + let delete_key_count = sstable_infos + .iter() + .map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count) + .sum::(); + let total_key_count = sstable_infos + .iter() + .map(|table_info| table_info.total_key_count) + .sum::(); + + let has_tombstone = compact_task + .input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .any(|sst| sst.range_tombstone_count > 0); + let has_ttl = compact_task + .table_options + .iter() + .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0)); + + let compact_table_ids: HashSet = HashSet::from_iter( + compact_task + .input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .flat_map(|sst| sst.table_ids.clone()), + ); + let single_table = compact_table_ids.len() == 1; + + context.storage_opts.enable_fast_compaction + && all_ssts_are_blocked_filter + && !has_tombstone + && !has_ttl + && single_table + && compact_task.target_level > 0 + && compact_task.input_ssts.len() == 2 + && compaction_size < context.storage_opts.compactor_fast_max_compact_task_size + && delete_key_count * 100 + < context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count + && compact_task.task_type() == TaskType::Dynamic +} + +pub async fn generate_splits_for_task( + compact_task: &mut CompactTask, + context: &CompactorContext, + optimize_by_copy_block: bool, +) -> HummockResult<()> { + let sstable_infos = compact_task + .input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .filter(|table_info| { + let table_ids = &table_info.table_ids; + table_ids + .iter() + .any(|table_id| compact_task.existing_table_ids.contains(table_id)) + }) + .cloned() + .collect_vec(); + let compaction_size = sstable_infos + .iter() + .map(|table_info| table_info.file_size) + .sum::(); + + if !optimize_by_copy_block { + let splits = generate_splits(&sstable_infos, compaction_size, context).await?; + if !splits.is_empty() { + compact_task.splits = splits; + } + return Ok(()); + } + + Ok(()) +} + +pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorContext) { + let group_label = compact_task.compaction_group_id.to_string(); + let cur_level_label = compact_task.input_ssts[0].level_idx.to_string(); + let select_table_infos = compact_task + .input_ssts + .iter() + .filter(|level| level.level_idx != compact_task.target_level) + .flat_map(|level| level.table_infos.iter()) + .collect_vec(); + let target_table_infos = compact_task + .input_ssts + .iter() + .filter(|level| level.level_idx == compact_task.target_level) + .flat_map(|level| level.table_infos.iter()) + .collect_vec(); + let select_size = select_table_infos + .iter() + .map(|table| table.file_size) + .sum::(); + context + .compactor_metrics + .compact_read_current_level + .with_label_values(&[&group_label, &cur_level_label]) + .inc_by(select_size); + context + .compactor_metrics + .compact_read_sstn_current_level + .with_label_values(&[&group_label, &cur_level_label]) + .inc_by(select_table_infos.len() as u64); + + let target_level_read_bytes = target_table_infos.iter().map(|t| t.file_size).sum::(); + let next_level_label = compact_task.target_level.to_string(); + context + .compactor_metrics + .compact_read_next_level + .with_label_values(&[&group_label, next_level_label.as_str()]) + .inc_by(target_level_read_bytes); + context + .compactor_metrics + .compact_read_sstn_next_level + .with_label_values(&[&group_label, next_level_label.as_str()]) + .inc_by(target_table_infos.len() as u64); +} + +pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &CompactorContext) -> usize { + let optimize_by_copy_block = optimize_by_copy_block(compact_task, context); + + if optimize_by_copy_block { + return 1; + } + + let sstable_infos = compact_task + .input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .filter(|table_info| { + let table_ids = &table_info.table_ids; + table_ids + .iter() + .any(|table_id| compact_task.existing_table_ids.contains(table_id)) + }) + .cloned() + .collect_vec(); + let compaction_size = sstable_infos + .iter() + .map(|table_info| table_info.file_size) + .sum::(); + let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20; + calculate_task_parallelism_impl( + context.compaction_executor.worker_num(), + parallel_compact_size, + compaction_size, + context.storage_opts.max_sub_compaction, + ) +} + +pub fn calculate_task_parallelism_impl( + worker_num: usize, + parallel_compact_size: u64, + compaction_size: u64, + max_sub_compaction: u32, +) -> usize { + let parallelism = (compaction_size + parallel_compact_size - 1) / parallel_compact_size; + worker_num.min(parallelism.min(max_sub_compaction as u64) as usize) +} diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index f1a28a5e2b93a..2a950ea39a4b9 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::{BinaryHeap, HashMap, HashSet}; -use std::sync::atomic::Ordering; use std::sync::Arc; use await_tree::InstrumentAwait; @@ -30,8 +29,8 @@ use risingwave_hummock_sdk::table_stats::{add_table_stats_map, TableStats, Table use risingwave_hummock_sdk::{ can_concat, compact_task_output_to_string, HummockSstableObjectId, KeyComparator, }; -use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType}; -use risingwave_pb::hummock::{BloomFilterType, CompactTask, LevelType, SstableInfo}; +use risingwave_pb::hummock::compact_task::TaskStatus; +use risingwave_pb::hummock::{CompactTask, LevelType, SstableInfo}; use thiserror_ext::AsReport; use tokio::sync::oneshot::Receiver; @@ -40,7 +39,8 @@ use super::task_progress::TaskProgress; use super::{CompactionStatistics, TaskConfig}; use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorManager}; use crate::hummock::compactor::compaction_utils::{ - build_multi_compaction_filter, estimate_task_output_capacity, generate_splits, + build_multi_compaction_filter, estimate_task_output_capacity, generate_splits_for_task, + metrics_report_for_task, optimize_by_copy_block, }; use crate::hummock::compactor::iterator::ConcatSstableIterator; use crate::hummock::compactor::task_progress::TaskProgressGuard; @@ -304,46 +304,7 @@ pub async fn compact( ) { let context = compactor_context.clone(); let group_label = compact_task.compaction_group_id.to_string(); - let cur_level_label = compact_task.input_ssts[0].level_idx.to_string(); - let select_table_infos = compact_task - .input_ssts - .iter() - .filter(|level| level.level_idx != compact_task.target_level) - .flat_map(|level| level.table_infos.iter()) - .collect_vec(); - let target_table_infos = compact_task - .input_ssts - .iter() - .filter(|level| level.level_idx == compact_task.target_level) - .flat_map(|level| level.table_infos.iter()) - .collect_vec(); - let select_size = select_table_infos - .iter() - .map(|table| table.file_size) - .sum::(); - context - .compactor_metrics - .compact_read_current_level - .with_label_values(&[&group_label, &cur_level_label]) - .inc_by(select_size); - context - .compactor_metrics - .compact_read_sstn_current_level - .with_label_values(&[&group_label, &cur_level_label]) - .inc_by(select_table_infos.len() as u64); - - let target_level_read_bytes = target_table_infos.iter().map(|t| t.file_size).sum::(); - let next_level_label = compact_task.target_level.to_string(); - context - .compactor_metrics - .compact_read_next_level - .with_label_values(&[&group_label, next_level_label.as_str()]) - .inc_by(target_level_read_bytes); - context - .compactor_metrics - .compact_read_sstn_next_level - .with_label_values(&[&group_label, next_level_label.as_str()]) - .inc_by(target_table_infos.len() as u64); + metrics_report_for_task(&compact_task, &context); let timer = context .compactor_metrics @@ -356,151 +317,52 @@ pub async fn compact( let multi_filter = build_multi_compaction_filter(&compact_task); - let mut compact_table_ids = compact_task - .input_ssts - .iter() - .flat_map(|level| level.table_infos.iter()) - .flat_map(|sst| sst.table_ids.clone()) - .collect_vec(); - compact_table_ids.sort(); - compact_table_ids.dedup(); - let single_table = compact_table_ids.len() == 1; - let existing_table_ids: HashSet = HashSet::from_iter(compact_task.existing_table_ids.clone()); let compact_table_ids = HashSet::from_iter( - compact_table_ids - .into_iter() + compact_task + .input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .flat_map(|sst| sst.table_ids.clone()) .filter(|table_id| existing_table_ids.contains(table_id)), ); - let multi_filter_key_extractor = match filter_key_extractor_manager - .acquire(compact_table_ids.clone()) - .await + + let multi_filter_key_extractor = match build_filter_key_extractor( + &compact_task, + filter_key_extractor_manager, + &compact_table_ids, + ) + .await { - Err(e) => { - tracing::error!(error = %e.as_report(), "Failed to fetch filter key extractor tables [{:?}], it may caused by some RPC error", compact_task.existing_table_ids); + Some(multi_filter_key_extractor) => multi_filter_key_extractor, + None => { let task_status = TaskStatus::ExecuteFailed; return ( compact_done(compact_task, context.clone(), vec![], task_status), None, ); } - Ok(extractor) => extractor, }; - if let FilterKeyExtractorImpl::Multi(multi) = &multi_filter_key_extractor { - let found_tables = multi.get_existing_table_ids(); - let removed_tables = compact_table_ids - .iter() - .filter(|table_id| !found_tables.contains(table_id)) - .collect_vec(); - if !removed_tables.is_empty() { - tracing::error!("Failed to fetch filter key extractor tables [{:?}. [{:?}] may be removed by meta-service. ", compact_table_ids, removed_tables); - let task_status = TaskStatus::ExecuteFailed; - return ( - compact_done(compact_task, context.clone(), vec![], task_status), - None, - ); - } - } - - let multi_filter_key_extractor = Arc::new(multi_filter_key_extractor); - let has_tombstone = compact_task - .input_ssts - .iter() - .flat_map(|level| level.table_infos.iter()) - .any(|sst| sst.range_tombstone_count > 0); - let has_ttl = compact_task - .table_options - .iter() - .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0)); let mut task_status = TaskStatus::Success; - // skip sst related to non-existent able_id to reduce io - let sstable_infos = compact_task - .input_ssts - .iter() - .flat_map(|level| level.table_infos.iter()) - .filter(|table_info| { - let table_ids = &table_info.table_ids; - table_ids - .iter() - .any(|table_id| existing_table_ids.contains(table_id)) - }) - .cloned() - .collect_vec(); - let compaction_size = sstable_infos - .iter() - .map(|table_info| table_info.file_size) - .sum::(); - let all_ssts_are_blocked_filter = sstable_infos - .iter() - .all(|table_info| table_info.bloom_filter_kind() == BloomFilterType::Blocked); + let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context); - let delete_key_count = sstable_infos - .iter() - .map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count) - .sum::(); - let total_key_count = sstable_infos - .iter() - .map(|table_info| table_info.total_key_count) - .sum::(); - let optimize_by_copy_block = context.storage_opts.enable_fast_compaction - && all_ssts_are_blocked_filter - && !has_tombstone - && !has_ttl - && single_table - && compact_task.target_level > 0 - && compact_task.input_ssts.len() == 2 - && compaction_size < context.storage_opts.compactor_fast_max_compact_task_size - && delete_key_count * 100 - < context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count - && compact_task.task_type() == TaskType::Dynamic; - - if !optimize_by_copy_block { - match generate_splits(&sstable_infos, compaction_size, context.clone()).await { - Ok(splits) => { - if !splits.is_empty() { - compact_task.splits = splits; - } - } - Err(e) => { - tracing::warn!(error = %e.as_report(), "Failed to generate_splits"); - task_status = TaskStatus::ExecuteFailed; - return ( - compact_done(compact_task, context.clone(), vec![], task_status), - None, - ); - } - } - } - let compact_task_statistics = statistics_compact_task(&compact_task); - // Number of splits (key ranges) is equal to number of compaction tasks - let parallelism = compact_task.splits.len(); - assert_ne!(parallelism, 0, "splits cannot be empty"); - if !context.acquire_task_quota(parallelism as u32) { - tracing::warn!( - "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}", - compact_task.task_id, - parallelism, - context.running_task_parallelism.load(Ordering::Relaxed), - context.max_task_parallelism.load(Ordering::Relaxed), - ); + if let Err(e) = + generate_splits_for_task(&mut compact_task, &context, optimize_by_copy_block).await + { + tracing::warn!(error = %e.as_report(), "Failed to generate_splits"); + task_status = TaskStatus::ExecuteFailed; return ( - compact_done( - compact_task, - context.clone(), - vec![], - TaskStatus::NoAvailCpuResourceCanceled, - ), + compact_done(compact_task, context.clone(), vec![], task_status), None, ); } - let _release_quota_guard = - scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| { - context.release_task_quota(parallelism as u32); - }); - + let compact_task_statistics = statistics_compact_task(&compact_task); + // Number of splits (key ranges) is equal to number of compaction tasks + let parallelism = compact_task.splits.len(); + assert_ne!(parallelism, 0, "splits cannot be empty"); let mut output_ssts = Vec::with_capacity(parallelism); let mut compaction_futures = vec![]; let mut abort_handles = vec![]; @@ -707,7 +569,7 @@ pub async fn compact( } /// Fills in the compact task and tries to report the task result to meta node. -fn compact_done( +pub(crate) fn compact_done( mut compact_task: CompactTask, context: CompactorContext, output_ssts: Vec, @@ -931,6 +793,39 @@ where Ok(compaction_statistics) } +async fn build_filter_key_extractor( + compact_task: &CompactTask, + filter_key_extractor_manager: FilterKeyExtractorManager, + compact_table_ids: &HashSet, +) -> Option> { + let multi_filter_key_extractor = match filter_key_extractor_manager + .acquire(compact_table_ids.clone()) + .await + { + Err(e) => { + tracing::error!(error = %e.as_report(), "Failed to fetch filter key extractor tables [{:?}], it may caused by some RPC error", compact_task.existing_table_ids); + return None; + } + Ok(extractor) => extractor, + }; + + if let FilterKeyExtractorImpl::Multi(multi) = &multi_filter_key_extractor { + let found_tables = multi.get_existing_table_ids(); + let removed_tables = compact_table_ids + .iter() + .filter(|table_id| !found_tables.contains(table_id)) + .collect_vec(); + if !removed_tables.is_empty() { + tracing::error!("Failed to fetch filter key extractor tables [{:?}. [{:?}] may be removed by meta-service. ", compact_table_ids, removed_tables); + return None; + } + } + + let multi_filter_key_extractor = Arc::new(multi_filter_key_extractor); + + Some(multi_filter_key_extractor) +} + #[cfg(test)] pub mod tests { use risingwave_hummock_sdk::can_concat; diff --git a/src/storage/src/hummock/compactor/context.rs b/src/storage/src/hummock/compactor/context.rs index 4525dcdc773fb..e8295effc25ed 100644 --- a/src/storage/src/hummock/compactor/context.rs +++ b/src/storage/src/hummock/compactor/context.rs @@ -12,11 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; -use more_asserts::assert_ge; - use super::task_progress::TaskProgressManagerRef; use crate::hummock::compactor::CompactionExecutor; use crate::hummock::sstable_store::SstableStoreRef; @@ -65,10 +62,6 @@ pub struct CompactorContext { pub task_progress_manager: TaskProgressManagerRef, pub await_tree_reg: Option, - - pub running_task_parallelism: Arc, - - pub max_task_parallelism: Arc, } impl CompactorContext { @@ -97,56 +90,6 @@ impl CompactorContext { memory_limiter: MemoryLimiter::unlimit(), task_progress_manager: Default::default(), await_tree_reg, - running_task_parallelism: Arc::new(AtomicU32::new(0)), - max_task_parallelism: Arc::new(AtomicU32::new(u32::MAX)), - } - } - - pub fn acquire_task_quota(&self, parallelism: u32) -> bool { - let mut running_u32 = self.running_task_parallelism.load(Ordering::SeqCst); - let max_u32 = self.max_task_parallelism.load(Ordering::SeqCst); - - while parallelism + running_u32 <= max_u32 { - match self.running_task_parallelism.compare_exchange( - running_u32, - running_u32 + parallelism, - Ordering::SeqCst, - Ordering::SeqCst, - ) { - Ok(_) => { - return true; - } - Err(old_running_u32) => { - running_u32 = old_running_u32; - } - } - } - - false - } - - pub fn release_task_quota(&self, parallelism: u32) { - let prev = self - .running_task_parallelism - .fetch_sub(parallelism, Ordering::SeqCst); - - assert_ge!( - prev, - parallelism, - "running {} parallelism {}", - prev, - parallelism - ); - } - - pub fn get_free_quota(&self) -> u32 { - let running_u32 = self.running_task_parallelism.load(Ordering::SeqCst); - let max_u32 = self.max_task_parallelism.load(Ordering::SeqCst); - - if max_u32 > running_u32 { - max_u32 - running_u32 - } else { - 0 } } } diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index c18828bc93e72..be9a3ffad0385 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -20,7 +20,7 @@ use risingwave_pb::hummock::report_compaction_task_request::{ Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat, ReportTask as ReportSharedTask, }; -use risingwave_pb::hummock::{ReportFullScanTaskRequest, ReportVacuumTaskRequest}; +use risingwave_pb::hummock::{CompactTask, ReportFullScanTaskRequest, ReportVacuumTaskRequest}; use risingwave_rpc_client::GrpcCompactorProxyClient; use thiserror_ext::AsReport; use tokio::sync::mpsc; @@ -35,7 +35,7 @@ pub(super) mod task_progress; use std::collections::{HashMap, VecDeque}; use std::marker::PhantomData; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; @@ -51,7 +51,7 @@ pub use context::{ use futures::{pin_mut, StreamExt}; pub use iterator::{ConcatSstableIterator, SstableStreamIterator}; use more_asserts::assert_ge; -use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; +use risingwave_hummock_sdk::table_stats::{to_prost_table_stats_map, TableStatsMap}; use risingwave_hummock_sdk::{compact_task_to_string, HummockCompactionTaskId, LocalSstableInfo}; use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_pb::hummock::subscribe_compaction_event_request::{ @@ -80,7 +80,8 @@ use super::{ use crate::filter_key_extractor::{ FilterKeyExtractorImpl, FilterKeyExtractorManager, StaticFilterKeyExtractorManager, }; -use crate::hummock::compactor::compactor_runner::compact_and_build_sst; +use crate::hummock::compactor::compaction_utils::calculate_task_parallelism; +use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done}; use crate::hummock::iterator::{Forward, HummockIterator}; use crate::hummock::multi_builder::SplitTableOutput; use crate::hummock::vacuum::Vacuum; @@ -299,14 +300,14 @@ pub fn start_compactor( let stream_retry_interval = Duration::from_secs(30); let task_progress = compactor_context.task_progress_manager.clone(); let periodic_event_update_interval = Duration::from_millis(1000); - let pull_task_ack = Arc::new(AtomicBool::new(true)); + + let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32 + * compactor_context.storage_opts.compactor_max_task_multiplier) + .ceil() as u32; + let running_task_parallelism = Arc::new(AtomicU32::new(0)); + const MAX_PULL_TASK_COUNT: u32 = 4; - let max_pull_task_count = std::cmp::min( - compactor_context - .max_task_parallelism - .load(Ordering::SeqCst), - MAX_PULL_TASK_COUNT, - ); + let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT); assert_ge!( compactor_context.storage_opts.compactor_max_task_multiplier, @@ -321,7 +322,8 @@ pub fn start_compactor( // This outer loop is to recreate stream. 'start_stream: loop { // reset state - pull_task_ack.store(true, Ordering::SeqCst); + // pull_task_ack.store(true, Ordering::SeqCst); + let mut pull_task_ack = true; tokio::select! { // Wait for interval. _ = min_interval.tick() => {}, @@ -365,7 +367,7 @@ pub fn start_compactor( event_loop_iteration_now = Instant::now(); } - let pull_task_ack = pull_task_ack.clone(); + let running_task_parallelism = running_task_parallelism.clone(); let request_sender = request_sender.clone(); let event: Option> = tokio::select! { _ = periodic_event_interval.tick() => { @@ -389,9 +391,9 @@ pub fn start_compactor( let mut pending_pull_task_count = 0; - if pull_task_ack.load(Ordering::SeqCst) { + if pull_task_ack { // TODO: Compute parallelism on meta side - pending_pull_task_count = compactor_context.get_free_quota().max(max_pull_task_count); + pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).max(max_pull_task_count); if pending_pull_task_count > 0 { if let Err(e) = request_sender.send(SubscribeCompactionEventRequest { @@ -410,14 +412,14 @@ pub fn start_compactor( // re subscribe stream continue 'start_stream; } else { - pull_task_ack.store(false, Ordering::SeqCst); + pull_task_ack = false; } } } tracing::info!( - running_parallelism_count = %compactor_context.running_task_parallelism.load(Ordering::Relaxed), - pull_task_ack = %pull_task_ack.load(Ordering::Relaxed), + running_parallelism_count = %running_task_parallelism.load(Ordering::SeqCst), + pull_task_ack = %pull_task_ack, pending_pull_task_count = %pending_pull_task_count ); @@ -433,6 +435,28 @@ pub fn start_compactor( } }; + fn send_report_task_event( + compact_task: &CompactTask, + table_stats: TableStatsMap, + request_sender: &mpsc::UnboundedSender, + ) { + if let Err(e) = request_sender.send(SubscribeCompactionEventRequest { + event: Some(RequestEvent::ReportTask(ReportTask { + task_id: compact_task.task_id, + task_status: compact_task.task_status, + sorted_output_ssts: compact_task.sorted_output_ssts.clone(), + table_stats_change: to_prost_table_stats_map(table_stats), + })), + create_at: SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("Clock may have gone backwards") + .as_millis() as u64, + }) { + let task_id = compact_task.task_id; + tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"); + } + } + match event { Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => { let event = match event { @@ -454,120 +478,180 @@ pub fn start_compactor( let meta_client = hummock_meta_client.clone(); let sstable_object_id_manager = sstable_object_id_manager.clone(); let filter_key_extractor_manager = filter_key_extractor_manager.clone(); - executor.spawn(async move { - match event { - ResponseEvent::CompactTask(compact_task) => { - let (tx, rx) = tokio::sync::oneshot::channel(); - let task_id = compact_task.task_id; - shutdown.lock().unwrap().insert(task_id, tx); - let ((compact_task, table_stats), _memory_tracker) = match sstable_object_id_manager.add_watermark_object_id(None).await + + match event { + ResponseEvent::CompactTask(compact_task) => { + let parallelism = + calculate_task_parallelism(&compact_task, &context); + + assert_ne!(parallelism, 0, "splits cannot be empty"); + + if (max_task_parallelism + - running_task_parallelism.load(Ordering::SeqCst)) + < parallelism as u32 + { + tracing::warn!( + "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}", + compact_task.task_id, + parallelism, + max_task_parallelism, + running_task_parallelism.load(Ordering::Relaxed), + ); + let (compact_task, table_stats) = compact_done( + compact_task, + context.clone(), + vec![], + TaskStatus::NoAvailCpuResourceCanceled, + ); + + send_report_task_event( + &compact_task, + table_stats, + &request_sender, + ); + + continue 'start_stream; + } + + running_task_parallelism + .fetch_add(parallelism as u32, Ordering::SeqCst); + executor.spawn(async move { + let (tx, rx) = tokio::sync::oneshot::channel(); + let task_id = compact_task.task_id; + shutdown.lock().unwrap().insert(task_id, tx); + let ((compact_task, table_stats), _memory_tracker) = + match sstable_object_id_manager + .add_watermark_object_id(None) + .await { Ok(tracker_id) => { - let sstable_object_id_manager_clone = sstable_object_id_manager.clone(); + let sstable_object_id_manager_clone = + sstable_object_id_manager.clone(); let _guard = scopeguard::guard( (tracker_id, sstable_object_id_manager_clone), |(tracker_id, sstable_object_id_manager)| { - sstable_object_id_manager.remove_watermark_object_id(tracker_id); + sstable_object_id_manager + .remove_watermark_object_id(tracker_id); }, ); - compactor_runner::compact(context.clone(), compact_task, rx, Box::new(sstable_object_id_manager.clone()), filter_key_extractor_manager.clone()).await - }, + + compactor_runner::compact( + context.clone(), + compact_task, + rx, + Box::new(sstable_object_id_manager.clone()), + filter_key_extractor_manager.clone(), + ) + .await + } Err(err) => { tracing::warn!(error = %err.as_report(), "Failed to track pending SST object id"); let mut compact_task = compact_task; - compact_task.set_task_status(TaskStatus::TrackSstObjectIdFailed); - ((compact_task, HashMap::default()),None) + compact_task.set_task_status( + TaskStatus::TrackSstObjectIdFailed, + ); + ((compact_task, HashMap::default()), None) } }; - shutdown.lock().unwrap().remove(&task_id); - - let enable_check_compaction_result = context.storage_opts.check_compaction_result; - let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status() == TaskStatus::Success; - if let Err(e) = request_sender.send(SubscribeCompactionEventRequest { - event: Some(RequestEvent::ReportTask( - ReportTask { - task_id: compact_task.task_id, - task_status: compact_task.task_status, - sorted_output_ssts: compact_task.sorted_output_ssts.clone(), - table_stats_change:to_prost_table_stats_map(table_stats), - } - )), - create_at: SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .expect("Clock may have gone backwards") - .as_millis() as u64, - }) { - tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"); - } - if enable_check_compaction_result && need_check_task { - match check_compaction_result(&compact_task, context.clone()).await { - Err(e) => { - tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id); - }, - Ok(true) => (), - Ok(false) => { - panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task)); - } - } - } - } - ResponseEvent::VacuumTask(vacuum_task) => { - match Vacuum::handle_vacuum_task( - context.sstable_store.clone(), - &vacuum_task.sstable_object_ids, - ) - .await{ - Ok(_) => { - Vacuum::report_vacuum_task(vacuum_task, meta_client).await; - } + shutdown.lock().unwrap().remove(&task_id); + running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst); + + send_report_task_event( + &compact_task, + table_stats, + &request_sender, + ); + + let enable_check_compaction_result = + context.storage_opts.check_compaction_result; + let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status() == TaskStatus::Success; + + if enable_check_compaction_result && need_check_task { + match check_compaction_result(&compact_task, context.clone()) + .await + { Err(e) => { - tracing::warn!(error = %e.as_report(), "Failed to vacuum task") - } - } - } - ResponseEvent::FullScanTask(full_scan_task) => { - match Vacuum::handle_full_scan_task(full_scan_task, context.sstable_store.clone()).await { - Ok((object_ids, total_object_count, total_object_size)) => { - Vacuum::report_full_scan_task(object_ids, total_object_count, total_object_size, meta_client).await; + tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id); } - Err(e) => { - tracing::warn!(error = %e.as_report(), "Failed to iter object"); + Ok(true) => (), + Ok(false) => { + panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task)); } } } - ResponseEvent::ValidationTask(validation_task) => { - validate_ssts( - validation_task, - context.sstable_store.clone(), - ) - .await; + }); + } + ResponseEvent::VacuumTask(vacuum_task) => { + executor.spawn(async move { + match Vacuum::handle_vacuum_task( + context.sstable_store.clone(), + &vacuum_task.sstable_object_ids, + ) + .await + { + Ok(_) => { + Vacuum::report_vacuum_task(vacuum_task, meta_client).await; + } + Err(e) => { + tracing::warn!(error = %e.as_report(), "Failed to vacuum task") + } } - ResponseEvent::CancelCompactTask(cancel_compact_task) => { - if let Some(tx) = shutdown - .lock() - .unwrap() - .remove(&cancel_compact_task.task_id) - { - if tx.send(()).is_err() { - tracing::warn!( - "Cancellation of compaction task failed. task_id: {}", - cancel_compact_task.task_id - ); - } - } else { - tracing::warn!( - "Attempting to cancel non-existent compaction task. task_id: {}", - cancel_compact_task.task_id - ); + }); + } + ResponseEvent::FullScanTask(full_scan_task) => { + executor.spawn(async move { + match Vacuum::handle_full_scan_task( + full_scan_task, + context.sstable_store.clone(), + ) + .await + { + Ok((object_ids, total_object_count, total_object_size)) => { + Vacuum::report_full_scan_task( + object_ids, + total_object_count, + total_object_size, + meta_client, + ) + .await; + } + Err(e) => { + tracing::warn!(error = %e.as_report(), "Failed to iter object"); } } - - ResponseEvent::PullTaskAck(_pull_task_ack) => { - // set flag - pull_task_ack.store(true, Ordering::SeqCst); + }); + } + ResponseEvent::ValidationTask(validation_task) => { + executor.spawn(async move { + validate_ssts(validation_task, context.sstable_store.clone()) + .await; + }); + } + ResponseEvent::CancelCompactTask(cancel_compact_task) => { + if let Some(tx) = shutdown + .lock() + .unwrap() + .remove(&cancel_compact_task.task_id) + { + if tx.send(()).is_err() { + tracing::warn!( + "Cancellation of compaction task failed. task_id: {}", + cancel_compact_task.task_id + ); } + } else { + tracing::warn!( + "Attempting to cancel non-existent compaction task. task_id: {}", + cancel_compact_task.task_id + ); } - }); + } + + ResponseEvent::PullTaskAck(_pull_task_ack) => { + // set flag + pull_task_ack = true; + } + } } Some(Err(e)) => { tracing::warn!("Failed to consume stream. {}", e.message()); diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 341e89a58af89..6ba652c0c72d8 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -15,7 +15,6 @@ use std::future::Future; use std::ops::{Bound, RangeBounds}; use std::pin::{pin, Pin}; -use std::sync::atomic::AtomicU32; use std::sync::Arc; use std::time::{Duration, SystemTime}; @@ -600,24 +599,15 @@ fn run_compactor_thread( ) { let filter_key_extractor_manager = FilterKeyExtractorManager::RpcFilterKeyExtractorManager(filter_key_extractor_manager); - - let compaction_executor = Arc::new(CompactionExecutor::new(Some(1))); - let max_task_parallelism = Arc::new(AtomicU32::new( - (compaction_executor.worker_num() as f32 * storage_opts.compactor_max_task_multiplier) - .ceil() as u32, - )); let compactor_context = CompactorContext { storage_opts, sstable_store, compactor_metrics, is_share_buffer_compact: false, compaction_executor: Arc::new(CompactionExecutor::new(None)), - memory_limiter: MemoryLimiter::unlimit(), task_progress_manager: Default::default(), await_tree_reg: None, - running_task_parallelism: Arc::new(AtomicU32::new(0)), - max_task_parallelism, }; start_compactor(