From 5a06a564785b667b13a46879d8d9b6d35822d683 Mon Sep 17 00:00:00 2001 From: "Zhanxiang (Patrick) Huang" Date: Mon, 26 Feb 2024 17:51:02 +0800 Subject: [PATCH] fix: pass correct object store config to monitored object store for all backends (#15260) --- src/object_store/src/object/mod.rs | 101 +++++++----------- src/object_store/src/object/s3.rs | 11 -- src/storage/backup/src/storage.rs | 2 + src/storage/benches/bench_compactor.rs | 7 +- src/storage/benches/bench_multi_builder.rs | 2 +- .../src/hummock/iterator/test_utils.rs | 7 +- 6 files changed, 50 insertions(+), 80 deletions(-) diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index d9ae0bc37b868..df330967d9679 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -118,26 +118,20 @@ pub trait ObjectStore: Send + Sync { /// specified in the request is not found, it will be considered as successfully deleted. async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()>; - fn monitored(self, metrics: Arc) -> MonitoredObjectStore + fn monitored( + self, + metrics: Arc, + config: ObjectStoreConfig, + ) -> MonitoredObjectStore where Self: Sized, { - MonitoredObjectStore::new(self, metrics) + MonitoredObjectStore::new(self, metrics, config) } async fn list(&self, prefix: &str) -> ObjectResult; fn store_media_type(&self) -> &'static str; - - fn recv_buffer_size(&self) -> usize { - // 2MB - 1 << 21 - } - - fn config(&self) -> Option<&ObjectStoreConfig> { - // TODO: remove option - None - } } pub enum ObjectStoreImpl { @@ -280,16 +274,6 @@ impl ObjectStoreImpl { ObjectStoreImpl::Sim(_) => true, } } - - pub fn recv_buffer_size(&self) -> usize { - match self { - ObjectStoreImpl::InMem(store) => store.recv_buffer_size(), - ObjectStoreImpl::Opendal(store) => store.recv_buffer_size(), - ObjectStoreImpl::S3(store) => store.recv_buffer_size(), - #[cfg(madsim)] - ObjectStoreImpl::Sim(store) => store.recv_buffer_size(), - } - } } fn try_update_failure_metric( @@ -526,29 +510,22 @@ pub struct MonitoredObjectStore { /// - start `operation_latency` timer /// - `failure-count` impl MonitoredObjectStore { - pub fn new(store: OS, object_store_metrics: Arc) -> Self { - if let Some(config) = store.config() { - Self { - object_store_metrics, - streaming_read_timeout: Some(Duration::from_millis( - config.object_store_streaming_read_timeout_ms, - )), - streaming_upload_timeout: Some(Duration::from_millis( - config.object_store_streaming_upload_timeout_ms, - )), - read_timeout: Some(Duration::from_millis(config.object_store_read_timeout_ms)), - upload_timeout: Some(Duration::from_millis(config.object_store_upload_timeout_ms)), - inner: store, - } - } else { - Self { - inner: store, - object_store_metrics, - streaming_read_timeout: None, - streaming_upload_timeout: None, - read_timeout: None, - upload_timeout: None, - } + pub fn new( + store: OS, + object_store_metrics: Arc, + config: ObjectStoreConfig, + ) -> Self { + Self { + object_store_metrics, + streaming_read_timeout: Some(Duration::from_millis( + config.object_store_streaming_read_timeout_ms, + )), + streaming_upload_timeout: Some(Duration::from_millis( + config.object_store_streaming_upload_timeout_ms, + )), + read_timeout: Some(Duration::from_millis(config.object_store_read_timeout_ms)), + upload_timeout: Some(Duration::from_millis(config.object_store_upload_timeout_ms)), + inner: store, } } @@ -800,10 +777,6 @@ impl MonitoredObjectStore { try_update_failure_metric(&self.object_store_metrics, &res, operation_type); res } - - fn recv_buffer_size(&self) -> usize { - self.inner.recv_buffer_size() - } } /// Creates a new [`ObjectStore`] from the given `url`. Credentials are configured via environment @@ -823,19 +796,19 @@ pub async fn build_remote_object_store( let bucket = s3.strip_prefix("s3://").unwrap(); ObjectStoreImpl::Opendal( - OpendalObjectStore::new_s3_engine(bucket.to_string(), config) + OpendalObjectStore::new_s3_engine(bucket.to_string(), config.clone()) .unwrap() - .monitored(metrics), + .monitored(metrics, config), ) } else { ObjectStoreImpl::S3( S3ObjectStore::new_with_config( s3.strip_prefix("s3://").unwrap().to_string(), metrics.clone(), - config, + config.clone(), ) .await - .monitored(metrics), + .monitored(metrics, config), ) } } @@ -855,7 +828,7 @@ pub async fn build_remote_object_store( ObjectStoreImpl::Opendal( OpendalObjectStore::new_gcs_engine(bucket.to_string(), root.to_string()) .unwrap() - .monitored(metrics), + .monitored(metrics, config), ) } obs if obs.starts_with("obs://") => { @@ -864,7 +837,7 @@ pub async fn build_remote_object_store( ObjectStoreImpl::Opendal( OpendalObjectStore::new_obs_engine(bucket.to_string(), root.to_string()) .unwrap() - .monitored(metrics), + .monitored(metrics, config), ) } @@ -874,7 +847,7 @@ pub async fn build_remote_object_store( ObjectStoreImpl::Opendal( OpendalObjectStore::new_oss_engine(bucket.to_string(), root.to_string()) .unwrap() - .monitored(metrics), + .monitored(metrics, config), ) } webhdfs if webhdfs.starts_with("webhdfs://") => { @@ -883,7 +856,7 @@ pub async fn build_remote_object_store( ObjectStoreImpl::Opendal( OpendalObjectStore::new_webhdfs_engine(namenode.to_string(), root.to_string()) .unwrap() - .monitored(metrics), + .monitored(metrics, config), ) } azblob if azblob.starts_with("azblob://") => { @@ -892,7 +865,7 @@ pub async fn build_remote_object_store( ObjectStoreImpl::Opendal( OpendalObjectStore::new_azblob_engine(container_name.to_string(), root.to_string()) .unwrap() - .monitored(metrics), + .monitored(metrics, config), ) } fs if fs.starts_with("fs://") => { @@ -900,7 +873,7 @@ pub async fn build_remote_object_store( ObjectStoreImpl::Opendal( OpendalObjectStore::new_fs_engine(fs.to_string()) .unwrap() - .monitored(metrics), + .monitored(metrics, config), ) } @@ -911,9 +884,9 @@ pub async fn build_remote_object_store( panic!("Passing s3-compatible is not supported, please modify the environment variable and pass in s3."); } minio if minio.starts_with("minio://") => ObjectStoreImpl::S3( - S3ObjectStore::with_minio(minio, metrics.clone(), config) + S3ObjectStore::with_minio(minio, metrics.clone(), config.clone()) .await - .monitored(metrics), + .monitored(metrics, config), ), "memory" => { if ident == "Meta Backup" { @@ -921,7 +894,7 @@ pub async fn build_remote_object_store( } else { tracing::warn!("You're using in-memory remote object store for {}. This should never be used in benchmarks and production environment.", ident); } - ObjectStoreImpl::InMem(InMemObjectStore::new().monitored(metrics)) + ObjectStoreImpl::InMem(InMemObjectStore::new().monitored(metrics, config)) } "memory-shared" => { if ident == "Meta Backup" { @@ -929,11 +902,11 @@ pub async fn build_remote_object_store( } else { tracing::warn!("You're using shared in-memory remote object store for {}. This should never be used in benchmarks and production environment.", ident); } - ObjectStoreImpl::InMem(InMemObjectStore::shared().monitored(metrics)) + ObjectStoreImpl::InMem(InMemObjectStore::shared().monitored(metrics, config)) } #[cfg(madsim)] sim if sim.starts_with("sim://") => { - ObjectStoreImpl::Sim(SimObjectStore::new(url).monitored(metrics)) + ObjectStoreImpl::Sim(SimObjectStore::new(url).monitored(metrics, config)) } other => { unimplemented!( diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 349d3b7142322..153d58fe3952d 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -532,17 +532,6 @@ impl ObjectStore for S3ObjectStore { fn store_media_type(&self) -> &'static str { "s3" } - - fn recv_buffer_size(&self) -> usize { - self.config - .s3 - .object_store_recv_buffer_size - .unwrap_or(1 << 21) - } - - fn config(&self) -> Option<&ObjectStoreConfig> { - Some(&self.config) - } } impl S3ObjectStore { diff --git a/src/storage/backup/src/storage.rs b/src/storage/backup/src/storage.rs index f7a1e5e61a35f..54d2eaa741c94 100644 --- a/src/storage/backup/src/storage.rs +++ b/src/storage/backup/src/storage.rs @@ -16,6 +16,7 @@ use std::collections::HashSet; use std::sync::Arc; use itertools::Itertools; +use risingwave_common::config::ObjectStoreConfig; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_object_store::object::{ InMemObjectStore, MonitoredObjectStore, ObjectError, ObjectStoreImpl, ObjectStoreRef, @@ -191,6 +192,7 @@ pub async fn unused() -> ObjectStoreMetaSnapshotStorage { Arc::new(ObjectStoreImpl::InMem(MonitoredObjectStore::new( InMemObjectStore::new(), Arc::new(ObjectStoreMetrics::unused()), + ObjectStoreConfig::default(), ))), ) .await diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index 5f9ee1c8e8524..c947bc86767f3 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -19,7 +19,7 @@ use criterion::async_executor::FuturesExecutor; use criterion::{criterion_group, criterion_main, Criterion}; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; -use risingwave_common::config::MetricLevel; +use risingwave_common::config::{MetricLevel, ObjectStoreConfig}; use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::FullKey; use risingwave_hummock_sdk::key_range::KeyRange; @@ -49,7 +49,10 @@ use risingwave_storage::monitor::{ }; pub fn mock_sstable_store() -> SstableStoreRef { - let store = InMemObjectStore::new().monitored(Arc::new(ObjectStoreMetrics::unused())); + let store = InMemObjectStore::new().monitored( + Arc::new(ObjectStoreMetrics::unused()), + ObjectStoreConfig::default(), + ); let store = Arc::new(ObjectStoreImpl::InMem(store)); let path = "test".to_string(); Arc::new(SstableStore::new(SstableStoreConfig { diff --git a/src/storage/benches/bench_multi_builder.rs b/src/storage/benches/bench_multi_builder.rs index 7d1abf67ec857..bdc288b4d6925 100644 --- a/src/storage/benches/bench_multi_builder.rs +++ b/src/storage/benches/bench_multi_builder.rs @@ -138,7 +138,7 @@ fn bench_builder( ObjectStoreConfig::default(), ) .await - .monitored(metrics) + .monitored(metrics, ObjectStoreConfig::default()) }); let object_store = Arc::new(ObjectStoreImpl::S3(object_store)); let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig { diff --git a/src/storage/src/hummock/iterator/test_utils.rs b/src/storage/src/hummock/iterator/test_utils.rs index e154098e7a0ae..62602074c601a 100644 --- a/src/storage/src/hummock/iterator/test_utils.rs +++ b/src/storage/src/hummock/iterator/test_utils.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use bytes::Bytes; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_common::config::MetricLevel; +use risingwave_common::config::{MetricLevel, ObjectStoreConfig}; use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey}; use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, HummockSstableObjectId}; use risingwave_object_store::object::{ @@ -54,7 +54,10 @@ pub const TEST_KEYS_COUNT: usize = 10; pub fn mock_sstable_store() -> SstableStoreRef { mock_sstable_store_with_object_store(Arc::new(ObjectStoreImpl::InMem( - InMemObjectStore::new().monitored(Arc::new(ObjectStoreMetrics::unused())), + InMemObjectStore::new().monitored( + Arc::new(ObjectStoreMetrics::unused()), + ObjectStoreConfig::default(), + ), ))) }