diff --git a/src/object-store/src/layers/prometheus.rs b/src/object-store/src/layers/prometheus.rs index 6cc71a3bebcf..080ace52bff6 100644 --- a/src/object-store/src/layers/prometheus.rs +++ b/src/object-store/src/layers/prometheus.rs @@ -25,13 +25,15 @@ use prometheus::{ Histogram, HistogramTimer, HistogramVec, IntCounterVec, }; +use crate::util::extract_parent_path; + type Result = std::result::Result; lazy_static! { static ref REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec!( "opendal_requests_total", "Total times of all kinds of operation being called", - &["scheme", "operation"], + &["scheme", "operation", "path"], ) .unwrap(); static ref REQUESTS_DURATION_SECONDS: HistogramVec = register_histogram_vec!( @@ -40,7 +42,7 @@ lazy_static! { "Histogram of the time spent on specific operation", exponential_buckets(0.01, 2.0, 16).unwrap() ), - &["scheme", "operation"] + &["scheme", "operation", "path"] ) .unwrap(); static ref BYTES_TOTAL: HistogramVec = register_histogram_vec!( @@ -49,7 +51,7 @@ lazy_static! { "Total size of sync or async Read/Write", exponential_buckets(0.01, 2.0, 16).unwrap() ), - &["scheme", "operation"] + &["scheme", "operation", "path"] ) .unwrap(); } @@ -126,12 +128,13 @@ impl LayeredAccess for PrometheusAccess { } async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { + let path_label = extract_parent_path(path); REQUESTS_TOTAL - .with_label_values(&[&self.scheme, Operation::CreateDir.into_static()]) + .with_label_values(&[&self.scheme, Operation::CreateDir.into_static(), path_label]) .inc(); let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::CreateDir.into_static()]) + .with_label_values(&[&self.scheme, Operation::CreateDir.into_static(), path_label]) .start_timer(); let create_res = self.inner.create_dir(path, args).await; @@ -143,12 +146,13 @@ impl LayeredAccess for PrometheusAccess { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let path_label = extract_parent_path(path); REQUESTS_TOTAL - .with_label_values(&[&self.scheme, Operation::Read.into_static()]) + .with_label_values(&[&self.scheme, Operation::Read.into_static(), path_label]) .inc(); let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::Read.into_static()]) + .with_label_values(&[&self.scheme, Operation::Read.into_static(), path_label]) .start_timer(); let (rp, r) = self.inner.read(path, args).await.map_err(|e| { @@ -161,19 +165,24 @@ impl LayeredAccess for PrometheusAccess { PrometheusMetricWrapper::new( r, Operation::Read, - BYTES_TOTAL.with_label_values(&[&self.scheme, Operation::Read.into_static()]), + BYTES_TOTAL.with_label_values(&[ + &self.scheme, + Operation::Read.into_static(), + path_label, + ]), timer, ), )) } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let path_label = extract_parent_path(path); REQUESTS_TOTAL - .with_label_values(&[&self.scheme, Operation::Write.into_static()]) + .with_label_values(&[&self.scheme, Operation::Write.into_static(), path_label]) .inc(); let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::Write.into_static()]) + .with_label_values(&[&self.scheme, Operation::Write.into_static(), path_label]) .start_timer(); let (rp, r) = self.inner.write(path, args).await.map_err(|e| { @@ -186,18 +195,23 @@ impl LayeredAccess for PrometheusAccess { PrometheusMetricWrapper::new( r, Operation::Write, - BYTES_TOTAL.with_label_values(&[&self.scheme, Operation::Write.into_static()]), + BYTES_TOTAL.with_label_values(&[ + &self.scheme, + Operation::Write.into_static(), + path_label, + ]), timer, ), )) } async fn stat(&self, path: &str, args: OpStat) -> Result { + let path_label = extract_parent_path(path); REQUESTS_TOTAL - .with_label_values(&[&self.scheme, Operation::Stat.into_static()]) + .with_label_values(&[&self.scheme, Operation::Stat.into_static(), path_label]) .inc(); let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::Stat.into_static()]) + .with_label_values(&[&self.scheme, Operation::Stat.into_static(), path_label]) .start_timer(); let stat_res = self.inner.stat(path, args).await; @@ -209,12 +223,13 @@ impl LayeredAccess for PrometheusAccess { } async fn delete(&self, path: &str, args: OpDelete) -> Result { + let path_label = extract_parent_path(path); REQUESTS_TOTAL - .with_label_values(&[&self.scheme, Operation::Delete.into_static()]) + .with_label_values(&[&self.scheme, Operation::Delete.into_static(), path_label]) .inc(); let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::Delete.into_static()]) + .with_label_values(&[&self.scheme, Operation::Delete.into_static(), path_label]) .start_timer(); let delete_res = self.inner.delete(path, args).await; @@ -226,12 +241,13 @@ impl LayeredAccess for PrometheusAccess { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + let path_label = extract_parent_path(path); REQUESTS_TOTAL - .with_label_values(&[&self.scheme, Operation::List.into_static()]) + .with_label_values(&[&self.scheme, Operation::List.into_static(), path_label]) .inc(); let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::List.into_static()]) + .with_label_values(&[&self.scheme, Operation::List.into_static(), path_label]) .start_timer(); let list_res = self.inner.list(path, args).await; @@ -245,11 +261,11 @@ impl LayeredAccess for PrometheusAccess { async fn batch(&self, args: OpBatch) -> Result { REQUESTS_TOTAL - .with_label_values(&[&self.scheme, Operation::Batch.into_static()]) + .with_label_values(&[&self.scheme, Operation::Batch.into_static(), ""]) .inc(); let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::Batch.into_static()]) + .with_label_values(&[&self.scheme, Operation::Batch.into_static(), ""]) .start_timer(); let result = self.inner.batch(args).await; @@ -261,12 +277,13 @@ impl LayeredAccess for PrometheusAccess { } async fn presign(&self, path: &str, args: OpPresign) -> Result { + let path_label = extract_parent_path(path); REQUESTS_TOTAL - .with_label_values(&[&self.scheme, Operation::Presign.into_static()]) + .with_label_values(&[&self.scheme, Operation::Presign.into_static(), path_label]) .inc(); let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::Presign.into_static()]) + .with_label_values(&[&self.scheme, Operation::Presign.into_static(), path_label]) .start_timer(); let result = self.inner.presign(path, args).await; timer.observe_duration(); @@ -278,12 +295,21 @@ impl LayeredAccess for PrometheusAccess { } fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { + let path_label = extract_parent_path(path); REQUESTS_TOTAL - .with_label_values(&[&self.scheme, Operation::BlockingCreateDir.into_static()]) + .with_label_values(&[ + &self.scheme, + Operation::BlockingCreateDir.into_static(), + path_label, + ]) .inc(); let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::BlockingCreateDir.into_static()]) + .with_label_values(&[ + &self.scheme, + Operation::BlockingCreateDir.into_static(), + path_label, + ]) .start_timer(); let result = self.inner.blocking_create_dir(path, args); @@ -296,12 +322,21 @@ impl LayeredAccess for PrometheusAccess { } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { + let path_label = extract_parent_path(path); REQUESTS_TOTAL - .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) + .with_label_values(&[ + &self.scheme, + Operation::BlockingRead.into_static(), + path_label, + ]) .inc(); let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) + .with_label_values(&[ + &self.scheme, + Operation::BlockingRead.into_static(), + path_label, + ]) .start_timer(); self.inner @@ -315,6 +350,7 @@ impl LayeredAccess for PrometheusAccess { BYTES_TOTAL.with_label_values(&[ &self.scheme, Operation::BlockingRead.into_static(), + path_label, ]), timer, ), @@ -327,12 +363,21 @@ impl LayeredAccess for PrometheusAccess { } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { + let path_label = extract_parent_path(path); REQUESTS_TOTAL - .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) + .with_label_values(&[ + &self.scheme, + Operation::BlockingWrite.into_static(), + path_label, + ]) .inc(); let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) + .with_label_values(&[ + &self.scheme, + Operation::BlockingWrite.into_static(), + path_label, + ]) .start_timer(); self.inner @@ -346,6 +391,7 @@ impl LayeredAccess for PrometheusAccess { BYTES_TOTAL.with_label_values(&[ &self.scheme, Operation::BlockingWrite.into_static(), + path_label, ]), timer, ), @@ -358,12 +404,21 @@ impl LayeredAccess for PrometheusAccess { } fn blocking_stat(&self, path: &str, args: OpStat) -> Result { + let path_label = extract_parent_path(path); REQUESTS_TOTAL - .with_label_values(&[&self.scheme, Operation::BlockingStat.into_static()]) + .with_label_values(&[ + &self.scheme, + Operation::BlockingStat.into_static(), + path_label, + ]) .inc(); let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::BlockingStat.into_static()]) + .with_label_values(&[ + &self.scheme, + Operation::BlockingStat.into_static(), + path_label, + ]) .start_timer(); let result = self.inner.blocking_stat(path, args); timer.observe_duration(); @@ -374,12 +429,21 @@ impl LayeredAccess for PrometheusAccess { } fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { + let path_label = extract_parent_path(path); REQUESTS_TOTAL - .with_label_values(&[&self.scheme, Operation::BlockingDelete.into_static()]) + .with_label_values(&[ + &self.scheme, + Operation::BlockingDelete.into_static(), + path_label, + ]) .inc(); let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::BlockingDelete.into_static()]) + .with_label_values(&[ + &self.scheme, + Operation::BlockingDelete.into_static(), + path_label, + ]) .start_timer(); let result = self.inner.blocking_delete(path, args); timer.observe_duration(); @@ -391,12 +455,21 @@ impl LayeredAccess for PrometheusAccess { } fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { + let path_label = extract_parent_path(path); REQUESTS_TOTAL - .with_label_values(&[&self.scheme, Operation::BlockingList.into_static()]) + .with_label_values(&[ + &self.scheme, + Operation::BlockingList.into_static(), + path_label, + ]) .inc(); let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::BlockingList.into_static()]) + .with_label_values(&[ + &self.scheme, + Operation::BlockingList.into_static(), + path_label, + ]) .start_timer(); let result = self.inner.blocking_list(path, args); timer.observe_duration(); diff --git a/src/object-store/src/util.rs b/src/object-store/src/util.rs index 1ad9c47d51df..376e1941c589 100644 --- a/src/object-store/src/util.rs +++ b/src/object-store/src/util.rs @@ -123,6 +123,20 @@ pub fn normalize_path(path: &str) -> String { p } +// This logical tries to extract parent path from the object storage operation +// the function also relies on assumption that the region path is built from +// pattern `/catalog/schema/table_id/....` +// +// this implementation tries to extract at most 3 levels of parent path +pub(crate) fn extract_parent_path(path: &str) -> &str { + // split the path into `catalog`, `schema` and others + path.char_indices() + .filter(|&(_, c)| c == '/') + // we get the data/catalog/schema from path, split at the 3rd / + .nth(2) + .map_or(path, |(i, _)| &path[..i]) +} + /// Attaches instrument layers to the object store. pub fn with_instrument_layers(object_store: ObjectStore) -> ObjectStore { object_store @@ -179,4 +193,28 @@ mod tests { assert_eq!("/abc", join_path("//", "/abc")); assert_eq!("abc/def", join_path("abc/", "//def")); } + + #[test] + fn test_path_extraction() { + assert_eq!( + "data/greptime/public", + extract_parent_path("data/greptime/public/1024/1024_0000000000/") + ); + + assert_eq!( + "data/greptime/public", + extract_parent_path("data/greptime/public/1/") + ); + + assert_eq!( + "data/greptime/public", + extract_parent_path("data/greptime/public") + ); + + assert_eq!("data/greptime/", extract_parent_path("data/greptime/")); + + assert_eq!("data/", extract_parent_path("data/")); + + assert_eq!("/", extract_parent_path("/")); + } }