Skip to content

Commit

Permalink
feat: add path prefix label to obejct storage metrics (#4277)
Browse files Browse the repository at this point in the history
* feat: add path prefix label to storage metrics

* refactor: return full path when the levels are less than 3

* refactor: align path label name with upstream

* refactor: better implementation of sub path

---------

Co-authored-by: Weny Xu <[email protected]>
  • Loading branch information
sunng87 and WenyXu authored Jul 5, 2024
1 parent 849e0b9 commit d9efa56
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 33 deletions.
139 changes: 106 additions & 33 deletions src/object-store/src/layers/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ use prometheus::{
Histogram, HistogramTimer, HistogramVec, IntCounterVec,
};

use crate::util::extract_parent_path;

type Result<T> = std::result::Result<T, opendal::Error>;

lazy_static! {
static ref REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec!(
"opendal_requests_total",
"Total times of all kinds of operation being called",
&["scheme", "operation"],
&["scheme", "operation", "path"],
)
.unwrap();
static ref REQUESTS_DURATION_SECONDS: HistogramVec = register_histogram_vec!(
Expand All @@ -40,7 +42,7 @@ lazy_static! {
"Histogram of the time spent on specific operation",
exponential_buckets(0.01, 2.0, 16).unwrap()
),
&["scheme", "operation"]
&["scheme", "operation", "path"]
)
.unwrap();
static ref BYTES_TOTAL: HistogramVec = register_histogram_vec!(
Expand All @@ -49,7 +51,7 @@ lazy_static! {
"Total size of sync or async Read/Write",
exponential_buckets(0.01, 2.0, 16).unwrap()
),
&["scheme", "operation"]
&["scheme", "operation", "path"]
)
.unwrap();
}
Expand Down Expand Up @@ -126,12 +128,13 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
let path_label = extract_parent_path(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::CreateDir.into_static()])
.with_label_values(&[&self.scheme, Operation::CreateDir.into_static(), path_label])
.inc();

let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::CreateDir.into_static()])
.with_label_values(&[&self.scheme, Operation::CreateDir.into_static(), path_label])
.start_timer();
let create_res = self.inner.create_dir(path, args).await;

Expand All @@ -143,12 +146,13 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let path_label = extract_parent_path(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::Read.into_static()])
.with_label_values(&[&self.scheme, Operation::Read.into_static(), path_label])
.inc();

let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::Read.into_static()])
.with_label_values(&[&self.scheme, Operation::Read.into_static(), path_label])
.start_timer();

let (rp, r) = self.inner.read(path, args).await.map_err(|e| {
Expand All @@ -161,19 +165,24 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
PrometheusMetricWrapper::new(
r,
Operation::Read,
BYTES_TOTAL.with_label_values(&[&self.scheme, Operation::Read.into_static()]),
BYTES_TOTAL.with_label_values(&[
&self.scheme,
Operation::Read.into_static(),
path_label,
]),
timer,
),
))
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let path_label = extract_parent_path(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::Write.into_static()])
.with_label_values(&[&self.scheme, Operation::Write.into_static(), path_label])
.inc();

let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::Write.into_static()])
.with_label_values(&[&self.scheme, Operation::Write.into_static(), path_label])
.start_timer();

let (rp, r) = self.inner.write(path, args).await.map_err(|e| {
Expand All @@ -186,18 +195,23 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
PrometheusMetricWrapper::new(
r,
Operation::Write,
BYTES_TOTAL.with_label_values(&[&self.scheme, Operation::Write.into_static()]),
BYTES_TOTAL.with_label_values(&[
&self.scheme,
Operation::Write.into_static(),
path_label,
]),
timer,
),
))
}

async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let path_label = extract_parent_path(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::Stat.into_static()])
.with_label_values(&[&self.scheme, Operation::Stat.into_static(), path_label])
.inc();
let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::Stat.into_static()])
.with_label_values(&[&self.scheme, Operation::Stat.into_static(), path_label])
.start_timer();

let stat_res = self.inner.stat(path, args).await;
Expand All @@ -209,12 +223,13 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let path_label = extract_parent_path(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::Delete.into_static()])
.with_label_values(&[&self.scheme, Operation::Delete.into_static(), path_label])
.inc();

let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::Delete.into_static()])
.with_label_values(&[&self.scheme, Operation::Delete.into_static(), path_label])
.start_timer();

let delete_res = self.inner.delete(path, args).await;
Expand All @@ -226,12 +241,13 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let path_label = extract_parent_path(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::List.into_static()])
.with_label_values(&[&self.scheme, Operation::List.into_static(), path_label])
.inc();

let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::List.into_static()])
.with_label_values(&[&self.scheme, Operation::List.into_static(), path_label])
.start_timer();

let list_res = self.inner.list(path, args).await;
Expand All @@ -245,11 +261,11 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {

async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::Batch.into_static()])
.with_label_values(&[&self.scheme, Operation::Batch.into_static(), ""])
.inc();

let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::Batch.into_static()])
.with_label_values(&[&self.scheme, Operation::Batch.into_static(), ""])
.start_timer();
let result = self.inner.batch(args).await;

