Skip to content

Commit

Permalink
fix: pass correct object store config to monitored object store for a…
Browse files Browse the repository at this point in the history
…ll backends (#15260)
  • Loading branch information
hzxa21 authored and zwang28 committed Feb 26, 2024
1 parent 128b251 commit daef674
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 80 deletions.
123 changes: 57 additions & 66 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,26 +112,20 @@ pub trait ObjectStore: Send + Sync {
/// specified in the request is not found, it will be considered as successfully deleted.
async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()>;

fn monitored(self, metrics: Arc<ObjectStoreMetrics>) -> MonitoredObjectStore<Self>
fn monitored(
self,
metrics: Arc<ObjectStoreMetrics>,
config: ObjectStoreConfig,
) -> MonitoredObjectStore<Self>
where
Self: Sized,
{
MonitoredObjectStore::new(self, metrics)
MonitoredObjectStore::new(self, metrics, config)
}

async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter>;

fn store_media_type(&self) -> &'static str;

fn recv_buffer_size(&self) -> usize {
// 2MB
1 << 21
}

fn config(&self) -> Option<&ObjectStoreConfig> {
// TODO: remove option
None
}
}

pub enum ObjectStoreImpl {
Expand Down Expand Up @@ -258,14 +252,6 @@ impl ObjectStoreImpl {
ObjectStoreImpl::S3(_) => true,
}
}

pub fn recv_buffer_size(&self) -> usize {
match self {
ObjectStoreImpl::InMem(store) => store.recv_buffer_size(),
ObjectStoreImpl::Opendal(store) => store.recv_buffer_size(),
ObjectStoreImpl::S3(store) => store.recv_buffer_size(),
}
}
}

