From d4577e73726e257e4670842a8d77220b9569ff6e Mon Sep 17 00:00:00 2001 From: Yingwen Date: Tue, 10 Oct 2023 11:53:17 +0800 Subject: [PATCH] feat(mito): add metrics to mito engine (#2556) * feat: allow discarding a timer * feat: flush metrics * feat: flush bytes and region count metrics * refactor: add as_str to get static string * feat: add handle request elapsed metrics * feat: add some write related metrics * style: fix clippy --- Cargo.lock | 1 + src/common/telemetry/src/metric.rs | 23 +++++++++++++- src/mito2/src/engine.rs | 4 +++ src/mito2/src/flush.rs | 46 ++++++++++++++++++++++----- src/mito2/src/metrics.rs | 33 ++++++++++++++++++- src/mito2/src/request.rs | 4 +++ src/mito2/src/worker/handle_close.rs | 4 +++ src/mito2/src/worker/handle_create.rs | 7 ++-- src/mito2/src/worker/handle_drop.rs | 4 +++ src/mito2/src/worker/handle_flush.rs | 8 +++-- src/mito2/src/worker/handle_open.rs | 4 +++ src/mito2/src/worker/handle_write.rs | 38 ++++++++++++++-------- src/store-api/Cargo.toml | 1 + src/store-api/src/region_request.rs | 8 ++++- 14 files changed, 155 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ec71fb7a3ec0..8456bab511e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9520,6 +9520,7 @@ dependencies = [ "serde", "serde_json", "snafu", + "strum 0.25.0", "tokio", ] diff --git a/src/common/telemetry/src/metric.rs b/src/common/telemetry/src/metric.rs index 5ff876a1f0c4..6c7b8f2c8e4e 100644 --- a/src/common/telemetry/src/metric.rs +++ b/src/common/telemetry/src/metric.rs @@ -14,6 +14,7 @@ // metric stuffs, inspired by databend +use std::fmt; use std::sync::{Arc, Once, RwLock}; use std::time::{Duration, Instant}; @@ -63,6 +64,7 @@ pub fn try_handle() -> Option { pub struct Timer { start: Instant, histogram: Histogram, + observed: bool, } impl From for Timer { @@ -71,12 +73,22 @@ impl From for Timer { } } +impl fmt::Debug for Timer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Timer") + .field("start", &self.start) + .field("observed", &self.observed) + .finish() + } +} + impl Timer { /// Creates a timer from given histogram. pub fn from_histogram(histogram: Histogram) -> Self { Self { start: Instant::now(), histogram, + observed: false, } } @@ -85,6 +97,7 @@ impl Timer { Self { start: Instant::now(), histogram: register_histogram!(name), + observed: false, } } @@ -93,6 +106,7 @@ impl Timer { Self { start: Instant::now(), histogram: register_histogram!(name, labels), + observed: false, } } @@ -100,11 +114,18 @@ impl Timer { pub fn elapsed(&self) -> Duration { self.start.elapsed() } + + /// Discards the timer result. + pub fn discard(mut self) { + self.observed = true; + } } impl Drop for Timer { fn drop(&mut self) { - self.histogram.record(self.elapsed()) + if !self.observed { + self.histogram.record(self.elapsed()) + } } } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 7e82bc51a9ba..222e9919c61f 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -45,6 +45,7 @@ use async_trait::async_trait; use common_error::ext::BoxedError; use common_query::Output; use common_recordbatch::SendableRecordBatchStream; +use common_telemetry::timer; use object_store::ObjectStore; use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; @@ -55,6 +56,7 @@ use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; use crate::error::{RecvSnafu, RegionNotFoundSnafu, Result}; +use crate::metrics::{HANDLE_REQUEST_ELAPSED, TYPE_LABEL}; use crate::read::scan_region::{ScanRegion, Scanner}; use crate::request::WorkerRequest; use crate::worker::WorkerGroup; @@ -132,6 +134,8 @@ impl EngineInner { /// Handles [RegionRequest] and return its executed result. async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result { + let _timer = timer!(HANDLE_REQUEST_ELAPSED, &[(TYPE_LABEL, request.type_name())]); + let (request, receiver) = WorkerRequest::try_from_region_request(region_id, request)?; self.workers.submit_to_worker(region_id, request).await?; diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 655a7389d7b0..cc1bf0544d01 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -19,10 +19,11 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use common_query::Output; -use common_telemetry::{error, info}; +use common_telemetry::{error, info, timer}; +use metrics::{counter, increment_counter}; use snafu::ResultExt; use store_api::storage::RegionId; -use strum::AsRefStr; +use strum::IntoStaticStr; use tokio::sync::mpsc; use crate::access_layer::AccessLayerRef; @@ -30,6 +31,10 @@ use crate::error::{ Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result, }; use crate::memtable::MemtableBuilderRef; +use crate::metrics::{ + FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_ERRORS_TOTAL, FLUSH_REASON, FLUSH_REQUESTS_TOTAL, + TYPE_LABEL, +}; use crate::read::Source; use crate::region::version::{VersionControlData, VersionControlRef, VersionRef}; use crate::request::{ @@ -114,8 +119,8 @@ impl WriteBufferManager for WriteBufferManagerImpl { let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed); if mutable_memtable_memory_usage > self.mutable_limit { info!( - "Engine should flush (over mutable limit), mutable_usage: {}, mutable_limit: {}.", - mutable_memtable_memory_usage, self.mutable_limit, + "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}", + mutable_memtable_memory_usage, self.memory_usage(), self.mutable_limit, self.global_write_buffer_size, ); return true; } @@ -163,7 +168,7 @@ impl WriteBufferManager for WriteBufferManagerImpl { } /// Reason of a flush task. -#[derive(Debug, AsRefStr)] +#[derive(Debug, IntoStaticStr)] pub enum FlushReason { /// Other reasons. Others, @@ -175,6 +180,13 @@ pub enum FlushReason { Alter, } +impl FlushReason { + /// Get flush reason as static str. + fn as_str(&self) -> &'static str { + self.into() + } +} + /// Task to flush a region. pub(crate) struct RegionFlushTask { /// Region to flush. @@ -232,6 +244,7 @@ impl RegionFlushTask { /// Runs the flush task. async fn do_flush(&mut self, version_data: VersionControlData) { + let timer = timer!(FLUSH_ELAPSED, &[(TYPE_LABEL, "total")]); self.listener.on_flush_begin(self.region_id).await; let worker_request = match self.flush_memtables(&version_data.version).await { Ok(file_metas) => { @@ -251,6 +264,7 @@ impl RegionFlushTask { memtables_to_remove, senders: std::mem::take(&mut self.senders), file_purger: self.file_purger.clone(), + timer, }; WorkerRequest::Background { region_id: self.region_id, @@ -259,6 +273,9 @@ impl RegionFlushTask { } Err(e) => { error!(e; "Failed to flush region {}", self.region_id); + // Discard the timer. + timer.discard(); + let err = Arc::new(e); self.on_failure(err.clone()); WorkerRequest::Background { @@ -272,6 +289,8 @@ impl RegionFlushTask { /// Flushes memtables to level 0 SSTs. async fn flush_memtables(&self, version: &VersionRef) -> Result> { + let timer = timer!(FLUSH_ELAPSED, &[(TYPE_LABEL, "flush_memtables")]); + // TODO(yingwen): Make it configurable. let mut write_opts = WriteOptions::default(); if let Some(row_group_size) = self.row_group_size { @@ -279,6 +298,7 @@ impl RegionFlushTask { } let memtables = version.memtables.immutables(); let mut file_metas = Vec::with_capacity(memtables.len()); + let mut flushed_bytes = 0; for mem in memtables { if mem.is_empty() { @@ -297,6 +317,7 @@ impl RegionFlushTask { continue; }; + flushed_bytes += sst_info.file_size; file_metas.push(FileMeta { region_id: version.metadata.region_id, file_id, @@ -306,12 +327,17 @@ impl RegionFlushTask { }); } + if !file_metas.is_empty() { + counter!(FLUSH_BYTES_TOTAL, flushed_bytes); + } + let file_ids: Vec<_> = file_metas.iter().map(|f| f.file_id).collect(); info!( - "Successfully flush memtables, region: {}, reason: {}, files: {:?}", + "Successfully flush memtables, region: {}, reason: {}, files: {:?}, cost: {:?}", version.metadata.region_id, - self.reason.as_ref(), - file_ids + self.reason.as_str(), + file_ids, + timer.elapsed(), ); Ok(file_metas) @@ -366,6 +392,8 @@ impl FlushScheduler { ) -> Result<()> { debug_assert_eq!(region_id, task.region_id); + increment_counter!(FLUSH_REQUESTS_TOTAL, FLUSH_REASON => task.reason.as_str()); + let version = version_control.current().version; if version.memtables.mutable.is_empty() && version.memtables.immutables().is_empty() { debug_assert!(!self.region_status.contains_key(®ion_id)); @@ -446,6 +474,8 @@ impl FlushScheduler { pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc) { error!(err; "Region {} failed to flush, cancel all pending tasks", region_id); + increment_counter!(FLUSH_ERRORS_TOTAL); + // Remove this region. let Some(flush_status) = self.region_status.remove(®ion_id) else { return; diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 26d71475277e..bd5920b3cbf5 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -13,4 +13,35 @@ // limitations under the License. /// Global write buffer size in bytes. -pub const WRITE_BUFFER_BYTES: &str = "storage.write_buffer_bytes"; +pub const WRITE_BUFFER_BYTES: &str = "mito.write_buffer_bytes"; +/// Type label. +pub const TYPE_LABEL: &str = "type"; +/// Gauge for open regions +pub const REGION_COUNT: &str = "mito.region_count"; +/// Elapsed time to handle requests. +pub const HANDLE_REQUEST_ELAPSED: &str = "mito.handle_request.elapsed"; + +// ------ Flush related metrics +/// Counter of scheduled flush requests. +/// Note that the flush scheduler may merge some flush requests. +pub const FLUSH_REQUESTS_TOTAL: &str = "mito.flush.requests_total"; +/// Reason to flush. +pub const FLUSH_REASON: &str = "reason"; +/// Counter of scheduled failed flush jobs. +pub const FLUSH_ERRORS_TOTAL: &str = "mito.flush.errors_total"; +/// Elapsed time of a flush job. +pub const FLUSH_ELAPSED: &str = "mito.flush.elapsed"; +/// Histogram of flushed bytes. +pub const FLUSH_BYTES_TOTAL: &str = "mito.flush.bytes_total"; +// ------ End of flush related metrics + +// ------ Write related metrics +/// Counter of stalled write requests. +pub const WRITE_STALL_TOTAL: &str = "mito.write.stall_total"; +/// Counter of rejected write requests. +pub const WRITE_REJECT_TOTAL: &str = "mito.write.reject_total"; +/// Elapsed time of each write stage. +pub const WRITE_STAGE_ELAPSED: &str = "mito.write.stage_elapsed"; +/// Stage label. +pub const STAGE_LABEL: &str = "stage"; +// ------ End of write related metrics diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index ee4c77e11162..c5ca3a6159a1 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -25,6 +25,7 @@ use api::helper::{ use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value}; use common_query::Output; use common_query::Output::AffectedRows; +use common_telemetry::metric::Timer; use common_telemetry::tracing::log::info; use common_telemetry::warn; use datatypes::prelude::DataType; @@ -592,9 +593,12 @@ pub(crate) struct FlushFinished { pub(crate) senders: Vec, /// File purger for cleaning files on failure. pub(crate) file_purger: FilePurgerRef, + /// Flush timer. + pub(crate) timer: Timer, } impl FlushFinished { + /// Marks the flush job as successful and observes the timer. pub(crate) fn on_success(self) { for sender in self.senders { sender.send(Ok(Output::AffectedRows(0))); diff --git a/src/mito2/src/worker/handle_close.rs b/src/mito2/src/worker/handle_close.rs index 2d786dd9cb83..8020cc8b7a28 100644 --- a/src/mito2/src/worker/handle_close.rs +++ b/src/mito2/src/worker/handle_close.rs @@ -16,9 +16,11 @@ use common_query::Output; use common_telemetry::info; +use metrics::decrement_gauge; use store_api::storage::RegionId; use crate::error::Result; +use crate::metrics::REGION_COUNT; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { @@ -38,6 +40,8 @@ impl RegionWorkerLoop { info!("Region {} closed", region_id); + decrement_gauge!(REGION_COUNT, 1.0); + Ok(Output::AffectedRows(0)) } } diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index e9ace31044a5..34df7a1ad8d6 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use common_query::Output; use common_telemetry::info; +use metrics::increment_gauge; use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataBuilder; @@ -25,6 +26,7 @@ use store_api::region_request::RegionCreateRequest; use store_api::storage::RegionId; use crate::error::{InvalidMetadataSnafu, Result}; +use crate::metrics::REGION_COUNT; use crate::region::opener::{check_recovered_region, RegionOpener}; use crate::worker::RegionWorkerLoop; @@ -69,10 +71,9 @@ impl RegionWorkerLoop { .create_or_open(&self.config, &self.wal) .await?; - // TODO(yingwen): Custom the Debug format for the metadata and also print it. - info!("A new region created, region_id: {}", region.region_id); + info!("A new region created, region: {:?}", region.metadata()); - // TODO(yingwen): Metrics. + increment_gauge!(REGION_COUNT, 1.0); // Insert the MitoRegion into the RegionMap. self.regions.insert_region(Arc::new(region)); diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index 0b0431180fd0..ce4d5cdd0a66 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -20,6 +20,7 @@ use common_query::Output; use common_telemetry::info; use common_telemetry::tracing::warn; use futures::TryStreamExt; +use metrics::decrement_gauge; use object_store::util::join_path; use object_store::{EntryMode, ObjectStore}; use snafu::ResultExt; @@ -27,6 +28,7 @@ use store_api::storage::RegionId; use tokio::time::sleep; use crate::error::{OpenDalSnafu, Result}; +use crate::metrics::REGION_COUNT; use crate::region::RegionMapRef; use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE}; @@ -62,6 +64,8 @@ impl RegionWorkerLoop { region_id ); + decrement_gauge!(REGION_COUNT, 1.0); + // detach a background task to delete the region dir let region_dir = region.access_layer.region_dir().to_owned(); let object_store = self.object_store.clone(); diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 3cea9b95e733..36a86555d8e0 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -186,8 +186,10 @@ impl RegionWorkerLoop { // Delete wal. info!( - "Region {} flush finished, tries to bump wal to {}", - region_id, request.flushed_entry_id + "Region {} flush finished, elapsed: {:?}, tries to bump wal to {}", + region_id, + request.timer.elapsed(), + request.flushed_entry_id ); if let Err(e) = self.wal.obsolete(region_id, request.flushed_entry_id).await { error!(e; "Failed to write wal, region: {}", region_id); @@ -195,7 +197,7 @@ impl RegionWorkerLoop { return; } - // Notifies waiters. + // Notifies waiters and observes the flush timer. request.on_success(); // Handle pending requests for the region. diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index cfeb43d84d98..18b226bacd7d 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use common_query::Output; use common_telemetry::info; +use metrics::increment_gauge; use object_store::util::join_path; use snafu::ResultExt; use store_api::logstore::LogStore; @@ -25,6 +26,7 @@ use store_api::region_request::RegionOpenRequest; use store_api::storage::RegionId; use crate::error::{OpenDalSnafu, RegionNotFoundSnafu, Result}; +use crate::metrics::REGION_COUNT; use crate::region::opener::RegionOpener; use crate::worker::handle_drop::remove_region_dir_once; use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE}; @@ -69,6 +71,8 @@ impl RegionWorkerLoop { info!("Region {} is opened", region_id); + increment_gauge!(REGION_COUNT, 1.0); + // Insert the MitoRegion into the RegionMap. self.regions.insert_region(Arc::new(region)); diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index a450f8ddd7ab..de6b6a5c5ea0 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -17,11 +17,14 @@ use std::collections::{hash_map, HashMap}; use std::sync::Arc; +use common_telemetry::timer; +use metrics::counter; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadata; use store_api::storage::RegionId; use crate::error::{RejectWriteSnafu, Result}; +use crate::metrics::{STAGE_LABEL, WRITE_REJECT_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL}; use crate::region_write_ctx::RegionWriteCtx; use crate::request::{SenderWriteRequest, WriteRequest}; use crate::worker::RegionWorkerLoop; @@ -50,7 +53,8 @@ impl RegionWorkerLoop { } if self.write_buffer_manager.should_stall() && allow_stall { - // TODO(yingwen): stalled metrics. + counter!(WRITE_STALL_TOTAL, write_requests.len() as u64); + self.stalled_requests.append(&mut write_requests); self.listener.on_write_stall(); return; @@ -59,23 +63,29 @@ impl RegionWorkerLoop { let mut region_ctxs = self.prepare_region_write_ctx(write_requests); // Write to WAL. - let mut wal_writer = self.wal.writer(); - for region_ctx in region_ctxs.values_mut() { - if let Err(e) = region_ctx.add_wal_entry(&mut wal_writer).map_err(Arc::new) { - region_ctx.set_error(e); + { + let _timer = timer!(WRITE_STAGE_ELAPSED, &[(STAGE_LABEL, "write_wal")]); + let mut wal_writer = self.wal.writer(); + for region_ctx in region_ctxs.values_mut() { + if let Err(e) = region_ctx.add_wal_entry(&mut wal_writer).map_err(Arc::new) { + region_ctx.set_error(e); + } } - } - if let Err(e) = wal_writer.write_to_wal().await.map_err(Arc::new) { - // Failed to write wal. - for mut region_ctx in region_ctxs.into_values() { - region_ctx.set_error(e.clone()); + if let Err(e) = wal_writer.write_to_wal().await.map_err(Arc::new) { + // Failed to write wal. + for mut region_ctx in region_ctxs.into_values() { + region_ctx.set_error(e.clone()); + } + return; } - return; } // Write to memtables. - for mut region_ctx in region_ctxs.into_values() { - region_ctx.write_memtable(); + { + let _timer = timer!(WRITE_STAGE_ELAPSED, &[(STAGE_LABEL, "write_memtable")]); + for mut region_ctx in region_ctxs.into_values() { + region_ctx.write_memtable(); + } } } } @@ -148,6 +158,8 @@ impl RegionWorkerLoop { /// Send rejected error to all `write_requests`. fn reject_write_requests(write_requests: Vec) { + counter!(WRITE_REJECT_TOTAL, write_requests.len() as u64); + for req in write_requests { req.sender.send( RejectWriteSnafu { diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index 3139f276622c..199a86351f13 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -21,6 +21,7 @@ futures.workspace = true serde.workspace = true serde_json.workspace = true snafu.workspace = true +strum.workspace = true [dev-dependencies] async-stream.workspace = true diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 26fe4cae9eef..9d608f2077da 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -19,6 +19,7 @@ use api::v1::add_column_location::LocationType; use api::v1::region::{alter_request, region_request, AlterRequest}; use api::v1::{self, Rows, SemanticType}; use snafu::{ensure, OptionExt}; +use strum::IntoStaticStr; use crate::metadata::{ ColumnMetadata, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu, MetadataError, @@ -27,7 +28,7 @@ use crate::metadata::{ use crate::path_utils::region_dir; use crate::storage::{ColumnId, RegionId, ScanRequest}; -#[derive(Debug)] +#[derive(Debug, IntoStaticStr)] pub enum RegionRequest { // TODO: rename to InsertRequest Put(RegionPutRequest), @@ -124,6 +125,11 @@ impl RegionRequest { )]), } } + + /// Returns the type name of the request. + pub fn type_name(&self) -> &'static str { + self.into() + } } /// Request to put data into a region.