diff --git a/Cargo.lock b/Cargo.lock index 77df85d6dbf1..6640573a353f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5413,7 +5413,6 @@ dependencies = [ "parking_lot 0.12.1", "percent-encoding", "pin-project", - "prometheus", "quick-xml 0.29.0", "reqsign", "reqwest", diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index b3bc70ed97fd..019430919521 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -4,9 +4,6 @@ version.workspace = true edition.workspace = true license.workspace = true -[features] -testing = [] - [dependencies] api.workspace = true arrow-flight.workspace = true diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 78528cf6ce6e..ff870315f0b7 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -66,23 +66,9 @@ pub(crate) async fn new_object_store(opts: &DatanodeOptions) -> Result, make a tiny change to avoid crash in multi thread env + +use std::fmt::{Debug, Formatter}; +use std::io; +use std::task::{Context, Poll}; + +use async_trait::async_trait; +use bytes::Bytes; +use common_telemetry::debug; +use futures::{FutureExt, TryFutureExt}; +use lazy_static::lazy_static; +use opendal::raw::*; +use opendal::ErrorKind; +use prometheus::{ + exponential_buckets, histogram_opts, register_histogram_vec, register_int_counter_vec, + HistogramVec, IntCounterVec, +}; + +type Result = std::result::Result; + +lazy_static! { + static ref REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec!( + "opendal_requests_total", + "Total times of all kinds of operation being called", + &["scheme", "operation"], + ) + .unwrap(); + static ref REQUESTS_DURATION_SECONDS: HistogramVec = register_histogram_vec!( + histogram_opts!( + "opendal_requests_duration_seconds", + "Histogram of the time spent on specific operation", + exponential_buckets(0.01, 2.0, 16).unwrap() + ), + &["scheme", "operation"] + ) + .unwrap(); + static ref BYTES_TOTAL: HistogramVec = register_histogram_vec!( + histogram_opts!( + "opendal_bytes_total", + "Total size of sync or async Read/Write", + exponential_buckets(0.01, 2.0, 16).unwrap() + ), + &["scheme", "operation"] + ) + .unwrap(); +} + +#[inline] +fn increment_errors_total(op: Operation, kind: ErrorKind) { + debug!( + "Prometheus statistics metrics error, operation {} error {}", + op.into_static(), + kind.into_static() + ); +} + +/// Please refer to [prometheus](https://docs.rs/prometheus) for every operations. +/// +/// # Prometheus Metrics +/// +/// In this section, we will introduce three metrics that are currently being exported by opendal. These metrics are essential for understanding the behavior and performance of opendal. +/// +/// +/// | Metric Name | Type | Description | Labels | +/// |-----------------------------------|-----------|------------------------------------------------------|---------------------| +/// | opendal_requests_total | Counter | Total times of all kinds of operation being called | scheme, operation | +/// | opendal_requests_duration_seconds | Histogram | Histogram of the time spent on specific operation | scheme, operation | +/// | opendal_bytes_total | Histogram | Total size of sync or async Read/Write | scheme, operation | +/// +/// For a more detailed explanation of these metrics and how they are used, please refer to the [Prometheus documentation](https://prometheus.io/docs/introduction/overview/). +/// +/// # Histogram Configuration +/// +/// The metric buckets for these histograms are automatically generated based on the `exponential_buckets(0.01, 2.0, 16)` configuration. +#[derive(Default, Debug, Clone)] +pub struct PrometheusMetricsLayer; + +impl Layer for PrometheusMetricsLayer { + type LayeredAccessor = PrometheusAccessor; + + fn layer(&self, inner: A) -> Self::LayeredAccessor { + let meta = inner.info(); + let scheme = meta.scheme(); + + PrometheusAccessor { + inner, + scheme: scheme.to_string(), + } + } +} + +#[derive(Clone)] +pub struct PrometheusAccessor { + inner: A, + scheme: String, +} + +impl Debug for PrometheusAccessor { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PrometheusAccessor") + .field("inner", &self.inner) + .finish_non_exhaustive() + } +} + +#[async_trait] +impl LayeredAccessor for PrometheusAccessor { + type Inner = A; + type Reader = PrometheusMetricWrapper; + type BlockingReader = PrometheusMetricWrapper; + type Writer = PrometheusMetricWrapper; + type BlockingWriter = PrometheusMetricWrapper; + type Pager = A::Pager; + type BlockingPager = A::BlockingPager; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { + REQUESTS_TOTAL + .with_label_values(&[&self.scheme, Operation::CreateDir.into_static()]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::CreateDir.into_static()]) + .start_timer(); + let create_res = self.inner.create_dir(path, args).await; + + timer.observe_duration(); + create_res.map_err(|e| { + increment_errors_total(Operation::CreateDir, e.kind()); + e + }) + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + REQUESTS_TOTAL + .with_label_values(&[&self.scheme, Operation::Read.into_static()]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::Read.into_static()]) + .start_timer(); + + let read_res = self + .inner + .read(path, args) + .map(|v| { + v.map(|(rp, r)| { + ( + rp, + PrometheusMetricWrapper::new(r, Operation::Read, &self.scheme), + ) + }) + }) + .await; + timer.observe_duration(); + read_res.map_err(|e| { + increment_errors_total(Operation::Read, e.kind()); + e + }) + } + + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + REQUESTS_TOTAL + .with_label_values(&[&self.scheme, Operation::Write.into_static()]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::Write.into_static()]) + .start_timer(); + + let write_res = self + .inner + .write(path, args) + .map(|v| { + v.map(|(rp, r)| { + ( + rp, + PrometheusMetricWrapper::new(r, Operation::Write, &self.scheme), + ) + }) + }) + .await; + timer.observe_duration(); + write_res.map_err(|e| { + increment_errors_total(Operation::Write, e.kind()); + e + }) + } + + async fn stat(&self, path: &str, args: OpStat) -> Result { + REQUESTS_TOTAL + .with_label_values(&[&self.scheme, Operation::Stat.into_static()]) + .inc(); + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::Stat.into_static()]) + .start_timer(); + + let stat_res = self + .inner + .stat(path, args) + .inspect_err(|e| { + increment_errors_total(Operation::Stat, e.kind()); + }) + .await; + timer.observe_duration(); + stat_res.map_err(|e| { + increment_errors_total(Operation::Stat, e.kind()); + e + }) + } + + async fn delete(&self, path: &str, args: OpDelete) -> Result { + REQUESTS_TOTAL + .with_label_values(&[&self.scheme, Operation::Delete.into_static()]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::Delete.into_static()]) + .start_timer(); + + let delete_res = self.inner.delete(path, args).await; + timer.observe_duration(); + delete_res.map_err(|e| { + increment_errors_total(Operation::Delete, e.kind()); + e + }) + } + + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { + REQUESTS_TOTAL + .with_label_values(&[&self.scheme, Operation::List.into_static()]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::List.into_static()]) + .start_timer(); + + let list_res = self.inner.list(path, args).await; + + timer.observe_duration(); + list_res.map_err(|e| { + increment_errors_total(Operation::List, e.kind()); + e + }) + } + + async fn batch(&self, args: OpBatch) -> Result { + REQUESTS_TOTAL + .with_label_values(&[&self.scheme, Operation::Batch.into_static()]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::Batch.into_static()]) + .start_timer(); + let result = self.inner.batch(args).await; + + timer.observe_duration(); + result.map_err(|e| { + increment_errors_total(Operation::Batch, e.kind()); + e + }) + } + + async fn presign(&self, path: &str, args: OpPresign) -> Result { + REQUESTS_TOTAL + .with_label_values(&[&self.scheme, Operation::Presign.into_static()]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::Presign.into_static()]) + .start_timer(); + let result = self.inner.presign(path, args).await; + timer.observe_duration(); + + result.map_err(|e| { + increment_errors_total(Operation::Presign, e.kind()); + e + }) + } + + fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { + REQUESTS_TOTAL + .with_label_values(&[&self.scheme, Operation::BlockingCreateDir.into_static()]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::BlockingCreateDir.into_static()]) + .start_timer(); + let result = self.inner.blocking_create_dir(path, args); + + timer.observe_duration(); + + result.map_err(|e| { + increment_errors_total(Operation::BlockingCreateDir, e.kind()); + e + }) + } + + fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { + REQUESTS_TOTAL + .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme]) + .start_timer(); + let result = self.inner.blocking_read(path, args).map(|(rp, r)| { + ( + rp, + PrometheusMetricWrapper::new(r, Operation::BlockingRead, &self.scheme), + ) + }); + timer.observe_duration(); + result.map_err(|e| { + increment_errors_total(Operation::BlockingRead, e.kind()); + e + }) + } + + fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { + REQUESTS_TOTAL + .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) + .start_timer(); + let result = self.inner.blocking_write(path, args).map(|(rp, r)| { + ( + rp, + PrometheusMetricWrapper::new(r, Operation::BlockingWrite, &self.scheme), + ) + }); + timer.observe_duration(); + result.map_err(|e| { + increment_errors_total(Operation::BlockingWrite, e.kind()); + e + }) + } + + fn blocking_stat(&self, path: &str, args: OpStat) -> Result { + REQUESTS_TOTAL + .with_label_values(&[&self.scheme, Operation::BlockingStat.into_static()]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::BlockingStat.into_static()]) + .start_timer(); + let result = self.inner.blocking_stat(path, args); + timer.observe_duration(); + result.map_err(|e| { + increment_errors_total(Operation::BlockingStat, e.kind()); + e + }) + } + + fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { + REQUESTS_TOTAL + .with_label_values(&[&self.scheme, Operation::BlockingDelete.into_static()]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::BlockingDelete.into_static()]) + .start_timer(); + let result = self.inner.blocking_delete(path, args); + timer.observe_duration(); + + result.map_err(|e| { + increment_errors_total(Operation::BlockingDelete, e.kind()); + e + }) + } + + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { + REQUESTS_TOTAL + .with_label_values(&[&self.scheme, Operation::BlockingList.into_static()]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::BlockingList.into_static()]) + .start_timer(); + let result = self.inner.blocking_list(path, args); + timer.observe_duration(); + + result.map_err(|e| { + increment_errors_total(Operation::BlockingList, e.kind()); + e + }) + } +} + +pub struct PrometheusMetricWrapper { + inner: R, + + op: Operation, + scheme: String, +} + +impl PrometheusMetricWrapper { + fn new(inner: R, op: Operation, scheme: &String) -> Self { + Self { + inner, + op, + scheme: scheme.to_string(), + } + } +} + +impl oio::Read for PrometheusMetricWrapper { + fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + self.inner.poll_read(cx, buf).map(|res| match res { + Ok(bytes) => { + BYTES_TOTAL + .with_label_values(&[&self.scheme, Operation::Read.into_static()]) + .observe(bytes as f64); + Ok(bytes) + } + Err(e) => { + increment_errors_total(self.op, e.kind()); + Err(e) + } + }) + } + + fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) -> Poll> { + self.inner.poll_seek(cx, pos).map(|res| match res { + Ok(n) => Ok(n), + Err(e) => { + increment_errors_total(self.op, e.kind()); + Err(e) + } + }) + } + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + self.inner.poll_next(cx).map(|res| match res { + Some(Ok(bytes)) => { + BYTES_TOTAL + .with_label_values(&[&self.scheme, Operation::Read.into_static()]) + .observe(bytes.len() as f64); + Some(Ok(bytes)) + } + Some(Err(e)) => { + increment_errors_total(self.op, e.kind()); + Some(Err(e)) + } + None => None, + }) + } +} + +impl oio::BlockingRead for PrometheusMetricWrapper { + fn read(&mut self, buf: &mut [u8]) -> Result { + self.inner + .read(buf) + .map(|n| { + BYTES_TOTAL + .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) + .observe(n as f64); + n + }) + .map_err(|e| { + increment_errors_total(self.op, e.kind()); + e + }) + } + + fn seek(&mut self, pos: io::SeekFrom) -> Result { + self.inner.seek(pos).map_err(|err| { + increment_errors_total(self.op, err.kind()); + err + }) + } + + fn next(&mut self) -> Option> { + self.inner.next().map(|res| match res { + Ok(bytes) => { + BYTES_TOTAL + .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) + .observe(bytes.len() as f64); + Ok(bytes) + } + Err(e) => { + increment_errors_total(self.op, e.kind()); + Err(e) + } + }) + } +} + +#[async_trait] +impl oio::Write for PrometheusMetricWrapper { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll> { + self.inner + .poll_write(cx, bs) + .map_ok(|n| { + BYTES_TOTAL + .with_label_values(&[&self.scheme, Operation::Write.into_static()]) + .observe(n as f64); + n + }) + .map_err(|err| { + increment_errors_total(self.op, err.kind()); + err + }) + } + + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_abort(cx).map_err(|err| { + increment_errors_total(self.op, err.kind()); + err + }) + } + + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_close(cx).map_err(|err| { + increment_errors_total(self.op, err.kind()); + err + }) + } +} + +impl oio::BlockingWrite for PrometheusMetricWrapper { + fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result { + self.inner + .write(bs) + .map(|n| { + BYTES_TOTAL + .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) + .observe(n as f64); + n + }) + .map_err(|err| { + increment_errors_total(self.op, err.kind()); + err + }) + } + + fn close(&mut self) -> Result<()> { + self.inner.close().map_err(|err| { + increment_errors_total(self.op, err.kind()); + err + }) + } +} diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index bb613302f79d..20812d397fba 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -29,7 +29,7 @@ common-recordbatch.workspace = true common-runtime.workspace = true common-telemetry.workspace = true common-test-util.workspace = true -datanode = { workspace = true, features = ["testing"] } +datanode = { workspace = true } datatypes.workspace = true dotenv = "0.15" frontend = { workspace = true, features = ["testing"] }