From a3778f4bf718104d6beb4d80101457c60fd777ea Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Tue, 10 Oct 2023 15:38:39 +0800 Subject: [PATCH] feat: add compaction metrics (#2560) * feat: add compaction metrics * feat: add compaction request total count * fix: CR comments --- src/mito2/src/compaction.rs | 13 ++++-- src/mito2/src/compaction/twcs.rs | 14 ++++++- src/mito2/src/metrics.rs | 15 ++++++- src/mito2/src/request.rs | 9 +++- src/mito2/src/worker/handle_compaction.rs | 50 +++++++++++++---------- 5 files changed, 72 insertions(+), 29 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 38327f10ca81..5d843fc373f5 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -20,8 +20,9 @@ mod twcs; use std::collections::HashMap; use std::sync::Arc; +use std::time::Instant; -use common_telemetry::{debug, error}; +use common_telemetry::{debug, error, timer}; pub use picker::CompactionPickerRef; use snafu::ResultExt; use store_api::storage::RegionId; @@ -32,6 +33,7 @@ use crate::compaction::twcs::TwcsPicker; use crate::error::{ CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result, }; +use crate::metrics::{COMPACTION_STAGE_ELAPSED, STAGE_LABEL}; use crate::region::options::CompactionOptions; use crate::region::version::{VersionControlRef, VersionRef}; use crate::request::{OptionOutputTx, OutputTx, WorkerRequest}; @@ -47,6 +49,8 @@ pub struct CompactionRequest { /// Waiters of the compaction request. pub(crate) waiters: Vec, pub(crate) file_purger: FilePurgerRef, + /// Start time of compaction task. + pub(crate) start_time: Instant, } impl CompactionRequest { @@ -175,11 +179,14 @@ impl CompactionScheduler { "Pick compaction strategy {:?} for region: {}", picker, region_id ); + + let pick_timer = timer!(COMPACTION_STAGE_ELAPSED, &[(STAGE_LABEL, "pick")]); let Some(mut task) = picker.pick(request) else { // Nothing to compact, remove it from the region status map. self.region_status.remove(®ion_id); return Ok(()); }; + drop(pick_timer); // Submit the compaction task. self.scheduler @@ -188,10 +195,8 @@ impl CompactionScheduler { })) .map_err(|e| { error!(e; "Failed to submit compaction request for region {}", region_id); - // If failed to submit the job, we need to remove the region from the scheduler. self.region_status.remove(®ion_id); - e }) } @@ -295,12 +300,14 @@ impl CompactionStatus { waiter: OptionOutputTx, ) -> CompactionRequest { let current_version = self.version_control.current().version; + let start_time = Instant::now(); let mut req = CompactionRequest { current_version, access_layer: self.access_layer.clone(), request_sender: request_sender.clone(), waiters: Vec::new(), file_purger: self.file_purger.clone(), + start_time, }; if let Some(pending) = self.pending_compaction.take() { diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 19b9085eba67..d9a586d0f631 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -15,14 +15,15 @@ use std::collections::BTreeMap; use std::fmt::{Debug, Formatter}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use common_base::readable_size::ReadableSize; use common_query::Output; -use common_telemetry::{debug, error, info}; +use common_telemetry::{debug, error, info, timer}; use common_time::timestamp::TimeUnit; use common_time::timestamp_millis::BucketAligned; use common_time::Timestamp; +use metrics::increment_counter; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; @@ -34,6 +35,7 @@ use crate::compaction::picker::{CompactionTask, Picker}; use crate::compaction::CompactionRequest; use crate::error; use crate::error::CompactRegionSnafu; +use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED, STAGE_LABEL}; use crate::request::{ BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest, }; @@ -118,6 +120,7 @@ impl Picker for TwcsPicker { request_sender, waiters, file_purger, + start_time, } = req; let region_metadata = current_version.metadata.clone(); @@ -170,6 +173,7 @@ impl Picker for TwcsPicker { request_sender, waiters, file_purger, + start_time, }; Some(Box::new(task)) } @@ -228,6 +232,8 @@ pub(crate) struct TwcsCompactionTask { pub(crate) request_sender: mpsc::Sender, /// Senders that are used to notify waiters waiting for pending compaction tasks. pub waiters: Vec, + /// Start time of compaction task + pub start_time: Instant, } impl Debug for TwcsCompactionTask { @@ -310,8 +316,10 @@ impl TwcsCompactionTask { async fn handle_compaction(&mut self) -> error::Result<(Vec, Vec)> { self.mark_files_compacting(true); + let merge_timer = timer!(COMPACTION_STAGE_ELAPSED, &[(STAGE_LABEL, "merge")]); let (output, mut compacted) = self.merge_ssts().await.map_err(|e| { error!(e; "Failed to compact region: {}", self.region_id); + merge_timer.discard(); e })?; compacted.extend(self.expired_ssts.iter().map(FileHandle::meta)); @@ -320,6 +328,7 @@ impl TwcsCompactionTask { /// Handles compaction failure, notifies all waiters. fn on_failure(&mut self, err: Arc) { + increment_counter!(COMPACTION_FAILURE_COUNT); for waiter in self.waiters.drain(..) { waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id: self.region_id, @@ -357,6 +366,7 @@ impl CompactionTask for TwcsCompactionTask { compaction_time_window: self .compaction_time_window .map(|seconds| Duration::from_secs(seconds as u64)), + start_time: self.start_time, }) } Err(e) => { diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 855ba9ddf642..4b32ec877e43 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +/// Stage label. +pub const STAGE_LABEL: &str = "stage"; + /// Global write buffer size in bytes. pub const WRITE_BUFFER_BYTES: &str = "mito.write_buffer_bytes"; /// Type label. @@ -42,8 +45,16 @@ pub const WRITE_STALL_TOTAL: &str = "mito.write.stall_total"; pub const WRITE_REJECT_TOTAL: &str = "mito.write.reject_total"; /// Elapsed time of each write stage. pub const WRITE_STAGE_ELAPSED: &str = "mito.write.stage_elapsed"; -/// Stage label. -pub const STAGE_LABEL: &str = "stage"; /// Counter of rows to write. pub const WRITE_ROWS_TOTAL: &str = "mito.write.rows_total"; // ------ End of write related metrics + +// Compaction metrics +/// Timer of different stages in compaction. +pub const COMPACTION_STAGE_ELAPSED: &str = "mito.compaction.stage_elapsed"; +/// Timer of whole compaction task. +pub const COMPACTION_ELAPSED_TOTAL: &str = "mito.compaction.total_elapsed"; +/// Counter of all requested compaction task. +pub const COMPACTION_REQUEST_COUNT: &str = "mito.compaction.requests_total"; +/// Counter of failed compaction task. +pub const COMPACTION_FAILURE_COUNT: &str = "mito.compaction.failure_total"; diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index c5ca3a6159a1..e2427b29481c 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use api::helper::{ is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_column_data_type, @@ -29,6 +29,7 @@ use common_telemetry::metric::Timer; use common_telemetry::tracing::log::info; use common_telemetry::warn; use datatypes::prelude::DataType; +use metrics::histogram; use prost::Message; use smallvec::SmallVec; use snafu::{ensure, OptionExt, ResultExt}; @@ -45,6 +46,7 @@ use crate::error::{ InvalidRequestSnafu, Result, }; use crate::memtable::MemtableId; +use crate::metrics::COMPACTION_ELAPSED_TOTAL; use crate::sst::file::FileMeta; use crate::sst::file_purger::{FilePurgerRef, PurgeRequest}; use crate::wal::EntryId; @@ -646,10 +648,15 @@ pub(crate) struct CompactionFinished { pub(crate) file_purger: FilePurgerRef, /// Inferred Compaction time window. pub(crate) compaction_time_window: Option, + /// Start time of compaction task. + pub(crate) start_time: Instant, } impl CompactionFinished { pub fn on_success(self) { + // only update compaction time on success + histogram!(COMPACTION_ELAPSED_TOTAL, self.start_time.elapsed()); + for sender in self.senders { sender.send(Ok(AffectedRows(0))); } diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 79ade0f4dc9f..26d81c23d77a 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_telemetry::{error, info}; +use common_telemetry::{error, info, timer}; +use metrics::increment_counter; use store_api::logstore::LogStore; use store_api::storage::RegionId; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; +use crate::metrics::{COMPACTION_REQUEST_COUNT, COMPACTION_STAGE_ELAPSED, STAGE_LABEL}; use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx}; use crate::worker::RegionWorkerLoop; @@ -30,7 +32,7 @@ impl RegionWorkerLoop { let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else { return; }; - + increment_counter!(COMPACTION_REQUEST_COUNT); if let Err(e) = self.compaction_scheduler.schedule_compaction( region.region_id, ®ion.version_control, @@ -57,27 +59,33 @@ impl RegionWorkerLoop { return; }; - // Write region edit to manifest. - let edit = RegionEdit { - files_to_add: std::mem::take(&mut request.compaction_outputs), - files_to_remove: std::mem::take(&mut request.compacted_files), - compaction_time_window: request.compaction_time_window, - flushed_entry_id: None, - flushed_sequence: None, - }; - let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); - if let Err(e) = region.manifest_manager.update(action_list).await { - error!(e; "Failed to update manifest, region: {}", region_id); - request.on_failure(e); - return; - } + { + let manifest_timer = + timer!(COMPACTION_STAGE_ELAPSED, &[(STAGE_LABEL, "write_manifest")]); + // Write region edit to manifest. + let edit = RegionEdit { + files_to_add: std::mem::take(&mut request.compaction_outputs), + files_to_remove: std::mem::take(&mut request.compacted_files), + compaction_time_window: request.compaction_time_window, + flushed_entry_id: None, + flushed_sequence: None, + }; + let action_list = + RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); + if let Err(e) = region.manifest_manager.update(action_list).await { + error!(e; "Failed to update manifest, region: {}", region_id); + manifest_timer.discard(); + request.on_failure(e); + return; + } - // Apply edit to region's version. - region - .version_control - .apply_edit(edit, &[], region.file_purger.clone()); + // Apply edit to region's version. + region + .version_control + .apply_edit(edit, &[], region.file_purger.clone()); + } + // compaction finished. request.on_success(); - // Schedule next compaction if necessary. self.compaction_scheduler.on_compaction_finished(region_id); }