Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: pass correct object store config to monitored object store for all backends #15260

Merged
merged 3 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 37 additions & 64 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,20 @@ pub trait ObjectStore: Send + Sync {
/// specified in the request is not found, it will be considered as successfully deleted.
async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()>;

fn monitored(self, metrics: Arc<ObjectStoreMetrics>) -> MonitoredObjectStore<Self>
fn monitored(
self,
metrics: Arc<ObjectStoreMetrics>,
config: ObjectStoreConfig,
) -> MonitoredObjectStore<Self>
where
Self: Sized,
{
MonitoredObjectStore::new(self, metrics)
MonitoredObjectStore::new(self, metrics, config)
}

async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter>;

fn store_media_type(&self) -> &'static str;

fn recv_buffer_size(&self) -> usize {
// 2MB
1 << 21
}

fn config(&self) -> Option<&ObjectStoreConfig> {
// TODO: remove option
None
}
}

pub enum ObjectStoreImpl {
Expand Down Expand Up @@ -280,16 +274,6 @@ impl ObjectStoreImpl {
ObjectStoreImpl::Sim(_) => true,
}
}

pub fn recv_buffer_size(&self) -> usize {
match self {
ObjectStoreImpl::InMem(store) => store.recv_buffer_size(),
ObjectStoreImpl::Opendal(store) => store.recv_buffer_size(),
ObjectStoreImpl::S3(store) => store.recv_buffer_size(),
#[cfg(madsim)]
ObjectStoreImpl::Sim(store) => store.recv_buffer_size(),
}
}
}

fn try_update_failure_metric<T>(
Expand Down Expand Up @@ -526,29 +510,22 @@ pub struct MonitoredObjectStore<OS: ObjectStore> {
/// - start `operation_latency` timer
/// - `failure-count`
impl<OS: ObjectStore> MonitoredObjectStore<OS> {
pub fn new(store: OS, object_store_metrics: Arc<ObjectStoreMetrics>) -> Self {
if let Some(config) = store.config() {
Self {
object_store_metrics,
streaming_read_timeout: Some(Duration::from_millis(
config.object_store_streaming_read_timeout_ms,
)),
streaming_upload_timeout: Some(Duration::from_millis(
config.object_store_streaming_upload_timeout_ms,
)),
read_timeout: Some(Duration::from_millis(config.object_store_read_timeout_ms)),
upload_timeout: Some(Duration::from_millis(config.object_store_upload_timeout_ms)),
inner: store,
}
} else {
Self {
inner: store,
object_store_metrics,
streaming_read_timeout: None,
streaming_upload_timeout: None,
read_timeout: None,
upload_timeout: None,
}
pub fn new(
store: OS,
object_store_metrics: Arc<ObjectStoreMetrics>,
config: ObjectStoreConfig,
) -> Self {
Self {
object_store_metrics,
streaming_read_timeout: Some(Duration::from_millis(
config.object_store_streaming_read_timeout_ms,
)),
streaming_upload_timeout: Some(Duration::from_millis(
config.object_store_streaming_upload_timeout_ms,
)),
read_timeout: Some(Duration::from_millis(config.object_store_read_timeout_ms)),
upload_timeout: Some(Duration::from_millis(config.object_store_upload_timeout_ms)),
inner: store,
}
}

Expand Down Expand Up @@ -800,10 +777,6 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
try_update_failure_metric(&self.object_store_metrics, &res, operation_type);
res
}

fn recv_buffer_size(&self) -> usize {
self.inner.recv_buffer_size()
}
}

/// Creates a new [`ObjectStore`] from the given `url`. Credentials are configured via environment
Expand All @@ -823,19 +796,19 @@ pub async fn build_remote_object_store(
let bucket = s3.strip_prefix("s3://").unwrap();

ObjectStoreImpl::Opendal(
OpendalObjectStore::new_s3_engine(bucket.to_string(), config)
OpendalObjectStore::new_s3_engine(bucket.to_string(), config.clone())
.unwrap()
.monitored(metrics),
.monitored(metrics, config),
)
} else {
ObjectStoreImpl::S3(
S3ObjectStore::new_with_config(
s3.strip_prefix("s3://").unwrap().to_string(),
metrics.clone(),
config,
config.clone(),
)
.await
.monitored(metrics),
.monitored(metrics, config),
)
}
}
Expand All @@ -855,7 +828,7 @@ pub async fn build_remote_object_store(
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_gcs_engine(bucket.to_string(), root.to_string())
.unwrap()
.monitored(metrics),
.monitored(metrics, config),
)
}
obs if obs.starts_with("obs://") => {
Expand All @@ -864,7 +837,7 @@ pub async fn build_remote_object_store(
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_obs_engine(bucket.to_string(), root.to_string())
.unwrap()
.monitored(metrics),
.monitored(metrics, config),
)
}

Expand All @@ -874,7 +847,7 @@ pub async fn build_remote_object_store(
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_oss_engine(bucket.to_string(), root.to_string())
.unwrap()
.monitored(metrics),
.monitored(metrics, config),
)
}
webhdfs if webhdfs.starts_with("webhdfs://") => {
Expand All @@ -883,7 +856,7 @@ pub async fn build_remote_object_store(
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_webhdfs_engine(namenode.to_string(), root.to_string())
.unwrap()
.monitored(metrics),
.monitored(metrics, config),
)
}
azblob if azblob.starts_with("azblob://") => {
Expand All @@ -892,15 +865,15 @@ pub async fn build_remote_object_store(
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_azblob_engine(container_name.to_string(), root.to_string())
.unwrap()
.monitored(metrics),
.monitored(metrics, config),
)
}
fs if fs.starts_with("fs://") => {
let fs = fs.strip_prefix("fs://").unwrap();
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_fs_engine(fs.to_string())
.unwrap()
.monitored(metrics),
.monitored(metrics, config),
)
}