Expand All @@ -261,12 +277,13 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
let path_label = extract_parent_path(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::Presign.into_static()])
.with_label_values(&[&self.scheme, Operation::Presign.into_static(), path_label])
.inc();

let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::Presign.into_static()])
.with_label_values(&[&self.scheme, Operation::Presign.into_static(), path_label])
.start_timer();
let result = self.inner.presign(path, args).await;
timer.observe_duration();
Expand All @@ -278,12 +295,21 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
let path_label = extract_parent_path(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::BlockingCreateDir.into_static()])
.with_label_values(&[
&self.scheme,
Operation::BlockingCreateDir.into_static(),
path_label,
])
.inc();

let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::BlockingCreateDir.into_static()])
.with_label_values(&[
&self.scheme,
Operation::BlockingCreateDir.into_static(),
path_label,
])
.start_timer();
let result = self.inner.blocking_create_dir(path, args);

Expand All @@ -296,12 +322,21 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
let path_label = extract_parent_path(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()])
.with_label_values(&[
&self.scheme,
Operation::BlockingRead.into_static(),
path_label,
])
.inc();

let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()])
.with_label_values(&[
&self.scheme,
Operation::BlockingRead.into_static(),
path_label,
])
.start_timer();

self.inner
Expand All @@ -315,6 +350,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
BYTES_TOTAL.with_label_values(&[
&self.scheme,
Operation::BlockingRead.into_static(),
path_label,
]),
timer,
),
Expand All @@ -327,12 +363,21 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
let path_label = extract_parent_path(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()])
.with_label_values(&[
&self.scheme,
Operation::BlockingWrite.into_static(),
path_label,
])
.inc();

let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()])
.with_label_values(&[
&self.scheme,
Operation::BlockingWrite.into_static(),
path_label,
])
.start_timer();

self.inner
Expand All @@ -346,6 +391,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
BYTES_TOTAL.with_label_values(&[
&self.scheme,
Operation::BlockingWrite.into_static(),
path_label,
]),
timer,
),
Expand All @@ -358,12 +404,21 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let path_label = extract_parent_path(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::BlockingStat.into_static()])
.with_label_values(&[
&self.scheme,
Operation::BlockingStat.into_static(),
path_label,
])
.inc();

let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::BlockingStat.into_static()])
.with_label_values(&[
&self.scheme,
Operation::BlockingStat.into_static(),
path_label,
])
.start_timer();
let result = self.inner.blocking_stat(path, args);
timer.observe_duration();
Expand All @@ -374,12 +429,21 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let path_label = extract_parent_path(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::BlockingDelete.into_static()])
.with_label_values(&[
&self.scheme,
Operation::BlockingDelete.into_static(),
path_label,
])
.inc();

let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::BlockingDelete.into_static()])
.with_label_values(&[
&self.scheme,
Operation::BlockingDelete.into_static(),
path_label,
])
.start_timer();
let result = self.inner.blocking_delete(path, args);
timer.observe_duration();
Expand All @@ -391,12 +455,21 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
let path_label = extract_parent_path(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::BlockingList.into_static()])
.with_label_values(&[
&self.scheme,
Operation::BlockingList.into_static(),
path_label,
])
.inc();

let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::BlockingList.into_static()])
.with_label_values(&[
&self.scheme,
Operation::BlockingList.into_static(),
path_label,
])
.start_timer();
let result = self.inner.blocking_list(path, args);
timer.observe_duration();
Expand Down
38 changes: 38 additions & 0 deletions src/object-store/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,20 @@ pub fn normalize_path(path: &str) -> String {
p
}

// This logical tries to extract parent path from the object storage operation
// the function also relies on assumption that the region path is built from
// pattern `<data|index>/catalog/schema/table_id/....`
//
// this implementation tries to extract at most 3 levels of parent path
pub(crate) fn extract_parent_path(path: &str) -> &str {
// split the path into `catalog`, `schema` and others
path.char_indices()
.filter(|&(_, c)| c == '/')
// we get the data/catalog/schema from path, split at the 3rd /
.nth(2)
.map_or(path, |(i, _)| &path[..i])
}

/// Attaches instrument layers to the object store.
pub fn with_instrument_layers(object_store: ObjectStore) -> ObjectStore {
object_store
Expand Down Expand Up @@ -179,4 +193,28 @@ mod tests {
assert_eq!("/abc", join_path("//", "/abc"));
assert_eq!("abc/def", join_path("abc/", "//def"));
}

#[test]
fn test_path_extraction() {
assert_eq!(
"data/greptime/public",
extract_parent_path("data/greptime/public/1024/1024_0000000000/")
);

assert_eq!(
"data/greptime/public",
extract_parent_path("data/greptime/public/1/")
);

assert_eq!(
"data/greptime/public",
extract_parent_path("data/greptime/public")
);

assert_eq!("data/greptime/", extract_parent_path("data/greptime/"));

assert_eq!("data/", extract_parent_path("data/"));

assert_eq!("/", extract_parent_path("/"));
}
}

0 comments on commit d9efa56

Please sign in to comment.