From 1331a6656fcbd6afc32f4811af85f3d3500b2c3b Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Thu, 7 Mar 2024 18:44:22 +0800 Subject: [PATCH 01/12] batch get Signed-off-by: Little-Wallace --- src/meta/src/hummock/manager/compaction.rs | 157 +- .../manager/compaction_group_manager.rs | 30 +- src/meta/src/hummock/manager/gc.rs | 5 +- src/meta/src/hummock/manager/mod.rs | 1274 ++++++++--------- src/meta/src/hummock/manager/tests.rs | 6 +- src/meta/src/hummock/test_utils.rs | 3 +- src/meta/src/rpc/metrics.rs | 10 + .../src/hummock/compactor/compactor_runner.rs | 3 +- 8 files changed, 809 insertions(+), 679 deletions(-) diff --git a/src/meta/src/hummock/manager/compaction.rs b/src/meta/src/hummock/manager/compaction.rs index b414f8afa85e0..fe4e7c73467e6 100644 --- a/src/meta/src/hummock/manager/compaction.rs +++ b/src/meta/src/hummock/manager/compaction.rs @@ -12,20 +12,34 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use function_name::named; +use futures::future::Shared; use itertools::Itertools; use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId}; +use risingwave_pb::hummock::compact_task::TaskType; +use risingwave_pb::hummock::subscribe_compaction_event_request::{ + self, Event as RequestEvent, PullTask, +}; +use risingwave_pb::hummock::subscribe_compaction_event_response::{ + Event as ResponseEvent, PullTaskAck, +}; use risingwave_pb::hummock::{CompactStatus as PbCompactStatus, CompactTaskAssignment}; +use thiserror_ext::AsReport; +use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::oneshot::Receiver as OneShotReceiver; use crate::hummock::compaction::selector::level_selector::PickerInfo; use crate::hummock::compaction::selector::DynamicLevelSelectorCore; -use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig}; -use crate::hummock::manager::read_lock; +use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector}; +use crate::hummock::manager::{init_selectors, read_lock}; use crate::hummock::HummockManager; +const MAX_SKIP_TIMES: usize = 8; +const MAX_REPORT_COUNT: usize = 16; + #[derive(Default)] pub struct Compaction { /// Compaction task that is already assigned to a compactor @@ -102,4 +116,141 @@ impl HummockManager { let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers); ctx.score_levels } + + pub async fn handle_pull_task_event( + &self, + context_id: u32, + pull_task_count: usize, + compaction_selectors: &mut HashMap>, + ) { + assert_ne!(0, pull_task_count); + if let Some(compactor) = self.compactor_manager.get_compactor(context_id) { + let (groups, task_type) = self.auto_pick_compaction_groups_and_type().await; + if !groups.is_empty() { + let selector: &mut Box = + compaction_selectors.get_mut(&task_type).unwrap(); + + let mut generated_task_count = 0; + let mut existed_groups = groups.clone(); + let mut meet_error = false; + + while generated_task_count < pull_task_count && !meet_error { + let compact_ret = self + .get_compact_tasks( + std::mem::take(&mut existed_groups), + pull_task_count - generated_task_count, + selector, + ) + .await; + + match compact_ret { + Ok((compact_tasks, trivial_tasks)) => { + if compact_tasks.is_empty() { + break; + } + generated_task_count += compact_tasks.len(); + for task in trivial_tasks { + existed_groups.push(task.compaction_group_id); + } + for task in compact_tasks { + let task_id = task.task_id; + existed_groups.push(task.compaction_group_id); + if let Err(e) = + compactor.send_event(ResponseEvent::CompactTask(task)) + { + tracing::warn!( + error = %e.as_report(), + "Failed to send task {} to {}", + task_id, + compactor.context_id(), + ); + self.compactor_manager.remove_compactor(context_id); + meet_error = true; + break; + } + } + } + Err(err) => { + tracing::warn!(error = %err.as_report(), "Failed to get compaction task"); + break; + } + }; + } + if generated_task_count < pull_task_count && !meet_error { + // no compact_task to be picked + for group in groups { + self.compaction_state.unschedule(group, task_type); + } + } + } + + // ack to compactor + if let Err(e) = compactor.send_event(ResponseEvent::PullTaskAck(PullTaskAck {})) { + tracing::warn!( + error = %e.as_report(), + "Failed to send ask to {}", + context_id, + ); + self.compactor_manager.remove_compactor(context_id); + } + } + } + + /// dedicated event runtime for CPU/IO bound event + pub async fn compact_task_dedicated_event_handler( + hummock_manager: Arc, + mut rx: UnboundedReceiver<(u32, subscribe_compaction_event_request::Event)>, + shutdown_rx_shared: Shared>, + ) { + let mut compaction_selectors = init_selectors(); + + tokio::select! { + _ = shutdown_rx_shared => {} + + _ = async { + while let Some((context_id, event)) = rx.recv().await { + let mut report_events = vec![]; + let mut skip_times = 0; + match event { + RequestEvent::PullTask(PullTask { pull_task_count }) => { + hummock_manager.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors).await; + } + + RequestEvent::ReportTask(task) => { + report_events.push(task); + } + + _ => unreachable!(), + } + while let Ok((context_id, event)) = rx.try_recv() { + match event { + RequestEvent::PullTask(PullTask { pull_task_count }) => { + hummock_manager.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors).await; + if !report_events.is_empty() { + if skip_times > MAX_SKIP_TIMES { + break; + } + skip_times += 1; + } + } + + RequestEvent::ReportTask(task) => { + report_events.push(task); + if report_events.len() >= MAX_REPORT_COUNT { + break; + } + } + _ => unreachable!(), + } + } + if !report_events.is_empty() { + if let Err(e) = hummock_manager.report_compact_tasks(report_events).await + { + tracing::error!(error = %e.as_report(), "report compact_tack fail") + } + } + } + } => {} + } + } } diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 5665ef5bc9973..98d77aecad838 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -30,13 +30,13 @@ use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_pb::hummock::group_delta::DeltaType; use risingwave_pb::hummock::hummock_version_delta::GroupDeltas; use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig; +use risingwave_pb::hummock::subscribe_compaction_event_request::ReportTask; use risingwave_pb::hummock::{ compact_task, CompactionConfig, CompactionGroupInfo, CompatibilityVersion, GroupConstruct, GroupDelta, GroupDestroy, GroupMetaChange, GroupTableChange, }; use thiserror_ext::AsReport; use tokio::sync::{OnceCell, RwLock}; -use tracing::warn; use super::write_lock; use crate::controller::SqlMetaStore; @@ -485,7 +485,7 @@ impl HummockManager { return Ok((parent_group_id, table_to_partition)); } let table_ids = table_ids.iter().cloned().unique().collect_vec(); - let mut compaction_guard = write_lock!(self, compaction).await; + let compaction_guard = write_lock!(self, compaction).await; let mut versioning_guard = write_lock!(self, versioning).await; let versioning = versioning_guard.deref_mut(); let current_version = &versioning.current_version; @@ -691,27 +691,19 @@ impl HummockManager { } } if need_cancel { - canceled_tasks.push(task.clone()); + canceled_tasks.push(ReportTask { + task_id: task.task_id, + task_status: TaskStatus::ManualCanceled as i32, + table_stats_change: HashMap::default(), + sorted_output_ssts: vec![], + }); } } } - for task in canceled_tasks { - if !self - .report_compact_task_impl( - task.task_id, - None, - TaskStatus::ManualCanceled, - vec![], - &mut compaction_guard, - None, - ) - .await - .unwrap_or(false) - { - warn!("failed to cancel task-{}", task.task_id); - } - } + drop(compaction_guard); + self.report_compact_tasks(canceled_tasks).await?; + // Don't trigger compactions if we enable deterministic compaction if !self.env.opts.compaction_deterministic_test { // commit_epoch may contains SSTs from any compaction group diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index a9b4b04c35f49..3c68856dc6788 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -24,12 +24,11 @@ use itertools::Itertools; use risingwave_hummock_sdk::HummockSstableObjectId; use risingwave_pb::common::worker_node::State::Running; use risingwave_pb::common::WorkerType; +use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent; use risingwave_pb::hummock::FullScanTask; use crate::hummock::error::{Error, Result}; -use crate::hummock::manager::{ - commit_multi_var, create_trx_wrapper, read_lock, write_lock, ResponseEvent, -}; +use crate::hummock::manager::{commit_multi_var, create_trx_wrapper, read_lock, write_lock}; use crate::hummock::HummockManager; use crate::manager::MetadataManager; use crate::model::{BTreeMapTransaction, BTreeMapTransactionWrapper, ValTransaction}; diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index b33365868e681..79e920bd774c1 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -24,7 +24,7 @@ use arc_swap::ArcSwap; use bytes::Bytes; use fail::fail_point; use function_name::named; -use futures::future::{Either, Shared}; +use futures::future::Either; use futures::stream::{BoxStream, FuturesUnordered}; use futures::{FutureExt, StreamExt}; use itertools::Itertools; @@ -54,11 +54,9 @@ use risingwave_pb::hummock::compact_task::{self, TaskStatus, TaskType}; use risingwave_pb::hummock::group_delta::DeltaType; use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config; use risingwave_pb::hummock::subscribe_compaction_event_request::{ - self, Event as RequestEvent, HeartBeat, PullTask, ReportTask, -}; -use risingwave_pb::hummock::subscribe_compaction_event_response::{ - Event as ResponseEvent, PullTaskAck, + Event as RequestEvent, HeartBeat, ReportTask, }; +use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent; use risingwave_pb::hummock::{ CompactTask, CompactTaskAssignment, CompactionConfig, GroupDelta, HummockPinnedSnapshot, HummockPinnedVersion, HummockSnapshot, HummockVersionStats, IntraLevelDelta, @@ -68,7 +66,7 @@ use risingwave_pb::meta::subscribe_response::{Info, Operation}; use rw_futures_util::{pending_on_none, select_all}; use thiserror_ext::AsReport; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use tokio::sync::oneshot::{Receiver as OneShotReceiver, Sender}; +use tokio::sync::oneshot::Sender; use tokio::sync::RwLockWriteGuard; use tokio::task::JoinHandle; use tokio_stream::wrappers::IntervalStream; @@ -852,312 +850,6 @@ impl HummockManager { Ok(()) } - #[named] - pub async fn get_compact_task_impl( - &self, - compaction_group_id: CompactionGroupId, - selector: &mut Box, - ) -> Result> { - // TODO: `get_all_table_options` will hold catalog_manager async lock, to avoid holding the - // lock in compaction_guard, take out all table_options in advance there may be a - // waste of resources here, need to add a more efficient filter in catalog_manager - let all_table_id_to_option = self - .metadata_manager - .get_all_table_options() - .await - .map_err(|err| Error::MetaStore(err.into()))?; - let mut table_to_vnode_partition = match self - .group_to_table_vnode_partition - .read() - .get(&compaction_group_id) - { - Some(table_to_vnode_partition) => table_to_vnode_partition.clone(), - None => BTreeMap::default(), - }; - - let mut compaction_guard = write_lock!(self, compaction).await; - let compaction = compaction_guard.deref_mut(); - let compaction_statuses = &mut compaction.compaction_statuses; - - let start_time = Instant::now(); - // StoredIdGenerator already implements ids pre-allocation by ID_PREALLOCATE_INTERVAL. - let task_id = next_compaction_task_id(&self.env).await?; - - // When the last table of a compaction group is deleted, the compaction group (and its - // config) is destroyed as well. Then a compaction task for this group may come later and - // cannot find its config. - let group_config = match self - .compaction_group_manager - .read() - .await - .try_get_compaction_group_config(compaction_group_id) - { - Some(config) => config, - None => return Ok(None), - }; - self.precheck_compaction_group( - compaction_group_id, - compaction_statuses, - &group_config.compaction_config, - ) - .await?; - - let mut compact_status = match compaction.compaction_statuses.get_mut(&compaction_group_id) - { - Some(c) => create_trx_wrapper!( - self.sql_meta_store(), - VarTransactionWrapper, - VarTransaction::new(c) - ), - None => { - return Ok(None); - } - }; - let (current_version, watermark) = { - let versioning_guard = read_lock!(self, versioning).await; - let max_committed_epoch = versioning_guard.current_version.max_committed_epoch; - let watermark = versioning_guard - .pinned_snapshots - .values() - .map(|v| v.minimal_pinned_snapshot) - .fold(max_committed_epoch, std::cmp::min); - - (versioning_guard.current_version.clone(), watermark) - }; - if current_version.levels.get(&compaction_group_id).is_none() { - // compaction group has been deleted. - return Ok(None); - } - - let can_trivial_move = matches!(selector.task_type(), compact_task::TaskType::Dynamic) - || matches!(selector.task_type(), compact_task::TaskType::Emergency); - - let mut stats = LocalSelectorStatistic::default(); - let member_table_ids = ¤t_version - .get_compaction_group_levels(compaction_group_id) - .member_table_ids; - let table_id_to_option: HashMap = all_table_id_to_option - .into_iter() - .filter(|(table_id, _)| member_table_ids.contains(table_id)) - .collect(); - - let compact_task = compact_status.get_compact_task( - current_version.get_compaction_group_levels(compaction_group_id), - task_id as HummockCompactionTaskId, - &group_config, - &mut stats, - selector, - table_id_to_option.clone(), - Arc::new(CompactionDeveloperConfig::new_from_meta_opts( - &self.env.opts, - )), - ); - stats.report_to_metrics(compaction_group_id, self.metrics.as_ref()); - let compact_task = match compact_task { - None => { - return Ok(None); - } - Some(task) => task, - }; - - let target_level_id = compact_task.input.target_level as u32; - - let compression_algorithm = match compact_task.compression_algorithm.as_str() { - "Lz4" => 1, - "Zstd" => 2, - _ => 0, - }; - let vnode_partition_count = compact_task.input.vnode_partition_count; - use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; - - let mut compact_task = CompactTask { - input_ssts: compact_task.input.input_levels, - splits: vec![risingwave_pb::hummock::KeyRange::inf()], - watermark, - sorted_output_ssts: vec![], - task_id, - target_level: target_level_id, - // only gc delete keys in last level because there may be older version in more bottom - // level. - gc_delete_keys: current_version - .get_compaction_group_levels(compaction_group_id) - .is_last_level(target_level_id), - base_level: compact_task.base_level as u32, - task_status: TaskStatus::Pending as i32, - compaction_group_id: group_config.group_id, - existing_table_ids: member_table_ids.clone(), - compression_algorithm, - target_file_size: compact_task.target_file_size, - table_options: table_id_to_option - .into_iter() - .filter_map(|(table_id, table_option)| { - if member_table_ids.contains(&table_id) { - return Some((table_id, TableOption::from(&table_option))); - } - - None - }) - .collect(), - current_epoch_time: Epoch::now().0, - compaction_filter_mask: group_config.compaction_config.compaction_filter_mask, - target_sub_level_id: compact_task.input.target_sub_level_id, - task_type: compact_task.compaction_task_type as i32, - split_weight_by_vnode: compact_task.input.vnode_partition_count, - ..Default::default() - }; - - let is_trivial_reclaim = CompactStatus::is_trivial_reclaim(&compact_task); - let is_trivial_move = CompactStatus::is_trivial_move_task(&compact_task); - - if is_trivial_reclaim { - compact_task.set_task_status(TaskStatus::Success); - self.report_compact_task_impl( - task_id, - Some(compact_task.clone()), - TaskStatus::Success, - vec![], - &mut compaction_guard, - None, - ) - .await?; - tracing::debug!( - "TrivialReclaim for compaction group {}: remove {} sstables, cost time: {:?}", - compaction_group_id, - compact_task - .input_ssts - .iter() - .map(|level| level.table_infos.len()) - .sum::(), - start_time.elapsed() - ); - } else if is_trivial_move && can_trivial_move { - // this task has been finished and `trivial_move_task` does not need to be schedule. - compact_task.set_task_status(TaskStatus::Success); - compact_task.sorted_output_ssts = compact_task.input_ssts[0].table_infos.clone(); - self.report_compact_task_impl( - task_id, - Some(compact_task.clone()), - TaskStatus::Success, - compact_task.input_ssts[0].table_infos.clone(), - &mut compaction_guard, - None, - ) - .await?; - - tracing::debug!( - "TrivialMove for compaction group {}: pick up {} sstables in level {} to compact to target_level {} cost time: {:?} input {:?}", - compaction_group_id, - compact_task.input_ssts[0].table_infos.len(), - compact_task.input_ssts[0].level_idx, - compact_task.target_level, - start_time.elapsed(), - compact_task.input_ssts - ); - } else { - table_to_vnode_partition - .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id)); - if group_config.compaction_config.split_weight_by_vnode > 0 { - for table_id in &compact_task.existing_table_ids { - table_to_vnode_partition - .entry(*table_id) - .or_insert(vnode_partition_count); - } - } - - compact_task.table_vnode_partition = table_to_vnode_partition; - compact_task.table_watermarks = - current_version.safe_epoch_table_watermarks(&compact_task.existing_table_ids); - - let mut compact_task_assignment = create_trx_wrapper!( - self.sql_meta_store(), - BTreeMapTransactionWrapper, - BTreeMapTransaction::new(&mut compaction.compact_task_assignment,) - ); - compact_task_assignment.insert( - compact_task.task_id, - CompactTaskAssignment { - compact_task: Some(compact_task.clone()), - context_id: META_NODE_ID, // deprecated - }, - ); - - // We are using a single transaction to ensure that each task has progress when it is - // created. - commit_multi_var!( - self.env.meta_store(), - self.sql_meta_store(), - compact_status, - compact_task_assignment - )?; - - // Initiate heartbeat for the task to track its progress. - self.compactor_manager - .initiate_task_heartbeat(compact_task.clone()); - - // this task has been finished. - compact_task.set_task_status(TaskStatus::Pending); - - trigger_sst_stat( - &self.metrics, - compaction.compaction_statuses.get(&compaction_group_id), - ¤t_version, - compaction_group_id, - ); - - let compact_task_statistics = statistics_compact_task(&compact_task); - - let level_type_label = build_compact_task_level_type_metrics_label( - compact_task.input_ssts[0].level_idx as usize, - compact_task.input_ssts.last().unwrap().level_idx as usize, - ); - - let level_count = compact_task.input_ssts.len(); - if compact_task.input_ssts[0].level_idx == 0 { - self.metrics - .l0_compact_level_count - .with_label_values(&[&compaction_group_id.to_string(), &level_type_label]) - .observe(level_count as _); - } - - self.metrics - .compact_task_size - .with_label_values(&[&compaction_group_id.to_string(), &level_type_label]) - .observe(compact_task_statistics.total_file_size as _); - - self.metrics - .compact_task_size - .with_label_values(&[ - &compaction_group_id.to_string(), - &format!("{} uncompressed", level_type_label), - ]) - .observe(compact_task_statistics.total_uncompressed_file_size as _); - - self.metrics - .compact_task_file_count - .with_label_values(&[&compaction_group_id.to_string(), &level_type_label]) - .observe(compact_task_statistics.total_file_count as _); - - tracing::trace!( - "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}", - compaction_group_id, - level_count, - compact_task.input_ssts[0].level_type().as_str_name(), - compact_task.input_ssts[0].level_idx, - compact_task.target_level, - start_time.elapsed(), - compact_task_statistics - ); - } - - #[cfg(test)] - { - drop(compaction_guard); - self.check_state_consistency().await; - } - - Ok(Some(compact_task)) - } - /// Cancels a compaction task no matter it's assigned or unassigned. pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result { fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore( @@ -1166,57 +858,54 @@ impl HummockManager { self.cancel_compact_task_impl(task_id, task_status).await } - #[named] pub async fn cancel_compact_task_impl( &self, task_id: u64, task_status: TaskStatus, ) -> Result { assert!(CANCEL_STATUS_SET.contains(&task_status)); - let mut compaction_guard = write_lock!(self, compaction).await; - let ret = self - .report_compact_task_impl( + let rets = self + .report_compact_tasks(vec![ReportTask { task_id, - None, - task_status, - vec![], - &mut compaction_guard, - None, - ) + task_status: task_status as i32, + sorted_output_ssts: vec![], + table_stats_change: HashMap::default(), + }]) .await?; #[cfg(test)] { - drop(compaction_guard); self.check_state_consistency().await; } - Ok(ret) + Ok(rets[0]) } - // need mutex protect - async fn precheck_compaction_group( + pub async fn get_compact_tasks( &self, - compaction_group_id: CompactionGroupId, - compaction_statuses: &mut BTreeMap, - compaction_config: &CompactionConfig, - ) -> Result<()> { - if !compaction_statuses.contains_key(&compaction_group_id) { - let mut compact_statuses = create_trx_wrapper!( - self.sql_meta_store(), - BTreeMapTransactionWrapper, - BTreeMapTransaction::new(compaction_statuses) - ); - let new_compact_status = compact_statuses.new_entry_insert_txn( - compaction_group_id, - CompactStatus::new(compaction_group_id, compaction_config.max_level), - ); - commit_multi_var!( - self.env.meta_store(), - self.sql_meta_store(), - new_compact_status - )?; + mut compaction_groups: Vec, + max_task_count: usize, + selector: &mut Box, + ) -> Result<(Vec, Vec)> { + fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore( + anyhow::anyhow!("failpoint metastore error") + ))); + loop { + let (normal_tasks, trivial_tasks) = self + .get_compact_tasks_impl( + std::mem::take(&mut compaction_groups), + max_task_count, + selector, + ) + .await?; + if !normal_tasks.is_empty() { + return Ok((normal_tasks, trivial_tasks)); + } else if trivial_tasks.is_empty() { + return Ok((vec![], vec![])); + } + // only select groups which could generate more tasks. + for t in &trivial_tasks { + compaction_groups.push(t.compaction_group_id); + } } - - Ok(()) } pub async fn get_compact_task( @@ -1228,20 +917,16 @@ impl HummockManager { anyhow::anyhow!("failpoint metastore error") ))); - while let Some(task) = self - .get_compact_task_impl(compaction_group_id, selector) - .await? - { - if let TaskStatus::Pending = task.task_status() { - return Ok(Some(task)); + loop { + let (mut normal_tasks, trivial_tasks) = self + .get_compact_tasks_impl(vec![compaction_group_id], 1, selector) + .await?; + if !normal_tasks.is_empty() { + return Ok(normal_tasks.pop()); + } else if trivial_tasks.is_empty() { + return Ok(None); } - assert!( - CompactStatus::is_trivial_move_task(&task) - || CompactStatus::is_trivial_reclaim(&task) - ); } - - Ok(None) } pub async fn manual_get_compact_task( @@ -1274,7 +959,6 @@ impl HummockManager { false } - #[named] pub async fn report_compact_task( &self, task_id: u64, @@ -1282,16 +966,15 @@ impl HummockManager { sorted_output_ssts: Vec, table_stats_change: Option, ) -> Result { - let mut guard = write_lock!(self, compaction).await; - self.report_compact_task_impl( - task_id, - None, - task_status, - sorted_output_ssts, - &mut guard, - table_stats_change, - ) - .await + let rets = self + .report_compact_tasks(vec![ReportTask { + task_id, + task_status: task_status as i32, + sorted_output_ssts, + table_stats_change: table_stats_change.unwrap_or_default(), + }]) + .await?; + Ok(rets[0]) } /// Finishes or cancels a compaction task, according to `task_status`. @@ -1302,17 +985,10 @@ impl HummockManager { /// Return Ok(false) indicates either the task is not found, /// or the task is not owned by `context_id` when `context_id` is not None. #[named] - pub async fn report_compact_task_impl( - &self, - task_id: u64, - trivial_move_compact_task: Option, - task_status: TaskStatus, - sorted_output_ssts: Vec, - compaction_guard: &mut RwLockWriteGuard<'_, Compaction>, - table_stats_change: Option, - ) -> Result { + pub async fn report_compact_tasks(&self, report_tasks: Vec) -> Result> { + let mut guard = write_lock!(self, compaction).await; let deterministic_mode = self.env.opts.compaction_deterministic_test; - let compaction = compaction_guard.deref_mut(); + let compaction = guard.deref_mut(); let start_time = Instant::now(); let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec(); let mut compact_statuses = create_trx_wrapper!( @@ -1320,43 +996,57 @@ impl HummockManager { BTreeMapTransactionWrapper, BTreeMapTransaction::new(&mut compaction.compaction_statuses,) ); + let mut rets = vec![false; report_tasks.len()]; let mut compact_task_assignment = create_trx_wrapper!( self.sql_meta_store(), BTreeMapTransactionWrapper, BTreeMapTransaction::new(&mut compaction.compact_task_assignment,) ); - // remove task_assignment - let mut compact_task = if let Some(input_task) = trivial_move_compact_task { - input_task - } else { - match compact_task_assignment.remove(task_id) { - Some(compact_task) => compact_task.compact_task.unwrap(), - None => { - tracing::warn!("{}", format!("compact task {} not found", task_id)); - return Ok(false); - } + // The compaction task is finished. + let mut versioning_guard = write_lock!(self, versioning).await; + let versioning = versioning_guard.deref_mut(); + let mut current_version = versioning.current_version.clone(); + // purge stale compact_status + for group_id in original_keys { + if !current_version.levels.contains_key(&group_id) { + compact_statuses.remove(group_id); } - }; - - { - // apply result - compact_task.set_task_status(task_status); - compact_task.sorted_output_ssts = sorted_output_ssts; } + let mut tasks = vec![]; - let is_trivial_reclaim = CompactStatus::is_trivial_reclaim(&compact_task); - let is_trivial_move = CompactStatus::is_trivial_move_task(&compact_task); + let mut hummock_version_deltas = create_trx_wrapper!( + self.sql_meta_store(), + BTreeMapTransactionWrapper, + BTreeMapTransaction::new(&mut versioning.hummock_version_deltas,) + ); + let mut branched_ssts = create_trx_wrapper!( + self.sql_meta_store(), + BTreeMapTransactionWrapper, + BTreeMapTransaction::new(&mut versioning.branched_ssts) + ); - { - // The compaction task is finished. - let mut versioning_guard = write_lock!(self, versioning).await; - let versioning = versioning_guard.deref_mut(); - let mut current_version = versioning.current_version.clone(); - // purge stale compact_status - for group_id in original_keys { - if !current_version.levels.contains_key(&group_id) { - compact_statuses.remove(group_id); + let mut version_stats = create_trx_wrapper!( + self.sql_meta_store(), + VarTransactionWrapper, + VarTransaction::new(&mut versioning.version_stats) + ); + let mut success_count = 0; + let last_version_id = current_version.id; + for (idx, task) in report_tasks.into_iter().enumerate() { + rets[idx] = true; + let mut compact_task = match compact_task_assignment.remove(task.task_id) { + Some(compact_task) => compact_task.compact_task.unwrap(), + None => { + tracing::warn!("{}", format!("compact task {} not found", task.task_id)); + rets[idx] = false; + continue; } + }; + + { + // apply result + compact_task.task_status = task.task_status; + compact_task.sorted_output_ssts = task.sorted_output_ssts; } match compact_statuses.get_mut(compact_task.compaction_group_id) { @@ -1368,10 +1058,6 @@ impl HummockManager { } } - debug_assert!( - compact_task.task_status() != TaskStatus::Pending, - "report pending compaction task" - ); let input_sst_ids: HashSet = compact_task .input_ssts .iter() @@ -1385,7 +1071,7 @@ impl HummockManager { let is_success = if let TaskStatus::Success = compact_task.task_status() { // if member_table_ids changes, the data of sstable may stale. let is_expired = - Self::is_compact_task_expired(&compact_task, &versioning.branched_ssts); + Self::is_compact_task_expired(&compact_task, branched_ssts.tree_ref()); if is_expired { compact_task.set_task_status(TaskStatus::InputOutdatedCanceled); false @@ -1409,16 +1095,7 @@ impl HummockManager { false }; if is_success { - let mut hummock_version_deltas = create_trx_wrapper!( - self.sql_meta_store(), - BTreeMapTransactionWrapper, - BTreeMapTransaction::new(&mut versioning.hummock_version_deltas,) - ); - let mut branched_ssts = create_trx_wrapper!( - self.sql_meta_store(), - BTreeMapTransactionWrapper, - BTreeMapTransaction::new(&mut versioning.branched_ssts) - ); + success_count += 1; let version_delta = gen_version_delta( &mut hummock_version_deltas, &mut branched_ssts, @@ -1426,12 +1103,6 @@ impl HummockManager { &compact_task, deterministic_mode, ); - let mut version_stats = create_trx_wrapper!( - self.sql_meta_store(), - VarTransactionWrapper, - VarTransaction::new(&mut versioning.version_stats,) - ); - // apply version delta before we persist this change. If it causes panic we can // recover to a correct state after restarting meta-node. current_version.apply_version_delta(&version_delta); @@ -1439,100 +1110,103 @@ impl HummockManager { self.metrics.version_stats.reset(); versioning.local_metrics.clear(); } - if let Some(table_stats_change) = &table_stats_change { - add_prost_table_stats_map(&mut version_stats.table_stats, table_stats_change); - trigger_local_table_stat( - &self.metrics, - &mut versioning.local_metrics, - &version_stats, - table_stats_change, - ); - } - commit_multi_var!( - self.env.meta_store(), - self.sql_meta_store(), - compact_statuses, - compact_task_assignment, - hummock_version_deltas, - version_stats - )?; - branched_ssts.commit_memory(); - - trigger_version_stat(&self.metrics, ¤t_version); - trigger_delta_log_stats(&self.metrics, versioning.hummock_version_deltas.len()); - self.notify_stats(&versioning.version_stats); - versioning.current_version = current_version; - - if !deterministic_mode { - self.notify_last_version_delta(versioning); - } - } else { - // The compaction task is cancelled or failed. - commit_multi_var!( - self.env.meta_store(), - self.sql_meta_store(), - compact_statuses, - compact_task_assignment - )?; + add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change); + trigger_local_table_stat( + &self.metrics, + &mut versioning.local_metrics, + &version_stats, + &task.table_stats_change, + ); } + tasks.push(compact_task); } + if success_count > 0 { + commit_multi_var!( + self.env.meta_store(), + self.sql_meta_store(), + compact_statuses, + compact_task_assignment, + hummock_version_deltas, + version_stats + )?; + branched_ssts.commit_memory(); - let task_status = compact_task.task_status(); - let task_status_label = task_status.as_str_name(); - let task_type_label = compact_task.task_type().as_str_name(); + trigger_version_stat(&self.metrics, ¤t_version); + trigger_delta_log_stats(&self.metrics, versioning.hummock_version_deltas.len()); + self.notify_stats(&versioning.version_stats); + versioning.current_version = current_version; + + if !deterministic_mode { + self.notify_version_deltas(versioning, last_version_id); + } - let label = if is_trivial_reclaim { - "trivial-space-reclaim" - } else if is_trivial_move { - "trivial-move" + self.metrics + .compact_task_batch_count + .with_label_values(&["batch_report_task"]) + .observe(success_count as f64); } else { + // The compaction task is cancelled or failed. + commit_multi_var!( + self.env.meta_store(), + self.sql_meta_store(), + compact_statuses, + compact_task_assignment + )?; + } + let mut success_groups = vec![]; + for compact_task in tasks { + let task_status = compact_task.task_status(); + let task_status_label = task_status.as_str_name(); + let task_type_label = compact_task.task_type().as_str_name(); + self.compactor_manager .remove_task_heartbeat(compact_task.task_id); - "normal" - }; - - self.metrics - .compact_frequency - .with_label_values(&[ - label, - &compact_task.compaction_group_id.to_string(), - task_type_label, - task_status_label, - ]) - .inc(); - tracing::trace!( - "Reported compaction task. {}. cost time: {:?}", - compact_task_to_string(&compact_task), - start_time.elapsed(), - ); + self.metrics + .compact_frequency + .with_label_values(&[ + "normal", + &compact_task.compaction_group_id.to_string(), + task_type_label, + task_status_label, + ]) + .inc(); - trigger_sst_stat( - &self.metrics, - compaction - .compaction_statuses - .get(&compact_task.compaction_group_id), - &read_lock!(self, versioning).await.current_version, - compact_task.compaction_group_id, - ); + tracing::trace!( + "Reported compaction task. {}. cost time: {:?}", + compact_task_to_string(&compact_task), + start_time.elapsed(), + ); - if !deterministic_mode - && (matches!(compact_task.task_type(), compact_task::TaskType::Dynamic) - || matches!(compact_task.task_type(), compact_task::TaskType::Emergency)) - { - // only try send Dynamic compaction - self.try_send_compaction_request( + trigger_sst_stat( + &self.metrics, + compaction + .compaction_statuses + .get(&compact_task.compaction_group_id), + &versioning_guard.current_version, compact_task.compaction_group_id, - compact_task::TaskType::Dynamic, ); - } - if task_status == TaskStatus::Success { - self.try_update_write_limits(&[compact_task.compaction_group_id]) - .await; - } + if !deterministic_mode + && (matches!(compact_task.task_type(), compact_task::TaskType::Dynamic) + || matches!(compact_task.task_type(), compact_task::TaskType::Emergency)) + { + // only try send Dynamic compaction + self.try_send_compaction_request( + compact_task.compaction_group_id, + compact_task::TaskType::Dynamic, + ); + } - Ok(true) + if task_status == TaskStatus::Success { + success_groups.push(compact_task.compaction_group_id); + } + } + drop(versioning_guard); + if !success_groups.is_empty() { + self.try_update_write_limits(&success_groups).await; + } + Ok(rets) } /// Caller should ensure `epoch` > `max_committed_epoch` @@ -2139,20 +1813,28 @@ impl HummockManager { task_status: TaskStatus, sorted_output_ssts: Vec, table_stats_change: Option, - ) -> Result { - let mut guard = write_lock!(self, compaction).await; + ) -> Result<()> { + if let Some(task) = compact_task { + let mut guard = write_lock!(self, compaction).await; + guard.compact_task_assignment.insert( + task_id, + CompactTaskAssignment { + compact_task: Some(task), + context_id: 0, + }, + ); + } // In the test, the contents of the compact task may have been modified directly, while the contents of compact_task_assignment were not modified. // So we pass the modified compact_task directly into the `report_compact_task_impl` - self.report_compact_task_impl( + self.report_compact_tasks(vec![ReportTask { task_id, - compact_task, - task_status, + task_status: task_status as i32, sorted_output_ssts, - &mut guard, - table_stats_change, - ) - .await + table_stats_change: table_stats_change.unwrap_or_default(), + }]) + .await?; + Ok(()) } pub fn metadata_manager(&self) -> &MetadataManager { @@ -2175,12 +1857,394 @@ impl HummockManager { ); } + fn notify_version_deltas(&self, versioning: &Versioning, last_version_id: u64) { + let start_version_id = last_version_id + 1; + let version_deltas = versioning + .hummock_version_deltas + .range(start_version_id..) + .map(|(_, delta)| delta.to_protobuf()) + .collect_vec(); + self.env + .notification_manager() + .notify_hummock_without_version( + Operation::Add, + Info::HummockVersionDeltas(risingwave_pb::hummock::HummockVersionDeltas { + version_deltas, + }), + ); + } + fn notify_stats(&self, stats: &HummockVersionStats) { self.env .notification_manager() .notify_frontend_without_version(Operation::Update, Info::HummockStats(stats.clone())); } + #[named] + pub async fn get_compact_tasks_impl( + &self, + compaction_groups: Vec, + max_task_count: usize, + selector: &mut Box, + ) -> crate::hummock::error::Result<(Vec, Vec)> { + // TODO: `get_all_table_options` will hold catalog_manager async lock, to avoid holding the + // lock in compaction_guard, take out all table_options in advance there may be a + // waste of resources here, need to add a more efficient filter in catalog_manager + let deterministic_mode = self.env.opts.compaction_deterministic_test; + let all_table_id_to_option = self + .metadata_manager + .get_all_table_options() + .await + .map_err(|err| Error::MetaStore(err.into()))?; + + let mut compaction_guard = write_lock!(self, compaction).await; + let compaction = compaction_guard.deref_mut(); + + let start_time = Instant::now(); + + let mut compaction_statuses = create_trx_wrapper!( + self.sql_meta_store(), + BTreeMapTransactionWrapper, + BTreeMapTransaction::new(&mut compaction.compaction_statuses) + ); + + let mut compact_task_assignment = create_trx_wrapper!( + self.sql_meta_store(), + BTreeMapTransactionWrapper, + BTreeMapTransaction::new(&mut compaction.compact_task_assignment) + ); + + let (current_version, watermark) = { + let versioning_guard = read_lock!(self, versioning).await; + let max_committed_epoch = versioning_guard.current_version.max_committed_epoch; + let watermark = versioning_guard + .pinned_snapshots + .values() + .map(|v| v.minimal_pinned_snapshot) + .fold(max_committed_epoch, std::cmp::min); + + (versioning_guard.current_version.clone(), watermark) + }; + let mut trivial_tasks = vec![]; + let mut pick_tasks = vec![]; + for compaction_group_id in compaction_groups { + if current_version.levels.get(&compaction_group_id).is_none() { + continue; + } + if pick_tasks.len() >= max_task_count { + break; + } + // StoredIdGenerator already implements ids pre-allocation by ID_PREALLOCATE_INTERVAL. + let task_id = next_compaction_task_id(&self.env).await?; + + // When the last table of a compaction group is deleted, the compaction group (and its + // config) is destroyed as well. Then a compaction task for this group may come later and + // cannot find its config. + let group_config = match self + .compaction_group_manager + .read() + .await + .try_get_compaction_group_config(compaction_group_id) + { + Some(config) => config, + None => continue, + }; + if !compaction_statuses.contains_key(&compaction_group_id) { + compaction_statuses.insert( + compaction_group_id, + CompactStatus::new( + compaction_group_id, + group_config.compaction_config.max_level, + ), + ); + } + let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap(); + + let can_trivial_move = matches!(selector.task_type(), TaskType::Dynamic) + || matches!(selector.task_type(), TaskType::Emergency); + + let mut stats = LocalSelectorStatistic::default(); + let member_table_ids = ¤t_version + .get_compaction_group_levels(compaction_group_id) + .member_table_ids; + + let mut table_id_to_option: HashMap = HashMap::default(); + + for table_id in member_table_ids { + if let Some(opts) = all_table_id_to_option.get(table_id) { + table_id_to_option.insert(*table_id, *opts); + } + } + + let compact_task = compact_status.get_compact_task( + current_version.get_compaction_group_levels(compaction_group_id), + task_id as HummockCompactionTaskId, + &group_config, + &mut stats, + selector, + table_id_to_option.clone(), + Arc::new(CompactionDeveloperConfig::new_from_meta_opts( + &self.env.opts, + )), + ); + + stats.report_to_metrics(compaction_group_id, self.metrics.as_ref()); + let compact_task = match compact_task { + None => { + continue; + } + Some(task) => task, + }; + let target_level_id = compact_task.input.target_level as u32; + + let compression_algorithm = match compact_task.compression_algorithm.as_str() { + "Lz4" => 1, + "Zstd" => 2, + _ => 0, + }; + let vnode_partition_count = compact_task.input.vnode_partition_count; + use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; + + let mut compact_task = CompactTask { + input_ssts: compact_task.input.input_levels, + splits: vec![risingwave_pb::hummock::KeyRange::inf()], + watermark, + sorted_output_ssts: vec![], + task_id, + target_level: target_level_id, + // only gc delete keys in last level because there may be older version in more bottom + // level. + gc_delete_keys: current_version + .get_compaction_group_levels(compaction_group_id) + .is_last_level(target_level_id), + base_level: compact_task.base_level as u32, + task_status: TaskStatus::Pending as i32, + compaction_group_id: group_config.group_id, + existing_table_ids: member_table_ids.clone(), + compression_algorithm, + target_file_size: compact_task.target_file_size, + table_options: table_id_to_option + .into_iter() + .filter_map(|(table_id, table_option)| { + if member_table_ids.contains(&table_id) { + return Some((table_id, TableOption::from(&table_option))); + } + + None + }) + .collect(), + current_epoch_time: Epoch::now().0, + compaction_filter_mask: group_config.compaction_config.compaction_filter_mask, + target_sub_level_id: compact_task.input.target_sub_level_id, + task_type: compact_task.compaction_task_type as i32, + split_weight_by_vnode: compact_task.input.vnode_partition_count, + ..Default::default() + }; + + let is_trivial_reclaim = CompactStatus::is_trivial_reclaim(&compact_task); + let is_trivial_move = CompactStatus::is_trivial_move_task(&compact_task); + if is_trivial_reclaim { + tracing::debug!( + "TrivialReclaim for compaction group {}: remove {} sstables, cost time: {:?}", + compaction_group_id, + compact_task + .input_ssts + .iter() + .map(|level| level.table_infos.len()) + .sum::(), + start_time.elapsed() + ); + compact_task.set_task_status(TaskStatus::Success); + compact_status.report_compact_task(&compact_task); + trivial_tasks.push(compact_task); + } else if is_trivial_move && can_trivial_move { + tracing::debug!( + "TrivialMove for compaction group {}: pick up {} sstables in level {} to compact to target_level {} cost time: {:?} input {:?}", + compaction_group_id, + compact_task.input_ssts[0].table_infos.len(), + compact_task.input_ssts[0].level_idx, + compact_task.target_level, + start_time.elapsed(), + compact_task.input_ssts + ); + // this task has been finished and `trivial_move_task` does not need to be schedule. + compact_task.set_task_status(TaskStatus::Success); + compact_task.sorted_output_ssts = compact_task.input_ssts[0].table_infos.clone(); + compact_status.report_compact_task(&compact_task); + trivial_tasks.push(compact_task); + } else { + let mut table_to_vnode_partition = match self + .group_to_table_vnode_partition + .read() + .get(&compaction_group_id) + { + Some(table_to_vnode_partition) => table_to_vnode_partition.clone(), + None => BTreeMap::default(), + }; + table_to_vnode_partition + .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id)); + + if group_config.compaction_config.split_weight_by_vnode > 0 { + for table_id in &compact_task.existing_table_ids { + compact_task + .table_vnode_partition + .entry(*table_id) + .or_insert(vnode_partition_count); + } + } + compact_task.table_watermarks = + current_version.safe_epoch_table_watermarks(&compact_task.existing_table_ids); + + compact_task_assignment.insert( + compact_task.task_id, + CompactTaskAssignment { + compact_task: Some(compact_task.clone()), + context_id: META_NODE_ID, // deprecated + }, + ); + + pick_tasks.push(compact_task); + } + } + + if !trivial_tasks.is_empty() { + let mut versioning_guard = write_lock!(self, versioning).await; + let versioning = versioning_guard.deref_mut(); + let mut hummock_version_deltas = create_trx_wrapper!( + self.sql_meta_store(), + BTreeMapTransactionWrapper, + BTreeMapTransaction::new(&mut versioning.hummock_version_deltas,) + ); + let mut branched_ssts = create_trx_wrapper!( + self.sql_meta_store(), + BTreeMapTransactionWrapper, + BTreeMapTransaction::new(&mut versioning.branched_ssts) + ); + let mut current_version = versioning.current_version.clone(); + let last_apply_version_id = current_version.id; + for compact_task in &trivial_tasks { + let version_delta = gen_version_delta( + &mut hummock_version_deltas, + &mut branched_ssts, + ¤t_version, + compact_task, + deterministic_mode, + ); + current_version.apply_version_delta(&version_delta); + } + commit_multi_var!( + self.env.meta_store(), + self.sql_meta_store(), + compaction_statuses, + compact_task_assignment, + hummock_version_deltas + )?; + branched_ssts.commit_memory(); + + trigger_version_stat(&self.metrics, ¤t_version); + trigger_delta_log_stats(&self.metrics, versioning.hummock_version_deltas.len()); + self.notify_stats(&versioning.version_stats); + versioning.current_version = current_version; + self.notify_version_deltas(versioning, last_apply_version_id); + self.metrics + .compact_frequency + .with_label_values(&[ + "trivial-space-reclaim", + "several_group", + selector.task_type().as_str_name(), + "SUCCESS", + ]) + .inc_by(trivial_tasks.len() as u64); + self.metrics + .compact_task_batch_count + .with_label_values(&["batch_trivial_move"]) + .observe(trivial_tasks.len() as f64); + } else { + // We are using a single transaction to ensure that each task has progress when it is + // created. + commit_multi_var!( + self.env.meta_store(), + self.sql_meta_store(), + compaction_statuses, + compact_task_assignment + )?; + } + if !pick_tasks.is_empty() { + self.metrics + .compact_task_batch_count + .with_label_values(&["batch_get_compact_task"]) + .observe(pick_tasks.len() as f64); + } + + for compact_task in &mut pick_tasks { + let compaction_group_id = compact_task.compaction_group_id; + + // Initiate heartbeat for the task to track its progress. + self.compactor_manager + .initiate_task_heartbeat(compact_task.clone()); + + // this task has been finished. + compact_task.set_task_status(TaskStatus::Pending); + + trigger_sst_stat( + &self.metrics, + compaction.compaction_statuses.get(&compaction_group_id), + ¤t_version, + compaction_group_id, + ); + let compact_task_statistics = statistics_compact_task(compact_task); + + let level_type_label = build_compact_task_level_type_metrics_label( + compact_task.input_ssts[0].level_idx as usize, + compact_task.input_ssts.last().unwrap().level_idx as usize, + ); + + let level_count = compact_task.input_ssts.len(); + if compact_task.input_ssts[0].level_idx == 0 { + self.metrics + .l0_compact_level_count + .with_label_values(&[&compaction_group_id.to_string(), &level_type_label]) + .observe(level_count as _); + } + + self.metrics + .compact_task_size + .with_label_values(&[&compaction_group_id.to_string(), &level_type_label]) + .observe(compact_task_statistics.total_file_size as _); + + self.metrics + .compact_task_size + .with_label_values(&[ + &compaction_group_id.to_string(), + &format!("{} uncompressed", level_type_label), + ]) + .observe(compact_task_statistics.total_uncompressed_file_size as _); + + self.metrics + .compact_task_file_count + .with_label_values(&[&compaction_group_id.to_string(), &level_type_label]) + .observe(compact_task_statistics.total_file_count as _); + + tracing::trace!( + "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}", + compaction_group_id, + level_count, + compact_task.input_ssts[0].level_type().as_str_name(), + compact_task.input_ssts[0].level_idx, + compact_task.target_level, + start_time.elapsed(), + compact_task_statistics + ); + } + + #[cfg(test)] + { + drop(compaction_guard); + self.check_state_consistency().await; + } + Ok((pick_tasks, trivial_tasks)) + } + #[named] pub fn hummock_timer_task(hummock_manager: Arc) -> (JoinHandle<()>, Sender<()>) { use futures::{FutureExt, StreamExt}; @@ -2901,6 +2965,52 @@ impl HummockManager { None } + #[named] + pub async fn auto_pick_compaction_groups_and_type( + &self, + ) -> (Vec, compact_task::TaskType) { + use rand::prelude::SliceRandom; + use rand::thread_rng; + let mut compaction_group_ids = self.compaction_group_ids().await; + compaction_group_ids.shuffle(&mut thread_rng()); + + let versioning_guard = read_lock!(self, versioning).await; + let versioning = versioning_guard.deref(); + + let mut normal_groups = vec![]; + for cg_id in compaction_group_ids { + if versioning.write_limit.contains_key(&cg_id) { + let enable_emergency_picker = match self + .compaction_group_manager + .read() + .await + .try_get_compaction_group_config(cg_id) + { + Some(config) => config.compaction_config.enable_emergency_picker, + None => { + unreachable!("compaction-group {} not exist", cg_id) + } + }; + + if enable_emergency_picker { + if normal_groups.is_empty() { + return (vec![cg_id], TaskType::Emergency); + } else { + break; + } + } + } + if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) { + if pick_type == TaskType::Dynamic { + normal_groups.push(cg_id); + } else { + return (vec![cg_id], pick_type); + } + } + } + (normal_groups, TaskType::Dynamic) + } + async fn calculate_table_align_rule( &self, table_write_throughput: &HashMap>, @@ -3097,136 +3207,6 @@ impl HummockManager { Ok(()) } - - /// dedicated event runtime for CPU/IO bound event - #[named] - async fn compact_task_dedicated_event_handler( - hummock_manager: Arc, - mut rx: UnboundedReceiver<(u32, subscribe_compaction_event_request::Event)>, - shutdown_rx_shared: Shared>, - ) { - let mut compaction_selectors = init_selectors(); - - tokio::select! { - _ = shutdown_rx_shared => {} - - _ = async { - while let Some((context_id, event)) = rx.recv().await { - match event { - RequestEvent::PullTask(PullTask { pull_task_count }) => { - assert_ne!(0, pull_task_count); - if let Some(compactor) = - hummock_manager.compactor_manager.get_compactor(context_id) - { - if let Some((group, task_type)) = - hummock_manager.auto_pick_compaction_group_and_type().await - { - let selector: &mut Box = { - let versioning_guard = - read_lock!(hummock_manager, versioning).await; - let versioning = versioning_guard.deref(); - - if versioning.write_limit.contains_key(&group) { - let enable_emergency_picker = match hummock_manager - .compaction_group_manager - .read() - .await - .try_get_compaction_group_config(group) - { - Some(config) => { - config.compaction_config.enable_emergency_picker - } - None => { - unreachable!("compaction-group {} not exist", group) - } - }; - - if enable_emergency_picker { - compaction_selectors - .get_mut(&TaskType::Emergency) - .unwrap() - } else { - compaction_selectors.get_mut(&task_type).unwrap() - } - } else { - compaction_selectors.get_mut(&task_type).unwrap() - } - }; - for _ in 0..pull_task_count { - let compact_task = - hummock_manager.get_compact_task(group, selector).await; - - match compact_task { - Ok(Some(compact_task)) => { - let task_id = compact_task.task_id; - if let Err(e) = compactor.send_event( - ResponseEvent::CompactTask(compact_task), - ) { - tracing::warn!( - error = %e.as_report(), - "Failed to send task {} to {}", - task_id, - compactor.context_id(), - ); - - hummock_manager.compactor_manager.remove_compactor(context_id); - break; - } - } - Ok(None) => { - // no compact_task to be picked - hummock_manager - .compaction_state - .unschedule(group, task_type); - break; - } - Err(err) => { - tracing::warn!(error = %err.as_report(), "Failed to get compaction task"); - break; - } - }; - } - } - - // ack to compactor - if let Err(e) = - compactor.send_event(ResponseEvent::PullTaskAck(PullTaskAck {})) - { - tracing::warn!( - error = %e.as_report(), - "Failed to send ask to {}", - context_id, - ); - hummock_manager.compactor_manager.remove_compactor(context_id); - } - } - } - - RequestEvent::ReportTask(ReportTask { - task_id, - task_status, - sorted_output_ssts, - table_stats_change, - }) => { - if let Err(e) = hummock_manager - .report_compact_task( - task_id, - TaskStatus::try_from(task_status).unwrap(), - sorted_output_ssts, - Some(table_stats_change), - ) - .await - { - tracing::error!(error = %e.as_report(), "report compact_tack fail") - } - } - - _ => unreachable!(), - } - } - } => {} - } - } } // This structure describes how hummock handles sst switching in a compaction group. A better sst cut will result in better data alignment, which in turn will improve the efficiency of the compaction. diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 33eeeb54ba62a..041ea63c1281e 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -1682,11 +1682,11 @@ async fn test_split_compaction_group_trivial_expired() { .unwrap(); let mut selector: Box = Box::::default(); - let reclaim_task = hummock_manager - .get_compact_task_impl(2, &mut selector) + let (_, mut reclaim_tasks) = hummock_manager + .get_compact_tasks_impl(vec![2], 1, &mut selector) .await - .unwrap() .unwrap(); + let reclaim_task = reclaim_tasks.pop().unwrap(); assert!(CompactStatus::is_trivial_reclaim(&reclaim_task)); let current_version = hummock_manager.get_current_version().await; diff --git a/src/meta/src/hummock/test_utils.rs b/src/meta/src/hummock/test_utils.rs index 5aa7e3172442f..aeb59be110bbd 100644 --- a/src/meta/src/hummock/test_utils.rs +++ b/src/meta/src/hummock/test_utils.rs @@ -114,7 +114,7 @@ pub async fn add_test_tables( assert_eq!(compactor.context_id(), context_id); } - let ret = hummock_manager + hummock_manager .report_compact_task_for_test( compact_task.task_id, Some(compact_task), @@ -124,7 +124,6 @@ pub async fn add_test_tables( ) .await .unwrap(); - assert!(ret); if temp_compactor { hummock_manager .compactor_manager_ref_for_test() diff --git a/src/meta/src/rpc/metrics.rs b/src/meta/src/rpc/metrics.rs index d2dca20f98edf..7c9a8a9fb1d5e 100644 --- a/src/meta/src/rpc/metrics.rs +++ b/src/meta/src/rpc/metrics.rs @@ -150,6 +150,7 @@ pub struct MetaMetrics { pub l0_compact_level_count: HistogramVec, pub compact_task_size: HistogramVec, pub compact_task_file_count: HistogramVec, + pub compact_task_batch_count: HistogramVec, pub move_state_table_count: IntCounterVec, pub state_table_count: IntGaugeVec, pub branched_sst_count: IntGaugeVec, @@ -571,6 +572,14 @@ impl MetaMetrics { registry ) .unwrap(); + let opts = histogram_opts!( + "storage_compact_task_batch_count", + "count of compact task batch", + exponential_buckets(1.0, 2.0, 8).unwrap() + ); + let compact_task_batch_count = + register_histogram_vec_with_registry!(opts, &["type"], registry).unwrap(); + let table_write_throughput = register_int_counter_vec_with_registry!( "storage_commit_write_throughput", "The number of compactions from one level to another level that have been skipped.", @@ -676,6 +685,7 @@ impl MetaMetrics { l0_compact_level_count, compact_task_size, compact_task_file_count, + compact_task_batch_count, table_write_throughput, move_state_table_count, state_table_count, diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 441480d66a6b7..5df01e5779fa3 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -477,7 +477,7 @@ pub async fn compact( ) * compact_task.splits.len() as u64; tracing::info!( - "Ready to handle compaction group {} task: {} compact_task_statistics {:?} target_level {} compression_algorithm {:?} table_ids {:?} parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {} input: {:?}", + "Ready to handle compaction group {} task: {} compact_task_statistics {:?} target_level {} compression_algorithm {:?} table_ids {:?} parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}", compact_task.compaction_group_id, compact_task.task_id, compact_task_statistics, @@ -487,7 +487,6 @@ pub async fn compact( parallelism, task_memory_capacity_with_parallelism, optimize_by_copy_block, - compact_task_to_string(&compact_task), ); // If the task does not have enough memory, it should cancel the task and let the meta From e2dc747d18e1ffb28a01f0bcb63870a6cc7508fe Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Thu, 14 Mar 2024 14:31:28 +0800 Subject: [PATCH 02/12] apply task in once loop Signed-off-by: Little-Wallace --- src/meta/src/hummock/manager/compaction.rs | 22 +- src/meta/src/hummock/manager/mod.rs | 328 +++++++++++++++++++-- src/meta/src/hummock/manager/tests.rs | 9 +- 3 files changed, 314 insertions(+), 45 deletions(-) diff --git a/src/meta/src/hummock/manager/compaction.rs b/src/meta/src/hummock/manager/compaction.rs index fe4e7c73467e6..b41c05fba868e 100644 --- a/src/meta/src/hummock/manager/compaction.rs +++ b/src/meta/src/hummock/manager/compaction.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; use function_name::named; @@ -133,28 +133,23 @@ impl HummockManager { let mut generated_task_count = 0; let mut existed_groups = groups.clone(); let mut meet_error = false; + let mut wait_compact_groups = HashSet::default(); while generated_task_count < pull_task_count && !meet_error { let compact_ret = self - .get_compact_tasks( - std::mem::take(&mut existed_groups), - pull_task_count - generated_task_count, - selector, - ) + .get_compact_tasks(std::mem::take(&mut existed_groups), selector) .await; match compact_ret { - Ok((compact_tasks, trivial_tasks)) => { + Ok(compact_tasks) => { if compact_tasks.is_empty() { break; } generated_task_count += compact_tasks.len(); - for task in trivial_tasks { - existed_groups.push(task.compaction_group_id); - } for task in compact_tasks { let task_id = task.task_id; existed_groups.push(task.compaction_group_id); + wait_compact_groups.insert(task.compaction_group_id); if let Err(e) = compactor.send_event(ResponseEvent::CompactTask(task)) { @@ -181,6 +176,13 @@ impl HummockManager { for group in groups { self.compaction_state.unschedule(group, task_type); } + } else if !meet_error { + for group in groups { + if wait_compact_groups.contains(&group) { + continue; + } + self.compaction_state.unschedule(group, task_type); + } } } diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 79e920bd774c1..7fa615d0bc07b 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -850,6 +850,296 @@ impl HummockManager { Ok(()) } + #[named] + pub async fn get_compact_tasks_impl( + &self, + compaction_groups: Vec, + selector: &mut Box, + ) -> crate::hummock::error::Result> { + // TODO: `get_all_table_options` will hold catalog_manager async lock, to avoid holding the + // lock in compaction_guard, take out all table_options in advance there may be a + // waste of resources here, need to add a more efficient filter in catalog_manager + let _timer = start_measure_real_process_timer!(self); + let deterministic_mode = self.env.opts.compaction_deterministic_test; + let all_table_id_to_option = self + .metadata_manager + .get_all_table_options() + .await + .map_err(|err| Error::MetaStore(err.into()))?; + + let mut compaction_guard = write_lock!(self, compaction).await; + let compaction = compaction_guard.deref_mut(); + let mut versioning_guard = write_lock!(self, versioning).await; + let versioning = versioning_guard.deref_mut(); + let mut hummock_version_deltas = + BTreeMapTransaction::new(&mut versioning.hummock_version_deltas); + let mut branched_ssts = BTreeMapTransaction::new(&mut versioning.branched_ssts); + let mut current_version = versioning.current_version.clone(); + let start_time = Instant::now(); + + let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses); + + let mut compact_task_assignment = + BTreeMapTransaction::new(&mut compaction.compact_task_assignment); + + let max_committed_epoch = current_version.max_committed_epoch; + let watermark = versioning + .pinned_snapshots + .values() + .map(|v| v.minimal_pinned_snapshot) + .fold(max_committed_epoch, std::cmp::min); + let last_apply_version_id = current_version.id; + let mut trivial_tasks = vec![]; + let mut pick_tasks = vec![]; + for compaction_group_id in compaction_groups { + if current_version.levels.get(&compaction_group_id).is_none() { + continue; + } + + // StoredIdGenerator already implements ids pre-allocation by ID_PREALLOCATE_INTERVAL. + let task_id = self + .env + .id_gen_manager() + .generate::<{ IdCategory::HummockCompactionTask }>() + .await?; + + // When the last table of a compaction group is deleted, the compaction group (and its + // config) is destroyed as well. Then a compaction task for this group may come later and + // cannot find its config. + let group_config = match self + .compaction_group_manager + .read() + .await + .try_get_compaction_group_config(compaction_group_id) + { + Some(config) => config, + None => continue, + }; + if !compaction_statuses.contains_key(&compaction_group_id) { + compaction_statuses.insert( + compaction_group_id, + CompactStatus::new( + compaction_group_id, + group_config.compaction_config.max_level, + ), + ); + } + + let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap(); + + let can_trivial_move = matches!(selector.task_type(), TaskType::Dynamic) + || matches!(selector.task_type(), TaskType::Emergency); + + let mut stats = LocalSelectorStatistic::default(); + let member_table_ids = current_version + .get_compaction_group_levels(compaction_group_id) + .member_table_ids.clone(); + + let mut table_id_to_option: HashMap = HashMap::default(); + + for table_id in &member_table_ids { + if let Some(opts) = all_table_id_to_option.get(table_id) { + table_id_to_option.insert(*table_id, *opts); + } + } + while let Some(mut compact_task) = compact_status.get_compact_task( + current_version.get_compaction_group_levels(compaction_group_id), + task_id as HummockCompactionTaskId, + &group_config, + &mut stats, + selector, + table_id_to_option.clone(), + ) { + compact_task.watermark = watermark; + compact_task.existing_table_ids = member_table_ids.clone(); + + let is_trivial_reclaim = CompactStatus::is_trivial_reclaim(&compact_task); + let is_trivial_move = CompactStatus::is_trivial_move_task(&compact_task); + + if is_trivial_reclaim || (is_trivial_move && can_trivial_move) { + let log_label = if is_trivial_reclaim { + "TrivialReclaim" + } else { + "TrivialMove" + }; + let label = if is_trivial_reclaim { + "trivial-space-reclaim" + } else { + "trivial-move" + }; + tracing::debug!("{} for compaction group {}: input: {:?}, cost time: {:?}",log_label, compaction_group_id, compact_task.input_ssts,start_time.elapsed()); + compact_task.set_task_status(TaskStatus::Success); + compact_status.report_compact_task(&compact_task); + if !is_trivial_reclaim { + compact_task.sorted_output_ssts = compact_task.input_ssts[0].table_infos.clone(); + } + self.metrics + .compact_frequency + .with_label_values(&[ + label, + &compact_task.compaction_group_id.to_string(), + selector.task_type().as_str_name(), + "SUCCESS", + ]); + let version_delta = gen_version_delta( + &mut hummock_version_deltas, + &mut branched_ssts, + ¤t_version, + &compact_task, + deterministic_mode, + ); + current_version.apply_version_delta(&version_delta); + trivial_tasks.push(compact_task); + } else { + compact_task.table_options = table_id_to_option + .into_iter() + .filter_map(|(table_id, table_option)| { + if compact_task.existing_table_ids.contains(&table_id) { + return Some((table_id, TableOption::from(&table_option))); + } + + None + }) + .collect(); + compact_task.current_epoch_time = Epoch::now().0; + compact_task.compaction_filter_mask = + group_config.compaction_config.compaction_filter_mask; + let mut table_to_vnode_partition = match self + .group_to_table_vnode_partition + .read() + .get(&compaction_group_id) + { + Some(table_to_vnode_partition) => table_to_vnode_partition.clone(), + None => BTreeMap::default(), + }; + table_to_vnode_partition + .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id)); + compact_task.table_watermarks = + current_version.safe_epoch_table_watermarks(&compact_task.existing_table_ids); + + compact_task_assignment.insert( + compact_task.task_id, + CompactTaskAssignment { + compact_task: Some(compact_task.clone()), + context_id: META_NODE_ID, // deprecated + }, + ); + + pick_tasks.push(compact_task); + break; + } + stats = LocalSelectorStatistic::default(); + } + stats.report_to_metrics(compaction_group_id, self.metrics.as_ref()); + } + + if !trivial_tasks.is_empty() { + commit_multi_var!( + self, + None, + Transaction::default(), + compaction_statuses, + compact_task_assignment, + hummock_version_deltas + )?; + branched_ssts.commit_memory(); + + trigger_version_stat(&self.metrics, ¤t_version); + trigger_delta_log_stats(&self.metrics, versioning.hummock_version_deltas.len()); + self.notify_stats(&versioning.version_stats); + versioning.current_version = current_version; + self.notify_version_deltas(versioning, last_apply_version_id); + self.metrics + .compact_task_batch_count + .with_label_values(&["batch_trivial_move"]) + .observe(trivial_tasks.len() as f64); + } else { + // We are using a single transaction to ensure that each task has progress when it is + // created. + commit_multi_var!( + self, + None, + Transaction::default(), + compaction_statuses, + compact_task_assignment + )?; + } + if !pick_tasks.is_empty() { + self.metrics + .compact_task_batch_count + .with_label_values(&["batch_get_compact_task"]) + .observe(pick_tasks.len() as f64); + } + + for compact_task in &mut pick_tasks { + let compaction_group_id = compact_task.compaction_group_id; + + // Initiate heartbeat for the task to track its progress. + self.compactor_manager + .initiate_task_heartbeat(compact_task.clone()); + + // this task has been finished. + compact_task.set_task_status(TaskStatus::Pending); + + trigger_sst_stat( + &self.metrics, + compaction.compaction_statuses.get(&compaction_group_id), + &versioning.current_version, + compaction_group_id, + ); + let compact_task_statistics = statistics_compact_task(compact_task); + + let level_type_label = build_compact_task_level_type_metrics_label( + compact_task.input_ssts[0].level_idx as usize, + compact_task.input_ssts.last().unwrap().level_idx as usize, + ); + + let level_count = compact_task.input_ssts.len(); + if compact_task.input_ssts[0].level_idx == 0 { + self.metrics + .l0_compact_level_count + .with_label_values(&[&compaction_group_id.to_string(), &level_type_label]) + .observe(level_count as _); + } + + self.metrics + .compact_task_size + .with_label_values(&[&compaction_group_id.to_string(), &level_type_label]) + .observe(compact_task_statistics.total_file_size as _); + + self.metrics + .compact_task_size + .with_label_values(&[ + &compaction_group_id.to_string(), + &format!("{} uncompressed", level_type_label), + ]) + .observe(compact_task_statistics.total_uncompressed_file_size as _); + + self.metrics + .compact_task_file_count + .with_label_values(&[&compaction_group_id.to_string(), &level_type_label]) + .observe(compact_task_statistics.total_file_count as _); + + tracing::trace!( + "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}", + compaction_group_id, + level_count, + compact_task.input_ssts[0].level_type().as_str_name(), + compact_task.input_ssts[0].level_idx, + compact_task.target_level, + start_time.elapsed(), + compact_task_statistics + ); + } + + #[cfg(test)] + { + drop(compaction_guard); + self.check_state_consistency().await; + } + Ok(pick_tasks) + } + /// Cancels a compaction task no matter it's assigned or unassigned. pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result { fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore( @@ -881,31 +1171,13 @@ impl HummockManager { pub async fn get_compact_tasks( &self, - mut compaction_groups: Vec, - max_task_count: usize, + compaction_groups: Vec, selector: &mut Box, - ) -> Result<(Vec, Vec)> { + ) -> Result> { fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore( anyhow::anyhow!("failpoint metastore error") ))); - loop { - let (normal_tasks, trivial_tasks) = self - .get_compact_tasks_impl( - std::mem::take(&mut compaction_groups), - max_task_count, - selector, - ) - .await?; - if !normal_tasks.is_empty() { - return Ok((normal_tasks, trivial_tasks)); - } else if trivial_tasks.is_empty() { - return Ok((vec![], vec![])); - } - // only select groups which could generate more tasks. - for t in &trivial_tasks { - compaction_groups.push(t.compaction_group_id); - } - } + self.get_compact_tasks_impl(compaction_groups, selector).await } pub async fn get_compact_task( @@ -917,16 +1189,10 @@ impl HummockManager { anyhow::anyhow!("failpoint metastore error") ))); - loop { - let (mut normal_tasks, trivial_tasks) = self - .get_compact_tasks_impl(vec![compaction_group_id], 1, selector) - .await?; - if !normal_tasks.is_empty() { - return Ok(normal_tasks.pop()); - } else if trivial_tasks.is_empty() { - return Ok(None); - } - } + let mut normal_tasks = self + .get_compact_tasks_impl(vec![compaction_group_id], selector) + .await?; + Ok(normal_tasks.pop()) } pub async fn manual_get_compact_task( diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 041ea63c1281e..54f1663e8524a 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -1682,12 +1682,13 @@ async fn test_split_compaction_group_trivial_expired() { .unwrap(); let mut selector: Box = Box::::default(); - let (_, mut reclaim_tasks) = hummock_manager - .get_compact_tasks_impl(vec![2], 1, &mut selector) + let normal_tasks = hummock_manager + .get_compact_tasks_impl(vec![2], &mut selector) .await .unwrap(); - let reclaim_task = reclaim_tasks.pop().unwrap(); - assert!(CompactStatus::is_trivial_reclaim(&reclaim_task)); + assert!(normal_tasks.is_empty()); + // let reclaim_task = normal_tasks.pop().unwrap(); + // assert!(CompactStatus::is_trivial_reclaim(&reclaim_task)); let current_version = hummock_manager.get_current_version().await; let new_group_id = current_version.levels.keys().max().cloned().unwrap(); From 69e1aa5a5c72ebedb3313768c4379e6bc8af414d Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Thu, 14 Mar 2024 15:43:55 +0800 Subject: [PATCH 03/12] fix type Signed-off-by: Little-Wallace --- src/meta/src/hummock/manager/compaction.rs | 2 +- src/meta/src/hummock/manager/mod.rs | 524 +++++---------------- 2 files changed, 110 insertions(+), 416 deletions(-) diff --git a/src/meta/src/hummock/manager/compaction.rs b/src/meta/src/hummock/manager/compaction.rs index b41c05fba868e..a81281f567ce8 100644 --- a/src/meta/src/hummock/manager/compaction.rs +++ b/src/meta/src/hummock/manager/compaction.rs @@ -133,7 +133,7 @@ impl HummockManager { let mut generated_task_count = 0; let mut existed_groups = groups.clone(); let mut meet_error = false; - let mut wait_compact_groups = HashSet::default(); + let mut wait_compact_groups: HashSet = HashSet::default(); while generated_task_count < pull_task_count && !meet_error { let compact_ret = self diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 7fa615d0bc07b..e6ecb5bb2652b 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -859,7 +859,6 @@ impl HummockManager { // TODO: `get_all_table_options` will hold catalog_manager async lock, to avoid holding the // lock in compaction_guard, take out all table_options in advance there may be a // waste of resources here, need to add a more efficient filter in catalog_manager - let _timer = start_measure_real_process_timer!(self); let deterministic_mode = self.env.opts.compaction_deterministic_test; let all_table_id_to_option = self .metadata_manager @@ -871,17 +870,9 @@ impl HummockManager { let compaction = compaction_guard.deref_mut(); let mut versioning_guard = write_lock!(self, versioning).await; let versioning = versioning_guard.deref_mut(); - let mut hummock_version_deltas = - BTreeMapTransaction::new(&mut versioning.hummock_version_deltas); - let mut branched_ssts = BTreeMapTransaction::new(&mut versioning.branched_ssts); + let mut current_version = versioning.current_version.clone(); let start_time = Instant::now(); - - let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses); - - let mut compact_task_assignment = - BTreeMapTransaction::new(&mut compaction.compact_task_assignment); - let max_committed_epoch = current_version.max_committed_epoch; let watermark = versioning .pinned_snapshots @@ -889,19 +880,41 @@ impl HummockManager { .map(|v| v.minimal_pinned_snapshot) .fold(max_committed_epoch, std::cmp::min); let last_apply_version_id = current_version.id; + + let mut compaction_statuses = create_trx_wrapper!( + self.sql_meta_store(), + BTreeMapTransactionWrapper, + BTreeMapTransaction::new(&mut compaction.compaction_statuses) + ); + + let mut compact_task_assignment = create_trx_wrapper!( + self.sql_meta_store(), + BTreeMapTransactionWrapper, + BTreeMapTransaction::new(&mut compaction.compact_task_assignment) + ); + + let mut hummock_version_deltas = create_trx_wrapper!( + self.sql_meta_store(), + BTreeMapTransactionWrapper, + BTreeMapTransaction::new(&mut versioning.hummock_version_deltas,) + ); + let mut branched_ssts = create_trx_wrapper!( + self.sql_meta_store(), + BTreeMapTransactionWrapper, + BTreeMapTransaction::new(&mut versioning.branched_ssts) + ); + let mut trivial_tasks = vec![]; let mut pick_tasks = vec![]; + let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts( + &self.env.opts, + )); for compaction_group_id in compaction_groups { if current_version.levels.get(&compaction_group_id).is_none() { continue; } - // StoredIdGenerator already implements ids pre-allocation by ID_PREALLOCATE_INTERVAL. - let task_id = self - .env - .id_gen_manager() - .generate::<{ IdCategory::HummockCompactionTask }>() - .await?; + let task_id = next_compaction_task_id(&self.env).await?; // When the last table of a compaction group is deleted, the compaction group (and its // config) is destroyed as well. Then a compaction task for this group may come later and @@ -924,7 +937,6 @@ impl HummockManager { ), ); } - let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap(); let can_trivial_move = matches!(selector.task_type(), TaskType::Dynamic) @@ -933,7 +945,8 @@ impl HummockManager { let mut stats = LocalSelectorStatistic::default(); let member_table_ids = current_version .get_compaction_group_levels(compaction_group_id) - .member_table_ids.clone(); + .member_table_ids + .clone(); let mut table_id_to_option: HashMap = HashMap::default(); @@ -942,36 +955,84 @@ impl HummockManager { table_id_to_option.insert(*table_id, *opts); } } - while let Some(mut compact_task) = compact_status.get_compact_task( + + while let Some(compact_task) = compact_status.get_compact_task( current_version.get_compaction_group_levels(compaction_group_id), task_id as HummockCompactionTaskId, &group_config, &mut stats, selector, table_id_to_option.clone(), + developer_config.clone(), ) { - compact_task.watermark = watermark; - compact_task.existing_table_ids = member_table_ids.clone(); + let target_level_id = compact_task.input.target_level as u32; + + let compression_algorithm = match compact_task.compression_algorithm.as_str() { + "Lz4" => 1, + "Zstd" => 2, + _ => 0, + }; + let vnode_partition_count = compact_task.input.vnode_partition_count; + use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; + + let mut compact_task = CompactTask { + input_ssts: compact_task.input.input_levels, + splits: vec![risingwave_pb::hummock::KeyRange::inf()], + watermark, + sorted_output_ssts: vec![], + task_id, + target_level: target_level_id, + // only gc delete keys in last level because there may be older version in more bottom + // level. + gc_delete_keys: current_version + .get_compaction_group_levels(compaction_group_id) + .is_last_level(target_level_id), + base_level: compact_task.base_level as u32, + task_status: TaskStatus::Pending as i32, + compaction_group_id: group_config.group_id, + existing_table_ids: member_table_ids.clone(), + compression_algorithm, + target_file_size: compact_task.target_file_size, + table_options: table_id_to_option + .iter() + .map(|(table_id, table_option)| { + (*table_id, TableOption::from(table_option)) + }) + .collect(), + current_epoch_time: Epoch::now().0, + compaction_filter_mask: group_config.compaction_config.compaction_filter_mask, + target_sub_level_id: compact_task.input.target_sub_level_id, + task_type: compact_task.compaction_task_type as i32, + split_weight_by_vnode: compact_task.input.vnode_partition_count, + ..Default::default() + }; let is_trivial_reclaim = CompactStatus::is_trivial_reclaim(&compact_task); let is_trivial_move = CompactStatus::is_trivial_move_task(&compact_task); - if is_trivial_reclaim || (is_trivial_move && can_trivial_move) { - let log_label = if is_trivial_reclaim { + let log_tabel = if is_trivial_reclaim { "TrivialReclaim" } else { "TrivialMove" }; let label = if is_trivial_reclaim { "trivial-space-reclaim" - } else { + } else { "trivial-move" }; - tracing::debug!("{} for compaction group {}: input: {:?}, cost time: {:?}",log_label, compaction_group_id, compact_task.input_ssts,start_time.elapsed()); + + tracing::debug!( + "{} for compaction group {}: input: {:?}, cost time: {:?}", + log_tabel, + compact_task.compaction_group_id, + compact_task.input_ssts, + start_time.elapsed() + ); compact_task.set_task_status(TaskStatus::Success); compact_status.report_compact_task(&compact_task); if !is_trivial_reclaim { - compact_task.sorted_output_ssts = compact_task.input_ssts[0].table_infos.clone(); + compact_task.sorted_output_ssts = + compact_task.input_ssts[0].table_infos.clone(); } self.metrics .compact_frequency @@ -980,7 +1041,8 @@ impl HummockManager { &compact_task.compaction_group_id.to_string(), selector.task_type().as_str_name(), "SUCCESS", - ]); + ]) + .inc(); let version_delta = gen_version_delta( &mut hummock_version_deltas, &mut branched_ssts, @@ -991,19 +1053,6 @@ impl HummockManager { current_version.apply_version_delta(&version_delta); trivial_tasks.push(compact_task); } else { - compact_task.table_options = table_id_to_option - .into_iter() - .filter_map(|(table_id, table_option)| { - if compact_task.existing_table_ids.contains(&table_id) { - return Some((table_id, TableOption::from(&table_option))); - } - - None - }) - .collect(); - compact_task.current_epoch_time = Epoch::now().0; - compact_task.compaction_filter_mask = - group_config.compaction_config.compaction_filter_mask; let mut table_to_vnode_partition = match self .group_to_table_vnode_partition .read() @@ -1014,8 +1063,17 @@ impl HummockManager { }; table_to_vnode_partition .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id)); - compact_task.table_watermarks = - current_version.safe_epoch_table_watermarks(&compact_task.existing_table_ids); + + if group_config.compaction_config.split_weight_by_vnode > 0 { + for table_id in &compact_task.existing_table_ids { + compact_task + .table_vnode_partition + .entry(*table_id) + .or_insert(vnode_partition_count); + } + } + compact_task.table_watermarks = current_version + .safe_epoch_table_watermarks(&compact_task.existing_table_ids); compact_task_assignment.insert( compact_task.task_id, @@ -1028,16 +1086,17 @@ impl HummockManager { pick_tasks.push(compact_task); break; } + stats = LocalSelectorStatistic::default(); } + stats.report_to_metrics(compaction_group_id, self.metrics.as_ref()); } if !trivial_tasks.is_empty() { commit_multi_var!( - self, - None, - Transaction::default(), + self.env.meta_store(), + self.sql_meta_store(), compaction_statuses, compact_task_assignment, hummock_version_deltas @@ -1045,9 +1104,9 @@ impl HummockManager { branched_ssts.commit_memory(); trigger_version_stat(&self.metrics, ¤t_version); + versioning.current_version = current_version; trigger_delta_log_stats(&self.metrics, versioning.hummock_version_deltas.len()); self.notify_stats(&versioning.version_stats); - versioning.current_version = current_version; self.notify_version_deltas(versioning, last_apply_version_id); self.metrics .compact_task_batch_count @@ -1057,9 +1116,8 @@ impl HummockManager { // We are using a single transaction to ensure that each task has progress when it is // created. commit_multi_var!( - self, - None, - Transaction::default(), + self.env.meta_store(), + self.sql_meta_store(), compaction_statuses, compact_task_assignment )?; @@ -1177,7 +1235,8 @@ impl HummockManager { fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore( anyhow::anyhow!("failpoint metastore error") ))); - self.get_compact_tasks_impl(compaction_groups, selector).await + self.get_compact_tasks_impl(compaction_groups, selector) + .await } pub async fn get_compact_task( @@ -2146,371 +2205,6 @@ impl HummockManager { .notify_frontend_without_version(Operation::Update, Info::HummockStats(stats.clone())); } - #[named] - pub async fn get_compact_tasks_impl( - &self, - compaction_groups: Vec, - max_task_count: usize, - selector: &mut Box, - ) -> crate::hummock::error::Result<(Vec, Vec)> { - // TODO: `get_all_table_options` will hold catalog_manager async lock, to avoid holding the - // lock in compaction_guard, take out all table_options in advance there may be a - // waste of resources here, need to add a more efficient filter in catalog_manager - let deterministic_mode = self.env.opts.compaction_deterministic_test; - let all_table_id_to_option = self - .metadata_manager - .get_all_table_options() - .await - .map_err(|err| Error::MetaStore(err.into()))?; - - let mut compaction_guard = write_lock!(self, compaction).await; - let compaction = compaction_guard.deref_mut(); - - let start_time = Instant::now(); - - let mut compaction_statuses = create_trx_wrapper!( - self.sql_meta_store(), - BTreeMapTransactionWrapper, - BTreeMapTransaction::new(&mut compaction.compaction_statuses) - ); - - let mut compact_task_assignment = create_trx_wrapper!( - self.sql_meta_store(), - BTreeMapTransactionWrapper, - BTreeMapTransaction::new(&mut compaction.compact_task_assignment) - ); - - let (current_version, watermark) = { - let versioning_guard = read_lock!(self, versioning).await; - let max_committed_epoch = versioning_guard.current_version.max_committed_epoch; - let watermark = versioning_guard - .pinned_snapshots - .values() - .map(|v| v.minimal_pinned_snapshot) - .fold(max_committed_epoch, std::cmp::min); - - (versioning_guard.current_version.clone(), watermark) - }; - let mut trivial_tasks = vec![]; - let mut pick_tasks = vec![]; - for compaction_group_id in compaction_groups { - if current_version.levels.get(&compaction_group_id).is_none() { - continue; - } - if pick_tasks.len() >= max_task_count { - break; - } - // StoredIdGenerator already implements ids pre-allocation by ID_PREALLOCATE_INTERVAL. - let task_id = next_compaction_task_id(&self.env).await?; - - // When the last table of a compaction group is deleted, the compaction group (and its - // config) is destroyed as well. Then a compaction task for this group may come later and - // cannot find its config. - let group_config = match self - .compaction_group_manager - .read() - .await - .try_get_compaction_group_config(compaction_group_id) - { - Some(config) => config, - None => continue, - }; - if !compaction_statuses.contains_key(&compaction_group_id) { - compaction_statuses.insert( - compaction_group_id, - CompactStatus::new( - compaction_group_id, - group_config.compaction_config.max_level, - ), - ); - } - let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap(); - - let can_trivial_move = matches!(selector.task_type(), TaskType::Dynamic) - || matches!(selector.task_type(), TaskType::Emergency); - - let mut stats = LocalSelectorStatistic::default(); - let member_table_ids = ¤t_version - .get_compaction_group_levels(compaction_group_id) - .member_table_ids; - - let mut table_id_to_option: HashMap = HashMap::default(); - - for table_id in member_table_ids { - if let Some(opts) = all_table_id_to_option.get(table_id) { - table_id_to_option.insert(*table_id, *opts); - } - } - - let compact_task = compact_status.get_compact_task( - current_version.get_compaction_group_levels(compaction_group_id), - task_id as HummockCompactionTaskId, - &group_config, - &mut stats, - selector, - table_id_to_option.clone(), - Arc::new(CompactionDeveloperConfig::new_from_meta_opts( - &self.env.opts, - )), - ); - - stats.report_to_metrics(compaction_group_id, self.metrics.as_ref()); - let compact_task = match compact_task { - None => { - continue; - } - Some(task) => task, - }; - let target_level_id = compact_task.input.target_level as u32; - - let compression_algorithm = match compact_task.compression_algorithm.as_str() { - "Lz4" => 1, - "Zstd" => 2, - _ => 0, - }; - let vnode_partition_count = compact_task.input.vnode_partition_count; - use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; - - let mut compact_task = CompactTask { - input_ssts: compact_task.input.input_levels, - splits: vec![risingwave_pb::hummock::KeyRange::inf()], - watermark, - sorted_output_ssts: vec![], - task_id, - target_level: target_level_id, - // only gc delete keys in last level because there may be older version in more bottom - // level. - gc_delete_keys: current_version - .get_compaction_group_levels(compaction_group_id) - .is_last_level(target_level_id), - base_level: compact_task.base_level as u32, - task_status: TaskStatus::Pending as i32, - compaction_group_id: group_config.group_id, - existing_table_ids: member_table_ids.clone(), - compression_algorithm, - target_file_size: compact_task.target_file_size, - table_options: table_id_to_option - .into_iter() - .filter_map(|(table_id, table_option)| { - if member_table_ids.contains(&table_id) { - return Some((table_id, TableOption::from(&table_option))); - } - - None - }) - .collect(), - current_epoch_time: Epoch::now().0, - compaction_filter_mask: group_config.compaction_config.compaction_filter_mask, - target_sub_level_id: compact_task.input.target_sub_level_id, - task_type: compact_task.compaction_task_type as i32, - split_weight_by_vnode: compact_task.input.vnode_partition_count, - ..Default::default() - }; - - let is_trivial_reclaim = CompactStatus::is_trivial_reclaim(&compact_task); - let is_trivial_move = CompactStatus::is_trivial_move_task(&compact_task); - if is_trivial_reclaim { - tracing::debug!( - "TrivialReclaim for compaction group {}: remove {} sstables, cost time: {:?}", - compaction_group_id, - compact_task - .input_ssts - .iter() - .map(|level| level.table_infos.len()) - .sum::(), - start_time.elapsed() - ); - compact_task.set_task_status(TaskStatus::Success); - compact_status.report_compact_task(&compact_task); - trivial_tasks.push(compact_task); - } else if is_trivial_move && can_trivial_move { - tracing::debug!( - "TrivialMove for compaction group {}: pick up {} sstables in level {} to compact to target_level {} cost time: {:?} input {:?}", - compaction_group_id, - compact_task.input_ssts[0].table_infos.len(), - compact_task.input_ssts[0].level_idx, - compact_task.target_level, - start_time.elapsed(), - compact_task.input_ssts - ); - // this task has been finished and `trivial_move_task` does not need to be schedule. - compact_task.set_task_status(TaskStatus::Success); - compact_task.sorted_output_ssts = compact_task.input_ssts[0].table_infos.clone(); - compact_status.report_compact_task(&compact_task); - trivial_tasks.push(compact_task); - } else { - let mut table_to_vnode_partition = match self - .group_to_table_vnode_partition - .read() - .get(&compaction_group_id) - { - Some(table_to_vnode_partition) => table_to_vnode_partition.clone(), - None => BTreeMap::default(), - }; - table_to_vnode_partition - .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id)); - - if group_config.compaction_config.split_weight_by_vnode > 0 { - for table_id in &compact_task.existing_table_ids { - compact_task - .table_vnode_partition - .entry(*table_id) - .or_insert(vnode_partition_count); - } - } - compact_task.table_watermarks = - current_version.safe_epoch_table_watermarks(&compact_task.existing_table_ids); - - compact_task_assignment.insert( - compact_task.task_id, - CompactTaskAssignment { - compact_task: Some(compact_task.clone()), - context_id: META_NODE_ID, // deprecated - }, - ); - - pick_tasks.push(compact_task); - } - } - - if !trivial_tasks.is_empty() { - let mut versioning_guard = write_lock!(self, versioning).await; - let versioning = versioning_guard.deref_mut(); - let mut hummock_version_deltas = create_trx_wrapper!( - self.sql_meta_store(), - BTreeMapTransactionWrapper, - BTreeMapTransaction::new(&mut versioning.hummock_version_deltas,) - ); - let mut branched_ssts = create_trx_wrapper!( - self.sql_meta_store(), - BTreeMapTransactionWrapper, - BTreeMapTransaction::new(&mut versioning.branched_ssts) - ); - let mut current_version = versioning.current_version.clone(); - let last_apply_version_id = current_version.id; - for compact_task in &trivial_tasks { - let version_delta = gen_version_delta( - &mut hummock_version_deltas, - &mut branched_ssts, - ¤t_version, - compact_task, - deterministic_mode, - ); - current_version.apply_version_delta(&version_delta); - } - commit_multi_var!( - self.env.meta_store(), - self.sql_meta_store(), - compaction_statuses, - compact_task_assignment, - hummock_version_deltas - )?; - branched_ssts.commit_memory(); - - trigger_version_stat(&self.metrics, ¤t_version); - trigger_delta_log_stats(&self.metrics, versioning.hummock_version_deltas.len()); - self.notify_stats(&versioning.version_stats); - versioning.current_version = current_version; - self.notify_version_deltas(versioning, last_apply_version_id); - self.metrics - .compact_frequency - .with_label_values(&[ - "trivial-space-reclaim", - "several_group", - selector.task_type().as_str_name(), - "SUCCESS", - ]) - .inc_by(trivial_tasks.len() as u64); - self.metrics - .compact_task_batch_count - .with_label_values(&["batch_trivial_move"]) - .observe(trivial_tasks.len() as f64); - } else { - // We are using a single transaction to ensure that each task has progress when it is - // created. - commit_multi_var!( - self.env.meta_store(), - self.sql_meta_store(), - compaction_statuses, - compact_task_assignment - )?; - } - if !pick_tasks.is_empty() { - self.metrics - .compact_task_batch_count - .with_label_values(&["batch_get_compact_task"]) - .observe(pick_tasks.len() as f64); - } - - for compact_task in &mut pick_tasks { - let compaction_group_id = compact_task.compaction_group_id; - - // Initiate heartbeat for the task to track its progress. - self.compactor_manager - .initiate_task_heartbeat(compact_task.clone()); - - // this task has been finished. - compact_task.set_task_status(TaskStatus::Pending); - - trigger_sst_stat( - &self.metrics, - compaction.compaction_statuses.get(&compaction_group_id), - ¤t_version, - compaction_group_id, - ); - let compact_task_statistics = statistics_compact_task(compact_task); - - let level_type_label = build_compact_task_level_type_metrics_label( - compact_task.input_ssts[0].level_idx as usize, - compact_task.input_ssts.last().unwrap().level_idx as usize, - ); - - let level_count = compact_task.input_ssts.len(); - if compact_task.input_ssts[0].level_idx == 0 { - self.metrics - .l0_compact_level_count - .with_label_values(&[&compaction_group_id.to_string(), &level_type_label]) - .observe(level_count as _); - } - - self.metrics - .compact_task_size - .with_label_values(&[&compaction_group_id.to_string(), &level_type_label]) - .observe(compact_task_statistics.total_file_size as _); - - self.metrics - .compact_task_size - .with_label_values(&[ - &compaction_group_id.to_string(), - &format!("{} uncompressed", level_type_label), - ]) - .observe(compact_task_statistics.total_uncompressed_file_size as _); - - self.metrics - .compact_task_file_count - .with_label_values(&[&compaction_group_id.to_string(), &level_type_label]) - .observe(compact_task_statistics.total_file_count as _); - - tracing::trace!( - "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}", - compaction_group_id, - level_count, - compact_task.input_ssts[0].level_type().as_str_name(), - compact_task.input_ssts[0].level_idx, - compact_task.target_level, - start_time.elapsed(), - compact_task_statistics - ); - } - - #[cfg(test)] - { - drop(compaction_guard); - self.check_state_consistency().await; - } - Ok((pick_tasks, trivial_tasks)) - } - #[named] pub fn hummock_timer_task(hummock_manager: Arc) -> (JoinHandle<()>, Sender<()>) { use futures::{FutureExt, StreamExt}; From 910339d9069e1208f30e191ab7463a962f65e5ee Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Thu, 14 Mar 2024 23:05:21 +0800 Subject: [PATCH 04/12] support table schema Signed-off-by: Little-Wallace --- src/meta/src/hummock/manager/mod.rs | 28 ++++++++++++++++++++++----- src/meta/src/hummock/manager/tests.rs | 1 - 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 2685535db4702..1d5e3d55db987 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -1007,14 +1007,14 @@ impl HummockManager { compaction_filter_mask: group_config.compaction_config.compaction_filter_mask, target_sub_level_id: compact_task.input.target_sub_level_id, task_type: compact_task.compaction_task_type as i32, - split_weight_by_vnode: compact_task.input.vnode_partition_count, + split_weight_by_vnode, ..Default::default() }; let is_trivial_reclaim = CompactStatus::is_trivial_reclaim(&compact_task); let is_trivial_move = CompactStatus::is_trivial_move_task(&compact_task); if is_trivial_reclaim || (is_trivial_move && can_trivial_move) { - let log_tabel = if is_trivial_reclaim { + let log_label = if is_trivial_reclaim { "TrivialReclaim" } else { "TrivialMove" @@ -1027,7 +1027,7 @@ impl HummockManager { tracing::debug!( "{} for compaction group {}: input: {:?}, cost time: {:?}", - log_tabel, + log_label, compact_task.compaction_group_id, compact_task.input_ssts, start_time.elapsed() @@ -1072,13 +1072,30 @@ impl HummockManager { for table_id in &compact_task.existing_table_ids { compact_task .table_vnode_partition - .entry(*table_id) - .or_insert(vnode_partition_count); + .insert(*table_id, vnode_partition_count); } } compact_task.table_watermarks = current_version .safe_epoch_table_watermarks(&compact_task.existing_table_ids); + if self.env.opts.enable_dropped_column_reclaim { + compact_task.table_schemas = match self.metadata_manager() { + MetadataManager::V1(mgr) => mgr + .catalog_manager + .get_versioned_table_schemas(&compact_task.existing_table_ids) + .await + .into_iter() + .map(|(table_id, column_ids)| { + (table_id, TableSchema { column_ids }) + }) + .collect(), + MetadataManager::V2(_) => { + // TODO #13952: support V2 + BTreeMap::default() + } + }; + } + compact_task_assignment.insert( compact_task.task_id, CompactTaskAssignment { @@ -1196,6 +1213,7 @@ impl HummockManager { #[cfg(test)] { + drop(versioning_guard); drop(compaction_guard); self.check_state_consistency().await; } diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 98fd10a92f062..ed4605d9842ae 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -43,7 +43,6 @@ use crate::hummock::compaction::selector::{ default_compaction_selector, CompactionSelector, ManualCompactionOption, SpaceReclaimCompactionSelector, }; -use crate::hummock::compaction::CompactStatus; use crate::hummock::error::Error; use crate::hummock::test_utils::*; use crate::hummock::{CommitEpochInfo, HummockManager, HummockManagerRef}; From 645f758e0aa87a720a384ecbdb7a443ee059d2f8 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 26 Mar 2024 16:23:35 +0800 Subject: [PATCH 05/12] fix metrics Signed-off-by: Little-Wallace --- src/meta/src/hummock/manager/mod.rs | 25 ++++++++--- src/meta/src/hummock/metrics_utils.rs | 60 ++++++++++++++++++++------- 2 files changed, 66 insertions(+), 19 deletions(-) diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 0dc5cbb19bf2d..778e0a30721e2 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -80,10 +80,10 @@ use crate::hummock::compaction::selector::{ use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig}; use crate::hummock::error::{Error, Result}; use crate::hummock::metrics_utils::{ - build_compact_task_level_type_metrics_label, trigger_delta_log_stats, trigger_local_table_stat, - trigger_lsm_stat, trigger_mv_stat, trigger_pin_unpin_snapshot_state, - trigger_pin_unpin_version_state, trigger_split_stat, trigger_sst_stat, trigger_version_stat, - trigger_write_stop_stats, + build_compact_task_level_type_metrics_label, get_or_create_local_table_stat, + trigger_delta_log_stats, trigger_local_table_stat, trigger_lsm_stat, trigger_mv_stat, + trigger_pin_unpin_snapshot_state, trigger_pin_unpin_version_state, trigger_split_stat, + trigger_sst_stat, trigger_version_stat, trigger_write_stop_stats, }; use crate::hummock::sequence::next_compaction_task_id; use crate::hummock::{CompactorManagerRef, TASK_NORMAL}; @@ -1007,7 +1007,7 @@ impl HummockManager { compaction_filter_mask: group_config.compaction_config.compaction_filter_mask, target_sub_level_id: compact_task.input.target_sub_level_id, task_type: compact_task.compaction_task_type as i32, - split_weight_by_vnode, + split_weight_by_vnode: vnode_partition_count, ..Default::default() }; @@ -1751,6 +1751,21 @@ impl HummockManager { &version_stats, &table_stats_change, ); + for (table_id, stats) in &table_stats_change { + if stats.total_key_size == 0 + && stats.total_value_size == 0 + && stats.total_key_count == 0 + { + continue; + } + let stats_value = std::cmp::max(0, stats.total_key_size + stats.total_value_size); + let table_metrics = get_or_create_local_table_stat( + &self.metrics, + *table_id, + &mut versioning.local_metrics, + ); + table_metrics.inc_write_throughput(stats_value as u64); + } commit_multi_var!( self.env.meta_store(), self.sql_meta_store(), diff --git a/src/meta/src/hummock/metrics_utils.rs b/src/meta/src/hummock/metrics_utils.rs index 0ede18e7c559a..be25c5cf452b2 100644 --- a/src/meta/src/hummock/metrics_utils.rs +++ b/src/meta/src/hummock/metrics_utils.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use itertools::{enumerate, Itertools}; +use prometheus::core::{AtomicU64, GenericCounter}; use prometheus::IntGauge; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ object_size_map, BranchedSstInfo, @@ -43,6 +44,50 @@ pub struct LocalTableMetrics { total_key_count: IntGauge, total_key_size: IntGauge, total_value_size: IntGauge, + write_throughput: GenericCounter, + cal_count: usize, + write_size: u64, +} + +const MIN_FLUSH_COUNT: usize = 16; +const MIN_FLUSH_DATA_SIZE: u64 = 128 * 1024 * 1024; + +impl LocalTableMetrics { + pub fn inc_write_throughput(&mut self, val: u64) { + self.write_size += val; + self.cal_count += 1; + if self.write_size > MIN_FLUSH_DATA_SIZE || self.cal_count > MIN_FLUSH_COUNT { + self.write_throughput.inc_by(self.write_size / 1024 / 1024); + self.write_size = 0; + self.cal_count = 0; + } + } +} + +pub fn get_or_create_local_table_stat<'a>( + metrics: &MetaMetrics, + table_id: u32, + local_metrics: &'a mut HashMap, +) -> &'a mut LocalTableMetrics { + local_metrics.entry(table_id).or_insert_with(|| { + let table_label = format!("{}", table_id); + LocalTableMetrics { + total_key_count: metrics + .version_stats + .with_label_values(&[&table_label, "total_key_count"]), + total_key_size: metrics + .version_stats + .with_label_values(&[&table_label, "total_key_size"]), + total_value_size: metrics + .version_stats + .with_label_values(&[&table_label, "total_value_size"]), + write_throughput: metrics + .table_write_throughput + .with_label_values(&[&table_label]), + cal_count: 0, + write_size: 0, + } + }) } pub fn trigger_local_table_stat( @@ -55,20 +100,7 @@ pub fn trigger_local_table_stat( if stats.total_key_size == 0 && stats.total_value_size == 0 && stats.total_key_count == 0 { continue; } - let table_metrics = local_metrics.entry(*table_id).or_insert_with(|| { - let table_label = format!("{}", table_id); - LocalTableMetrics { - total_key_count: metrics - .version_stats - .with_label_values(&[&table_label, "total_key_count"]), - total_key_size: metrics - .version_stats - .with_label_values(&[&table_label, "total_key_size"]), - total_value_size: metrics - .version_stats - .with_label_values(&[&table_label, "total_value_size"]), - } - }); + let table_metrics = get_or_create_local_table_stat(metrics, *table_id, local_metrics); if let Some(table_stats) = version_stats.table_stats.get(table_id) { table_metrics .total_key_count From 7c2bf52e68cf069f9363e9d922b3740767000c65 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 27 Mar 2024 17:01:52 +0800 Subject: [PATCH 06/12] limit loop count Signed-off-by: Little-Wallace --- src/meta/src/hummock/manager/compaction.rs | 8 +++++++- src/meta/src/hummock/manager/mod.rs | 14 ++++++++++---- src/meta/src/hummock/manager/tests.rs | 7 ++++--- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/src/meta/src/hummock/manager/compaction.rs b/src/meta/src/hummock/manager/compaction.rs index a81281f567ce8..989a25aab028b 100644 --- a/src/meta/src/hummock/manager/compaction.rs +++ b/src/meta/src/hummock/manager/compaction.rs @@ -141,11 +141,15 @@ impl HummockManager { .await; match compact_ret { - Ok(compact_tasks) => { + Ok((compact_tasks, trivial_tasks)) => { if compact_tasks.is_empty() { break; } generated_task_count += compact_tasks.len(); + for task in trivial_tasks { + existed_groups.push(task.compaction_group_id); + wait_compact_groups.insert(task.compaction_group_id); + } for task in compact_tasks { let task_id = task.task_id; existed_groups.push(task.compaction_group_id); @@ -164,6 +168,8 @@ impl HummockManager { break; } } + existed_groups.sort(); + existed_groups.dedup(); } Err(err) => { tracing::warn!(error = %err.as_report(), "Failed to get compaction task"); diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 778e0a30721e2..dd50b800253e1 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -859,7 +859,7 @@ impl HummockManager { &self, compaction_groups: Vec, selector: &mut Box, - ) -> crate::hummock::error::Result> { + ) -> crate::hummock::error::Result<(Vec, Vec)> { // TODO: `get_all_table_options` will hold catalog_manager async lock, to avoid holding the // lock in compaction_guard, take out all table_options in advance there may be a // waste of resources here, need to add a more efficient filter in catalog_manager @@ -913,6 +913,7 @@ impl HummockManager { let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts( &self.env.opts, )); + const MAX_TRIVIAL_MOVE_TASK_COUNT: usize = 256; for compaction_group_id in compaction_groups { if current_version.levels.get(&compaction_group_id).is_none() { continue; @@ -960,6 +961,7 @@ impl HummockManager { } } + let mut trivial_move_count = 0; while let Some(compact_task) = compact_status.get_compact_task( current_version.get_compaction_group_levels(compaction_group_id), task_id as HummockCompactionTaskId, @@ -1056,6 +1058,10 @@ impl HummockManager { ); current_version.apply_version_delta(&version_delta); trivial_tasks.push(compact_task); + trivial_move_count += 1; + if trivial_move_count >= MAX_TRIVIAL_MOVE_TASK_COUNT { + break; + } } else { let mut table_to_vnode_partition = match self .group_to_table_vnode_partition @@ -1217,7 +1223,7 @@ impl HummockManager { drop(compaction_guard); self.check_state_consistency().await; } - Ok(pick_tasks) + Ok((pick_tasks, trivial_tasks)) } /// Cancels a compaction task no matter it's assigned or unassigned. @@ -1253,7 +1259,7 @@ impl HummockManager { &self, compaction_groups: Vec, selector: &mut Box, - ) -> Result> { + ) -> Result<(Vec, Vec)> { fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore( anyhow::anyhow!("failpoint metastore error") ))); @@ -1270,7 +1276,7 @@ impl HummockManager { anyhow::anyhow!("failpoint metastore error") ))); - let mut normal_tasks = self + let (mut normal_tasks, _) = self .get_compact_tasks_impl(vec![compaction_group_id], selector) .await?; Ok(normal_tasks.pop()) diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index ed4605d9842ae..ffed5f8af2b68 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -43,6 +43,7 @@ use crate::hummock::compaction::selector::{ default_compaction_selector, CompactionSelector, ManualCompactionOption, SpaceReclaimCompactionSelector, }; +use crate::hummock::compaction::CompactStatus; use crate::hummock::error::Error; use crate::hummock::test_utils::*; use crate::hummock::{CommitEpochInfo, HummockManager, HummockManagerRef}; @@ -1694,13 +1695,13 @@ async fn test_split_compaction_group_trivial_expired() { .unwrap(); let mut selector: Box = Box::::default(); - let normal_tasks = hummock_manager + let (normal_tasks, mut trivial_task) = hummock_manager .get_compact_tasks_impl(vec![2], &mut selector) .await .unwrap(); assert!(normal_tasks.is_empty()); - // let reclaim_task = normal_tasks.pop().unwrap(); - // assert!(CompactStatus::is_trivial_reclaim(&reclaim_task)); + let reclaim_task = trivial_task.pop().unwrap(); + assert!(CompactStatus::is_trivial_reclaim(&reclaim_task)); let current_version = hummock_manager.get_current_version().await; let new_group_id = current_version.levels.keys().max().cloned().unwrap(); From d4b28d74a7356aecbc344f4203dbe1ce88a36c1e Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Thu, 28 Mar 2024 20:07:18 +0800 Subject: [PATCH 07/12] address comment Signed-off-by: Little-Wallace --- src/meta/src/hummock/manager/compaction.rs | 34 +++----- src/meta/src/hummock/manager/mod.rs | 92 +++++++++++++++------- src/meta/src/hummock/manager/tests.rs | 9 +-- 3 files changed, 79 insertions(+), 56 deletions(-) diff --git a/src/meta/src/hummock/manager/compaction.rs b/src/meta/src/hummock/manager/compaction.rs index 989a25aab028b..9d19576179747 100644 --- a/src/meta/src/hummock/manager/compaction.rs +++ b/src/meta/src/hummock/manager/compaction.rs @@ -133,27 +133,26 @@ impl HummockManager { let mut generated_task_count = 0; let mut existed_groups = groups.clone(); let mut meet_error = false; - let mut wait_compact_groups: HashSet = HashSet::default(); + let mut no_task_groups: HashSet = HashSet::default(); while generated_task_count < pull_task_count && !meet_error { let compact_ret = self - .get_compact_tasks(std::mem::take(&mut existed_groups), selector) + .get_compact_tasks( + existed_groups.clone(), + pull_task_count - generated_task_count, + selector, + ) .await; match compact_ret { - Ok((compact_tasks, trivial_tasks)) => { + Ok((compact_tasks, unschedule_groups)) => { if compact_tasks.is_empty() { break; } generated_task_count += compact_tasks.len(); - for task in trivial_tasks { - existed_groups.push(task.compaction_group_id); - wait_compact_groups.insert(task.compaction_group_id); - } + no_task_groups.extend(unschedule_groups); for task in compact_tasks { let task_id = task.task_id; - existed_groups.push(task.compaction_group_id); - wait_compact_groups.insert(task.compaction_group_id); if let Err(e) = compactor.send_event(ResponseEvent::CompactTask(task)) { @@ -168,8 +167,7 @@ impl HummockManager { break; } } - existed_groups.sort(); - existed_groups.dedup(); + existed_groups.retain(|group_id| !no_task_groups.contains(group_id)); } Err(err) => { tracing::warn!(error = %err.as_report(), "Failed to get compaction task"); @@ -177,18 +175,8 @@ impl HummockManager { } }; } - if generated_task_count < pull_task_count && !meet_error { - // no compact_task to be picked - for group in groups { - self.compaction_state.unschedule(group, task_type); - } - } else if !meet_error { - for group in groups { - if wait_compact_groups.contains(&group) { - continue; - } - self.compaction_state.unschedule(group, task_type); - } + for group in no_task_groups { + self.compaction_state.unschedule(group, task_type); } } diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index e0e6d3b4dde16..5276611b4542b 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -879,8 +879,9 @@ impl HummockManager { pub async fn get_compact_tasks_impl( &self, compaction_groups: Vec, + max_select_count: usize, selector: &mut Box, - ) -> crate::hummock::error::Result<(Vec, Vec)> { + ) -> Result<(Vec, Vec)> { // TODO: `get_all_table_options` will hold catalog_manager async lock, to avoid holding the // lock in compaction_guard, take out all table_options in advance there may be a // waste of resources here, need to add a more efficient filter in catalog_manager @@ -921,7 +922,7 @@ impl HummockManager { let mut hummock_version_deltas = create_trx_wrapper!( self.sql_meta_store(), BTreeMapTransactionWrapper, - BTreeMapTransaction::new(&mut versioning.hummock_version_deltas,) + BTreeMapTransaction::new(&mut versioning.hummock_version_deltas) ); let mut branched_ssts = create_trx_wrapper!( self.sql_meta_store(), @@ -929,18 +930,21 @@ impl HummockManager { BTreeMapTransaction::new(&mut versioning.branched_ssts) ); + let mut unschedule_groups = vec![]; let mut trivial_tasks = vec![]; let mut pick_tasks = vec![]; let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts( &self.env.opts, )); const MAX_TRIVIAL_MOVE_TASK_COUNT: usize = 256; - for compaction_group_id in compaction_groups { + 'outside: for compaction_group_id in compaction_groups { + if pick_tasks.len() >= max_select_count { + break; + } + if current_version.levels.get(&compaction_group_id).is_none() { continue; } - // StoredIdGenerator already implements ids pre-allocation by ID_PREALLOCATE_INTERVAL. - let task_id = next_compaction_task_id(&self.env).await?; // When the last table of a compaction group is deleted, the compaction group (and its // config) is destroyed as well. Then a compaction task for this group may come later and @@ -954,7 +958,11 @@ impl HummockManager { Some(config) => config, None => continue, }; + // StoredIdGenerator already implements ids pre-allocation by ID_PREALLOCATE_INTERVAL. + let task_id = next_compaction_task_id(&self.env).await?; + if !compaction_statuses.contains_key(&compaction_group_id) { + // lazy initialize. compaction_statuses.insert( compaction_group_id, CompactStatus::new( @@ -982,7 +990,6 @@ impl HummockManager { } } - let mut trivial_move_count = 0; while let Some(compact_task) = compact_status.get_compact_task( current_version.get_compaction_group_levels(compaction_group_id), task_id as HummockCompactionTaskId, @@ -1079,9 +1086,8 @@ impl HummockManager { ); current_version.apply_version_delta(&version_delta); trivial_tasks.push(compact_task); - trivial_move_count += 1; - if trivial_move_count >= MAX_TRIVIAL_MOVE_TASK_COUNT { - break; + if trivial_tasks.len() >= MAX_TRIVIAL_MOVE_TASK_COUNT { + break 'outside; } } else { let mut table_to_vnode_partition = match self @@ -1106,6 +1112,7 @@ impl HummockManager { .safe_epoch_table_watermarks(&compact_task.existing_table_ids); if self.env.opts.enable_dropped_column_reclaim { + // TODO: get all table schemas for all tables in once call to avoid acquiring lock and await. compact_task.table_schemas = match self.metadata_manager() { MetadataManager::V1(mgr) => mgr .catalog_manager @@ -1135,8 +1142,16 @@ impl HummockManager { break; } + stats.report_to_metrics(compaction_group_id, self.metrics.as_ref()); stats = LocalSelectorStatistic::default(); } + if pick_tasks + .last() + .map(|task| task.compaction_group_id != compaction_group_id) + .unwrap_or(true) + { + unschedule_groups.push(compaction_group_id); + } stats.report_to_metrics(compaction_group_id, self.metrics.as_ref()); } @@ -1160,9 +1175,11 @@ impl HummockManager { .compact_task_batch_count .with_label_values(&["batch_trivial_move"]) .observe(trivial_tasks.len() as f64); + drop(versioning_guard); } else { // We are using a single transaction to ensure that each task has progress when it is // created. + drop(versioning_guard); commit_multi_var!( self.env.meta_store(), self.sql_meta_store(), @@ -1170,6 +1187,7 @@ impl HummockManager { compact_task_assignment )?; } + drop(compaction_guard); if !pick_tasks.is_empty() { self.metrics .compact_task_batch_count @@ -1186,13 +1204,6 @@ impl HummockManager { // this task has been finished. compact_task.set_task_status(TaskStatus::Pending); - - trigger_sst_stat( - &self.metrics, - compaction.compaction_statuses.get(&compaction_group_id), - &versioning.current_version, - compaction_group_id, - ); let compact_task_statistics = statistics_compact_task(compact_task); let level_type_label = build_compact_task_level_type_metrics_label( @@ -1240,11 +1251,10 @@ impl HummockManager { #[cfg(test)] { - drop(versioning_guard); - drop(compaction_guard); self.check_state_consistency().await; } - Ok((pick_tasks, trivial_tasks)) + pick_tasks.extend(trivial_tasks); + Ok((pick_tasks, unschedule_groups)) } /// Cancels a compaction task no matter it's assigned or unassigned. @@ -1279,13 +1289,27 @@ impl HummockManager { pub async fn get_compact_tasks( &self, compaction_groups: Vec, + max_select_count: usize, selector: &mut Box, - ) -> Result<(Vec, Vec)> { + ) -> Result<(Vec, Vec)> { fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore( anyhow::anyhow!("failpoint metastore error") ))); - self.get_compact_tasks_impl(compaction_groups, selector) - .await + let (mut tasks, groups) = self + .get_compact_tasks_impl(compaction_groups, max_select_count, selector) + .await?; + tasks.retain(|task| { + if task.task_status() == TaskStatus::Success { + debug_assert!( + CompactStatus::is_trivial_reclaim(task) + || CompactStatus::is_trivial_move_task(task) + ); + false + } else { + true + } + }); + Ok((tasks, groups)) } pub async fn get_compact_task( @@ -1297,10 +1321,19 @@ impl HummockManager { anyhow::anyhow!("failpoint metastore error") ))); - let (mut normal_tasks, _) = self - .get_compact_tasks_impl(vec![compaction_group_id], selector) + let (normal_tasks, _) = self + .get_compact_tasks_impl(vec![compaction_group_id], 1, selector) .await?; - Ok(normal_tasks.pop()) + for task in normal_tasks { + if task.task_status() != TaskStatus::Success { + return Ok(Some(task)); + } + debug_assert!( + CompactStatus::is_trivial_reclaim(&task) + || CompactStatus::is_trivial_move_task(&task) + ); + } + Ok(None) } pub async fn manual_get_compact_task( @@ -3058,17 +3091,20 @@ impl HummockManager { None } + /// This method will return all compaction group id in a random order and task type. If there are any group block by `write_limit`, it will return a single array with `TaskType::Emergency`. + /// If these groups get different task-type, it will return all group id with `TaskType::Dynamic` if the first group get `TaskType::Dynamic`, otherwise it will return the single group with other task type. #[named] pub async fn auto_pick_compaction_groups_and_type( &self, ) -> (Vec, compact_task::TaskType) { use rand::prelude::SliceRandom; use rand::thread_rng; - let mut compaction_group_ids = self.compaction_group_ids().await; - compaction_group_ids.shuffle(&mut thread_rng()); let versioning_guard = read_lock!(self, versioning).await; let versioning = versioning_guard.deref(); + let mut compaction_group_ids = + get_compaction_group_ids(&versioning.current_version).collect_vec(); + compaction_group_ids.shuffle(&mut thread_rng()); let mut normal_groups = vec![]; for cg_id in compaction_group_ids { @@ -3096,7 +3132,7 @@ impl HummockManager { if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) { if pick_type == TaskType::Dynamic { normal_groups.push(cg_id); - } else { + } else if normal_groups.is_empty() { return (vec![cg_id], pick_type); } } diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index ffed5f8af2b68..3481bdb514e30 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -43,7 +43,6 @@ use crate::hummock::compaction::selector::{ default_compaction_selector, CompactionSelector, ManualCompactionOption, SpaceReclaimCompactionSelector, }; -use crate::hummock::compaction::CompactStatus; use crate::hummock::error::Error; use crate::hummock::test_utils::*; use crate::hummock::{CommitEpochInfo, HummockManager, HummockManagerRef}; @@ -1695,12 +1694,12 @@ async fn test_split_compaction_group_trivial_expired() { .unwrap(); let mut selector: Box = Box::::default(); - let (normal_tasks, mut trivial_task) = hummock_manager - .get_compact_tasks_impl(vec![2], &mut selector) + let (mut normal_tasks, _unscheduled) = hummock_manager + .get_compact_tasks_impl(vec![2], 1, &mut selector) .await .unwrap(); - assert!(normal_tasks.is_empty()); - let reclaim_task = trivial_task.pop().unwrap(); + use crate::hummock::manager::CompactStatus; + let reclaim_task = normal_tasks.pop().unwrap(); assert!(CompactStatus::is_trivial_reclaim(&reclaim_task)); let current_version = hummock_manager.get_current_version().await; From 264cb7f8d2c592edad76939f6781ac9dfc44b1fb Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 2 Apr 2024 12:42:09 +0800 Subject: [PATCH 08/12] fix vnode partition Signed-off-by: Little-Wallace --- src/meta/src/hummock/manager/mod.rs | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 5276611b4542b..bdad670ba1d43 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -29,6 +29,7 @@ use futures::stream::{BoxStream, FuturesUnordered}; use futures::{FutureExt, StreamExt}; use itertools::Itertools; use parking_lot::Mutex; +use rand::thread_rng; use risingwave_common::catalog::TableId; use risingwave_common::config::default::compaction_config; use risingwave_common::config::ObjectStoreConfig; @@ -990,6 +991,15 @@ impl HummockManager { } } + let mut table_to_vnode_partition = match self + .group_to_table_vnode_partition + .read() + .get(&compaction_group_id) + { + Some(table_to_vnode_partition) => table_to_vnode_partition.clone(), + None => BTreeMap::default(), + }; + while let Some(compact_task) = compact_status.get_compact_task( current_version.get_compaction_group_levels(compaction_group_id), task_id as HummockCompactionTaskId, @@ -1090,24 +1100,18 @@ impl HummockManager { break 'outside; } } else { - let mut table_to_vnode_partition = match self - .group_to_table_vnode_partition - .read() - .get(&compaction_group_id) - { - Some(table_to_vnode_partition) => table_to_vnode_partition.clone(), - None => BTreeMap::default(), - }; - table_to_vnode_partition - .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id)); - if group_config.compaction_config.split_weight_by_vnode > 0 { for table_id in &compact_task.existing_table_ids { compact_task .table_vnode_partition .insert(*table_id, vnode_partition_count); } + } else { + compact_task.table_vnode_partition = table_to_vnode_partition.clone(); } + compact_task + .table_vnode_partition + .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id)); compact_task.table_watermarks = current_version .safe_epoch_table_watermarks(&compact_task.existing_table_ids); @@ -1288,13 +1292,14 @@ impl HummockManager { pub async fn get_compact_tasks( &self, - compaction_groups: Vec, + mut compaction_groups: Vec, max_select_count: usize, selector: &mut Box, ) -> Result<(Vec, Vec)> { fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore( anyhow::anyhow!("failpoint metastore error") ))); + compaction_groups.shuffle(&mut thread_rng()); let (mut tasks, groups) = self .get_compact_tasks_impl(compaction_groups, max_select_count, selector) .await?; From f45011e7cd4cbf359d61c5b5e61a4318d2d1291c Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 2 Apr 2024 14:32:01 +0800 Subject: [PATCH 09/12] batch cancel Signed-off-by: Little-Wallace --- src/meta/src/hummock/manager/compaction.rs | 19 +++++++---- src/meta/src/hummock/manager/mod.rs | 38 ++++++++++++++-------- 2 files changed, 37 insertions(+), 20 deletions(-) diff --git a/src/meta/src/hummock/manager/compaction.rs b/src/meta/src/hummock/manager/compaction.rs index 9d19576179747..065e9745f7dbb 100644 --- a/src/meta/src/hummock/manager/compaction.rs +++ b/src/meta/src/hummock/manager/compaction.rs @@ -19,7 +19,7 @@ use function_name::named; use futures::future::Shared; use itertools::Itertools; use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId}; -use risingwave_pb::hummock::compact_task::TaskType; +use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType}; use risingwave_pb::hummock::subscribe_compaction_event_request::{ self, Event as RequestEvent, PullTask, }; @@ -132,10 +132,10 @@ impl HummockManager { let mut generated_task_count = 0; let mut existed_groups = groups.clone(); - let mut meet_error = false; let mut no_task_groups: HashSet = HashSet::default(); + let mut failed_tasks = vec![]; - while generated_task_count < pull_task_count && !meet_error { + while generated_task_count < pull_task_count && failed_tasks.is_empty() { let compact_ret = self .get_compact_tasks( existed_groups.clone(), @@ -162,11 +162,12 @@ impl HummockManager { task_id, compactor.context_id(), ); - self.compactor_manager.remove_compactor(context_id); - meet_error = true; - break; + failed_tasks.push(task_id); } } + if !failed_tasks.is_empty() { + self.compactor_manager.remove_compactor(context_id); + } existed_groups.retain(|group_id| !no_task_groups.contains(group_id)); } Err(err) => { @@ -178,6 +179,12 @@ impl HummockManager { for group in no_task_groups { self.compaction_state.unschedule(group, task_type); } + if let Err(err) = self + .cancel_compact_tasks(failed_tasks, TaskStatus::SendFailCanceled) + .await + { + tracing::warn!(error = %err.as_report(), "Failed to cancel compaction task"); + } } // ack to compactor diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index bdad670ba1d43..44f8d72003d36 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -29,6 +29,7 @@ use futures::stream::{BoxStream, FuturesUnordered}; use futures::{FutureExt, StreamExt}; use itertools::Itertools; use parking_lot::Mutex; +use rand::prelude::SliceRandom; use rand::thread_rng; use risingwave_common::catalog::TableId; use risingwave_common::config::default::compaction_config; @@ -991,7 +992,7 @@ impl HummockManager { } } - let mut table_to_vnode_partition = match self + let table_to_vnode_partition = match self .group_to_table_vnode_partition .read() .get(&compaction_group_id) @@ -1266,28 +1267,41 @@ impl HummockManager { fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore( anyhow::anyhow!("failpoint metastore err") ))); - self.cancel_compact_task_impl(task_id, task_status).await + let ret = self + .cancel_compact_task_impl(vec![task_id], task_status) + .await?; + Ok(ret[0]) + } + + pub async fn cancel_compact_tasks( + &self, + tasks: Vec, + task_status: TaskStatus, + ) -> Result> { + self.cancel_compact_task_impl(tasks, task_status).await } pub async fn cancel_compact_task_impl( &self, - task_id: u64, + task_ids: Vec, task_status: TaskStatus, - ) -> Result { + ) -> Result> { assert!(CANCEL_STATUS_SET.contains(&task_status)); - let rets = self - .report_compact_tasks(vec![ReportTask { + let tasks = task_ids + .into_iter() + .map(|task_id| ReportTask { task_id, task_status: task_status as i32, sorted_output_ssts: vec![], table_stats_change: HashMap::default(), - }]) - .await?; + }) + .collect_vec(); + let rets = self.report_compact_tasks(tasks).await?; #[cfg(test)] { self.check_state_consistency().await; } - Ok(rets[0]) + Ok(rets) } pub async fn get_compact_tasks( @@ -1506,6 +1520,7 @@ impl HummockManager { } else { false }; + rets[idx] = is_success; if is_success { success_count += 1; let version_delta = gen_version_delta( @@ -3082,8 +3097,6 @@ impl HummockManager { pub async fn auto_pick_compaction_group_and_type( &self, ) -> Option<(CompactionGroupId, compact_task::TaskType)> { - use rand::prelude::SliceRandom; - use rand::thread_rng; let mut compaction_group_ids = self.compaction_group_ids().await; compaction_group_ids.shuffle(&mut thread_rng()); @@ -3102,9 +3115,6 @@ impl HummockManager { pub async fn auto_pick_compaction_groups_and_type( &self, ) -> (Vec, compact_task::TaskType) { - use rand::prelude::SliceRandom; - use rand::thread_rng; - let versioning_guard = read_lock!(self, versioning).await; let versioning = versioning_guard.deref(); let mut compaction_group_ids = From a8fc6f407e6139940c11ff81e2af539fe351fd4e Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 2 Apr 2024 15:41:09 +0800 Subject: [PATCH 10/12] fix test Signed-off-by: Little-Wallace --- src/meta/src/hummock/manager/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 44f8d72003d36..77c9659789e9a 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -1520,7 +1520,6 @@ impl HummockManager { } else { false }; - rets[idx] = is_success; if is_success { success_count += 1; let version_delta = gen_version_delta( From 73f3a79ddf8ddc3d6f7ead8216594e9a185e9b88 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 2 Apr 2024 19:52:50 +0800 Subject: [PATCH 11/12] revert log Signed-off-by: Little-Wallace --- src/storage/src/hummock/compactor/compactor_runner.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index d9c7c9ee1bbd8..ab3c34c1d85b3 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -520,16 +520,14 @@ pub async fn compact( ) * compact_task.splits.len() as u64; tracing::info!( - "Ready to handle compaction group {} task: {} compact_task_statistics {:?} target_level {} compression_algorithm {:?} table_ids {:?} parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}", - compact_task.compaction_group_id, + "Ready to handle task: {} compact_task_statistics {:?} compression_algorithm {:?} parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}, {:?}", compact_task.task_id, compact_task_statistics, - compact_task.target_level, compact_task.compression_algorithm, - compact_task.existing_table_ids, parallelism, task_memory_capacity_with_parallelism, optimize_by_copy_block, + compact_task_to_string(&compact_task), ); // If the task does not have enough memory, it should cancel the task and let the meta From 840b5122670f3b36875586ef475d7e46c33de1a5 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 2 Apr 2024 20:13:47 +0800 Subject: [PATCH 12/12] do not print input sst Signed-off-by: Little-Wallace --- src/storage/hummock_sdk/src/compact.rs | 14 ++++++++++---- .../src/hummock/compactor/compactor_runner.rs | 8 +++++--- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/src/storage/hummock_sdk/src/compact.rs b/src/storage/hummock_sdk/src/compact.rs index 33d895d2c4fcd..e9671df2d0637 100644 --- a/src/storage/hummock_sdk/src/compact.rs +++ b/src/storage/hummock_sdk/src/compact.rs @@ -14,6 +14,15 @@ use risingwave_pb::hummock::{CompactTask, LevelType, SstableInfo}; +pub fn compact_task_output_to_string(compact_task: &CompactTask) -> String { + let mut s = String::default(); + s.push_str("Compaction task output: \n"); + for sst in &compact_task.sorted_output_ssts { + append_sstable_info_to_string(&mut s, sst); + } + s +} + pub fn compact_task_to_string(compact_task: &CompactTask) -> String { use std::fmt::Write; @@ -80,10 +89,7 @@ pub fn compact_task_to_string(compact_task: &CompactTask) -> String { .collect(); writeln!(s, "Level {:?} {:?} ", level_entry.level_idx, tables).unwrap(); } - s.push_str("Compaction task output: \n"); - for sst in &compact_task.sorted_output_ssts { - append_sstable_info_to_string(&mut s, sst); - } + s.push_str(&compact_task_output_to_string(compact_task)); s } diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index ab3c34c1d85b3..775e1ef8cf86d 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -27,7 +27,9 @@ use risingwave_hummock_sdk::compact::{ use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker}; use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon}; use risingwave_hummock_sdk::table_stats::{add_table_stats_map, TableStats, TableStatsMap}; -use risingwave_hummock_sdk::{can_concat, HummockSstableObjectId, KeyComparator}; +use risingwave_hummock_sdk::{ + can_concat, compact_task_output_to_string, HummockSstableObjectId, KeyComparator, +}; use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType}; use risingwave_pb::hummock::{BloomFilterType, CompactTask, LevelType, SstableInfo}; use thiserror_ext::AsReport; @@ -520,7 +522,7 @@ pub async fn compact( ) * compact_task.splits.len() as u64; tracing::info!( - "Ready to handle task: {} compact_task_statistics {:?} compression_algorithm {:?} parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}, {:?}", + "Ready to handle task: {} compact_task_statistics {:?} compression_algorithm {:?} parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}, {}", compact_task.task_id, compact_task_statistics, compact_task.compression_algorithm, @@ -696,7 +698,7 @@ pub async fn compact( tracing::info!( "Finished compaction task in {:?}ms: {}", cost_time, - compact_task_to_string(&compact_task) + compact_task_output_to_string(&compact_task) ); ((compact_task, table_stats), memory_detector) }