Skip to content

Commit

Permalink
feat(mito): add metrics to mito engine (#2556)
Browse files Browse the repository at this point in the history
* feat: allow discarding a timer

* feat: flush metrics

* feat: flush bytes and region count metrics

* refactor: add as_str to get static string

* feat: add handle request elapsed metrics

* feat: add some write related metrics

* style: fix clippy
  • Loading branch information
evenyag authored Oct 10, 2023
1 parent 88f2667 commit d4577e7
Show file tree
Hide file tree
Showing 14 changed files with 155 additions and 30 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 22 additions & 1 deletion src/common/telemetry/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

// metric stuffs, inspired by databend

use std::fmt;
use std::sync::{Arc, Once, RwLock};
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -63,6 +64,7 @@ pub fn try_handle() -> Option<PrometheusHandle> {
pub struct Timer {
start: Instant,
histogram: Histogram,
observed: bool,
}

impl From<Histogram> for Timer {
Expand All @@ -71,12 +73,22 @@ impl From<Histogram> for Timer {
}
}

impl fmt::Debug for Timer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Timer")
.field("start", &self.start)
.field("observed", &self.observed)
.finish()
}
}

impl Timer {
/// Creates a timer from given histogram.
pub fn from_histogram(histogram: Histogram) -> Self {
Self {
start: Instant::now(),
histogram,
observed: false,
}
}

Expand All @@ -85,6 +97,7 @@ impl Timer {
Self {
start: Instant::now(),
histogram: register_histogram!(name),
observed: false,
}
}

Expand All @@ -93,18 +106,26 @@ impl Timer {
Self {
start: Instant::now(),
histogram: register_histogram!(name, labels),
observed: false,
}
}

/// Returns the elapsed duration from the time this timer created.
pub fn elapsed(&self) -> Duration {
self.start.elapsed()
}

/// Discards the timer result.
pub fn discard(mut self) {
self.observed = true;
}
}