fn try_update_failure_metric<T>(
Expand Down Expand Up @@ -502,29 +488,22 @@ pub struct MonitoredObjectStore<OS: ObjectStore> {
/// - start `operation_latency` timer
/// - `failure-count`
impl<OS: ObjectStore> MonitoredObjectStore<OS> {
pub fn new(store: OS, object_store_metrics: Arc<ObjectStoreMetrics>) -> Self {
if let Some(config) = store.config() {
Self {
object_store_metrics,
streaming_read_timeout: Some(Duration::from_millis(
config.object_store_streaming_read_timeout_ms,
)),
streaming_upload_timeout: Some(Duration::from_millis(
config.object_store_streaming_upload_timeout_ms,
)),
read_timeout: Some(Duration::from_millis(config.object_store_read_timeout_ms)),
upload_timeout: Some(Duration::from_millis(config.object_store_upload_timeout_ms)),
inner: store,
}
} else {
Self {
inner: store,
object_store_metrics,
streaming_read_timeout: None,
streaming_upload_timeout: None,
read_timeout: None,
upload_timeout: None,
}
pub fn new(
store: OS,
object_store_metrics: Arc<ObjectStoreMetrics>,
config: ObjectStoreConfig,
) -> Self {
Self {
object_store_metrics,
streaming_read_timeout: Some(Duration::from_millis(
config.object_store_streaming_read_timeout_ms,
)),
streaming_upload_timeout: Some(Duration::from_millis(
config.object_store_streaming_upload_timeout_ms,
)),
read_timeout: Some(Duration::from_millis(config.object_store_read_timeout_ms)),
upload_timeout: Some(Duration::from_millis(config.object_store_upload_timeout_ms)),
inner: store,
}
}

Expand Down Expand Up @@ -776,10 +755,6 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
try_update_failure_metric(&self.object_store_metrics, &res, operation_type);
res
}

fn recv_buffer_size(&self) -> usize {
self.inner.recv_buffer_size()
}
}

/// Creates a new [`ObjectStore`] from the given `url`. Credentials are configured via environment
Expand All @@ -794,15 +769,27 @@ pub async fn build_remote_object_store(
config: ObjectStoreConfig,
) -> ObjectStoreImpl {
match url {
s3 if s3.starts_with("s3://") => ObjectStoreImpl::S3(
S3ObjectStore::new_with_config(
s3.strip_prefix("s3://").unwrap().to_string(),
metrics.clone(),
config,
)
.await
.monitored(metrics),
),
s3 if s3.starts_with("s3://") => {
if std::env::var("RW_USE_OPENDAL_FOR_S3").is_ok() {
let bucket = s3.strip_prefix("s3://").unwrap();

ObjectStoreImpl::Opendal(
OpendalObjectStore::new_s3_engine(bucket.to_string(), config.clone())
.unwrap()
.monitored(metrics, config),
)
} else {
ObjectStoreImpl::S3(
S3ObjectStore::new_with_config(
s3.strip_prefix("s3://").unwrap().to_string(),
metrics.clone(),
config.clone(),
)
.await
.monitored(metrics, config),
)
}
}
#[cfg(feature = "hdfs-backend")]
hdfs if hdfs.starts_with("hdfs://") => {
let hdfs = hdfs.strip_prefix("hdfs://").unwrap();
Expand All @@ -819,7 +806,7 @@ pub async fn build_remote_object_store(
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_gcs_engine(bucket.to_string(), root.to_string())
.unwrap()
.monitored(metrics),
.monitored(metrics, config),
)
}
obs if obs.starts_with("obs://") => {
Expand All @@ -828,7 +815,7 @@ pub async fn build_remote_object_store(
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_obs_engine(bucket.to_string(), root.to_string())
.unwrap()
.monitored(metrics),
.monitored(metrics, config),
)
}

Expand All @@ -838,7 +825,7 @@ pub async fn build_remote_object_store(
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_oss_engine(bucket.to_string(), root.to_string())
.unwrap()
.monitored(metrics),
.monitored(metrics, config),
)
}
webhdfs if webhdfs.starts_with("webhdfs://") => {
Expand All @@ -847,7 +834,7 @@ pub async fn build_remote_object_store(
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_webhdfs_engine(namenode.to_string(), root.to_string())
.unwrap()
.monitored(metrics),
.monitored(metrics, config),
)
}
azblob if azblob.starts_with("azblob://") => {
Expand All @@ -856,15 +843,15 @@ pub async fn build_remote_object_store(
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_azblob_engine(container_name.to_string(), root.to_string())
.unwrap()
.monitored(metrics),
.monitored(metrics, config),
)
}
fs if fs.starts_with("fs://") => {
let fs = fs.strip_prefix("fs://").unwrap();
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_fs_engine(fs.to_string())
.unwrap()
.monitored(metrics),
.monitored(metrics, config),
)
}

Expand All @@ -875,25 +862,29 @@ pub async fn build_remote_object_store(
panic!("Passing s3-compatible is not supported, please modify the environment variable and pass in s3.");
}
minio if minio.starts_with("minio://") => ObjectStoreImpl::S3(
S3ObjectStore::with_minio(minio, metrics.clone())
S3ObjectStore::with_minio(minio, metrics.clone(), config.clone())
.await
.monitored(metrics),
.monitored(metrics, config),
),
"memory" => {
if ident == "Meta Backup" {
tracing::warn!("You're using in-memory remote object store for {}. This is not recommended for production environment.", ident);
} else {
tracing::warn!("You're using in-memory remote object store for {}. This should never be used in benchmarks and production environment.", ident);
}
ObjectStoreImpl::InMem(InMemObjectStore::new().monitored(metrics))
ObjectStoreImpl::InMem(InMemObjectStore::new().monitored(metrics, config))
}
"memory-shared" => {
if ident == "Meta Backup" {
tracing::warn!("You're using shared in-memory remote object store for {}. This should never be used in production environment.", ident);
} else {
tracing::warn!("You're using shared in-memory remote object store for {}. This should never be used in benchmarks and production environment.", ident);
}
ObjectStoreImpl::InMem(InMemObjectStore::shared().monitored(metrics))
ObjectStoreImpl::InMem(InMemObjectStore::shared().monitored(metrics, config))
}
#[cfg(madsim)]
sim if sim.starts_with("sim://") => {
ObjectStoreImpl::Sim(SimObjectStore::new(url).monitored(metrics, config))
}
other => {
unimplemented!(
Expand Down
11 changes: 0 additions & 11 deletions src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,17 +530,6 @@ impl ObjectStore for S3ObjectStore {
fn store_media_type(&self) -> &'static str {
"s3"
}

fn recv_buffer_size(&self) -> usize {
self.config
.s3
.object_store_recv_buffer_size
.unwrap_or(1 << 21)
}

fn config(&self) -> Option<&ObjectStoreConfig> {
Some(&self.config)
}
}

impl S3ObjectStore {
Expand Down
2 changes: 2 additions & 0 deletions src/storage/backup/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashSet;
use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::config::ObjectStoreConfig;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::{
InMemObjectStore, MonitoredObjectStore, ObjectError, ObjectStoreImpl, ObjectStoreRef,
Expand Down Expand Up @@ -191,6 +192,7 @@ pub async fn unused() -> ObjectStoreMetaSnapshotStorage {
Arc::new(ObjectStoreImpl::InMem(MonitoredObjectStore::new(
InMemObjectStore::new(),
Arc::new(ObjectStoreMetrics::unused()),
ObjectStoreConfig::default(),
))),
)
.await
Expand Down
6 changes: 5 additions & 1 deletion src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use criterion::async_executor::FuturesExecutor;
use criterion::{criterion_group, criterion_main, Criterion};
use risingwave_common::cache::CachePriority;
use risingwave_common::catalog::TableId;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::key_range::KeyRange;
Expand Down Expand Up @@ -47,7 +48,10 @@ use risingwave_storage::hummock::{
use risingwave_storage::monitor::{CompactorMetrics, StoreLocalStatistic};

pub fn mock_sstable_store() -> SstableStoreRef {
let store = InMemObjectStore::new().monitored(Arc::new(ObjectStoreMetrics::unused()));
let store = InMemObjectStore::new().monitored(
Arc::new(ObjectStoreMetrics::unused()),
ObjectStoreConfig::default(),
);
let store = Arc::new(ObjectStoreImpl::InMem(store));
let path = "test".to_string();
Arc::new(SstableStore::new(
Expand Down
2 changes: 1 addition & 1 deletion src/storage/benches/bench_multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ fn bench_builder(
ObjectStoreConfig::default(),
)
.await
.monitored(metrics)
.monitored(metrics, ObjectStoreConfig::default())
});
let object_store = Arc::new(ObjectStoreImpl::S3(object_store));
let sstable_store = Arc::new(SstableStore::new(
Expand Down
6 changes: 5 additions & 1 deletion src/storage/src/hummock/iterator/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use bytes::Bytes;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, HummockSstableObjectId};
use risingwave_object_store::object::{
Expand Down Expand Up @@ -53,7 +54,10 @@ pub const TEST_KEYS_COUNT: usize = 10;

pub fn mock_sstable_store() -> SstableStoreRef {
mock_sstable_store_with_object_store(Arc::new(ObjectStoreImpl::InMem(
InMemObjectStore::new().monitored(Arc::new(ObjectStoreMetrics::unused())),
InMemObjectStore::new().monitored(
Arc::new(ObjectStoreMetrics::unused()),
ObjectStoreConfig::default(),
),
)))
}

Expand Down

0 comments on commit daef674

Please sign in to comment.