diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index a3fde993a1f6..8ec0568c1bd5 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -361,6 +361,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { .developer .enable_check_task_level_overlap, enable_dropped_column_reclaim: config.meta.enable_dropped_column_reclaim, + object_store_config: config.storage.object_store, }, config.system.into_init_system_params(), ) diff --git a/src/meta/src/backup_restore/backup_manager.rs b/src/meta/src/backup_restore/backup_manager.rs index d6e90f0b5c7d..bcf23a71af23 100644 --- a/src/meta/src/backup_restore/backup_manager.rs +++ b/src/meta/src/backup_restore/backup_manager.rs @@ -88,8 +88,12 @@ impl BackupManager { store_dir: &str, ) -> MetaResult> { let store_config = (store_url.to_string(), store_dir.to_string()); - let store = - create_snapshot_store(&store_config, metrics.object_store_metric.clone()).await?; + let store = create_snapshot_store( + &store_config, + metrics.object_store_metric.clone(), + &env.opts.object_store_config, + ) + .await?; tracing::info!( "backup manager initialized: url={}, dir={}", store_config.0, @@ -161,8 +165,12 @@ impl BackupManager { } pub async fn set_store(&self, config: StoreConfig) -> MetaResult<()> { - let new_store = - create_snapshot_store(&config, self.meta_metrics.object_store_metric.clone()).await?; + let new_store = create_snapshot_store( + &config, + self.meta_metrics.object_store_metric.clone(), + &self.env.opts.object_store_config, + ) + .await?; tracing::info!( "new backup config is applied: url={}, dir={}", config.0, @@ -368,13 +376,14 @@ impl BackupWorker { async fn create_snapshot_store( config: &StoreConfig, metric: Arc, + object_store_config: &ObjectStoreConfig, ) -> MetaResult { let object_store = Arc::new( build_remote_object_store( &config.0, metric, "Meta Backup", - ObjectStoreConfig::default(), + object_store_config.clone(), ) .await, ); diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index a8843b1f24b8..28b44d8c70b2 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -33,7 +33,6 @@ use rand::prelude::SliceRandom; use rand::thread_rng; use risingwave_common::catalog::TableId; use risingwave_common::config::default::compaction_config; -use risingwave_common::config::ObjectStoreConfig; use risingwave_common::monitor::rwlock::MonitoredRwLock; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; @@ -365,7 +364,7 @@ impl HummockManager { let state_store_url = sys_params.state_store(); let state_store_dir: &str = sys_params.data_directory(); let deterministic_mode = env.opts.compaction_deterministic_test; - let mut object_store_config = ObjectStoreConfig::default(); + let mut object_store_config = env.opts.object_store_config.clone(); // For fs and hdfs object store, operations are not always atomic. // We should manually enable atomicity guarantee by setting the atomic_write_dir config when building services. object_store_config.set_atomic_write_dir(); diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 8f8adf59947e..506866e8d3c8 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -15,7 +15,7 @@ use std::ops::Deref; use std::sync::Arc; -use risingwave_common::config::{CompactionConfig, DefaultParallelism}; +use risingwave_common::config::{CompactionConfig, DefaultParallelism, ObjectStoreConfig}; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_meta_model_v2::prelude::Cluster; use risingwave_pb::meta::SystemParams; @@ -212,6 +212,7 @@ pub struct MetaOpts { /// l0 multi level picker whether to check the overlap accuracy between sub levels pub enable_check_task_level_overlap: bool, pub enable_dropped_column_reclaim: bool, + pub object_store_config: ObjectStoreConfig, } impl MetaOpts { @@ -267,6 +268,7 @@ impl MetaOpts { enable_trivial_move: true, enable_check_task_level_overlap: true, enable_dropped_column_reclaim: false, + object_store_config: ObjectStoreConfig::default(), } } } diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index c75447323c30..a8fe659550ae 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -792,6 +792,7 @@ pub async fn build_remote_object_store( ident: &str, config: ObjectStoreConfig, ) -> ObjectStoreImpl { + tracing::debug!(config=?config, "object store {ident}"); match url { s3 if s3.starts_with("s3://") => { if config.s3.developer.use_opendal { diff --git a/src/storage/src/hummock/backup_reader.rs b/src/storage/src/hummock/backup_reader.rs index 0b99970f26e7..cd0a231420c1 100644 --- a/src/storage/src/hummock/backup_reader.rs +++ b/src/storage/src/hummock/backup_reader.rs @@ -45,13 +45,14 @@ type VersionHolder = ( async fn create_snapshot_store( config: &StoreConfig, + object_store_config: &ObjectStoreConfig, ) -> StorageResult { let backup_object_store = Arc::new( build_remote_object_store( &config.0, Arc::new(ObjectStoreMetrics::unused()), "Meta Backup", - ObjectStoreConfig::default(), + object_store_config.clone(), ) .await, ); @@ -69,26 +70,38 @@ pub struct BackupReader { inflight_request: parking_lot::Mutex>, store: ArcSwap<(ObjectStoreMetaSnapshotStorage, StoreConfig)>, refresh_tx: tokio::sync::mpsc::UnboundedSender, + object_store_config: ObjectStoreConfig, } impl BackupReader { - pub async fn new(storage_url: &str, storage_directory: &str) -> StorageResult { + pub async fn new( + storage_url: &str, + storage_directory: &str, + object_store_config: &ObjectStoreConfig, + ) -> StorageResult { let config = (storage_url.to_string(), storage_directory.to_string()); - let store = create_snapshot_store(&config).await?; + let store = create_snapshot_store(&config, object_store_config).await?; tracing::info!( "backup reader is initialized: url={}, dir={}", config.0, config.1 ); - Ok(Self::with_store((store, config))) + Ok(Self::with_store( + (store, config), + object_store_config.clone(), + )) } - fn with_store(store: (ObjectStoreMetaSnapshotStorage, StoreConfig)) -> BackupReaderRef { + fn with_store( + store: (ObjectStoreMetaSnapshotStorage, StoreConfig), + object_store_config: ObjectStoreConfig, + ) -> BackupReaderRef { let (refresh_tx, refresh_rx) = tokio::sync::mpsc::unbounded_channel(); let instance = Arc::new(Self { store: ArcSwap::from_pointee(store), versions: Default::default(), inflight_request: Default::default(), + object_store_config, refresh_tx, }); tokio::spawn(Self::start_manifest_refresher(instance.clone(), refresh_rx)); @@ -96,14 +109,17 @@ impl BackupReader { } pub async fn unused() -> BackupReaderRef { - Self::with_store(( - risingwave_backup::storage::unused().await, - StoreConfig::default(), - )) + Self::with_store( + ( + risingwave_backup::storage::unused().await, + StoreConfig::default(), + ), + ObjectStoreConfig::default(), + ) } async fn set_store(&self, config: StoreConfig) -> StorageResult<()> { - let new_store = create_snapshot_store(&config).await?; + let new_store = create_snapshot_store(&config, &self.object_store_config).await?; tracing::info!( "backup reader is updated: url={}, dir={}", config.0, diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 29b95c314ef5..fd392a3e023c 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -150,6 +150,7 @@ impl HummockStorage { let backup_reader = BackupReader::new( &options.backup_storage_url, &options.backup_storage_directory, + &options.object_store_config, ) .await .map_err(HummockError::read_backup_error)?;