diff --git a/Cargo.lock b/Cargo.lock index c23acf60636d..7f38d0d8b183 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -896,6 +896,18 @@ dependencies = [ "rand", ] +[[package]] +name = "backon" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d67782c3f868daa71d3533538e98a8e13713231969def7536e8039606fc46bf0" +dependencies = [ + "fastrand", + "futures-core", + "pin-project", + "tokio", +] + [[package]] name = "backon" version = "1.2.0" @@ -2252,7 +2264,7 @@ version = "0.12.0" dependencies = [ "async-stream", "async-trait", - "backon", + "backon 1.2.0", "common-base", "common-error", "common-macro", @@ -7469,13 +7481,13 @@ checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9" [[package]] name = "opendal" -version = "0.50.2" +version = "0.49.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb28bb6c64e116ceaf8dd4e87099d3cfea4a58e85e62b104fef74c91afba0f44" +checksum = "9b04d09b9822c2f75a1d2fc513a2c1279c70e91e7407936fffdf6a6976ec530a" dependencies = [ "anyhow", "async-trait", - "backon", + "backon 0.4.4", "base64 0.22.1", "bytes", "chrono", @@ -7488,7 +7500,6 @@ dependencies = [ "md-5", "once_cell", "percent-encoding", - "prometheus", "quick-xml 0.36.2", "reqsign", "reqwest", @@ -9504,9 +9515,9 @@ dependencies = [ [[package]] name = "reqsign" -version = "0.16.1" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb0075a66c8bfbf4cc8b70dca166e722e1f55a3ea9250ecbb85f4d92a5f64149" +checksum = "03dd4ba7c3901dd43e6b8c7446a760d45bc1ea4301002e1a6fa48f97c3a796fa" dependencies = [ "anyhow", "async-trait", diff --git a/src/common/datasource/src/object_store/fs.rs b/src/common/datasource/src/object_store/fs.rs index 5ffbbfa3148a..f87311f517b7 100644 --- a/src/common/datasource/src/object_store/fs.rs +++ b/src/common/datasource/src/object_store/fs.rs @@ -27,7 +27,7 @@ pub fn build_fs_backend(root: &str) -> Result { DefaultLoggingInterceptor, )) .layer(object_store::layers::TracingLayer) - .layer(object_store::layers::build_prometheus_metrics_layer(true)) + .layer(object_store::layers::PrometheusMetricsLayer::new(true)) .finish(); Ok(object_store) } diff --git a/src/common/datasource/src/object_store/s3.rs b/src/common/datasource/src/object_store/s3.rs index 0d83eb7a98b8..e141621b899b 100644 --- a/src/common/datasource/src/object_store/s3.rs +++ b/src/common/datasource/src/object_store/s3.rs @@ -89,7 +89,7 @@ pub fn build_s3_backend( DefaultLoggingInterceptor, )) .layer(object_store::layers::TracingLayer) - .layer(object_store::layers::build_prometheus_metrics_layer(true)) + .layer(object_store::layers::PrometheusMetricsLayer::new(true)) .finish()) } diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index bf277a0e72e5..c2d15001fba3 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -544,7 +544,7 @@ mod tests { use common_test_util::temp_dir::create_temp_dir; use futures_util::future::BoxFuture; use futures_util::FutureExt; - use object_store::{EntryMode, ObjectStore}; + use object_store::ObjectStore; use tokio::sync::mpsc; use super::*; @@ -578,11 +578,7 @@ mod tests { ) { let dir = proc_path!(procedure_store, "{procedure_id}/"); let lister = object_store.list(&dir).await.unwrap(); - let mut files_in_dir: Vec<_> = lister - .into_iter() - .filter(|x| x.metadata().mode() == EntryMode::FILE) - .map(|de| de.name().to_string()) - .collect(); + let mut files_in_dir: Vec<_> = lister.into_iter().map(|de| de.name().to_string()).collect(); files_in_dir.sort_unstable(); assert_eq!(files, files_in_dir); } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 61a4eae12883..9fbd46e16009 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -193,14 +193,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to build http client"))] - BuildHttpClient { - #[snafu(implicit)] - location: Location, - #[snafu(source)] - error: reqwest::Error, - }, - #[snafu(display("Missing required field: {}", name))] MissingRequiredField { name: String, @@ -414,10 +406,9 @@ impl ErrorExt for Error { | MissingKvBackend { .. } | TomlFormat { .. } => StatusCode::InvalidArguments, - PayloadNotExist { .. } - | Unexpected { .. } - | WatchAsyncTaskChange { .. } - | BuildHttpClient { .. } => StatusCode::Unexpected, + PayloadNotExist { .. } | Unexpected { .. } | WatchAsyncTaskChange { .. } => { + StatusCode::Unexpected + } AsyncTaskExecute { source, .. } => source.status_code(), diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 52a1cba982e1..c78afe448e0c 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -32,7 +32,7 @@ use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder, O use snafu::prelude::*; use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; -use crate::error::{self, BuildHttpClientSnafu, CreateDirSnafu, Result}; +use crate::error::{self, CreateDirSnafu, Result}; pub(crate) async fn new_raw_object_store( store: &ObjectStoreConfig, @@ -236,8 +236,7 @@ pub(crate) fn build_http_client(config: &HttpClientConfig) -> Result builder.timeout(config.timeout) }; - let client = http_builder.build().context(BuildHttpClientSnafu)?; - Ok(HttpClient::with(client)) + HttpClient::build(http_builder).context(error::InitBackendSnafu) } struct PrintDetailedError; diff --git a/src/file-engine/src/manifest.rs b/src/file-engine/src/manifest.rs index 6bf5ee104ba2..6310c3ccb912 100644 --- a/src/file-engine/src/manifest.rs +++ b/src/file-engine/src/manifest.rs @@ -46,7 +46,7 @@ impl FileRegionManifest { pub async fn store(&self, region_dir: &str, object_store: &ObjectStore) -> Result<()> { let path = ®ion_manifest_path(region_dir); let exist = object_store - .exists(path) + .is_exist(path) .await .context(CheckObjectSnafu { path })?; ensure!(!exist, ManifestExistsSnafu { path }); diff --git a/src/file-engine/src/region.rs b/src/file-engine/src/region.rs index 673d352b1e63..a5af6822285e 100644 --- a/src/file-engine/src/region.rs +++ b/src/file-engine/src/region.rs @@ -130,7 +130,7 @@ mod tests { assert_eq!(region.metadata.primary_key, vec![1]); assert!(object_store - .exists("create_region_dir/manifest/_file_manifest") + .is_exist("create_region_dir/manifest/_file_manifest") .await .unwrap()); @@ -198,13 +198,13 @@ mod tests { .unwrap(); assert!(object_store - .exists("drop_region_dir/manifest/_file_manifest") + .is_exist("drop_region_dir/manifest/_file_manifest") .await .unwrap()); FileRegion::drop(®ion, &object_store).await.unwrap(); assert!(!object_store - .exists("drop_region_dir/manifest/_file_manifest") + .is_exist("drop_region_dir/manifest/_file_manifest") .await .unwrap()); diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index d0f8cf5028e6..c5f7a2b4a32c 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -313,12 +313,12 @@ mod test { let region_dir = "test_metric_region"; // assert metadata region's dir let metadata_region_dir = join_dir(region_dir, METADATA_REGION_SUBDIR); - let exist = object_store.exists(&metadata_region_dir).await.unwrap(); + let exist = object_store.is_exist(&metadata_region_dir).await.unwrap(); assert!(exist); // assert data region's dir let data_region_dir = join_dir(region_dir, DATA_REGION_SUBDIR); - let exist = object_store.exists(&data_region_dir).await.unwrap(); + let exist = object_store.is_exist(&data_region_dir).await.unwrap(); assert!(exist); // check mito engine diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index eb112530cad7..9e5742ca0410 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -286,7 +286,7 @@ impl FileCache { } async fn get_reader(&self, file_path: &str) -> object_store::Result> { - if self.local_store.exists(file_path).await? { + if self.local_store.is_exist(file_path).await? { Ok(Some(self.local_store.reader(file_path).await?)) } else { Ok(None) @@ -480,7 +480,7 @@ mod tests { cache.memory_index.run_pending_tasks().await; // The file also not exists. - assert!(!local_store.exists(&file_path).await.unwrap()); + assert!(!local_store.is_exist(&file_path).await.unwrap()); assert_eq!(0, cache.memory_index.weighted_size()); } diff --git a/src/mito2/src/engine/create_test.rs b/src/mito2/src/engine/create_test.rs index 4bcc55934034..48b04dc86d91 100644 --- a/src/mito2/src/engine/create_test.rs +++ b/src/mito2/src/engine/create_test.rs @@ -192,12 +192,12 @@ async fn test_engine_create_with_custom_store() { assert!(object_store_manager .find("Gcs") .unwrap() - .exists(region_dir) + .is_exist(region_dir) .await .unwrap()); assert!(!object_store_manager .default_object_store() - .exists(region_dir) + .is_exist(region_dir) .await .unwrap()); } diff --git a/src/mito2/src/engine/drop_test.rs b/src/mito2/src/engine/drop_test.rs index 5d0c5afbf06e..7d719f778be9 100644 --- a/src/mito2/src/engine/drop_test.rs +++ b/src/mito2/src/engine/drop_test.rs @@ -71,7 +71,7 @@ async fn test_engine_drop_region() { assert!(!env .get_object_store() .unwrap() - .exists(&join_path(®ion_dir, DROPPING_MARKER_FILE)) + .is_exist(&join_path(®ion_dir, DROPPING_MARKER_FILE)) .await .unwrap()); @@ -93,7 +93,7 @@ async fn test_engine_drop_region() { listener.wait().await; let object_store = env.get_object_store().unwrap(); - assert!(!object_store.exists(®ion_dir).await.unwrap()); + assert!(!object_store.is_exist(®ion_dir).await.unwrap()); } #[tokio::test] @@ -167,13 +167,13 @@ async fn test_engine_drop_region_for_custom_store() { assert!(object_store_manager .find("Gcs") .unwrap() - .exists(&custom_region_dir) + .is_exist(&custom_region_dir) .await .unwrap()); assert!(object_store_manager .find("default") .unwrap() - .exists(&global_region_dir) + .is_exist(&global_region_dir) .await .unwrap()); @@ -190,13 +190,13 @@ async fn test_engine_drop_region_for_custom_store() { assert!(!object_store_manager .find("Gcs") .unwrap() - .exists(&custom_region_dir) + .is_exist(&custom_region_dir) .await .unwrap()); assert!(object_store_manager .find("default") .unwrap() - .exists(&global_region_dir) + .is_exist(&global_region_dir) .await .unwrap()); } diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index a3b51514c287..6752bbd04b12 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -228,13 +228,13 @@ async fn test_engine_region_open_with_custom_store() { let object_store_manager = env.get_object_store_manager().unwrap(); assert!(!object_store_manager .default_object_store() - .exists(region.access_layer.region_dir()) + .is_exist(region.access_layer.region_dir()) .await .unwrap()); assert!(object_store_manager .find("Gcs") .unwrap() - .exists(region.access_layer.region_dir()) + .is_exist(region.access_layer.region_dir()) .await .unwrap()); } diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index 6f2c92bc5e09..692f40422b17 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -84,7 +84,6 @@ async fn manager_without_checkpoint() { // check files let mut expected = vec![ - "/", "00000000000000000010.json", "00000000000000000009.json", "00000000000000000008.json", @@ -131,7 +130,6 @@ async fn manager_with_checkpoint_distance_1() { // check files let mut expected = vec![ - "/", "00000000000000000009.checkpoint", "00000000000000000010.checkpoint", "00000000000000000010.json", diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 81251c91a564..76c7a7150328 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -185,7 +185,7 @@ mod tests { scheduler.stop(true).await.unwrap(); - assert!(!object_store.exists(&path).await.unwrap()); + assert!(!object_store.is_exist(&path).await.unwrap()); } #[tokio::test] @@ -247,7 +247,7 @@ mod tests { scheduler.stop(true).await.unwrap(); - assert!(!object_store.exists(&path).await.unwrap()); - assert!(!object_store.exists(&index_path).await.unwrap()); + assert!(!object_store.is_exist(&path).await.unwrap()); + assert!(!object_store.is_exist(&index_path).await.unwrap()); } } diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index 01eaf1765224..d4a13a134597 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -51,7 +51,7 @@ impl RegionWorkerLoop { // Check if this region is pending drop. And clean the entire dir if so. if !self.dropping_regions.is_region_exists(region_id) && object_store - .exists(&join_path(&request.region_dir, DROPPING_MARKER_FILE)) + .is_exist(&join_path(&request.region_dir, DROPPING_MARKER_FILE)) .await .context(OpenDalSnafu)? { diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index b82be7376a72..72e0e2bfbe46 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -17,9 +17,8 @@ futures.workspace = true lazy_static.workspace = true md5 = "0.7" moka = { workspace = true, features = ["future"] } -opendal = { version = "0.50", features = [ +opendal = { version = "0.49", features = [ "layers-tracing", - "layers-prometheus", "services-azblob", "services-fs", "services-gcs", diff --git a/src/object-store/src/layers.rs b/src/object-store/src/layers.rs index 20108ab63c52..b2145aa6b0e5 100644 --- a/src/object-store/src/layers.rs +++ b/src/object-store/src/layers.rs @@ -13,37 +13,8 @@ // limitations under the License. mod lru_cache; +mod prometheus; pub use lru_cache::*; pub use opendal::layers::*; -pub use prometheus::build_prometheus_metrics_layer; - -mod prometheus { - use std::sync::{Mutex, OnceLock}; - - use opendal::layers::PrometheusLayer; - - static PROMETHEUS_LAYER: OnceLock> = OnceLock::new(); - - pub fn build_prometheus_metrics_layer(with_path_label: bool) -> PrometheusLayer { - PROMETHEUS_LAYER - .get_or_init(|| { - // This logical tries to extract parent path from the object storage operation - // the function also relies on assumption that the region path is built from - // pattern `/catalog/schema/table_id/....` - // - // We'll get the data/catalog/schema from path. - let path_level = if with_path_label { 3 } else { 0 }; - - let layer = PrometheusLayer::builder() - .path_label(path_level) - .register_default() - .unwrap(); - - Mutex::new(layer) - }) - .lock() - .unwrap() - .clone() - } -} +pub use prometheus::PrometheusMetricsLayer; diff --git a/src/object-store/src/layers/lru_cache/read_cache.rs b/src/object-store/src/layers/lru_cache/read_cache.rs index 874b17280d9c..f88b36784d15 100644 --- a/src/object-store/src/layers/lru_cache/read_cache.rs +++ b/src/object-store/src/layers/lru_cache/read_cache.rs @@ -156,12 +156,9 @@ impl ReadCache { let size = entry.metadata().content_length(); OBJECT_STORE_LRU_CACHE_ENTRIES.inc(); OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64); - // ignore root path - if entry.path() != "/" { - self.mem_cache - .insert(read_key.to_string(), ReadResult::Success(size as u32)) - .await; - } + self.mem_cache + .insert(read_key.to_string(), ReadResult::Success(size as u32)) + .await; } Ok(self.cache_stat().await) diff --git a/src/object-store/src/layers/prometheus.rs b/src/object-store/src/layers/prometheus.rs new file mode 100644 index 000000000000..fef83a91468a --- /dev/null +++ b/src/object-store/src/layers/prometheus.rs @@ -0,0 +1,584 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! code originally from , make a tiny change to avoid crash in multi thread env + +use std::fmt::{Debug, Formatter}; + +use common_telemetry::debug; +use lazy_static::lazy_static; +use opendal::raw::*; +use opendal::{Buffer, ErrorKind}; +use prometheus::{ + exponential_buckets, histogram_opts, register_histogram_vec, register_int_counter_vec, + Histogram, HistogramTimer, HistogramVec, IntCounterVec, +}; + +use crate::util::extract_parent_path; + +type Result = std::result::Result; + +lazy_static! { + static ref REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec!( + "opendal_requests_total", + "Total times of all kinds of operation being called", + &["scheme", "operation", "path"], + ) + .unwrap(); + static ref REQUESTS_DURATION_SECONDS: HistogramVec = register_histogram_vec!( + histogram_opts!( + "opendal_requests_duration_seconds", + "Histogram of the time spent on specific operation", + exponential_buckets(0.01, 2.0, 16).unwrap() + ), + &["scheme", "operation", "path"] + ) + .unwrap(); + static ref BYTES_TOTAL: HistogramVec = register_histogram_vec!( + histogram_opts!( + "opendal_bytes_total", + "Total size of sync or async Read/Write", + exponential_buckets(0.01, 2.0, 16).unwrap() + ), + &["scheme", "operation", "path"] + ) + .unwrap(); +} + +#[inline] +fn increment_errors_total(op: Operation, kind: ErrorKind) { + debug!( + "Prometheus statistics metrics error, operation {} error {}", + op.into_static(), + kind.into_static() + ); +} + +/// Please refer to [prometheus](https://docs.rs/prometheus) for every operation. +/// +/// # Prometheus Metrics +/// +/// In this section, we will introduce three metrics that are currently being exported by opendal. These metrics are essential for understanding the behavior and performance of opendal. +/// +/// +/// | Metric Name | Type | Description | Labels | +/// |-----------------------------------|-----------|------------------------------------------------------|---------------------| +/// | opendal_requests_total | Counter | Total times of all kinds of operation being called | scheme, operation | +/// | opendal_requests_duration_seconds | Histogram | Histogram of the time spent on specific operation | scheme, operation | +/// | opendal_bytes_total | Histogram | Total size of sync or async Read/Write | scheme, operation | +/// +/// For a more detailed explanation of these metrics and how they are used, please refer to the [Prometheus documentation](https://prometheus.io/docs/introduction/overview/). +/// +/// # Histogram Configuration +/// +/// The metric buckets for these histograms are automatically generated based on the `exponential_buckets(0.01, 2.0, 16)` configuration. +#[derive(Default, Debug, Clone)] +pub struct PrometheusMetricsLayer { + pub path_label: bool, +} + +impl PrometheusMetricsLayer { + pub fn new(path_label: bool) -> Self { + Self { path_label } + } +} + +impl Layer for PrometheusMetricsLayer { + type LayeredAccess = PrometheusAccess; + + fn layer(&self, inner: A) -> Self::LayeredAccess { + let meta = inner.info(); + let scheme = meta.scheme(); + + PrometheusAccess { + inner, + scheme: scheme.to_string(), + path_label: self.path_label, + } + } +} + +#[derive(Clone)] +pub struct PrometheusAccess { + inner: A, + scheme: String, + path_label: bool, +} + +impl PrometheusAccess { + fn get_path_label<'a>(&self, path: &'a str) -> &'a str { + if self.path_label { + extract_parent_path(path) + } else { + "" + } + } +} + +impl Debug for PrometheusAccess { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PrometheusAccessor") + .field("inner", &self.inner) + .finish_non_exhaustive() + } +} + +impl LayeredAccess for PrometheusAccess { + type Inner = A; + type Reader = PrometheusMetricWrapper; + type BlockingReader = PrometheusMetricWrapper; + type Writer = PrometheusMetricWrapper; + type BlockingWriter = PrometheusMetricWrapper; + type Lister = A::Lister; + type BlockingLister = A::BlockingLister; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { + let path_label = self.get_path_label(path); + REQUESTS_TOTAL + .with_label_values(&[&self.scheme, Operation::CreateDir.into_static(), path_label]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::CreateDir.into_static(), path_label]) + .start_timer(); + let create_res = self.inner.create_dir(path, args).await; + + timer.observe_duration(); + create_res.inspect_err(|e| { + increment_errors_total(Operation::CreateDir, e.kind()); + }) + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let path_label = self.get_path_label(path); + REQUESTS_TOTAL + .with_label_values(&[&self.scheme, Operation::Read.into_static(), path_label]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::Read.into_static(), path_label]) + .start_timer(); + + let (rp, r) = self.inner.read(path, args).await.inspect_err(|e| { + increment_errors_total(Operation::Read, e.kind()); + })?; + + Ok(( + rp, + PrometheusMetricWrapper::new( + r, + Operation::Read, + BYTES_TOTAL.with_label_values(&[ + &self.scheme, + Operation::Read.into_static(), + path_label, + ]), + timer, + ), + )) + } + + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let path_label = self.get_path_label(path); + REQUESTS_TOTAL + .with_label_values(&[&self.scheme, Operation::Write.into_static(), path_label]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::Write.into_static(), path_label]) + .start_timer(); + + let (rp, r) = self.inner.write(path, args).await.inspect_err(|e| { + increment_errors_total(Operation::Write, e.kind()); + })?; + + Ok(( + rp, + PrometheusMetricWrapper::new( + r, + Operation::Write, + BYTES_TOTAL.with_label_values(&[ + &self.scheme, + Operation::Write.into_static(), + path_label, + ]), + timer, + ), + )) + } + + async fn stat(&self, path: &str, args: OpStat) -> Result { + let path_label = self.get_path_label(path); + REQUESTS_TOTAL + .with_label_values(&[&self.scheme, Operation::Stat.into_static(), path_label]) + .inc(); + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::Stat.into_static(), path_label]) + .start_timer(); + + let stat_res = self.inner.stat(path, args).await; + timer.observe_duration(); + stat_res.inspect_err(|e| { + increment_errors_total(Operation::Stat, e.kind()); + }) + } + + async fn delete(&self, path: &str, args: OpDelete) -> Result { + let path_label = self.get_path_label(path); + REQUESTS_TOTAL + .with_label_values(&[&self.scheme, Operation::Delete.into_static(), path_label]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::Delete.into_static(), path_label]) + .start_timer(); + + let delete_res = self.inner.delete(path, args).await; + timer.observe_duration(); + delete_res.inspect_err(|e| { + increment_errors_total(Operation::Delete, e.kind()); + }) + } + + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + let path_label = self.get_path_label(path); + REQUESTS_TOTAL + .with_label_values(&[&self.scheme, Operation::List.into_static(), path_label]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::List.into_static(), path_label]) + .start_timer(); + + let list_res = self.inner.list(path, args).await; + + timer.observe_duration(); + list_res.inspect_err(|e| { + increment_errors_total(Operation::List, e.kind()); + }) + } + + async fn batch(&self, args: OpBatch) -> Result { + REQUESTS_TOTAL + .with_label_values(&[&self.scheme, Operation::Batch.into_static(), ""]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::Batch.into_static(), ""]) + .start_timer(); + let result = self.inner.batch(args).await; + + timer.observe_duration(); + result.inspect_err(|e| { + increment_errors_total(Operation::Batch, e.kind()); + }) + } + + async fn presign(&self, path: &str, args: OpPresign) -> Result { + let path_label = self.get_path_label(path); + REQUESTS_TOTAL + .with_label_values(&[&self.scheme, Operation::Presign.into_static(), path_label]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::Presign.into_static(), path_label]) + .start_timer(); + let result = self.inner.presign(path, args).await; + timer.observe_duration(); + + result.inspect_err(|e| { + increment_errors_total(Operation::Presign, e.kind()); + }) + } + + fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { + let path_label = self.get_path_label(path); + REQUESTS_TOTAL + .with_label_values(&[ + &self.scheme, + Operation::BlockingCreateDir.into_static(), + path_label, + ]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[ + &self.scheme, + Operation::BlockingCreateDir.into_static(), + path_label, + ]) + .start_timer(); + let result = self.inner.blocking_create_dir(path, args); + + timer.observe_duration(); + + result.inspect_err(|e| { + increment_errors_total(Operation::BlockingCreateDir, e.kind()); + }) + } + + fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { + let path_label = self.get_path_label(path); + REQUESTS_TOTAL + .with_label_values(&[ + &self.scheme, + Operation::BlockingRead.into_static(), + path_label, + ]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[ + &self.scheme, + Operation::BlockingRead.into_static(), + path_label, + ]) + .start_timer(); + + self.inner + .blocking_read(path, args) + .map(|(rp, r)| { + ( + rp, + PrometheusMetricWrapper::new( + r, + Operation::BlockingRead, + BYTES_TOTAL.with_label_values(&[ + &self.scheme, + Operation::BlockingRead.into_static(), + path_label, + ]), + timer, + ), + ) + }) + .inspect_err(|e| { + increment_errors_total(Operation::BlockingRead, e.kind()); + }) + } + + fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { + let path_label = self.get_path_label(path); + REQUESTS_TOTAL + .with_label_values(&[ + &self.scheme, + Operation::BlockingWrite.into_static(), + path_label, + ]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[ + &self.scheme, + Operation::BlockingWrite.into_static(), + path_label, + ]) + .start_timer(); + + self.inner + .blocking_write(path, args) + .map(|(rp, r)| { + ( + rp, + PrometheusMetricWrapper::new( + r, + Operation::BlockingWrite, + BYTES_TOTAL.with_label_values(&[ + &self.scheme, + Operation::BlockingWrite.into_static(), + path_label, + ]), + timer, + ), + ) + }) + .inspect_err(|e| { + increment_errors_total(Operation::BlockingWrite, e.kind()); + }) + } + + fn blocking_stat(&self, path: &str, args: OpStat) -> Result { + let path_label = self.get_path_label(path); + REQUESTS_TOTAL + .with_label_values(&[ + &self.scheme, + Operation::BlockingStat.into_static(), + path_label, + ]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[ + &self.scheme, + Operation::BlockingStat.into_static(), + path_label, + ]) + .start_timer(); + let result = self.inner.blocking_stat(path, args); + timer.observe_duration(); + result.inspect_err(|e| { + increment_errors_total(Operation::BlockingStat, e.kind()); + }) + } + + fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { + let path_label = self.get_path_label(path); + REQUESTS_TOTAL + .with_label_values(&[ + &self.scheme, + Operation::BlockingDelete.into_static(), + path_label, + ]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[ + &self.scheme, + Operation::BlockingDelete.into_static(), + path_label, + ]) + .start_timer(); + let result = self.inner.blocking_delete(path, args); + timer.observe_duration(); + + result.inspect_err(|e| { + increment_errors_total(Operation::BlockingDelete, e.kind()); + }) + } + + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { + let path_label = self.get_path_label(path); + REQUESTS_TOTAL + .with_label_values(&[ + &self.scheme, + Operation::BlockingList.into_static(), + path_label, + ]) + .inc(); + + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[ + &self.scheme, + Operation::BlockingList.into_static(), + path_label, + ]) + .start_timer(); + let result = self.inner.blocking_list(path, args); + timer.observe_duration(); + + result.inspect_err(|e| { + increment_errors_total(Operation::BlockingList, e.kind()); + }) + } +} + +pub struct PrometheusMetricWrapper { + inner: R, + + op: Operation, + bytes_counter: Histogram, + _requests_duration_timer: HistogramTimer, + bytes: u64, +} + +impl Drop for PrometheusMetricWrapper { + fn drop(&mut self) { + self.bytes_counter.observe(self.bytes as f64); + } +} + +impl PrometheusMetricWrapper { + fn new( + inner: R, + op: Operation, + bytes_counter: Histogram, + requests_duration_timer: HistogramTimer, + ) -> Self { + Self { + inner, + op, + bytes_counter, + _requests_duration_timer: requests_duration_timer, + bytes: 0, + } + } +} + +impl oio::Read for PrometheusMetricWrapper { + async fn read(&mut self) -> Result { + self.inner.read().await.inspect_err(|err| { + increment_errors_total(self.op, err.kind()); + }) + } +} + +impl oio::BlockingRead for PrometheusMetricWrapper { + fn read(&mut self) -> opendal::Result { + self.inner.read().inspect_err(|err| { + increment_errors_total(self.op, err.kind()); + }) + } +} + +impl oio::Write for PrometheusMetricWrapper { + async fn write(&mut self, bs: Buffer) -> Result<()> { + let bytes = bs.len(); + match self.inner.write(bs).await { + Ok(_) => { + self.bytes += bytes as u64; + Ok(()) + } + Err(err) => { + increment_errors_total(self.op, err.kind()); + Err(err) + } + } + } + + async fn close(&mut self) -> Result<()> { + self.inner.close().await.inspect_err(|err| { + increment_errors_total(self.op, err.kind()); + }) + } + + async fn abort(&mut self) -> Result<()> { + self.inner.close().await.inspect_err(|err| { + increment_errors_total(self.op, err.kind()); + }) + } +} + +impl oio::BlockingWrite for PrometheusMetricWrapper { + fn write(&mut self, bs: Buffer) -> Result<()> { + let bytes = bs.len(); + self.inner + .write(bs) + .map(|_| { + self.bytes += bytes as u64; + }) + .inspect_err(|err| { + increment_errors_total(self.op, err.kind()); + }) + } + + fn close(&mut self) -> Result<()> { + self.inner.close().inspect_err(|err| { + increment_errors_total(self.op, err.kind()); + }) + } +} diff --git a/src/object-store/src/util.rs b/src/object-store/src/util.rs index 271da33e853c..fc0a031ab953 100644 --- a/src/object-store/src/util.rs +++ b/src/object-store/src/util.rs @@ -15,12 +15,19 @@ use std::fmt::Display; use common_telemetry::{debug, error, trace}; +use futures::TryStreamExt; use opendal::layers::{LoggingInterceptor, LoggingLayer, TracingLayer}; use opendal::raw::{AccessorInfo, Operation}; -use opendal::ErrorKind; +use opendal::{Entry, ErrorKind, Lister}; +use crate::layers::PrometheusMetricsLayer; use crate::ObjectStore; +/// Collect all entries from the [Lister]. +pub async fn collect(stream: Lister) -> Result, opendal::Error> { + stream.try_collect::>().await +} + /// Join two paths and normalize the output dir. /// /// The output dir is always ends with `/`. e.g. @@ -120,12 +127,26 @@ pub fn normalize_path(path: &str) -> String { p } +// This logical tries to extract parent path from the object storage operation +// the function also relies on assumption that the region path is built from +// pattern `/catalog/schema/table_id/....` +// +// this implementation tries to extract at most 3 levels of parent path +pub(crate) fn extract_parent_path(path: &str) -> &str { + // split the path into `catalog`, `schema` and others + path.char_indices() + .filter(|&(_, c)| c == '/') + // we get the data/catalog/schema from path, split at the 3rd / + .nth(2) + .map_or(path, |(i, _)| &path[..i]) +} + /// Attaches instrument layers to the object store. pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> ObjectStore { object_store .layer(LoggingLayer::new(DefaultLoggingInterceptor)) .layer(TracingLayer) - .layer(crate::layers::build_prometheus_metrics_layer(path_label)) + .layer(PrometheusMetricsLayer::new(path_label)) } static LOGGING_TARGET: &str = "opendal::services"; @@ -242,4 +263,28 @@ mod tests { assert_eq!("/abc", join_path("//", "/abc")); assert_eq!("abc/def", join_path("abc/", "//def")); } + + #[test] + fn test_path_extraction() { + assert_eq!( + "data/greptime/public", + extract_parent_path("data/greptime/public/1024/1024_0000000000/") + ); + + assert_eq!( + "data/greptime/public", + extract_parent_path("data/greptime/public/1/") + ); + + assert_eq!( + "data/greptime/public", + extract_parent_path("data/greptime/public") + ); + + assert_eq!("data/greptime/", extract_parent_path("data/greptime/")); + + assert_eq!("data/", extract_parent_path("data/")); + + assert_eq!("/", extract_parent_path("/")); + } } diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index 7e81b965fbed..497decffabfc 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -65,38 +65,23 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> { store.write(p3, "Hello, object3!").await?; // List objects - let entries = store - .list("/") - .await? - .into_iter() - .filter(|x| x.metadata().mode() == EntryMode::FILE) - .collect::>(); + let entries = store.list("/").await?; assert_eq!(3, entries.len()); store.delete(p1).await?; store.delete(p3).await?; // List objects again - // Only o2 and root exist - let entries = store - .list("/") - .await? - .into_iter() - .filter(|x| x.metadata().mode() == EntryMode::FILE) - .collect::>(); + // Only o2 is exists + let entries = store.list("/").await?; assert_eq!(1, entries.len()); - assert_eq!(p2, entries[0].path()); + assert_eq!(p2, entries.first().unwrap().path()); let content = store.read(p2).await?; assert_eq!("Hello, object2!", String::from_utf8(content.to_vec())?); store.delete(p2).await?; - let entries = store - .list("/") - .await? - .into_iter() - .filter(|x| x.metadata().mode() == EntryMode::FILE) - .collect::>(); + let entries = store.list("/").await?; assert!(entries.is_empty()); assert!(store.read(p1).await.is_err()); @@ -267,7 +252,7 @@ async fn test_file_backend_with_lru_cache() -> Result<()> { async fn assert_lru_cache(cache_layer: &LruCacheLayer, file_names: &[&str]) { for file_name in file_names { - assert!(cache_layer.contains_file(file_name).await, "{file_name}"); + assert!(cache_layer.contains_file(file_name).await); } } @@ -279,9 +264,7 @@ async fn assert_cache_files( let (_, mut lister) = store.list("/", OpList::default()).await?; let mut objects = vec![]; while let Some(e) = lister.next().await? { - if e.mode() == EntryMode::FILE { - objects.push(e); - } + objects.push(e); } // compare the cache file with the expected cache file; ignore orders @@ -349,9 +332,9 @@ async fn test_object_store_cache_policy() -> Result<()> { assert_cache_files( &cache_store, &[ - "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-", - "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-", - "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-", + "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14", + "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-14", + "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-14", ], &["Hello, object1!", "object2!", "Hello, object2!"], ) @@ -359,9 +342,9 @@ async fn test_object_store_cache_policy() -> Result<()> { assert_lru_cache( &cache_layer, &[ - "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-", - "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-", - "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-", + "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14", + "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-14", + "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-14", ], ) .await; @@ -372,13 +355,13 @@ async fn test_object_store_cache_policy() -> Result<()> { assert_eq!(cache_layer.read_cache_stat().await, (1, 15)); assert_cache_files( &cache_store, - &["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-"], + &["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14"], &["Hello, object1!"], ) .await?; assert_lru_cache( &cache_layer, - &["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-"], + &["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14"], ) .await; @@ -405,8 +388,8 @@ async fn test_object_store_cache_policy() -> Result<()> { assert_cache_files( &cache_store, &[ - "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-", - "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-", + "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14", "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4", ], &["Hello, object1!", "Hello, object3!", "Hello"], @@ -415,8 +398,8 @@ async fn test_object_store_cache_policy() -> Result<()> { assert_lru_cache( &cache_layer, &[ - "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-", - "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-", + "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14", "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4", ], ) @@ -433,7 +416,7 @@ async fn test_object_store_cache_policy() -> Result<()> { &cache_store, &[ "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14", - "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14", "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4", ], &["ello, object1!", "Hello, object3!", "Hello"], @@ -443,7 +426,7 @@ async fn test_object_store_cache_policy() -> Result<()> { &cache_layer, &[ "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14", - "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14", "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4", ], ) @@ -465,7 +448,7 @@ async fn test_object_store_cache_policy() -> Result<()> { &cache_layer, &[ "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14", - "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14", "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4", ], )