Expand All @@ -911,29 +884,29 @@ pub async fn build_remote_object_store(
panic!("Passing s3-compatible is not supported, please modify the environment variable and pass in s3.");
}
minio if minio.starts_with("minio://") => ObjectStoreImpl::S3(
S3ObjectStore::with_minio(minio, metrics.clone(), config)
S3ObjectStore::with_minio(minio, metrics.clone(), config.clone())
.await
.monitored(metrics),
.monitored(metrics, config),
),
"memory" => {
if ident == "Meta Backup" {
tracing::warn!("You're using in-memory remote object store for {}. This is not recommended for production environment.", ident);
} else {
tracing::warn!("You're using in-memory remote object store for {}. This should never be used in benchmarks and production environment.", ident);
}
ObjectStoreImpl::InMem(InMemObjectStore::new().monitored(metrics))
ObjectStoreImpl::InMem(InMemObjectStore::new().monitored(metrics, config))
}
"memory-shared" => {
if ident == "Meta Backup" {
tracing::warn!("You're using shared in-memory remote object store for {}. This should never be used in production environment.", ident);
} else {
tracing::warn!("You're using shared in-memory remote object store for {}. This should never be used in benchmarks and production environment.", ident);
}
ObjectStoreImpl::InMem(InMemObjectStore::shared().monitored(metrics))
ObjectStoreImpl::InMem(InMemObjectStore::shared().monitored(metrics, config))
}
#[cfg(madsim)]
sim if sim.starts_with("sim://") => {
ObjectStoreImpl::Sim(SimObjectStore::new(url).monitored(metrics))
ObjectStoreImpl::Sim(SimObjectStore::new(url).monitored(metrics, config))
}
other => {
unimplemented!(
Expand Down
11 changes: 0 additions & 11 deletions src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,17 +532,6 @@ impl ObjectStore for S3ObjectStore {
fn store_media_type(&self) -> &'static str {
"s3"
}

fn recv_buffer_size(&self) -> usize {
self.config
.s3
.object_store_recv_buffer_size
.unwrap_or(1 << 21)
}

fn config(&self) -> Option<&ObjectStoreConfig> {
Some(&self.config)
}
}

impl S3ObjectStore {
Expand Down
2 changes: 2 additions & 0 deletions src/storage/backup/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashSet;
use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::config::ObjectStoreConfig;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::{
InMemObjectStore, MonitoredObjectStore, ObjectError, ObjectStoreImpl, ObjectStoreRef,
Expand Down Expand Up @@ -191,6 +192,7 @@ pub async fn unused() -> ObjectStoreMetaSnapshotStorage {
Arc::new(ObjectStoreImpl::InMem(MonitoredObjectStore::new(
InMemObjectStore::new(),
Arc::new(ObjectStoreMetrics::unused()),
ObjectStoreConfig::default(),
))),
)
.await
Expand Down
7 changes: 5 additions & 2 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use criterion::async_executor::FuturesExecutor;
use criterion::{criterion_group, criterion_main, Criterion};
use risingwave_common::cache::CachePriority;
use risingwave_common::catalog::TableId;
use risingwave_common::config::MetricLevel;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::key_range::KeyRange;
Expand Down Expand Up @@ -49,7 +49,10 @@ use risingwave_storage::monitor::{
};

pub fn mock_sstable_store() -> SstableStoreRef {
let store = InMemObjectStore::new().monitored(Arc::new(ObjectStoreMetrics::unused()));
let store = InMemObjectStore::new().monitored(
Arc::new(ObjectStoreMetrics::unused()),
ObjectStoreConfig::default(),
);
let store = Arc::new(ObjectStoreImpl::InMem(store));
let path = "test".to_string();
Arc::new(SstableStore::new(SstableStoreConfig {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/benches/bench_multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ fn bench_builder(
ObjectStoreConfig::default(),
)
.await
.monitored(metrics)
.monitored(metrics, ObjectStoreConfig::default())
});
let object_store = Arc::new(ObjectStoreImpl::S3(object_store));
let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
Expand Down
7 changes: 5 additions & 2 deletions src/storage/src/hummock/iterator/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;
use bytes::Bytes;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::config::MetricLevel;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, HummockSstableObjectId};
use risingwave_object_store::object::{
Expand Down Expand Up @@ -54,7 +54,10 @@ pub const TEST_KEYS_COUNT: usize = 10;

pub fn mock_sstable_store() -> SstableStoreRef {
mock_sstable_store_with_object_store(Arc::new(ObjectStoreImpl::InMem(
InMemObjectStore::new().monitored(Arc::new(ObjectStoreMetrics::unused())),
InMemObjectStore::new().monitored(
Arc::new(ObjectStoreMetrics::unused()),
ObjectStoreConfig::default(),
),
)))
}

Expand Down
Loading