impl Drop for Timer {
fn drop(&mut self) {
self.histogram.record(self.elapsed())
if !self.observed {
self.histogram.record(self.elapsed())
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::timer;
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::LogStore;
Expand All @@ -55,6 +56,7 @@ use store_api::storage::{RegionId, ScanRequest};

use crate::config::MitoConfig;
use crate::error::{RecvSnafu, RegionNotFoundSnafu, Result};
use crate::metrics::{HANDLE_REQUEST_ELAPSED, TYPE_LABEL};
use crate::read::scan_region::{ScanRegion, Scanner};
use crate::request::WorkerRequest;
use crate::worker::WorkerGroup;
Expand Down Expand Up @@ -132,6 +134,8 @@ impl EngineInner {

/// Handles [RegionRequest] and return its executed result.
async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result<Output> {
let _timer = timer!(HANDLE_REQUEST_ELAPSED, &[(TYPE_LABEL, request.type_name())]);

let (request, receiver) = WorkerRequest::try_from_region_request(region_id, request)?;
self.workers.submit_to_worker(region_id, request).await?;

Expand Down
46 changes: 38 additions & 8 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,22 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use common_query::Output;
use common_telemetry::{error, info};
use common_telemetry::{error, info, timer};
use metrics::{counter, increment_counter};
use snafu::ResultExt;
use store_api::storage::RegionId;
use strum::AsRefStr;
use strum::IntoStaticStr;
use tokio::sync::mpsc;

use crate::access_layer::AccessLayerRef;
use crate::error::{
Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
};
use crate::memtable::MemtableBuilderRef;
use crate::metrics::{
FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_ERRORS_TOTAL, FLUSH_REASON, FLUSH_REQUESTS_TOTAL,
TYPE_LABEL,
};
use crate::read::Source;
use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
use crate::request::{
Expand Down Expand Up @@ -114,8 +119,8 @@ impl WriteBufferManager for WriteBufferManagerImpl {
let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
if mutable_memtable_memory_usage > self.mutable_limit {
info!(
"Engine should flush (over mutable limit), mutable_usage: {}, mutable_limit: {}.",
mutable_memtable_memory_usage, self.mutable_limit,
"Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
mutable_memtable_memory_usage, self.memory_usage(), self.mutable_limit, self.global_write_buffer_size,
);
return true;
}
Expand Down Expand Up @@ -163,7 +168,7 @@ impl WriteBufferManager for WriteBufferManagerImpl {
}

/// Reason of a flush task.
#[derive(Debug, AsRefStr)]
#[derive(Debug, IntoStaticStr)]
pub enum FlushReason {
/// Other reasons.
Others,
Expand All @@ -175,6 +180,13 @@ pub enum FlushReason {
Alter,
}

impl FlushReason {
/// Get flush reason as static str.
fn as_str(&self) -> &'static str {
self.into()
}
}

/// Task to flush a region.
pub(crate) struct RegionFlushTask {
/// Region to flush.
Expand Down Expand Up @@ -232,6 +244,7 @@ impl RegionFlushTask {

/// Runs the flush task.
async fn do_flush(&mut self, version_data: VersionControlData) {
let timer = timer!(FLUSH_ELAPSED, &[(TYPE_LABEL, "total")]);
self.listener.on_flush_begin(self.region_id).await;
let worker_request = match self.flush_memtables(&version_data.version).await {
Ok(file_metas) => {
Expand All @@ -251,6 +264,7 @@ impl RegionFlushTask {
memtables_to_remove,
senders: std::mem::take(&mut self.senders),
file_purger: self.file_purger.clone(),
timer,
};
WorkerRequest::Background {
region_id: self.region_id,
Expand All @@ -259,6 +273,9 @@ impl RegionFlushTask {
}
Err(e) => {
error!(e; "Failed to flush region {}", self.region_id);
// Discard the timer.
timer.discard();

let err = Arc::new(e);
self.on_failure(err.clone());
WorkerRequest::Background {
Expand All @@ -272,13 +289,16 @@ impl RegionFlushTask {

/// Flushes memtables to level 0 SSTs.
async fn flush_memtables(&self, version: &VersionRef) -> Result<Vec<FileMeta>> {
let timer = timer!(FLUSH_ELAPSED, &[(TYPE_LABEL, "flush_memtables")]);

// TODO(yingwen): Make it configurable.
let mut write_opts = WriteOptions::default();
if let Some(row_group_size) = self.row_group_size {
write_opts.row_group_size = row_group_size;
}
let memtables = version.memtables.immutables();
let mut file_metas = Vec::with_capacity(memtables.len());
let mut flushed_bytes = 0;

for mem in memtables {
if mem.is_empty() {
Expand All @@ -297,6 +317,7 @@ impl RegionFlushTask {
continue;
};

flushed_bytes += sst_info.file_size;
file_metas.push(FileMeta {
region_id: version.metadata.region_id,
file_id,
Expand All @@ -306,12 +327,17 @@ impl RegionFlushTask {
});
}

if !file_metas.is_empty() {
counter!(FLUSH_BYTES_TOTAL, flushed_bytes);
}

let file_ids: Vec<_> = file_metas.iter().map(|f| f.file_id).collect();
info!(
"Successfully flush memtables, region: {}, reason: {}, files: {:?}",
"Successfully flush memtables, region: {}, reason: {}, files: {:?}, cost: {:?}",
version.metadata.region_id,
self.reason.as_ref(),
file_ids
self.reason.as_str(),
file_ids,
timer.elapsed(),
);

Ok(file_metas)
Expand Down Expand Up @@ -366,6 +392,8 @@ impl FlushScheduler {
) -> Result<()> {
debug_assert_eq!(region_id, task.region_id);

increment_counter!(FLUSH_REQUESTS_TOTAL, FLUSH_REASON => task.reason.as_str());

let version = version_control.current().version;
if version.memtables.mutable.is_empty() && version.memtables.immutables().is_empty() {
debug_assert!(!self.region_status.contains_key(&region_id));
Expand Down Expand Up @@ -446,6 +474,8 @@ impl FlushScheduler {
pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);

increment_counter!(FLUSH_ERRORS_TOTAL);

// Remove this region.
let Some(flush_status) = self.region_status.remove(&region_id) else {
return;
Expand Down
33 changes: 32 additions & 1 deletion src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,35 @@
// limitations under the License.

/// Global write buffer size in bytes.
pub const WRITE_BUFFER_BYTES: &str = "storage.write_buffer_bytes";
pub const WRITE_BUFFER_BYTES: &str = "mito.write_buffer_bytes";
/// Type label.
pub const TYPE_LABEL: &str = "type";
/// Gauge for open regions
pub const REGION_COUNT: &str = "mito.region_count";
/// Elapsed time to handle requests.
pub const HANDLE_REQUEST_ELAPSED: &str = "mito.handle_request.elapsed";

// ------ Flush related metrics
/// Counter of scheduled flush requests.
/// Note that the flush scheduler may merge some flush requests.
pub const FLUSH_REQUESTS_TOTAL: &str = "mito.flush.requests_total";
/// Reason to flush.
pub const FLUSH_REASON: &str = "reason";
/// Counter of scheduled failed flush jobs.
pub const FLUSH_ERRORS_TOTAL: &str = "mito.flush.errors_total";
/// Elapsed time of a flush job.
pub const FLUSH_ELAPSED: &str = "mito.flush.elapsed";
/// Histogram of flushed bytes.
pub const FLUSH_BYTES_TOTAL: &str = "mito.flush.bytes_total";
// ------ End of flush related metrics

// ------ Write related metrics
/// Counter of stalled write requests.
pub const WRITE_STALL_TOTAL: &str = "mito.write.stall_total";
/// Counter of rejected write requests.
pub const WRITE_REJECT_TOTAL: &str = "mito.write.reject_total";
/// Elapsed time of each write stage.
pub const WRITE_STAGE_ELAPSED: &str = "mito.write.stage_elapsed";
/// Stage label.
pub const STAGE_LABEL: &str = "stage";
// ------ End of write related metrics
4 changes: 4 additions & 0 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use api::helper::{
use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value};
use common_query::Output;
use common_query::Output::AffectedRows;
use common_telemetry::metric::Timer;
use common_telemetry::tracing::log::info;
use common_telemetry::warn;
use datatypes::prelude::DataType;
Expand Down Expand Up @@ -592,9 +593,12 @@ pub(crate) struct FlushFinished {
pub(crate) senders: Vec<OutputTx>,
/// File purger for cleaning files on failure.
pub(crate) file_purger: FilePurgerRef,
/// Flush timer.
pub(crate) timer: Timer,
}

impl FlushFinished {
/// Marks the flush job as successful and observes the timer.
pub(crate) fn on_success(self) {
for sender in self.senders {
sender.send(Ok(Output::AffectedRows(0)));
Expand Down
4 changes: 4 additions & 0 deletions src/mito2/src/worker/handle_close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
use common_query::Output;
use common_telemetry::info;
use metrics::decrement_gauge;
use store_api::storage::RegionId;

use crate::error::Result;
use crate::metrics::REGION_COUNT;
use crate::worker::RegionWorkerLoop;

impl<S> RegionWorkerLoop<S> {
Expand All @@ -38,6 +40,8 @@ impl<S> RegionWorkerLoop<S> {

info!("Region {} closed", region_id);

decrement_gauge!(REGION_COUNT, 1.0);

Ok(Output::AffectedRows(0))
}
}
7 changes: 4 additions & 3 deletions src/mito2/src/worker/handle_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ use std::sync::Arc;

use common_query::Output;
use common_telemetry::info;
use metrics::increment_gauge;
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataBuilder;
use store_api::region_request::RegionCreateRequest;
use store_api::storage::RegionId;

use crate::error::{InvalidMetadataSnafu, Result};
use crate::metrics::REGION_COUNT;
use crate::region::opener::{check_recovered_region, RegionOpener};
use crate::worker::RegionWorkerLoop;

Expand Down Expand Up @@ -69,10 +71,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.create_or_open(&self.config, &self.wal)
.await?;

// TODO(yingwen): Custom the Debug format for the metadata and also print it.
info!("A new region created, region_id: {}", region.region_id);
info!("A new region created, region: {:?}", region.metadata());

// TODO(yingwen): Metrics.
increment_gauge!(REGION_COUNT, 1.0);

// Insert the MitoRegion into the RegionMap.
self.regions.insert_region(Arc::new(region));
Expand Down
4 changes: 4 additions & 0 deletions src/mito2/src/worker/handle_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ use common_query::Output;
use common_telemetry::info;
use common_telemetry::tracing::warn;
use futures::TryStreamExt;
use metrics::decrement_gauge;
use object_store::util::join_path;
use object_store::{EntryMode, ObjectStore};
use snafu::ResultExt;
use store_api::storage::RegionId;
use tokio::time::sleep;

use crate::error::{OpenDalSnafu, Result};
use crate::metrics::REGION_COUNT;
use crate::region::RegionMapRef;
use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE};

Expand Down Expand Up @@ -62,6 +64,8 @@ impl<S> RegionWorkerLoop<S> {
region_id
);

decrement_gauge!(REGION_COUNT, 1.0);

// detach a background task to delete the region dir
let region_dir = region.access_layer.region_dir().to_owned();
let object_store = self.object_store.clone();
Expand Down
Loading

0 comments on commit d4577e7

Please sign in to comment.