diff --git a/src/meta/src/hummock/manager/compaction.rs b/src/meta/src/hummock/manager/compaction.rs index b414f8afa85e0..065e9745f7dbb 100644 --- a/src/meta/src/hummock/manager/compaction.rs +++ b/src/meta/src/hummock/manager/compaction.rs @@ -12,20 +12,34 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; use function_name::named; +use futures::future::Shared; use itertools::Itertools; use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId}; +use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType}; +use risingwave_pb::hummock::subscribe_compaction_event_request::{ + self, Event as RequestEvent, PullTask, +}; +use risingwave_pb::hummock::subscribe_compaction_event_response::{ + Event as ResponseEvent, PullTaskAck, +}; use risingwave_pb::hummock::{CompactStatus as PbCompactStatus, CompactTaskAssignment}; +use thiserror_ext::AsReport; +use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::oneshot::Receiver as OneShotReceiver; use crate::hummock::compaction::selector::level_selector::PickerInfo; use crate::hummock::compaction::selector::DynamicLevelSelectorCore; -use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig}; -use crate::hummock::manager::read_lock; +use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector}; +use crate::hummock::manager::{init_selectors, read_lock}; use crate::hummock::HummockManager; +const MAX_SKIP_TIMES: usize = 8; +const MAX_REPORT_COUNT: usize = 16; + #[derive(Default)] pub struct Compaction { /// Compaction task that is already assigned to a compactor @@ -102,4 +116,144 @@ impl HummockManager { let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers); ctx.score_levels } + + pub async fn handle_pull_task_event( + &self, + context_id: u32, + pull_task_count: usize, + compaction_selectors: &mut HashMap>, + ) { + assert_ne!(0, pull_task_count); + if let Some(compactor) = self.compactor_manager.get_compactor(context_id) { + let (groups, task_type) = self.auto_pick_compaction_groups_and_type().await; + if !groups.is_empty() { + let selector: &mut Box = + compaction_selectors.get_mut(&task_type).unwrap(); + + let mut generated_task_count = 0; + let mut existed_groups = groups.clone(); + let mut no_task_groups: HashSet = HashSet::default(); + let mut failed_tasks = vec![]; + + while generated_task_count < pull_task_count && failed_tasks.is_empty() { + let compact_ret = self + .get_compact_tasks( + existed_groups.clone(), + pull_task_count - generated_task_count, + selector, + ) + .await; + + match compact_ret { + Ok((compact_tasks, unschedule_groups)) => { + if compact_tasks.is_empty() { + break; + } + generated_task_count += compact_tasks.len(); + no_task_groups.extend(unschedule_groups); + for task in compact_tasks { + let task_id = task.task_id; + if let Err(e) = + compactor.send_event(ResponseEvent::CompactTask(task)) + { + tracing::warn!( + error = %e.as_report(), + "Failed to send task {} to {}", + task_id, + compactor.context_id(), + ); + failed_tasks.push(task_id); + } + } + if !failed_tasks.is_empty() { + self.compactor_manager.remove_compactor(context_id); + } + existed_groups.retain(|group_id| !no_task_groups.contains(group_id)); + } + Err(err) => { + tracing::warn!(error = %err.as_report(), "Failed to get compaction task"); + break; + } + }; + } + for group in no_task_groups { + self.compaction_state.unschedule(group, task_type); + } + if let Err(err) = self + .cancel_compact_tasks(failed_tasks, TaskStatus::SendFailCanceled) + .await + { + tracing::warn!(error = %err.as_report(), "Failed to cancel compaction task"); + } + } + + // ack to compactor + if let Err(e) = compactor.send_event(ResponseEvent::PullTaskAck(PullTaskAck {})) { + tracing::warn!( + error = %e.as_report(), + "Failed to send ask to {}", + context_id, + ); + self.compactor_manager.remove_compactor(context_id); + } + } + } + + /// dedicated event runtime for CPU/IO bound event + pub async fn compact_task_dedicated_event_handler( + hummock_manager: Arc, + mut rx: UnboundedReceiver<(u32, subscribe_compaction_event_request::Event)>, + shutdown_rx_shared: Shared>, + ) { + let mut compaction_selectors = init_selectors(); + + tokio::select! { + _ = shutdown_rx_shared => {} + + _ = async { + while let Some((context_id, event)) = rx.recv().await { + let mut report_events = vec![]; + let mut skip_times = 0; + match event { + RequestEvent::PullTask(PullTask { pull_task_count }) => { + hummock_manager.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors).await; + } + + RequestEvent::ReportTask(task) => { + report_events.push(task); + } + + _ => unreachable!(), + } + while let Ok((context_id, event)) = rx.try_recv() { + match event { + RequestEvent::PullTask(PullTask { pull_task_count }) => { + hummock_manager.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors).await; + if !report_events.is_empty() { + if skip_times > MAX_SKIP_TIMES { + break; + } + skip_times += 1; + } + } + + RequestEvent::ReportTask(task) => { + report_events.push(task); + if report_events.len() >= MAX_REPORT_COUNT { + break; + } + } + _ => unreachable!(), + } + } + if !report_events.is_empty() { + if let Err(e) = hummock_manager.report_compact_tasks(report_events).await + { + tracing::error!(error = %e.as_report(), "report compact_tack fail") + } + } + } + } => {} + } + } } diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index e7e737405fc37..81a3044fa905d 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -30,13 +30,13 @@ use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_pb::hummock::group_delta::DeltaType; use risingwave_pb::hummock::hummock_version_delta::GroupDeltas; use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig; +use risingwave_pb::hummock::subscribe_compaction_event_request::ReportTask; use risingwave_pb::hummock::{ compact_task, CompactionConfig, CompactionGroupInfo, CompatibilityVersion, GroupConstruct, GroupDelta, GroupDestroy, GroupMetaChange, GroupTableChange, }; use thiserror_ext::AsReport; use tokio::sync::{OnceCell, RwLock}; -use tracing::warn; use super::write_lock; use crate::controller::SqlMetaStore; @@ -476,7 +476,7 @@ impl HummockManager { return Ok((parent_group_id, table_to_partition)); } let table_ids = table_ids.iter().cloned().unique().collect_vec(); - let mut compaction_guard = write_lock!(self, compaction).await; + let compaction_guard = write_lock!(self, compaction).await; let mut versioning_guard = write_lock!(self, versioning).await; let versioning = versioning_guard.deref_mut(); let current_version = &versioning.current_version; @@ -682,27 +682,19 @@ impl HummockManager { } } if need_cancel { - canceled_tasks.push(task.clone()); + canceled_tasks.push(ReportTask { + task_id: task.task_id, + task_status: TaskStatus::ManualCanceled as i32, + table_stats_change: HashMap::default(), + sorted_output_ssts: vec![], + }); } } } - for task in canceled_tasks { - if !self - .report_compact_task_impl( - task.task_id, - None, - TaskStatus::ManualCanceled, - vec![], - &mut compaction_guard, - None, - ) - .await - .unwrap_or(false) - { - warn!("failed to cancel task-{}", task.task_id); - } - } + drop(compaction_guard); + self.report_compact_tasks(canceled_tasks).await?; + // Don't trigger compactions if we enable deterministic compaction if !self.env.opts.compaction_deterministic_test { // commit_epoch may contains SSTs from any compaction group diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index a9b4b04c35f49..3c68856dc6788 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -24,12 +24,11 @@ use itertools::Itertools; use risingwave_hummock_sdk::HummockSstableObjectId; use risingwave_pb::common::worker_node::State::Running; use risingwave_pb::common::WorkerType; +use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent; use risingwave_pb::hummock::FullScanTask; use crate::hummock::error::{Error, Result}; -use crate::hummock::manager::{ - commit_multi_var, create_trx_wrapper, read_lock, write_lock, ResponseEvent, -}; +use crate::hummock::manager::{commit_multi_var, create_trx_wrapper, read_lock, write_lock}; use crate::hummock::HummockManager; use crate::manager::MetadataManager; use crate::model::{BTreeMapTransaction, BTreeMapTransactionWrapper, ValTransaction}; diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index f1639ea37210c..77c9659789e9a 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -24,11 +24,13 @@ use arc_swap::ArcSwap; use bytes::Bytes; use fail::fail_point; use function_name::named; -use futures::future::{Either, Shared}; +use futures::future::Either; use futures::stream::{BoxStream, FuturesUnordered}; use futures::{FutureExt, StreamExt}; use itertools::Itertools; use parking_lot::Mutex; +use rand::prelude::SliceRandom; +use rand::thread_rng; use risingwave_common::catalog::TableId; use risingwave_common::config::default::compaction_config; use risingwave_common::config::ObjectStoreConfig; @@ -54,11 +56,9 @@ use risingwave_pb::hummock::compact_task::{self, TaskStatus, TaskType}; use risingwave_pb::hummock::group_delta::DeltaType; use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config; use risingwave_pb::hummock::subscribe_compaction_event_request::{ - self, Event as RequestEvent, HeartBeat, PullTask, ReportTask, -}; -use risingwave_pb::hummock::subscribe_compaction_event_response::{ - Event as ResponseEvent, PullTaskAck, + Event as RequestEvent, HeartBeat, ReportTask, }; +use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent; use risingwave_pb::hummock::{ CompactTask, CompactTaskAssignment, CompactionConfig, GroupDelta, GroupMetaChange, HummockPinnedSnapshot, HummockPinnedVersion, HummockSnapshot, HummockVersionStats, @@ -69,7 +69,7 @@ use risingwave_pb::meta::subscribe_response::{Info, Operation}; use rw_futures_util::{pending_on_none, select_all}; use thiserror_ext::AsReport; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use tokio::sync::oneshot::{Receiver as OneShotReceiver, Sender}; +use tokio::sync::oneshot::Sender; use tokio::sync::RwLockWriteGuard; use tokio::task::JoinHandle; use tokio_stream::wrappers::IntervalStream; @@ -83,10 +83,10 @@ use crate::hummock::compaction::selector::{ use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig}; use crate::hummock::error::{Error, Result}; use crate::hummock::metrics_utils::{ - build_compact_task_level_type_metrics_label, trigger_delta_log_stats, trigger_local_table_stat, - trigger_lsm_stat, trigger_mv_stat, trigger_pin_unpin_snapshot_state, - trigger_pin_unpin_version_state, trigger_split_stat, trigger_sst_stat, trigger_version_stat, - trigger_write_stop_stats, + build_compact_task_level_type_metrics_label, get_or_create_local_table_stat, + trigger_delta_log_stats, trigger_local_table_stat, trigger_lsm_stat, trigger_mv_stat, + trigger_pin_unpin_snapshot_state, trigger_pin_unpin_version_state, trigger_split_stat, + trigger_sst_stat, trigger_version_stat, trigger_write_stop_stats, }; use crate::hummock::sequence::next_compaction_task_id; use crate::hummock::{CompactorManagerRef, TASK_NORMAL}; @@ -878,243 +878,330 @@ impl HummockManager { } #[named] - pub async fn get_compact_task_impl( + pub async fn get_compact_tasks_impl( &self, - compaction_group_id: CompactionGroupId, + compaction_groups: Vec, + max_select_count: usize, selector: &mut Box, - ) -> Result> { + ) -> Result<(Vec, Vec)> { // TODO: `get_all_table_options` will hold catalog_manager async lock, to avoid holding the // lock in compaction_guard, take out all table_options in advance there may be a // waste of resources here, need to add a more efficient filter in catalog_manager + let deterministic_mode = self.env.opts.compaction_deterministic_test; let all_table_id_to_option = self .metadata_manager .get_all_table_options() .await .map_err(|err| Error::MetaStore(err.into()))?; - let mut table_to_vnode_partition = match self - .group_to_table_vnode_partition - .read() - .get(&compaction_group_id) - { - Some(table_to_vnode_partition) => table_to_vnode_partition.clone(), - None => BTreeMap::default(), - }; let mut compaction_guard = write_lock!(self, compaction).await; let compaction = compaction_guard.deref_mut(); - let compaction_statuses = &mut compaction.compaction_statuses; + let mut versioning_guard = write_lock!(self, versioning).await; + let versioning = versioning_guard.deref_mut(); + let mut current_version = versioning.current_version.clone(); let start_time = Instant::now(); - // StoredIdGenerator already implements ids pre-allocation by ID_PREALLOCATE_INTERVAL. - let task_id = next_compaction_task_id(&self.env).await?; + let max_committed_epoch = current_version.max_committed_epoch; + let watermark = versioning + .pinned_snapshots + .values() + .map(|v| v.minimal_pinned_snapshot) + .fold(max_committed_epoch, std::cmp::min); + let last_apply_version_id = current_version.id; + + let mut compaction_statuses = create_trx_wrapper!( + self.sql_meta_store(), + BTreeMapTransactionWrapper, + BTreeMapTransaction::new(&mut compaction.compaction_statuses) + ); - // When the last table of a compaction group is deleted, the compaction group (and its - // config) is destroyed as well. Then a compaction task for this group may come later and - // cannot find its config. - let group_config = match self - .compaction_group_manager - .read() - .await - .try_get_compaction_group_config(compaction_group_id) - { - Some(config) => config, - None => return Ok(None), - }; - self.precheck_compaction_group( - compaction_group_id, - compaction_statuses, - &group_config.compaction_config, - ) - .await?; + let mut compact_task_assignment = create_trx_wrapper!( + self.sql_meta_store(), + BTreeMapTransactionWrapper, + BTreeMapTransaction::new(&mut compaction.compact_task_assignment) + ); - let mut compact_status = match compaction.compaction_statuses.get_mut(&compaction_group_id) - { - Some(c) => create_trx_wrapper!( - self.sql_meta_store(), - VarTransactionWrapper, - VarTransaction::new(c) - ), - None => { - return Ok(None); + let mut hummock_version_deltas = create_trx_wrapper!( + self.sql_meta_store(), + BTreeMapTransactionWrapper, + BTreeMapTransaction::new(&mut versioning.hummock_version_deltas) + ); + let mut branched_ssts = create_trx_wrapper!( + self.sql_meta_store(), + BTreeMapTransactionWrapper, + BTreeMapTransaction::new(&mut versioning.branched_ssts) + ); + + let mut unschedule_groups = vec![]; + let mut trivial_tasks = vec![]; + let mut pick_tasks = vec![]; + let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts( + &self.env.opts, + )); + const MAX_TRIVIAL_MOVE_TASK_COUNT: usize = 256; + 'outside: for compaction_group_id in compaction_groups { + if pick_tasks.len() >= max_select_count { + break; } - }; - let (current_version, watermark) = { - let versioning_guard = read_lock!(self, versioning).await; - let max_committed_epoch = versioning_guard.current_version.max_committed_epoch; - let watermark = versioning_guard - .pinned_snapshots - .values() - .map(|v| v.minimal_pinned_snapshot) - .fold(max_committed_epoch, std::cmp::min); - - (versioning_guard.current_version.clone(), watermark) - }; - if current_version.levels.get(&compaction_group_id).is_none() { - // compaction group has been deleted. - return Ok(None); - } - let can_trivial_move = matches!(selector.task_type(), compact_task::TaskType::Dynamic) - || matches!(selector.task_type(), compact_task::TaskType::Emergency); + if current_version.levels.get(&compaction_group_id).is_none() { + continue; + } - let mut stats = LocalSelectorStatistic::default(); - let member_table_ids = ¤t_version - .get_compaction_group_levels(compaction_group_id) - .member_table_ids; - let table_id_to_option: HashMap = all_table_id_to_option - .into_iter() - .filter(|(table_id, _)| member_table_ids.contains(table_id)) - .collect(); + // When the last table of a compaction group is deleted, the compaction group (and its + // config) is destroyed as well. Then a compaction task for this group may come later and + // cannot find its config. + let group_config = match self + .compaction_group_manager + .read() + .await + .try_get_compaction_group_config(compaction_group_id) + { + Some(config) => config, + None => continue, + }; + // StoredIdGenerator already implements ids pre-allocation by ID_PREALLOCATE_INTERVAL. + let task_id = next_compaction_task_id(&self.env).await?; - let compact_task = compact_status.get_compact_task( - current_version.get_compaction_group_levels(compaction_group_id), - task_id as HummockCompactionTaskId, - &group_config, - &mut stats, - selector, - table_id_to_option.clone(), - Arc::new(CompactionDeveloperConfig::new_from_meta_opts( - &self.env.opts, - )), - ); - stats.report_to_metrics(compaction_group_id, self.metrics.as_ref()); - let compact_task = match compact_task { - None => { - return Ok(None); + if !compaction_statuses.contains_key(&compaction_group_id) { + // lazy initialize. + compaction_statuses.insert( + compaction_group_id, + CompactStatus::new( + compaction_group_id, + group_config.compaction_config.max_level, + ), + ); } - Some(task) => task, - }; + let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap(); - let target_level_id = compact_task.input.target_level as u32; + let can_trivial_move = matches!(selector.task_type(), TaskType::Dynamic) + || matches!(selector.task_type(), TaskType::Emergency); - let compression_algorithm = match compact_task.compression_algorithm.as_str() { - "Lz4" => 1, - "Zstd" => 2, - _ => 0, - }; - let vnode_partition_count = compact_task.input.vnode_partition_count; - use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; - - let mut compact_task = CompactTask { - input_ssts: compact_task.input.input_levels, - splits: vec![risingwave_pb::hummock::KeyRange::inf()], - watermark, - sorted_output_ssts: vec![], - task_id, - target_level: target_level_id, - // only gc delete keys in last level because there may be older version in more bottom - // level. - gc_delete_keys: current_version + let mut stats = LocalSelectorStatistic::default(); + let member_table_ids = current_version .get_compaction_group_levels(compaction_group_id) - .is_last_level(target_level_id), - base_level: compact_task.base_level as u32, - task_status: TaskStatus::Pending as i32, - compaction_group_id: group_config.group_id, - existing_table_ids: member_table_ids.clone(), - compression_algorithm, - target_file_size: compact_task.target_file_size, - table_options: table_id_to_option - .into_iter() - .filter_map(|(table_id, table_option)| { - if member_table_ids.contains(&table_id) { - return Some((table_id, TableOption::from(&table_option))); - } + .member_table_ids + .clone(); - None - }) - .collect(), - current_epoch_time: Epoch::now().0, - compaction_filter_mask: group_config.compaction_config.compaction_filter_mask, - target_sub_level_id: compact_task.input.target_sub_level_id, - task_type: compact_task.compaction_task_type as i32, - split_weight_by_vnode: compact_task.input.vnode_partition_count, - ..Default::default() - }; + let mut table_id_to_option: HashMap = HashMap::default(); - let is_trivial_reclaim = CompactStatus::is_trivial_reclaim(&compact_task); - let is_trivial_move = CompactStatus::is_trivial_move_task(&compact_task); + for table_id in &member_table_ids { + if let Some(opts) = all_table_id_to_option.get(table_id) { + table_id_to_option.insert(*table_id, *opts); + } + } - if is_trivial_reclaim { - compact_task.set_task_status(TaskStatus::Success); - self.report_compact_task_impl( - task_id, - Some(compact_task.clone()), - TaskStatus::Success, - vec![], - &mut compaction_guard, - None, - ) - .await?; - tracing::debug!( - "TrivialReclaim for compaction group {}: remove {} sstables, cost time: {:?}", - compaction_group_id, - compact_task - .input_ssts - .iter() - .map(|level| level.table_infos.len()) - .sum::(), - start_time.elapsed() - ); - } else if is_trivial_move && can_trivial_move { - // this task has been finished and `trivial_move_task` does not need to be schedule. - compact_task.set_task_status(TaskStatus::Success); - compact_task - .sorted_output_ssts - .clone_from(&compact_task.input_ssts[0].table_infos); - self.report_compact_task_impl( - task_id, - Some(compact_task.clone()), - TaskStatus::Success, - compact_task.input_ssts[0].table_infos.clone(), - &mut compaction_guard, - None, - ) - .await?; + let table_to_vnode_partition = match self + .group_to_table_vnode_partition + .read() + .get(&compaction_group_id) + { + Some(table_to_vnode_partition) => table_to_vnode_partition.clone(), + None => BTreeMap::default(), + }; - tracing::debug!( - "TrivialMove for compaction group {}: pick up {} sstables in level {} to compact to target_level {} cost time: {:?} input {:?}", - compaction_group_id, - compact_task.input_ssts[0].table_infos.len(), - compact_task.input_ssts[0].level_idx, - compact_task.target_level, - start_time.elapsed(), - compact_task.input_ssts - ); - } else { - table_to_vnode_partition - .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id)); - if group_config.compaction_config.split_weight_by_vnode > 0 { - table_to_vnode_partition.clear(); - for table_id in &compact_task.existing_table_ids { - table_to_vnode_partition.insert(*table_id, vnode_partition_count); + while let Some(compact_task) = compact_status.get_compact_task( + current_version.get_compaction_group_levels(compaction_group_id), + task_id as HummockCompactionTaskId, + &group_config, + &mut stats, + selector, + table_id_to_option.clone(), + developer_config.clone(), + ) { + let target_level_id = compact_task.input.target_level as u32; + + let compression_algorithm = match compact_task.compression_algorithm.as_str() { + "Lz4" => 1, + "Zstd" => 2, + _ => 0, + }; + let vnode_partition_count = compact_task.input.vnode_partition_count; + use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; + + let mut compact_task = CompactTask { + input_ssts: compact_task.input.input_levels, + splits: vec![risingwave_pb::hummock::KeyRange::inf()], + watermark, + sorted_output_ssts: vec![], + task_id, + target_level: target_level_id, + // only gc delete keys in last level because there may be older version in more bottom + // level. + gc_delete_keys: current_version + .get_compaction_group_levels(compaction_group_id) + .is_last_level(target_level_id), + base_level: compact_task.base_level as u32, + task_status: TaskStatus::Pending as i32, + compaction_group_id: group_config.group_id, + existing_table_ids: member_table_ids.clone(), + compression_algorithm, + target_file_size: compact_task.target_file_size, + table_options: table_id_to_option + .iter() + .map(|(table_id, table_option)| { + (*table_id, TableOption::from(table_option)) + }) + .collect(), + current_epoch_time: Epoch::now().0, + compaction_filter_mask: group_config.compaction_config.compaction_filter_mask, + target_sub_level_id: compact_task.input.target_sub_level_id, + task_type: compact_task.compaction_task_type as i32, + split_weight_by_vnode: vnode_partition_count, + ..Default::default() + }; + + let is_trivial_reclaim = CompactStatus::is_trivial_reclaim(&compact_task); + let is_trivial_move = CompactStatus::is_trivial_move_task(&compact_task); + if is_trivial_reclaim || (is_trivial_move && can_trivial_move) { + let log_label = if is_trivial_reclaim { + "TrivialReclaim" + } else { + "TrivialMove" + }; + let label = if is_trivial_reclaim { + "trivial-space-reclaim" + } else { + "trivial-move" + }; + + tracing::debug!( + "{} for compaction group {}: input: {:?}, cost time: {:?}", + log_label, + compact_task.compaction_group_id, + compact_task.input_ssts, + start_time.elapsed() + ); + compact_task.set_task_status(TaskStatus::Success); + compact_status.report_compact_task(&compact_task); + if !is_trivial_reclaim { + compact_task.sorted_output_ssts = + compact_task.input_ssts[0].table_infos.clone(); + } + self.metrics + .compact_frequency + .with_label_values(&[ + label, + &compact_task.compaction_group_id.to_string(), + selector.task_type().as_str_name(), + "SUCCESS", + ]) + .inc(); + let version_delta = gen_version_delta( + &mut hummock_version_deltas, + &mut branched_ssts, + ¤t_version, + &compact_task, + deterministic_mode, + ); + current_version.apply_version_delta(&version_delta); + trivial_tasks.push(compact_task); + if trivial_tasks.len() >= MAX_TRIVIAL_MOVE_TASK_COUNT { + break 'outside; + } + } else { + if group_config.compaction_config.split_weight_by_vnode > 0 { + for table_id in &compact_task.existing_table_ids { + compact_task + .table_vnode_partition + .insert(*table_id, vnode_partition_count); + } + } else { + compact_task.table_vnode_partition = table_to_vnode_partition.clone(); + } + compact_task + .table_vnode_partition + .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id)); + compact_task.table_watermarks = current_version + .safe_epoch_table_watermarks(&compact_task.existing_table_ids); + + if self.env.opts.enable_dropped_column_reclaim { + // TODO: get all table schemas for all tables in once call to avoid acquiring lock and await. + compact_task.table_schemas = match self.metadata_manager() { + MetadataManager::V1(mgr) => mgr + .catalog_manager + .get_versioned_table_schemas(&compact_task.existing_table_ids) + .await + .into_iter() + .map(|(table_id, column_ids)| { + (table_id, TableSchema { column_ids }) + }) + .collect(), + MetadataManager::V2(_) => { + // TODO #13952: support V2 + BTreeMap::default() + } + }; + } + + compact_task_assignment.insert( + compact_task.task_id, + CompactTaskAssignment { + compact_task: Some(compact_task.clone()), + context_id: META_NODE_ID, // deprecated + }, + ); + + pick_tasks.push(compact_task); + break; } + + stats.report_to_metrics(compaction_group_id, self.metrics.as_ref()); + stats = LocalSelectorStatistic::default(); + } + if pick_tasks + .last() + .map(|task| task.compaction_group_id != compaction_group_id) + .unwrap_or(true) + { + unschedule_groups.push(compaction_group_id); } - compact_task.table_vnode_partition = table_to_vnode_partition; - compact_task.table_watermarks = - current_version.safe_epoch_table_watermarks(&compact_task.existing_table_ids); + stats.report_to_metrics(compaction_group_id, self.metrics.as_ref()); + } - let mut compact_task_assignment = create_trx_wrapper!( + if !trivial_tasks.is_empty() { + commit_multi_var!( + self.env.meta_store(), self.sql_meta_store(), - BTreeMapTransactionWrapper, - BTreeMapTransaction::new(&mut compaction.compact_task_assignment,) - ); - compact_task_assignment.insert( - compact_task.task_id, - CompactTaskAssignment { - compact_task: Some(compact_task.clone()), - context_id: META_NODE_ID, // deprecated - }, - ); + compaction_statuses, + compact_task_assignment, + hummock_version_deltas + )?; + branched_ssts.commit_memory(); + trigger_version_stat(&self.metrics, ¤t_version); + versioning.current_version = current_version; + trigger_delta_log_stats(&self.metrics, versioning.hummock_version_deltas.len()); + self.notify_stats(&versioning.version_stats); + self.notify_version_deltas(versioning, last_apply_version_id); + self.metrics + .compact_task_batch_count + .with_label_values(&["batch_trivial_move"]) + .observe(trivial_tasks.len() as f64); + drop(versioning_guard); + } else { // We are using a single transaction to ensure that each task has progress when it is // created. + drop(versioning_guard); commit_multi_var!( self.env.meta_store(), self.sql_meta_store(), - compact_status, + compaction_statuses, compact_task_assignment )?; + } + drop(compaction_guard); + if !pick_tasks.is_empty() { + self.metrics + .compact_task_batch_count + .with_label_values(&["batch_get_compact_task"]) + .observe(pick_tasks.len() as f64); + } + + for compact_task in &mut pick_tasks { + let compaction_group_id = compact_task.compaction_group_id; // Initiate heartbeat for the task to track its progress. self.compactor_manager @@ -1122,15 +1209,7 @@ impl HummockManager { // this task has been finished. compact_task.set_task_status(TaskStatus::Pending); - - trigger_sst_stat( - &self.metrics, - compaction.compaction_statuses.get(&compaction_group_id), - ¤t_version, - compaction_group_id, - ); - - let compact_task_statistics = statistics_compact_task(&compact_task); + let compact_task_statistics = statistics_compact_task(compact_task); let level_type_label = build_compact_task_level_type_metrics_label( compact_task.input_ssts[0].level_idx as usize, @@ -1177,11 +1256,10 @@ impl HummockManager { #[cfg(test)] { - drop(compaction_guard); self.check_state_consistency().await; } - - Ok(Some(compact_task)) + pick_tasks.extend(trivial_tasks); + Ok((pick_tasks, unschedule_groups)) } /// Cancels a compaction task no matter it's assigned or unassigned. @@ -1189,60 +1267,68 @@ impl HummockManager { fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore( anyhow::anyhow!("failpoint metastore err") ))); - self.cancel_compact_task_impl(task_id, task_status).await + let ret = self + .cancel_compact_task_impl(vec![task_id], task_status) + .await?; + Ok(ret[0]) + } + + pub async fn cancel_compact_tasks( + &self, + tasks: Vec, + task_status: TaskStatus, + ) -> Result> { + self.cancel_compact_task_impl(tasks, task_status).await } - #[named] pub async fn cancel_compact_task_impl( &self, - task_id: u64, + task_ids: Vec, task_status: TaskStatus, - ) -> Result { + ) -> Result> { assert!(CANCEL_STATUS_SET.contains(&task_status)); - let mut compaction_guard = write_lock!(self, compaction).await; - let ret = self - .report_compact_task_impl( + let tasks = task_ids + .into_iter() + .map(|task_id| ReportTask { task_id, - None, - task_status, - vec![], - &mut compaction_guard, - None, - ) - .await?; + task_status: task_status as i32, + sorted_output_ssts: vec![], + table_stats_change: HashMap::default(), + }) + .collect_vec(); + let rets = self.report_compact_tasks(tasks).await?; #[cfg(test)] { - drop(compaction_guard); self.check_state_consistency().await; } - Ok(ret) + Ok(rets) } - // need mutex protect - async fn precheck_compaction_group( + pub async fn get_compact_tasks( &self, - compaction_group_id: CompactionGroupId, - compaction_statuses: &mut BTreeMap, - compaction_config: &CompactionConfig, - ) -> Result<()> { - if !compaction_statuses.contains_key(&compaction_group_id) { - let mut compact_statuses = create_trx_wrapper!( - self.sql_meta_store(), - BTreeMapTransactionWrapper, - BTreeMapTransaction::new(compaction_statuses) - ); - let new_compact_status = compact_statuses.new_entry_insert_txn( - compaction_group_id, - CompactStatus::new(compaction_group_id, compaction_config.max_level), - ); - commit_multi_var!( - self.env.meta_store(), - self.sql_meta_store(), - new_compact_status - )?; - } - - Ok(()) + mut compaction_groups: Vec, + max_select_count: usize, + selector: &mut Box, + ) -> Result<(Vec, Vec)> { + fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore( + anyhow::anyhow!("failpoint metastore error") + ))); + compaction_groups.shuffle(&mut thread_rng()); + let (mut tasks, groups) = self + .get_compact_tasks_impl(compaction_groups, max_select_count, selector) + .await?; + tasks.retain(|task| { + if task.task_status() == TaskStatus::Success { + debug_assert!( + CompactStatus::is_trivial_reclaim(task) + || CompactStatus::is_trivial_move_task(task) + ); + false + } else { + true + } + }); + Ok((tasks, groups)) } pub async fn get_compact_task( @@ -1254,34 +1340,18 @@ impl HummockManager { anyhow::anyhow!("failpoint metastore error") ))); - while let Some(mut task) = self - .get_compact_task_impl(compaction_group_id, selector) - .await? - { - if let TaskStatus::Pending = task.task_status() { - if self.env.opts.enable_dropped_column_reclaim { - task.table_schemas = match self.metadata_manager() { - MetadataManager::V1(mgr) => mgr - .catalog_manager - .get_versioned_table_schemas(&task.existing_table_ids) - .await - .into_iter() - .map(|(table_id, column_ids)| (table_id, TableSchema { column_ids })) - .collect(), - MetadataManager::V2(_) => { - // TODO #13952: support V2 - BTreeMap::default() - } - } - } + let (normal_tasks, _) = self + .get_compact_tasks_impl(vec![compaction_group_id], 1, selector) + .await?; + for task in normal_tasks { + if task.task_status() != TaskStatus::Success { return Ok(Some(task)); } - assert!( - CompactStatus::is_trivial_move_task(&task) - || CompactStatus::is_trivial_reclaim(&task) + debug_assert!( + CompactStatus::is_trivial_reclaim(&task) + || CompactStatus::is_trivial_move_task(&task) ); } - Ok(None) } @@ -1315,7 +1385,6 @@ impl HummockManager { false } - #[named] pub async fn report_compact_task( &self, task_id: u64, @@ -1323,16 +1392,15 @@ impl HummockManager { sorted_output_ssts: Vec, table_stats_change: Option, ) -> Result { - let mut guard = write_lock!(self, compaction).await; - self.report_compact_task_impl( - task_id, - None, - task_status, - sorted_output_ssts, - &mut guard, - table_stats_change, - ) - .await + let rets = self + .report_compact_tasks(vec![ReportTask { + task_id, + task_status: task_status as i32, + sorted_output_ssts, + table_stats_change: table_stats_change.unwrap_or_default(), + }]) + .await?; + Ok(rets[0]) } /// Finishes or cancels a compaction task, according to `task_status`. @@ -1343,17 +1411,10 @@ impl HummockManager { /// Return Ok(false) indicates either the task is not found, /// or the task is not owned by `context_id` when `context_id` is not None. #[named] - pub async fn report_compact_task_impl( - &self, - task_id: u64, - trivial_move_compact_task: Option, - task_status: TaskStatus, - sorted_output_ssts: Vec, - compaction_guard: &mut RwLockWriteGuard<'_, Compaction>, - table_stats_change: Option, - ) -> Result { + pub async fn report_compact_tasks(&self, report_tasks: Vec) -> Result> { + let mut guard = write_lock!(self, compaction).await; let deterministic_mode = self.env.opts.compaction_deterministic_test; - let compaction = compaction_guard.deref_mut(); + let compaction = guard.deref_mut(); let start_time = Instant::now(); let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec(); let mut compact_statuses = create_trx_wrapper!( @@ -1361,43 +1422,57 @@ impl HummockManager { BTreeMapTransactionWrapper, BTreeMapTransaction::new(&mut compaction.compaction_statuses,) ); + let mut rets = vec![false; report_tasks.len()]; let mut compact_task_assignment = create_trx_wrapper!( self.sql_meta_store(), BTreeMapTransactionWrapper, BTreeMapTransaction::new(&mut compaction.compact_task_assignment,) ); - // remove task_assignment - let mut compact_task = if let Some(input_task) = trivial_move_compact_task { - input_task - } else { - match compact_task_assignment.remove(task_id) { - Some(compact_task) => compact_task.compact_task.unwrap(), - None => { - tracing::warn!("{}", format!("compact task {} not found", task_id)); - return Ok(false); - } + // The compaction task is finished. + let mut versioning_guard = write_lock!(self, versioning).await; + let versioning = versioning_guard.deref_mut(); + let mut current_version = versioning.current_version.clone(); + // purge stale compact_status + for group_id in original_keys { + if !current_version.levels.contains_key(&group_id) { + compact_statuses.remove(group_id); } - }; - - { - // apply result - compact_task.set_task_status(task_status); - compact_task.sorted_output_ssts = sorted_output_ssts; } + let mut tasks = vec![]; - let is_trivial_reclaim = CompactStatus::is_trivial_reclaim(&compact_task); - let is_trivial_move = CompactStatus::is_trivial_move_task(&compact_task); + let mut hummock_version_deltas = create_trx_wrapper!( + self.sql_meta_store(), + BTreeMapTransactionWrapper, + BTreeMapTransaction::new(&mut versioning.hummock_version_deltas,) + ); + let mut branched_ssts = create_trx_wrapper!( + self.sql_meta_store(), + BTreeMapTransactionWrapper, + BTreeMapTransaction::new(&mut versioning.branched_ssts) + ); - { - // The compaction task is finished. - let mut versioning_guard = write_lock!(self, versioning).await; - let versioning = versioning_guard.deref_mut(); - let mut current_version = versioning.current_version.clone(); - // purge stale compact_status - for group_id in original_keys { - if !current_version.levels.contains_key(&group_id) { - compact_statuses.remove(group_id); + let mut version_stats = create_trx_wrapper!( + self.sql_meta_store(), + VarTransactionWrapper, + VarTransaction::new(&mut versioning.version_stats) + ); + let mut success_count = 0; + let last_version_id = current_version.id; + for (idx, task) in report_tasks.into_iter().enumerate() { + rets[idx] = true; + let mut compact_task = match compact_task_assignment.remove(task.task_id) { + Some(compact_task) => compact_task.compact_task.unwrap(), + None => { + tracing::warn!("{}", format!("compact task {} not found", task.task_id)); + rets[idx] = false; + continue; } + }; + + { + // apply result + compact_task.task_status = task.task_status; + compact_task.sorted_output_ssts = task.sorted_output_ssts; } match compact_statuses.get_mut(compact_task.compaction_group_id) { @@ -1409,10 +1484,6 @@ impl HummockManager { } } - debug_assert!( - compact_task.task_status() != TaskStatus::Pending, - "report pending compaction task" - ); let input_sst_ids: HashSet = compact_task .input_ssts .iter() @@ -1426,7 +1497,7 @@ impl HummockManager { let is_success = if let TaskStatus::Success = compact_task.task_status() { // if member_table_ids changes, the data of sstable may stale. let is_expired = - Self::is_compact_task_expired(&compact_task, &versioning.branched_ssts); + Self::is_compact_task_expired(&compact_task, branched_ssts.tree_ref()); if is_expired { compact_task.set_task_status(TaskStatus::InputOutdatedCanceled); false @@ -1450,16 +1521,7 @@ impl HummockManager { false }; if is_success { - let mut hummock_version_deltas = create_trx_wrapper!( - self.sql_meta_store(), - BTreeMapTransactionWrapper, - BTreeMapTransaction::new(&mut versioning.hummock_version_deltas,) - ); - let mut branched_ssts = create_trx_wrapper!( - self.sql_meta_store(), - BTreeMapTransactionWrapper, - BTreeMapTransaction::new(&mut versioning.branched_ssts) - ); + success_count += 1; let version_delta = gen_version_delta( &mut hummock_version_deltas, &mut branched_ssts, @@ -1467,12 +1529,6 @@ impl HummockManager { &compact_task, deterministic_mode, ); - let mut version_stats = create_trx_wrapper!( - self.sql_meta_store(), - VarTransactionWrapper, - VarTransaction::new(&mut versioning.version_stats,) - ); - // apply version delta before we persist this change. If it causes panic we can // recover to a correct state after restarting meta-node. current_version.apply_version_delta(&version_delta); @@ -1480,100 +1536,103 @@ impl HummockManager { self.metrics.version_stats.reset(); versioning.local_metrics.clear(); } - if let Some(table_stats_change) = &table_stats_change { - add_prost_table_stats_map(&mut version_stats.table_stats, table_stats_change); - trigger_local_table_stat( - &self.metrics, - &mut versioning.local_metrics, - &version_stats, - table_stats_change, - ); - } - commit_multi_var!( - self.env.meta_store(), - self.sql_meta_store(), - compact_statuses, - compact_task_assignment, - hummock_version_deltas, - version_stats - )?; - branched_ssts.commit_memory(); - - trigger_version_stat(&self.metrics, ¤t_version); - trigger_delta_log_stats(&self.metrics, versioning.hummock_version_deltas.len()); - self.notify_stats(&versioning.version_stats); - versioning.current_version = current_version; - - if !deterministic_mode { - self.notify_last_version_delta(versioning); - } - } else { - // The compaction task is cancelled or failed. - commit_multi_var!( - self.env.meta_store(), - self.sql_meta_store(), - compact_statuses, - compact_task_assignment - )?; + add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change); + trigger_local_table_stat( + &self.metrics, + &mut versioning.local_metrics, + &version_stats, + &task.table_stats_change, + ); } + tasks.push(compact_task); } + if success_count > 0 { + commit_multi_var!( + self.env.meta_store(), + self.sql_meta_store(), + compact_statuses, + compact_task_assignment, + hummock_version_deltas, + version_stats + )?; + branched_ssts.commit_memory(); - let task_status = compact_task.task_status(); - let task_status_label = task_status.as_str_name(); - let task_type_label = compact_task.task_type().as_str_name(); + trigger_version_stat(&self.metrics, ¤t_version); + trigger_delta_log_stats(&self.metrics, versioning.hummock_version_deltas.len()); + self.notify_stats(&versioning.version_stats); + versioning.current_version = current_version; - let label = if is_trivial_reclaim { - "trivial-space-reclaim" - } else if is_trivial_move { - "trivial-move" + if !deterministic_mode { + self.notify_version_deltas(versioning, last_version_id); + } + + self.metrics + .compact_task_batch_count + .with_label_values(&["batch_report_task"]) + .observe(success_count as f64); } else { + // The compaction task is cancelled or failed. + commit_multi_var!( + self.env.meta_store(), + self.sql_meta_store(), + compact_statuses, + compact_task_assignment + )?; + } + let mut success_groups = vec![]; + for compact_task in tasks { + let task_status = compact_task.task_status(); + let task_status_label = task_status.as_str_name(); + let task_type_label = compact_task.task_type().as_str_name(); + self.compactor_manager .remove_task_heartbeat(compact_task.task_id); - "normal" - }; - self.metrics - .compact_frequency - .with_label_values(&[ - label, - &compact_task.compaction_group_id.to_string(), - task_type_label, - task_status_label, - ]) - .inc(); - - tracing::trace!( - "Reported compaction task. {}. cost time: {:?}", - compact_task_to_string(&compact_task), - start_time.elapsed(), - ); + self.metrics + .compact_frequency + .with_label_values(&[ + "normal", + &compact_task.compaction_group_id.to_string(), + task_type_label, + task_status_label, + ]) + .inc(); - trigger_sst_stat( - &self.metrics, - compaction - .compaction_statuses - .get(&compact_task.compaction_group_id), - &read_lock!(self, versioning).await.current_version, - compact_task.compaction_group_id, - ); + tracing::trace!( + "Reported compaction task. {}. cost time: {:?}", + compact_task_to_string(&compact_task), + start_time.elapsed(), + ); - if !deterministic_mode - && (matches!(compact_task.task_type(), compact_task::TaskType::Dynamic) - || matches!(compact_task.task_type(), compact_task::TaskType::Emergency)) - { - // only try send Dynamic compaction - self.try_send_compaction_request( + trigger_sst_stat( + &self.metrics, + compaction + .compaction_statuses + .get(&compact_task.compaction_group_id), + &versioning_guard.current_version, compact_task.compaction_group_id, - compact_task::TaskType::Dynamic, ); - } - if task_status == TaskStatus::Success { - self.try_update_write_limits(&[compact_task.compaction_group_id]) - .await; - } + if !deterministic_mode + && (matches!(compact_task.task_type(), compact_task::TaskType::Dynamic) + || matches!(compact_task.task_type(), compact_task::TaskType::Emergency)) + { + // only try send Dynamic compaction + self.try_send_compaction_request( + compact_task.compaction_group_id, + compact_task::TaskType::Dynamic, + ); + } - Ok(true) + if task_status == TaskStatus::Success { + success_groups.push(compact_task.compaction_group_id); + } + } + drop(versioning_guard); + if !success_groups.is_empty() { + self.try_update_write_limits(&success_groups).await; + } + Ok(rets) } /// Caller should ensure `epoch` > `max_committed_epoch` @@ -1840,6 +1899,21 @@ impl HummockManager { &version_stats, &table_stats_change, ); + for (table_id, stats) in &table_stats_change { + if stats.total_key_size == 0 + && stats.total_value_size == 0 + && stats.total_key_count == 0 + { + continue; + } + let stats_value = std::cmp::max(0, stats.total_key_size + stats.total_value_size); + let table_metrics = get_or_create_local_table_stat( + &self.metrics, + *table_id, + &mut versioning.local_metrics, + ); + table_metrics.inc_write_throughput(stats_value as u64); + } commit_multi_var!( self.env.meta_store(), self.sql_meta_store(), @@ -2249,20 +2323,28 @@ impl HummockManager { task_status: TaskStatus, sorted_output_ssts: Vec, table_stats_change: Option, - ) -> Result { - let mut guard = write_lock!(self, compaction).await; + ) -> Result<()> { + if let Some(task) = compact_task { + let mut guard = write_lock!(self, compaction).await; + guard.compact_task_assignment.insert( + task_id, + CompactTaskAssignment { + compact_task: Some(task), + context_id: 0, + }, + ); + } // In the test, the contents of the compact task may have been modified directly, while the contents of compact_task_assignment were not modified. // So we pass the modified compact_task directly into the `report_compact_task_impl` - self.report_compact_task_impl( + self.report_compact_tasks(vec![ReportTask { task_id, - compact_task, - task_status, + task_status: task_status as i32, sorted_output_ssts, - &mut guard, - table_stats_change, - ) - .await + table_stats_change: table_stats_change.unwrap_or_default(), + }]) + .await?; + Ok(()) } pub fn metadata_manager(&self) -> &MetadataManager { @@ -2285,6 +2367,23 @@ impl HummockManager { ); } + fn notify_version_deltas(&self, versioning: &Versioning, last_version_id: u64) { + let start_version_id = last_version_id + 1; + let version_deltas = versioning + .hummock_version_deltas + .range(start_version_id..) + .map(|(_, delta)| delta.to_protobuf()) + .collect_vec(); + self.env + .notification_manager() + .notify_hummock_without_version( + Operation::Add, + Info::HummockVersionDeltas(risingwave_pb::hummock::HummockVersionDeltas { + version_deltas, + }), + ); + } + fn notify_stats(&self, stats: &HummockVersionStats) { self.env .notification_manager() @@ -2997,8 +3096,6 @@ impl HummockManager { pub async fn auto_pick_compaction_group_and_type( &self, ) -> Option<(CompactionGroupId, compact_task::TaskType)> { - use rand::prelude::SliceRandom; - use rand::thread_rng; let mut compaction_group_ids = self.compaction_group_ids().await; compaction_group_ids.shuffle(&mut thread_rng()); @@ -3011,6 +3108,52 @@ impl HummockManager { None } + /// This method will return all compaction group id in a random order and task type. If there are any group block by `write_limit`, it will return a single array with `TaskType::Emergency`. + /// If these groups get different task-type, it will return all group id with `TaskType::Dynamic` if the first group get `TaskType::Dynamic`, otherwise it will return the single group with other task type. + #[named] + pub async fn auto_pick_compaction_groups_and_type( + &self, + ) -> (Vec, compact_task::TaskType) { + let versioning_guard = read_lock!(self, versioning).await; + let versioning = versioning_guard.deref(); + let mut compaction_group_ids = + get_compaction_group_ids(&versioning.current_version).collect_vec(); + compaction_group_ids.shuffle(&mut thread_rng()); + + let mut normal_groups = vec![]; + for cg_id in compaction_group_ids { + if versioning.write_limit.contains_key(&cg_id) { + let enable_emergency_picker = match self + .compaction_group_manager + .read() + .await + .try_get_compaction_group_config(cg_id) + { + Some(config) => config.compaction_config.enable_emergency_picker, + None => { + unreachable!("compaction-group {} not exist", cg_id) + } + }; + + if enable_emergency_picker { + if normal_groups.is_empty() { + return (vec![cg_id], TaskType::Emergency); + } else { + break; + } + } + } + if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) { + if pick_type == TaskType::Dynamic { + normal_groups.push(cg_id); + } else if normal_groups.is_empty() { + return (vec![cg_id], pick_type); + } + } + } + (normal_groups, TaskType::Dynamic) + } + async fn calculate_table_align_rule( &self, table_write_throughput: &HashMap>, @@ -3207,136 +3350,6 @@ impl HummockManager { Ok(()) } - - /// dedicated event runtime for CPU/IO bound event - #[named] - async fn compact_task_dedicated_event_handler( - hummock_manager: Arc, - mut rx: UnboundedReceiver<(u32, subscribe_compaction_event_request::Event)>, - shutdown_rx_shared: Shared>, - ) { - let mut compaction_selectors = init_selectors(); - - tokio::select! { - _ = shutdown_rx_shared => {} - - _ = async { - while let Some((context_id, event)) = rx.recv().await { - match event { - RequestEvent::PullTask(PullTask { pull_task_count }) => { - assert_ne!(0, pull_task_count); - if let Some(compactor) = - hummock_manager.compactor_manager.get_compactor(context_id) - { - if let Some((group, task_type)) = - hummock_manager.auto_pick_compaction_group_and_type().await - { - let selector: &mut Box = { - let versioning_guard = - read_lock!(hummock_manager, versioning).await; - let versioning = versioning_guard.deref(); - - if versioning.write_limit.contains_key(&group) { - let enable_emergency_picker = match hummock_manager - .compaction_group_manager - .read() - .await - .try_get_compaction_group_config(group) - { - Some(config) => { - config.compaction_config.enable_emergency_picker - } - None => { - unreachable!("compaction-group {} not exist", group) - } - }; - - if enable_emergency_picker { - compaction_selectors - .get_mut(&TaskType::Emergency) - .unwrap() - } else { - compaction_selectors.get_mut(&task_type).unwrap() - } - } else { - compaction_selectors.get_mut(&task_type).unwrap() - } - }; - for _ in 0..pull_task_count { - let compact_task = - hummock_manager.get_compact_task(group, selector).await; - - match compact_task { - Ok(Some(compact_task)) => { - let task_id = compact_task.task_id; - if let Err(e) = compactor.send_event( - ResponseEvent::CompactTask(compact_task), - ) { - tracing::warn!( - error = %e.as_report(), - "Failed to send task {} to {}", - task_id, - compactor.context_id(), - ); - - hummock_manager.compactor_manager.remove_compactor(context_id); - break; - } - } - Ok(None) => { - // no compact_task to be picked - hummock_manager - .compaction_state - .unschedule(group, task_type); - break; - } - Err(err) => { - tracing::warn!(error = %err.as_report(), "Failed to get compaction task"); - break; - } - }; - } - } - - // ack to compactor - if let Err(e) = - compactor.send_event(ResponseEvent::PullTaskAck(PullTaskAck {})) - { - tracing::warn!( - error = %e.as_report(), - "Failed to send ask to {}", - context_id, - ); - hummock_manager.compactor_manager.remove_compactor(context_id); - } - } - } - - RequestEvent::ReportTask(ReportTask { - task_id, - task_status, - sorted_output_ssts, - table_stats_change, - }) => { - if let Err(e) = hummock_manager - .report_compact_task( - task_id, - TaskStatus::try_from(task_status).unwrap(), - sorted_output_ssts, - Some(table_stats_change), - ) - .await - { - tracing::error!(error = %e.as_report(), "report compact_tack fail") - } - } - - _ => unreachable!(), - } - } - } => {} - } - } } // This structure describes how hummock handles sst switching in a compaction group. A better sst cut will result in better data alignment, which in turn will improve the efficiency of the compaction. diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index ca0c3ab8b0ab2..3481bdb514e30 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -43,7 +43,6 @@ use crate::hummock::compaction::selector::{ default_compaction_selector, CompactionSelector, ManualCompactionOption, SpaceReclaimCompactionSelector, }; -use crate::hummock::compaction::CompactStatus; use crate::hummock::error::Error; use crate::hummock::test_utils::*; use crate::hummock::{CommitEpochInfo, HummockManager, HummockManagerRef}; @@ -1695,11 +1694,12 @@ async fn test_split_compaction_group_trivial_expired() { .unwrap(); let mut selector: Box = Box::::default(); - let reclaim_task = hummock_manager - .get_compact_task_impl(2, &mut selector) + let (mut normal_tasks, _unscheduled) = hummock_manager + .get_compact_tasks_impl(vec![2], 1, &mut selector) .await - .unwrap() .unwrap(); + use crate::hummock::manager::CompactStatus; + let reclaim_task = normal_tasks.pop().unwrap(); assert!(CompactStatus::is_trivial_reclaim(&reclaim_task)); let current_version = hummock_manager.get_current_version().await; diff --git a/src/meta/src/hummock/metrics_utils.rs b/src/meta/src/hummock/metrics_utils.rs index 0ede18e7c559a..be25c5cf452b2 100644 --- a/src/meta/src/hummock/metrics_utils.rs +++ b/src/meta/src/hummock/metrics_utils.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use itertools::{enumerate, Itertools}; +use prometheus::core::{AtomicU64, GenericCounter}; use prometheus::IntGauge; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ object_size_map, BranchedSstInfo, @@ -43,6 +44,50 @@ pub struct LocalTableMetrics { total_key_count: IntGauge, total_key_size: IntGauge, total_value_size: IntGauge, + write_throughput: GenericCounter, + cal_count: usize, + write_size: u64, +} + +const MIN_FLUSH_COUNT: usize = 16; +const MIN_FLUSH_DATA_SIZE: u64 = 128 * 1024 * 1024; + +impl LocalTableMetrics { + pub fn inc_write_throughput(&mut self, val: u64) { + self.write_size += val; + self.cal_count += 1; + if self.write_size > MIN_FLUSH_DATA_SIZE || self.cal_count > MIN_FLUSH_COUNT { + self.write_throughput.inc_by(self.write_size / 1024 / 1024); + self.write_size = 0; + self.cal_count = 0; + } + } +} + +pub fn get_or_create_local_table_stat<'a>( + metrics: &MetaMetrics, + table_id: u32, + local_metrics: &'a mut HashMap, +) -> &'a mut LocalTableMetrics { + local_metrics.entry(table_id).or_insert_with(|| { + let table_label = format!("{}", table_id); + LocalTableMetrics { + total_key_count: metrics + .version_stats + .with_label_values(&[&table_label, "total_key_count"]), + total_key_size: metrics + .version_stats + .with_label_values(&[&table_label, "total_key_size"]), + total_value_size: metrics + .version_stats + .with_label_values(&[&table_label, "total_value_size"]), + write_throughput: metrics + .table_write_throughput + .with_label_values(&[&table_label]), + cal_count: 0, + write_size: 0, + } + }) } pub fn trigger_local_table_stat( @@ -55,20 +100,7 @@ pub fn trigger_local_table_stat( if stats.total_key_size == 0 && stats.total_value_size == 0 && stats.total_key_count == 0 { continue; } - let table_metrics = local_metrics.entry(*table_id).or_insert_with(|| { - let table_label = format!("{}", table_id); - LocalTableMetrics { - total_key_count: metrics - .version_stats - .with_label_values(&[&table_label, "total_key_count"]), - total_key_size: metrics - .version_stats - .with_label_values(&[&table_label, "total_key_size"]), - total_value_size: metrics - .version_stats - .with_label_values(&[&table_label, "total_value_size"]), - } - }); + let table_metrics = get_or_create_local_table_stat(metrics, *table_id, local_metrics); if let Some(table_stats) = version_stats.table_stats.get(table_id) { table_metrics .total_key_count diff --git a/src/meta/src/hummock/test_utils.rs b/src/meta/src/hummock/test_utils.rs index fdc313c622452..23898f6965ca1 100644 --- a/src/meta/src/hummock/test_utils.rs +++ b/src/meta/src/hummock/test_utils.rs @@ -118,7 +118,7 @@ pub async fn add_test_tables( assert_eq!(compactor.context_id(), context_id); } - let ret = hummock_manager + hummock_manager .report_compact_task_for_test( compact_task.task_id, Some(compact_task), @@ -128,7 +128,6 @@ pub async fn add_test_tables( ) .await .unwrap(); - assert!(ret); if temp_compactor { hummock_manager .compactor_manager_ref_for_test() diff --git a/src/meta/src/rpc/metrics.rs b/src/meta/src/rpc/metrics.rs index a41f230184c8b..cfe3c1f880773 100644 --- a/src/meta/src/rpc/metrics.rs +++ b/src/meta/src/rpc/metrics.rs @@ -152,6 +152,7 @@ pub struct MetaMetrics { pub l0_compact_level_count: HistogramVec, pub compact_task_size: HistogramVec, pub compact_task_file_count: HistogramVec, + pub compact_task_batch_count: HistogramVec, pub move_state_table_count: IntCounterVec, pub state_table_count: IntGaugeVec, pub branched_sst_count: IntGaugeVec, @@ -579,6 +580,14 @@ impl MetaMetrics { registry ) .unwrap(); + let opts = histogram_opts!( + "storage_compact_task_batch_count", + "count of compact task batch", + exponential_buckets(1.0, 2.0, 8).unwrap() + ); + let compact_task_batch_count = + register_histogram_vec_with_registry!(opts, &["type"], registry).unwrap(); + let table_write_throughput = register_int_counter_vec_with_registry!( "storage_commit_write_throughput", "The number of compactions from one level to another level that have been skipped.", @@ -685,6 +694,7 @@ impl MetaMetrics { l0_compact_level_count, compact_task_size, compact_task_file_count, + compact_task_batch_count, table_write_throughput, move_state_table_count, state_table_count, diff --git a/src/storage/hummock_sdk/src/compact.rs b/src/storage/hummock_sdk/src/compact.rs index 33d895d2c4fcd..e9671df2d0637 100644 --- a/src/storage/hummock_sdk/src/compact.rs +++ b/src/storage/hummock_sdk/src/compact.rs @@ -14,6 +14,15 @@ use risingwave_pb::hummock::{CompactTask, LevelType, SstableInfo}; +pub fn compact_task_output_to_string(compact_task: &CompactTask) -> String { + let mut s = String::default(); + s.push_str("Compaction task output: \n"); + for sst in &compact_task.sorted_output_ssts { + append_sstable_info_to_string(&mut s, sst); + } + s +} + pub fn compact_task_to_string(compact_task: &CompactTask) -> String { use std::fmt::Write; @@ -80,10 +89,7 @@ pub fn compact_task_to_string(compact_task: &CompactTask) -> String { .collect(); writeln!(s, "Level {:?} {:?} ", level_entry.level_idx, tables).unwrap(); } - s.push_str("Compaction task output: \n"); - for sst in &compact_task.sorted_output_ssts { - append_sstable_info_to_string(&mut s, sst); - } + s.push_str(&compact_task_output_to_string(compact_task)); s } diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index a229ef6e15f93..775e1ef8cf86d 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -27,7 +27,9 @@ use risingwave_hummock_sdk::compact::{ use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker}; use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon}; use risingwave_hummock_sdk::table_stats::{add_table_stats_map, TableStats, TableStatsMap}; -use risingwave_hummock_sdk::{can_concat, HummockSstableObjectId, KeyComparator}; +use risingwave_hummock_sdk::{ + can_concat, compact_task_output_to_string, HummockSstableObjectId, KeyComparator, +}; use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType}; use risingwave_pb::hummock::{BloomFilterType, CompactTask, LevelType, SstableInfo}; use thiserror_ext::AsReport; @@ -520,13 +522,10 @@ pub async fn compact( ) * compact_task.splits.len() as u64; tracing::info!( - "Ready to handle compaction group {} task: {} compact_task_statistics {:?} target_level {} compression_algorithm {:?} table_ids {:?} parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {} input: {:?}", - compact_task.compaction_group_id, + "Ready to handle task: {} compact_task_statistics {:?} compression_algorithm {:?} parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}, {}", compact_task.task_id, compact_task_statistics, - compact_task.target_level, compact_task.compression_algorithm, - compact_task.existing_table_ids, parallelism, task_memory_capacity_with_parallelism, optimize_by_copy_block, @@ -699,7 +698,7 @@ pub async fn compact( tracing::info!( "Finished compaction task in {:?}ms: {}", cost_time, - compact_task_to_string(&compact_task) + compact_task_output_to_string(&compact_task) ); ((compact_task, table_stats), memory_detector) }