diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 32adba09d2bb1..5c66d7a5bb6c6 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -112,26 +112,20 @@ pub trait ObjectStore: Send + Sync { /// specified in the request is not found, it will be considered as successfully deleted. async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()>; - fn monitored(self, metrics: Arc) -> MonitoredObjectStore + fn monitored( + self, + metrics: Arc, + config: ObjectStoreConfig, + ) -> MonitoredObjectStore where Self: Sized, { - MonitoredObjectStore::new(self, metrics) + MonitoredObjectStore::new(self, metrics, config) } async fn list(&self, prefix: &str) -> ObjectResult; fn store_media_type(&self) -> &'static str; - - fn recv_buffer_size(&self) -> usize { - // 2MB - 1 << 21 - } - - fn config(&self) -> Option<&ObjectStoreConfig> { - // TODO: remove option - None - } } pub enum ObjectStoreImpl { @@ -258,14 +252,6 @@ impl ObjectStoreImpl { ObjectStoreImpl::S3(_) => true, } } - - pub fn recv_buffer_size(&self) -> usize { - match self { - ObjectStoreImpl::InMem(store) => store.recv_buffer_size(), - ObjectStoreImpl::Opendal(store) => store.recv_buffer_size(), - ObjectStoreImpl::S3(store) => store.recv_buffer_size(), - } - } } fn try_update_failure_metric( @@ -502,29 +488,22 @@ pub struct MonitoredObjectStore { /// - start `operation_latency` timer /// - `failure-count` impl MonitoredObjectStore { - pub fn new(store: OS, object_store_metrics: Arc) -> Self { - if let Some(config) = store.config() { - Self { - object_store_metrics, - streaming_read_timeout: Some(Duration::from_millis( - config.object_store_streaming_read_timeout_ms, - )), - streaming_upload_timeout: Some(Duration::from_millis( - config.object_store_streaming_upload_timeout_ms, - )), - read_timeout: Some(Duration::from_millis(config.object_store_read_timeout_ms)), - upload_timeout: Some(Duration::from_millis(config.object_store_upload_timeout_ms)), - inner: store, - } - } else { - Self { - inner: store, - object_store_metrics, - streaming_read_timeout: None, - streaming_upload_timeout: None, - read_timeout: None, - upload_timeout: None, - } + pub fn new( + store: OS, + object_store_metrics: Arc, + config: ObjectStoreConfig, + ) -> Self { + Self { + object_store_metrics, + streaming_read_timeout: Some(Duration::from_millis( + config.object_store_streaming_read_timeout_ms, + )), + streaming_upload_timeout: Some(Duration::from_millis( + config.object_store_streaming_upload_timeout_ms, + )), + read_timeout: Some(Duration::from_millis(config.object_store_read_timeout_ms)), + upload_timeout: Some(Duration::from_millis(config.object_store_upload_timeout_ms)), + inner: store, } } @@ -776,10 +755,6 @@ impl MonitoredObjectStore { try_update_failure_metric(&self.object_store_metrics, &res, operation_type); res } - - fn recv_buffer_size(&self) -> usize { - self.inner.recv_buffer_size() - } } /// Creates a new [`ObjectStore`] from the given `url`. Credentials are configured via environment @@ -794,15 +769,27 @@ pub async fn build_remote_object_store( config: ObjectStoreConfig, ) -> ObjectStoreImpl { match url { - s3 if s3.starts_with("s3://") => ObjectStoreImpl::S3( - S3ObjectStore::new_with_config( - s3.strip_prefix("s3://").unwrap().to_string(), - metrics.clone(), - config, - ) - .await - .monitored(metrics), - ), + s3 if s3.starts_with("s3://") => { + if std::env::var("RW_USE_OPENDAL_FOR_S3").is_ok() { + let bucket = s3.strip_prefix("s3://").unwrap(); + + ObjectStoreImpl::Opendal( + OpendalObjectStore::new_s3_engine(bucket.to_string(), config.clone()) + .unwrap() + .monitored(metrics, config), + ) + } else { + ObjectStoreImpl::S3( + S3ObjectStore::new_with_config( + s3.strip_prefix("s3://").unwrap().to_string(), + metrics.clone(), + config.clone(), + ) + .await + .monitored(metrics, config), + ) + } + } #[cfg(feature = "hdfs-backend")] hdfs if hdfs.starts_with("hdfs://") => { let hdfs = hdfs.strip_prefix("hdfs://").unwrap(); @@ -819,7 +806,7 @@ pub async fn build_remote_object_store( ObjectStoreImpl::Opendal( OpendalObjectStore::new_gcs_engine(bucket.to_string(), root.to_string()) .unwrap() - .monitored(metrics), + .monitored(metrics, config), ) } obs if obs.starts_with("obs://") => { @@ -828,7 +815,7 @@ pub async fn build_remote_object_store( ObjectStoreImpl::Opendal( OpendalObjectStore::new_obs_engine(bucket.to_string(), root.to_string()) .unwrap() - .monitored(metrics), + .monitored(metrics, config), ) } @@ -838,7 +825,7 @@ pub async fn build_remote_object_store( ObjectStoreImpl::Opendal( OpendalObjectStore::new_oss_engine(bucket.to_string(), root.to_string()) .unwrap() - .monitored(metrics), + .monitored(metrics, config), ) } webhdfs if webhdfs.starts_with("webhdfs://") => { @@ -847,7 +834,7 @@ pub async fn build_remote_object_store( ObjectStoreImpl::Opendal( OpendalObjectStore::new_webhdfs_engine(namenode.to_string(), root.to_string()) .unwrap() - .monitored(metrics), + .monitored(metrics, config), ) } azblob if azblob.starts_with("azblob://") => { @@ -856,7 +843,7 @@ pub async fn build_remote_object_store( ObjectStoreImpl::Opendal( OpendalObjectStore::new_azblob_engine(container_name.to_string(), root.to_string()) .unwrap() - .monitored(metrics), + .monitored(metrics, config), ) } fs if fs.starts_with("fs://") => { @@ -864,7 +851,7 @@ pub async fn build_remote_object_store( ObjectStoreImpl::Opendal( OpendalObjectStore::new_fs_engine(fs.to_string()) .unwrap() - .monitored(metrics), + .monitored(metrics, config), ) } @@ -875,9 +862,9 @@ pub async fn build_remote_object_store( panic!("Passing s3-compatible is not supported, please modify the environment variable and pass in s3."); } minio if minio.starts_with("minio://") => ObjectStoreImpl::S3( - S3ObjectStore::with_minio(minio, metrics.clone()) + S3ObjectStore::with_minio(minio, metrics.clone(), config.clone()) .await - .monitored(metrics), + .monitored(metrics, config), ), "memory" => { if ident == "Meta Backup" { @@ -885,7 +872,7 @@ pub async fn build_remote_object_store( } else { tracing::warn!("You're using in-memory remote object store for {}. This should never be used in benchmarks and production environment.", ident); } - ObjectStoreImpl::InMem(InMemObjectStore::new().monitored(metrics)) + ObjectStoreImpl::InMem(InMemObjectStore::new().monitored(metrics, config)) } "memory-shared" => { if ident == "Meta Backup" { @@ -893,7 +880,11 @@ pub async fn build_remote_object_store( } else { tracing::warn!("You're using shared in-memory remote object store for {}. This should never be used in benchmarks and production environment.", ident); } - ObjectStoreImpl::InMem(InMemObjectStore::shared().monitored(metrics)) + ObjectStoreImpl::InMem(InMemObjectStore::shared().monitored(metrics, config)) + } + #[cfg(madsim)] + sim if sim.starts_with("sim://") => { + ObjectStoreImpl::Sim(SimObjectStore::new(url).monitored(metrics, config)) } other => { unimplemented!( diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 7ec004c786481..fcbd302149d06 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -530,17 +530,6 @@ impl ObjectStore for S3ObjectStore { fn store_media_type(&self) -> &'static str { "s3" } - - fn recv_buffer_size(&self) -> usize { - self.config - .s3 - .object_store_recv_buffer_size - .unwrap_or(1 << 21) - } - - fn config(&self) -> Option<&ObjectStoreConfig> { - Some(&self.config) - } } impl S3ObjectStore { diff --git a/src/storage/backup/src/storage.rs b/src/storage/backup/src/storage.rs index f7a1e5e61a35f..54d2eaa741c94 100644 --- a/src/storage/backup/src/storage.rs +++ b/src/storage/backup/src/storage.rs @@ -16,6 +16,7 @@ use std::collections::HashSet; use std::sync::Arc; use itertools::Itertools; +use risingwave_common::config::ObjectStoreConfig; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_object_store::object::{ InMemObjectStore, MonitoredObjectStore, ObjectError, ObjectStoreImpl, ObjectStoreRef, @@ -191,6 +192,7 @@ pub async fn unused() -> ObjectStoreMetaSnapshotStorage { Arc::new(ObjectStoreImpl::InMem(MonitoredObjectStore::new( InMemObjectStore::new(), Arc::new(ObjectStoreMetrics::unused()), + ObjectStoreConfig::default(), ))), ) .await diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index 2736fbc7d7a9e..7f64537b64908 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -19,6 +19,7 @@ use criterion::async_executor::FuturesExecutor; use criterion::{criterion_group, criterion_main, Criterion}; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; +use risingwave_common::config::{MetricLevel, ObjectStoreConfig}; use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::FullKey; use risingwave_hummock_sdk::key_range::KeyRange; @@ -47,7 +48,10 @@ use risingwave_storage::hummock::{ use risingwave_storage::monitor::{CompactorMetrics, StoreLocalStatistic}; pub fn mock_sstable_store() -> SstableStoreRef { - let store = InMemObjectStore::new().monitored(Arc::new(ObjectStoreMetrics::unused())); + let store = InMemObjectStore::new().monitored( + Arc::new(ObjectStoreMetrics::unused()), + ObjectStoreConfig::default(), + ); let store = Arc::new(ObjectStoreImpl::InMem(store)); let path = "test".to_string(); Arc::new(SstableStore::new( diff --git a/src/storage/benches/bench_multi_builder.rs b/src/storage/benches/bench_multi_builder.rs index 860567e2f5cf9..821dd7e32d0de 100644 --- a/src/storage/benches/bench_multi_builder.rs +++ b/src/storage/benches/bench_multi_builder.rs @@ -138,7 +138,7 @@ fn bench_builder( ObjectStoreConfig::default(), ) .await - .monitored(metrics) + .monitored(metrics, ObjectStoreConfig::default()) }); let object_store = Arc::new(ObjectStoreImpl::S3(object_store)); let sstable_store = Arc::new(SstableStore::new( diff --git a/src/storage/src/hummock/iterator/test_utils.rs b/src/storage/src/hummock/iterator/test_utils.rs index 584198c0f11c7..4581ae1469672 100644 --- a/src/storage/src/hummock/iterator/test_utils.rs +++ b/src/storage/src/hummock/iterator/test_utils.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use bytes::Bytes; use itertools::Itertools; use risingwave_common::catalog::TableId; +use risingwave_common::config::{MetricLevel, ObjectStoreConfig}; use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey}; use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, HummockSstableObjectId}; use risingwave_object_store::object::{ @@ -53,7 +54,10 @@ pub const TEST_KEYS_COUNT: usize = 10; pub fn mock_sstable_store() -> SstableStoreRef { mock_sstable_store_with_object_store(Arc::new(ObjectStoreImpl::InMem( - InMemObjectStore::new().monitored(Arc::new(ObjectStoreMetrics::unused())), + InMemObjectStore::new().monitored( + Arc::new(ObjectStoreMetrics::unused()), + ObjectStoreConfig::default(), + ), ))) }