diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index c0bf8100a0dac..d472430e34d7d 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -253,10 +253,6 @@ mod test { do_not_config_object_storage_lifecycle: None, backup_storage_url: None, backup_storage_directory: None, - object_store_streaming_read_timeout_ms: None, - object_store_streaming_upload_timeout_ms: None, - object_store_upload_timeout_ms: None, - object_store_read_timeout_ms: None, heap_profiling_dir: None, }, ), @@ -278,10 +274,6 @@ mod test { meta_file_cache_dir: None, async_stack_trace: None, heap_profiling_dir: None, - object_store_streaming_read_timeout_ms: None, - object_store_streaming_upload_timeout_ms: None, - object_store_upload_timeout_ms: None, - object_store_read_timeout_ms: None, }, ), frontend_opts: Some( diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 625164c7c87fa..2aec8200dfdcb 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -568,30 +568,6 @@ pub struct StorageConfig { #[serde(default = "default::storage::max_version_pinning_duration_sec")] pub max_version_pinning_duration_sec: u64, - #[serde(default = "default::storage::object_store_streaming_read_timeout_ms")] - pub object_store_streaming_read_timeout_ms: u64, - #[serde(default = "default::storage::object_store_streaming_upload_timeout_ms")] - pub object_store_streaming_upload_timeout_ms: u64, - #[serde(default = "default::storage::object_store_upload_timeout_ms")] - pub object_store_upload_timeout_ms: u64, - #[serde(default = "default::storage::object_store_read_timeout_ms")] - pub object_store_read_timeout_ms: u64, - - #[serde(default = "default::s3_objstore_config::object_store_keepalive_ms")] - pub object_store_keepalive_ms: Option, - #[serde(default = "default::s3_objstore_config::object_store_recv_buffer_size")] - pub object_store_recv_buffer_size: Option, - #[serde(default = "default::s3_objstore_config::object_store_send_buffer_size")] - pub object_store_send_buffer_size: Option, - #[serde(default = "default::s3_objstore_config::object_store_nodelay")] - pub object_store_nodelay: Option, - #[serde(default = "default::s3_objstore_config::object_store_req_retry_interval_ms")] - pub object_store_req_retry_interval_ms: u64, - #[serde(default = "default::s3_objstore_config::object_store_req_retry_max_delay_ms")] - pub object_store_req_retry_max_delay_ms: u64, - #[serde(default = "default::s3_objstore_config::object_store_req_retry_max_attempts")] - pub object_store_req_retry_max_attempts: usize, - #[serde(default = "default::storage::compactor_max_sst_key_count")] pub compactor_max_sst_key_count: u64, #[serde(default = "default::storage::compact_iter_recreate_timeout_ms")] @@ -608,6 +584,9 @@ pub struct StorageConfig { /// The spill threshold for mem table. #[serde(default = "default::storage::mem_table_spill_threshold")] pub mem_table_spill_threshold: usize, + + #[serde(default)] + pub object_store: ObjectStoreConfig, } #[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)] @@ -869,6 +848,41 @@ pub struct SystemConfig { pub pause_on_next_bootstrap: Option, } +/// The subsections `[storage.object_store]`. +#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)] +pub struct ObjectStoreConfig { + #[serde(default = "default::object_store_config::object_store_streaming_read_timeout_ms")] + pub object_store_streaming_read_timeout_ms: u64, + #[serde(default = "default::object_store_config::object_store_streaming_upload_timeout_ms")] + pub object_store_streaming_upload_timeout_ms: u64, + #[serde(default = "default::object_store_config::object_store_upload_timeout_ms")] + pub object_store_upload_timeout_ms: u64, + #[serde(default = "default::object_store_config::object_store_read_timeout_ms")] + pub object_store_read_timeout_ms: u64, + + #[serde(default)] + pub s3: S3ObjectStoreConfig, +} + +/// The subsections `[storage.object_store.s3]`. +#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)] +pub struct S3ObjectStoreConfig { + #[serde(default = "default::object_store_config::s3::object_store_keepalive_ms")] + pub object_store_keepalive_ms: Option, + #[serde(default = "default::object_store_config::s3::object_store_recv_buffer_size")] + pub object_store_recv_buffer_size: Option, + #[serde(default = "default::object_store_config::s3::object_store_send_buffer_size")] + pub object_store_send_buffer_size: Option, + #[serde(default = "default::object_store_config::s3::object_store_nodelay")] + pub object_store_nodelay: Option, + #[serde(default = "default::object_store_config::s3::object_store_req_retry_interval_ms")] + pub object_store_req_retry_interval_ms: u64, + #[serde(default = "default::object_store_config::s3::object_store_req_retry_max_delay_ms")] + pub object_store_req_retry_max_delay_ms: u64, + #[serde(default = "default::object_store_config::s3::object_store_req_retry_max_attempts")] + pub object_store_req_retry_max_attempts: usize, +} + impl SystemConfig { #![allow(deprecated)] pub fn into_init_system_params(self) -> SystemParams { @@ -1097,22 +1111,6 @@ pub mod default { 3 * 3600 } - pub fn object_store_streaming_read_timeout_ms() -> u64 { - 10 * 60 * 1000 - } - - pub fn object_store_streaming_upload_timeout_ms() -> u64 { - 10 * 60 * 1000 - } - - pub fn object_store_upload_timeout_ms() -> u64 { - 60 * 60 * 1000 - } - - pub fn object_store_read_timeout_ms() -> u64 { - 60 * 60 * 1000 - } - pub fn compactor_max_sst_key_count() -> u64 { 2 * 1024 * 1024 // 200w } @@ -1387,40 +1385,58 @@ pub mod default { } } - pub mod s3_objstore_config { - /// Retry config for compute node http timeout error. - const DEFAULT_RETRY_INTERVAL_MS: u64 = 20; - const DEFAULT_RETRY_MAX_DELAY_MS: u64 = 10 * 1000; - const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 8; - - const DEFAULT_KEEPALIVE_MS: u64 = 600 * 1000; // 10min - - pub fn object_store_keepalive_ms() -> Option { - Some(DEFAULT_KEEPALIVE_MS) // 10min + pub mod object_store_config { + pub fn object_store_streaming_read_timeout_ms() -> u64 { + 10 * 60 * 1000 } - pub fn object_store_recv_buffer_size() -> Option { - Some(1 << 21) // 2m + pub fn object_store_streaming_upload_timeout_ms() -> u64 { + 10 * 60 * 1000 } - pub fn object_store_send_buffer_size() -> Option { - None + pub fn object_store_upload_timeout_ms() -> u64 { + 60 * 60 * 1000 } - pub fn object_store_nodelay() -> Option { - Some(true) + pub fn object_store_read_timeout_ms() -> u64 { + 60 * 60 * 1000 } - pub fn object_store_req_retry_interval_ms() -> u64 { - DEFAULT_RETRY_INTERVAL_MS - } + pub mod s3 { + /// Retry config for compute node http timeout error. + const DEFAULT_RETRY_INTERVAL_MS: u64 = 20; + const DEFAULT_RETRY_MAX_DELAY_MS: u64 = 10 * 1000; + const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 8; - pub fn object_store_req_retry_max_delay_ms() -> u64 { - DEFAULT_RETRY_MAX_DELAY_MS // 10s - } + const DEFAULT_KEEPALIVE_MS: u64 = 600 * 1000; // 10min + + pub fn object_store_keepalive_ms() -> Option { + Some(DEFAULT_KEEPALIVE_MS) // 10min + } + + pub fn object_store_recv_buffer_size() -> Option { + Some(1 << 21) // 2m + } - pub fn object_store_req_retry_max_attempts() -> usize { - DEFAULT_RETRY_MAX_ATTEMPTS + pub fn object_store_send_buffer_size() -> Option { + None + } + + pub fn object_store_nodelay() -> Option { + Some(true) + } + + pub fn object_store_req_retry_interval_ms() -> u64 { + DEFAULT_RETRY_INTERVAL_MS + } + + pub fn object_store_req_retry_max_delay_ms() -> u64 { + DEFAULT_RETRY_MAX_DELAY_MS // 10s + } + + pub fn object_store_req_retry_max_attempts() -> usize { + DEFAULT_RETRY_MAX_ATTEMPTS + } } } } diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index 96110b487536e..1c6ab2dc7b49b 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -133,19 +133,6 @@ pub struct ComputeNodeOpts { #[clap(long, env = "RW_HEAP_PROFILING_DIR")] #[override_opts(path = server.heap_profiling.dir)] pub heap_profiling_dir: Option, - - #[clap(long, env = "RW_OBJECT_STORE_STREAMING_READ_TIMEOUT_MS", value_enum)] - #[override_opts(path = storage.object_store_streaming_read_timeout_ms)] - pub object_store_streaming_read_timeout_ms: Option, - #[clap(long, env = "RW_OBJECT_STORE_STREAMING_UPLOAD_TIMEOUT_MS", value_enum)] - #[override_opts(path = storage.object_store_streaming_upload_timeout_ms)] - pub object_store_streaming_upload_timeout_ms: Option, - #[clap(long, env = "RW_OBJECT_STORE_UPLOAD_TIMEOUT_MS", value_enum)] - #[override_opts(path = storage.object_store_upload_timeout_ms)] - pub object_store_upload_timeout_ms: Option, - #[clap(long, env = "RW_OBJECT_STORE_READ_TIMEOUT_MS", value_enum)] - #[override_opts(path = storage.object_store_read_timeout_ms)] - pub object_store_read_timeout_ms: Option, } #[derive(Copy, Clone, Debug, Default, ValueEnum, Serialize, Deserialize)] diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 63cfc0b2519ce..e48e47583688a 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -98,8 +98,7 @@ pub async fn compute_node_serve( info!("> version: {} ({})", RW_VERSION, GIT_SHA); // Initialize all the configs - let stream_config: Arc = - Arc::new(config.streaming.clone()); + let stream_config = Arc::new(config.streaming.clone()); let batch_config = Arc::new(config.batch.clone()); // Register to the cluster. We're not ready to serve until activate is called. diff --git a/src/config/example.toml b/src/config/example.toml index 77fa839aebc0c..b6a2b487fa84e 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -102,16 +102,6 @@ max_sub_compaction = 4 max_concurrent_compaction_task_number = 16 max_preload_wait_time_mill = 0 max_version_pinning_duration_sec = 10800 -object_store_streaming_read_timeout_ms = 600000 -object_store_streaming_upload_timeout_ms = 600000 -object_store_upload_timeout_ms = 3600000 -object_store_read_timeout_ms = 3600000 -object_store_keepalive_ms = 600000 -object_store_recv_buffer_size = 2097152 -object_store_nodelay = true -object_store_req_retry_interval_ms = 20 -object_store_req_retry_max_delay_ms = 10000 -object_store_req_retry_max_attempts = 8 compactor_max_sst_key_count = 2097152 compact_iter_recreate_timeout_ms = 600000 compactor_max_sst_size = 536870912 @@ -162,6 +152,20 @@ threshold = 0.5 recent_filter_layers = 6 recent_filter_rotate_interval_ms = 10000 +[storage.object_store] +object_store_streaming_read_timeout_ms = 600000 +object_store_streaming_upload_timeout_ms = 600000 +object_store_upload_timeout_ms = 3600000 +object_store_read_timeout_ms = 3600000 + +[storage.object_store.s3] +object_store_keepalive_ms = 600000 +object_store_recv_buffer_size = 2097152 +object_store_nodelay = true +object_store_req_retry_interval_ms = 20 +object_store_req_retry_max_delay_ms = 10000 +object_store_req_retry_max_attempts = 8 + [system] barrier_interval_ms = 1000 checkpoint_frequency = 1 diff --git a/src/ctl/src/common/hummock_service.rs b/src/ctl/src/common/hummock_service.rs index 5105927ff0a13..bd5d1c165333c 100644 --- a/src/ctl/src/common/hummock_service.rs +++ b/src/ctl/src/common/hummock_service.rs @@ -17,7 +17,8 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, bail, Result}; -use risingwave_object_store::object::parse_remote_object_store; +use risingwave_common::config::ObjectStoreConfig; +use risingwave_object_store::object::build_remote_object_store; use risingwave_rpc_client::MetaClient; use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient; use risingwave_storage::hummock::{FileCache, HummockStorage, SstableStore}; @@ -66,16 +67,15 @@ impl HummockServiceOpts { } Err(_) => { const MESSAGE: &str = "env variable `RW_HUMMOCK_URL` not found. - -For `./risedev d` use cases, please do the following. -* start the cluster with shared storage: - - consider adding `use: minio` in the risedev config, - - or directly use `./risedev d for-ctl` to start the cluster. -* use `./risedev ctl` to use risectl. - -For `./risedev apply-compose-deploy` users, -* `RW_HUMMOCK_URL` will be printed out when deploying. Please copy the bash exports to your console. -"; + For `./risedev d` use cases, please do the following. + * start the cluster with shared storage: + - consider adding `use: minio` in the risedev config, + - or directly use `./risedev d for-ctl` to start the cluster. + * use `./risedev ctl` to use risectl. + + For `./risedev apply-compose-deploy` users, + * `RW_HUMMOCK_URL` will be printed out when deploying. Please copy the bash exports to your console. + "; bail!(MESSAGE); } }; @@ -152,10 +152,11 @@ For `./risedev apply-compose-deploy` users, } pub async fn create_sstable_store(&self) -> Result> { - let object_store = parse_remote_object_store( + let object_store = build_remote_object_store( self.hummock_url.strip_prefix("hummock+").unwrap(), Arc::new(ObjectStoreMetrics::unused()), "Hummock", + ObjectStoreConfig::default(), ) .await; diff --git a/src/jni_core/src/hummock_iterator.rs b/src/jni_core/src/hummock_iterator.rs index 10221f65c0aab..7cb869a881cbf 100644 --- a/src/jni_core/src/hummock_iterator.rs +++ b/src/jni_core/src/hummock_iterator.rs @@ -17,14 +17,15 @@ use std::sync::Arc; use bytes::Bytes; use futures::TryStreamExt; use risingwave_common::catalog::ColumnDesc; +use risingwave_common::config::ObjectStoreConfig; use risingwave_common::hash::VirtualNode; use risingwave_common::row::OwnedRow; use risingwave_common::util::select_all; use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde; use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer}; use risingwave_hummock_sdk::key::{map_table_key_range, prefixed_range, TableKeyRange}; +use risingwave_object_store::object::build_remote_object_store; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; -use risingwave_object_store::object::parse_remote_object_store; use risingwave_pb::java_binding::key_range::Bound; use risingwave_pb::java_binding::{KeyRange, ReadPlan}; use risingwave_storage::error::{StorageError, StorageResult}; @@ -54,10 +55,11 @@ impl HummockJavaBindingIterator { pub async fn new(read_plan: ReadPlan) -> StorageResult { // Note(bugen): should we forward the implementation to the `StorageTable`? let object_store = Arc::new( - parse_remote_object_store( + build_remote_object_store( &read_plan.object_store_url, Arc::new(ObjectStoreMetrics::unused()), "Hummock", + ObjectStoreConfig::default(), ) .await, ); diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index b94031776eb67..2f080db760194 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -158,19 +158,6 @@ pub struct MetaNodeOpts { #[override_opts(path = system.backup_storage_directory)] backup_storage_directory: Option, - #[clap(long, env = "RW_OBJECT_STORE_STREAMING_READ_TIMEOUT_MS", value_enum)] - #[override_opts(path = storage.object_store_streaming_read_timeout_ms)] - pub object_store_streaming_read_timeout_ms: Option, - #[clap(long, env = "RW_OBJECT_STORE_STREAMING_UPLOAD_TIMEOUT_MS", value_enum)] - #[override_opts(path = storage.object_store_streaming_upload_timeout_ms)] - pub object_store_streaming_upload_timeout_ms: Option, - #[clap(long, env = "RW_OBJECT_STORE_UPLOAD_TIMEOUT_MS", value_enum)] - #[override_opts(path = storage.object_store_upload_timeout_ms)] - pub object_store_upload_timeout_ms: Option, - #[clap(long, env = "RW_OBJECT_STORE_READ_TIMEOUT_MS", value_enum)] - #[override_opts(path = storage.object_store_read_timeout_ms)] - pub object_store_read_timeout_ms: Option, - /// Enable heap profile dump when memory usage is high. #[clap(long, env = "RW_HEAP_PROFILING_DIR")] #[override_opts(path = server.heap_profiling.dir)] diff --git a/src/meta/src/backup_restore/backup_manager.rs b/src/meta/src/backup_restore/backup_manager.rs index 2e957cca0a9ba..516c7f8f6fd1e 100644 --- a/src/meta/src/backup_restore/backup_manager.rs +++ b/src/meta/src/backup_restore/backup_manager.rs @@ -21,9 +21,10 @@ use risingwave_backup::error::BackupError; use risingwave_backup::storage::{MetaSnapshotStorage, ObjectStoreMetaSnapshotStorage}; use risingwave_backup::{MetaBackupJobId, MetaSnapshotId, MetaSnapshotManifest}; use risingwave_common::bail; +use risingwave_common::config::ObjectStoreConfig; use risingwave_hummock_sdk::HummockSstableObjectId; +use risingwave_object_store::object::build_remote_object_store; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; -use risingwave_object_store::object::parse_remote_object_store; use risingwave_pb::backup_service::{BackupJobStatus, MetaBackupManifestId}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use tokio::task::JoinHandle; @@ -366,7 +367,15 @@ async fn create_snapshot_store( config: &StoreConfig, metric: Arc, ) -> MetaResult { - let object_store = Arc::new(parse_remote_object_store(&config.0, metric, "Meta Backup").await); + let object_store = Arc::new( + build_remote_object_store( + &config.0, + metric, + "Meta Backup", + ObjectStoreConfig::default(), + ) + .await, + ); let store = ObjectStoreMetaSnapshotStorage::new(&config.1, object_store).await?; Ok(store) } diff --git a/src/meta/src/backup_restore/restore.rs b/src/meta/src/backup_restore/restore.rs index 5474b43654017..d5ae118c3ade0 100644 --- a/src/meta/src/backup_restore/restore.rs +++ b/src/meta/src/backup_restore/restore.rs @@ -18,10 +18,10 @@ use risingwave_backup::error::{BackupError, BackupResult}; use risingwave_backup::meta_snapshot::Metadata; use risingwave_backup::storage::{MetaSnapshotStorage, MetaSnapshotStorageRef}; use risingwave_backup::MetaSnapshotId; -use risingwave_common::config::MetaBackend; +use risingwave_common::config::{MetaBackend, ObjectStoreConfig}; use risingwave_hummock_sdk::version_checkpoint_path; +use risingwave_object_store::object::build_remote_object_store; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; -use risingwave_object_store::object::parse_remote_object_store; use risingwave_pb::hummock::{HummockVersion, HummockVersionCheckpoint}; use crate::backup_restore::restore_impl::v1::{LoaderV1, WriterModelV1ToMetaStoreV1}; @@ -74,10 +74,11 @@ async fn restore_hummock_version( hummock_version: &HummockVersion, ) -> BackupResult<()> { let object_store = Arc::new( - parse_remote_object_store( + build_remote_object_store( hummock_storage_url, Arc::new(ObjectStoreMetrics::unused()), "Version Checkpoint", + ObjectStoreConfig::default(), ) .await, ); diff --git a/src/meta/src/backup_restore/utils.rs b/src/meta/src/backup_restore/utils.rs index 83cd1095beb25..6b1549ba31389 100644 --- a/src/meta/src/backup_restore/utils.rs +++ b/src/meta/src/backup_restore/utils.rs @@ -18,9 +18,9 @@ use std::time::Duration; use etcd_client::ConnectOptions; use risingwave_backup::error::BackupResult; use risingwave_backup::storage::{MetaSnapshotStorageRef, ObjectStoreMetaSnapshotStorage}; -use risingwave_common::config::MetaBackend; +use risingwave_common::config::{MetaBackend, ObjectStoreConfig}; +use risingwave_object_store::object::build_remote_object_store; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; -use risingwave_object_store::object::parse_remote_object_store; use crate::backup_restore::RestoreOpts; use crate::controller::SqlMetaStore; @@ -82,10 +82,11 @@ pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult BackupResult { - let object_store = parse_remote_object_store( + let object_store = build_remote_object_store( &opts.backup_storage_url, Arc::new(ObjectStoreMetrics::unused()), "Meta Backup", + ObjectStoreConfig::default(), ) .await; let backup_store = diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index f8d45456d7182..fac5a27653e9e 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -29,6 +29,7 @@ use futures::{FutureExt, StreamExt}; use itertools::Itertools; use parking_lot::Mutex; use risingwave_common::config::default::compaction_config; +use risingwave_common::config::ObjectStoreConfig; use risingwave_common::monitor::rwlock::MonitoredRwLock; use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; use risingwave_common::util::{pending_on_none, select_all}; @@ -193,7 +194,7 @@ use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGro use risingwave_hummock_sdk::table_stats::{ add_prost_table_stats_map, purge_prost_table_stats, PbTableStatsMap, }; -use risingwave_object_store::object::{parse_remote_object_store, ObjectError, ObjectStoreRef}; +use risingwave_object_store::object::{build_remote_object_store, ObjectError, ObjectStoreRef}; use risingwave_pb::catalog::Table; use risingwave_pb::hummock::level_handler::RunningCompactTask; use risingwave_pb::hummock::version_update_payload::Payload; @@ -358,10 +359,11 @@ impl HummockManager { let state_store_dir: &str = sys_params.data_directory(); let deterministic_mode = env.opts.compaction_deterministic_test; let object_store = Arc::new( - parse_remote_object_store( + build_remote_object_store( state_store_url.strip_prefix("hummock+").unwrap_or("memory"), metrics.object_store_metric.clone(), "Version Checkpoint", + ObjectStoreConfig::default(), ) .await, ); diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index f205ee5001498..8507a75839cf2 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -24,7 +24,7 @@ use sea_orm::EntityTrait; use super::{SystemParamsManager, SystemParamsManagerRef}; use crate::controller::system_param::{SystemParamsController, SystemParamsControllerRef}; use crate::controller::SqlMetaStore; -use crate::manager::event_log::{start_event_log_manager, EventLogManger, EventLogMangerRef}; +use crate::manager::event_log::{start_event_log_manager, EventLogMangerRef}; use crate::manager::{ IdGeneratorManager, IdGeneratorManagerRef, IdleManager, IdleManagerRef, NotificationManager, NotificationManagerRef, @@ -386,6 +386,8 @@ impl MetaSrvEnv { } pub async fn for_test_opts(opts: Arc) -> Self { + use crate::manager::event_log::EventLogManger; + // change to sync after refactor `IdGeneratorManager::new` sync. let meta_store = MemStore::default().into_ref(); #[cfg(madsim)] diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 72a94ad626c12..f9f964a40c11a 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -29,6 +29,7 @@ pub mod s3; use await_tree::InstrumentAwait; use futures::stream::BoxStream; use futures::StreamExt; +use risingwave_common::config::ObjectStoreConfig; pub use s3::*; pub mod error; @@ -126,6 +127,11 @@ pub trait ObjectStore: Send + Sync { // 2MB 1 << 21 } + + fn config(&self) -> Option<&ObjectStoreConfig> { + // TODO: remove option + None + } } pub enum ObjectStoreImpl { @@ -253,41 +259,6 @@ impl ObjectStoreImpl { } } - pub fn set_opts( - &mut self, - streaming_read_timeout_ms: u64, - streaming_upload_timeout_ms: u64, - read_timeout_ms: u64, - upload_timeout_ms: u64, - ) { - match self { - ObjectStoreImpl::InMem(s) => { - s.set_opts( - streaming_read_timeout_ms, - streaming_upload_timeout_ms, - read_timeout_ms, - upload_timeout_ms, - ); - } - ObjectStoreImpl::Opendal(s) => { - s.set_opts( - streaming_read_timeout_ms, - streaming_upload_timeout_ms, - read_timeout_ms, - upload_timeout_ms, - ); - } - ObjectStoreImpl::S3(s) => { - s.set_opts( - streaming_read_timeout_ms, - streaming_upload_timeout_ms, - read_timeout_ms, - upload_timeout_ms, - ); - } - } - } - pub fn recv_buffer_size(&self) -> usize { match self { ObjectStoreImpl::InMem(store) => store.recv_buffer_size(), @@ -531,13 +502,28 @@ pub struct MonitoredObjectStore { /// - `failure-count` impl MonitoredObjectStore { pub fn new(store: OS, object_store_metrics: Arc) -> Self { - Self { - inner: store, - object_store_metrics, - streaming_read_timeout: None, - streaming_upload_timeout: None, - read_timeout: None, - upload_timeout: None, + if let Some(config) = store.config() { + Self { + object_store_metrics, + streaming_read_timeout: Some(Duration::from_millis( + config.object_store_streaming_read_timeout_ms, + )), + streaming_upload_timeout: Some(Duration::from_millis( + config.object_store_streaming_upload_timeout_ms, + )), + read_timeout: Some(Duration::from_millis(config.object_store_read_timeout_ms)), + upload_timeout: Some(Duration::from_millis(config.object_store_upload_timeout_ms)), + inner: store, + } + } else { + Self { + inner: store, + object_store_metrics, + streaming_read_timeout: None, + streaming_upload_timeout: None, + read_timeout: None, + upload_timeout: None, + } } } @@ -549,6 +535,10 @@ impl MonitoredObjectStore { &self.inner } + pub fn mut_inner(&mut self) -> &mut OS { + &mut self.inner + } + pub async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> { let operation_type = "upload"; self.object_store_metrics @@ -780,34 +770,23 @@ impl MonitoredObjectStore { res } - fn set_opts( - &mut self, - streaming_read_timeout_ms: u64, - streaming_upload_timeout_ms: u64, - read_timeout_ms: u64, - upload_timeout_ms: u64, - ) { - self.streaming_read_timeout = Some(Duration::from_millis(streaming_read_timeout_ms)); - self.streaming_upload_timeout = Some(Duration::from_millis(streaming_upload_timeout_ms)); - self.read_timeout = Some(Duration::from_millis(read_timeout_ms)); - self.upload_timeout = Some(Duration::from_millis(upload_timeout_ms)); - } - fn recv_buffer_size(&self) -> usize { self.inner.recv_buffer_size() } } -pub async fn parse_remote_object_store( +pub async fn build_remote_object_store( url: &str, metrics: Arc, ident: &str, + config: ObjectStoreConfig, ) -> ObjectStoreImpl { match url { s3 if s3.starts_with("s3://") => ObjectStoreImpl::S3( - S3ObjectStore::new( + S3ObjectStore::new_with_config( s3.strip_prefix("s3://").unwrap().to_string(), metrics.clone(), + config, ) .await .monitored(metrics), diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 3cca6cd2aef9c..f2158ca3f1cf4 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -43,7 +43,7 @@ use futures::future::{try_join_all, BoxFuture, FutureExt}; use futures::{stream, Stream, StreamExt, TryStreamExt}; use hyper::{Body, Response}; use itertools::Itertools; -use risingwave_common::config::default::s3_objstore_config; +use risingwave_common::config::ObjectStoreConfig; use risingwave_common::monitor::connection::monitor_connector; use risingwave_common::range::RangeBoundsExt; use tokio::task::JoinHandle; @@ -309,7 +309,7 @@ pub struct S3ObjectStore { /// For S3 specific metrics. metrics: Arc, - config: S3ObjectStoreConfig, + config: ObjectStoreConfig, } #[async_trait::async_trait] @@ -358,7 +358,7 @@ impl ObjectStore for S3ObjectStore { // retry if occurs AWS EC2 HTTP timeout error. let val = tokio_retry::RetryIf::spawn( - self.config.get_retry_strategy(), + self.get_retry_strategy(), || async { match self.obj_store_request(path, range.clone()).send().await { Ok(resp) => { @@ -438,7 +438,7 @@ impl ObjectStore for S3ObjectStore { // retry if occurs AWS EC2 HTTP timeout error. let resp = tokio_retry::RetryIf::spawn( - self.config.get_retry_strategy(), + self.get_retry_strategy(), || async { match self.obj_store_request(path, range.clone()).send().await { Ok(resp) => Ok(resp), @@ -532,35 +532,35 @@ impl ObjectStore for S3ObjectStore { } fn recv_buffer_size(&self) -> usize { - self.config.recv_buffer_size.unwrap_or(1 << 21) + self.config + .s3 + .object_store_recv_buffer_size + .unwrap_or(1 << 21) } -} -impl S3ObjectStore { - /// Creates an S3 object store from environment variable. - /// - /// See [AWS Docs](https://docs.aws.amazon.com/sdk-for-rust/latest/dg/credentials.html) on how to provide credentials and region from env variable. If you are running compute-node on EC2, no configuration is required. - pub async fn new(bucket: String, metrics: Arc) -> Self { - Self::new_with_config(bucket, metrics, S3ObjectStoreConfig::default()).await + fn config(&self) -> Option<&ObjectStoreConfig> { + Some(&self.config) } +} - pub fn new_http_client(config: &S3ObjectStoreConfig) -> impl HttpClient { +impl S3ObjectStore { + pub fn new_http_client(config: &ObjectStoreConfig) -> impl HttpClient { let mut http = hyper::client::HttpConnector::new(); // connection config - if let Some(keepalive_ms) = config.keepalive_ms.as_ref() { + if let Some(keepalive_ms) = config.s3.object_store_keepalive_ms.as_ref() { http.set_keepalive(Some(Duration::from_millis(*keepalive_ms))); } - if let Some(nodelay) = config.nodelay.as_ref() { + if let Some(nodelay) = config.s3.object_store_nodelay.as_ref() { http.set_nodelay(*nodelay); } - if let Some(recv_buffer_size) = config.recv_buffer_size.as_ref() { + if let Some(recv_buffer_size) = config.s3.object_store_recv_buffer_size.as_ref() { http.set_recv_buffer_size(Some(*recv_buffer_size)); } - if let Some(send_buffer_size) = config.send_buffer_size.as_ref() { + if let Some(send_buffer_size) = config.s3.object_store_send_buffer_size.as_ref() { http.set_send_buffer_size(Some(*send_buffer_size)); } @@ -577,10 +577,13 @@ impl S3ObjectStore { HyperClientBuilder::new().build(conn) } + /// Creates an S3 object store from environment variable. + /// + /// See [AWS Docs](https://docs.aws.amazon.com/sdk-for-rust/latest/dg/credentials.html) on how to provide credentials and region from env variable. If you are running compute-node on EC2, no configuration is required. pub async fn new_with_config( bucket: String, metrics: Arc, - config: S3ObjectStoreConfig, + config: ObjectStoreConfig, ) -> Self { let sdk_config_loader = aws_config::from_env() .retry_config(RetryConfig::standard().with_max_attempts(4)) @@ -595,7 +598,7 @@ impl S3ObjectStore { Err(_) => false, }; - let sdk_config: aws_config::SdkConfig = sdk_config_loader.load().await; + let sdk_config = sdk_config_loader.load().await; #[cfg(madsim)] let client = Client::new(&sdk_config); #[cfg(not(madsim))] @@ -640,13 +643,14 @@ impl S3ObjectStore { }; let (address, bucket) = rest.split_once('/').unwrap(); + let s3_object_store_config = ObjectStoreConfig::default(); #[cfg(madsim)] let builder = aws_sdk_s3::config::Builder::new(); #[cfg(not(madsim))] - let builder = + let builder: aws_sdk_s3::config::Builder = aws_sdk_s3::config::Builder::from(&aws_config::ConfigLoader::default().load().await) .force_path_style(true) - .http_client(Self::new_http_client(&S3ObjectStoreConfig::default())); + .http_client(Self::new_http_client(&s3_object_store_config)); let config = builder .region(Region::new("custom")) .endpoint_url(format!("{}{}", endpoint_prefix, address)) @@ -663,7 +667,7 @@ impl S3ObjectStore { bucket: bucket.to_string(), part_size: MINIO_PART_SIZE, metrics, - config: S3ObjectStoreConfig::default(), + config: s3_object_store_config, } } @@ -824,52 +828,17 @@ impl S3ObjectStore { false } -} - -pub struct S3ObjectStoreConfig { - pub keepalive_ms: Option, - pub recv_buffer_size: Option, - pub send_buffer_size: Option, - pub nodelay: Option, - pub req_retry_interval_ms: Option, - pub req_retry_max_delay_ms: Option, - pub req_retry_max_attempts: Option, -} - -impl Default for S3ObjectStoreConfig { - fn default() -> Self { - Self { - keepalive_ms: s3_objstore_config::object_store_keepalive_ms(), - recv_buffer_size: s3_objstore_config::object_store_recv_buffer_size(), - send_buffer_size: s3_objstore_config::object_store_send_buffer_size(), - nodelay: s3_objstore_config::object_store_nodelay(), - req_retry_interval_ms: Some(s3_objstore_config::object_store_req_retry_interval_ms()), - req_retry_max_delay_ms: Some(s3_objstore_config::object_store_req_retry_max_delay_ms()), - req_retry_max_attempts: Some(s3_objstore_config::object_store_req_retry_max_attempts()), - } - } -} - -impl S3ObjectStoreConfig { #[inline(always)] fn get_retry_strategy(&self) -> impl Iterator { - ExponentialBackoff::from_millis( - self.req_retry_interval_ms - .unwrap_or(s3_objstore_config::object_store_req_retry_interval_ms()), - ) - .max_delay(Duration::from_millis( - self.req_retry_max_delay_ms - .unwrap_or(s3_objstore_config::object_store_req_retry_max_delay_ms()), - )) - .take( - self.req_retry_max_attempts - .unwrap_or(s3_objstore_config::object_store_req_retry_max_attempts()), - ) - .map(jitter) + ExponentialBackoff::from_millis(self.config.s3.object_store_req_retry_interval_ms) + .max_delay(Duration::from_millis( + self.config.s3.object_store_req_retry_max_delay_ms, + )) + .take(self.config.s3.object_store_req_retry_max_attempts) + .map(jitter) } } - struct S3ObjectIter { buffer: VecDeque, client: Client, diff --git a/src/storage/benches/bench_multi_builder.rs b/src/storage/benches/bench_multi_builder.rs index e84072d88207d..e6cd0c687216b 100644 --- a/src/storage/benches/bench_multi_builder.rs +++ b/src/storage/benches/bench_multi_builder.rs @@ -23,6 +23,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; use futures::future::try_join_all; use itertools::Itertools; use risingwave_common::catalog::TableId; +use risingwave_common::config::ObjectStoreConfig; use risingwave_hummock_sdk::key::{FullKey, UserKey}; use risingwave_object_store::object::{ObjectStore, ObjectStoreImpl, S3ObjectStore}; use risingwave_storage::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory}; @@ -131,9 +132,13 @@ fn bench_builder( let metrics = Arc::new(ObjectStoreMetrics::unused()); let object_store = runtime.block_on(async { - S3ObjectStore::new(bucket.to_string(), metrics.clone()) - .await - .monitored(metrics) + S3ObjectStore::new_with_config( + bucket.to_string(), + metrics.clone(), + ObjectStoreConfig::default(), + ) + .await + .monitored(metrics) }); let object_store = Arc::new(ObjectStoreImpl::S3(object_store)); let sstable_store = Arc::new(SstableStore::new( diff --git a/src/storage/compactor/src/lib.rs b/src/storage/compactor/src/lib.rs index b269b2aec73d8..83b288b08a34c 100644 --- a/src/storage/compactor/src/lib.rs +++ b/src/storage/compactor/src/lib.rs @@ -82,19 +82,6 @@ pub struct CompactorOpts { #[override_opts(path = server.heap_profiling.dir)] pub heap_profiling_dir: Option, - #[clap(long, env = "RW_OBJECT_STORE_STREAMING_READ_TIMEOUT_MS", value_enum)] - #[override_opts(path = storage.object_store_streaming_read_timeout_ms)] - pub object_store_streaming_read_timeout_ms: Option, - #[clap(long, env = "RW_OBJECT_STORE_STREAMING_UPLOAD_TIMEOUT_MS", value_enum)] - #[override_opts(path = storage.object_store_streaming_upload_timeout_ms)] - pub object_store_streaming_upload_timeout_ms: Option, - #[clap(long, env = "RW_OBJECT_STORE_UPLOAD_TIMEOUT_MS", value_enum)] - #[override_opts(path = storage.object_store_upload_timeout_ms)] - pub object_store_upload_timeout_ms: Option, - #[clap(long, env = "RW_OBJECT_STORE_READ_TIMEOUT_MS", value_enum)] - #[override_opts(path = storage.object_store_read_timeout_ms)] - pub object_store_read_timeout_ms: Option, - #[clap(long, env = "RW_COMPACTOR_MODE", value_enum)] pub compactor_mode: Option, diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index 7480b51b1cc55..274d793ccf63a 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -32,8 +32,8 @@ use risingwave_common::{GIT_SHA, RW_VERSION}; use risingwave_common_heap_profiling::HeapProfiler; use risingwave_common_service::metrics_manager::MetricsManager; use risingwave_common_service::observer_manager::ObserverManager; +use risingwave_object_store::object::build_remote_object_store; use risingwave_object_store::object::object_metrics::GLOBAL_OBJECT_STORE_METRICS; -use risingwave_object_store::object::parse_remote_object_store; use risingwave_pb::common::WorkerType; use risingwave_pb::compactor::compactor_service_server::CompactorServiceServer; use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer; @@ -114,20 +114,16 @@ pub async fn prepare_start_parameters( assert!(compactor_memory_limit_bytes > min_compactor_memory_limit_bytes * 2); } - let mut object_store = parse_remote_object_store( + let object_store = build_remote_object_store( state_store_url .strip_prefix("hummock+") .expect("object store must be hummock for compactor server"), object_metrics, "Hummock", + config.storage.object_store.clone(), ) .await; - object_store.set_opts( - storage_opts.object_store_streaming_read_timeout_ms, - storage_opts.object_store_streaming_upload_timeout_ms, - storage_opts.object_store_read_timeout_ms, - storage_opts.object_store_upload_timeout_ms, - ); + let object_store = Arc::new(object_store); let sstable_store = Arc::new(SstableStore::for_compactor( object_store, diff --git a/src/storage/hummock_test/src/bin/replay/main.rs b/src/storage/hummock_test/src/bin/replay/main.rs index d64f7dfbaf478..517e338e1df74 100644 --- a/src/storage/hummock_test/src/bin/replay/main.rs +++ b/src/storage/hummock_test/src/bin/replay/main.rs @@ -28,7 +28,7 @@ use std::sync::Arc; use clap::Parser; use replay_impl::{get_replay_notification_client, GlobalReplayImpl}; use risingwave_common::config::{ - extract_storage_memory_config, load_config, NoOverride, StorageConfig, + extract_storage_memory_config, load_config, NoOverride, ObjectStoreConfig, StorageConfig, }; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_hummock_trace::{ @@ -36,7 +36,7 @@ use risingwave_hummock_trace::{ }; use risingwave_meta::hummock::test_utils::setup_compute_env; use risingwave_meta::hummock::MockHummockMetaClient; -use risingwave_object_store::object::parse_remote_object_store; +use risingwave_object_store::object::build_remote_object_store; use risingwave_storage::filter_key_extractor::{ FakeRemoteTableAccessor, RpcFilterKeyExtractorManager, }; @@ -99,8 +99,13 @@ async fn create_replay_hummock(r: Record, args: &Args) -> Result, + storage_opts: Arc, sstable_store: SstableStoreRef, ) -> CompactorContext { CompactorContext { - storage_opts: options, + storage_opts, sstable_store, compactor_metrics: Arc::new(CompactorMetrics::unused()), is_share_buffer_compact: false, diff --git a/src/storage/src/hummock/backup_reader.rs b/src/storage/src/hummock/backup_reader.rs index acca6454c2fec..8568ceb95e7b7 100644 --- a/src/storage/src/hummock/backup_reader.rs +++ b/src/storage/src/hummock/backup_reader.rs @@ -25,9 +25,10 @@ use risingwave_backup::error::BackupError; use risingwave_backup::meta_snapshot::{MetaSnapshot, Metadata}; use risingwave_backup::storage::{MetaSnapshotStorage, ObjectStoreMetaSnapshotStorage}; use risingwave_backup::{meta_snapshot_v1, MetaSnapshotId}; +use risingwave_common::config::ObjectStoreConfig; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; +use risingwave_object_store::object::build_remote_object_store; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; -use risingwave_object_store::object::parse_remote_object_store; use crate::error::{StorageError, StorageResult}; use crate::hummock::local_version::pinned_version::{PinVersionAction, PinnedVersion}; @@ -44,10 +45,11 @@ async fn create_snapshot_store( config: &StoreConfig, ) -> StorageResult { let backup_object_store = Arc::new( - parse_remote_object_store( + build_remote_object_store( &config.0, Arc::new(ObjectStoreMetrics::unused()), "Meta Backup", + ObjectStoreConfig::default(), ) .await, ); diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 9a520919e71a4..dcd840d19c875 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -408,6 +408,8 @@ pub async fn compact( (context.storage_opts.block_size_kb as u64) * (1 << 10), context .storage_opts + .object_store_config + .s3 .object_store_recv_buffer_size .unwrap_or(6 * 1024 * 1024) as u64, capacity as u64, diff --git a/src/storage/src/opts.rs b/src/storage/src/opts.rs index b010d608da119..cb0fe6763f4d7 100644 --- a/src/storage/src/opts.rs +++ b/src/storage/src/opts.rs @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::config::{extract_storage_memory_config, RwConfig, StorageMemoryConfig}; +use risingwave_common::config::{ + extract_storage_memory_config, ObjectStoreConfig, RwConfig, StorageMemoryConfig, +}; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::system_param::system_params_for_test; @@ -122,7 +124,6 @@ pub struct StorageOpts { /// object store read timeout. pub object_store_read_timeout_ms: u64, - pub object_store_recv_buffer_size: Option, pub compactor_max_sst_key_count: u64, pub compactor_max_task_multiplier: f32, pub compactor_max_sst_size: u64, @@ -131,6 +132,8 @@ pub struct StorageOpts { pub max_preload_io_retry_times: usize, pub mem_table_spill_threshold: usize, + + pub object_store_config: ObjectStoreConfig, } impl Default for StorageOpts { @@ -231,22 +234,24 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt max_preload_wait_time_mill: c.storage.max_preload_wait_time_mill, object_store_streaming_read_timeout_ms: c .storage + .object_store .object_store_streaming_read_timeout_ms, compact_iter_recreate_timeout_ms: c.storage.compact_iter_recreate_timeout_ms, object_store_streaming_upload_timeout_ms: c .storage + .object_store .object_store_streaming_upload_timeout_ms, - object_store_read_timeout_ms: c.storage.object_store_read_timeout_ms, - object_store_upload_timeout_ms: c.storage.object_store_upload_timeout_ms, + object_store_read_timeout_ms: c.storage.object_store.object_store_read_timeout_ms, + object_store_upload_timeout_ms: c.storage.object_store.object_store_upload_timeout_ms, max_preload_io_retry_times: c.storage.max_preload_io_retry_times, backup_storage_url: p.backup_storage_url().to_string(), backup_storage_directory: p.backup_storage_directory().to_string(), - object_store_recv_buffer_size: c.storage.object_store_recv_buffer_size, compactor_max_sst_key_count: c.storage.compactor_max_sst_key_count, compactor_max_task_multiplier: c.storage.compactor_max_task_multiplier, compactor_max_sst_size: c.storage.compactor_max_sst_size, enable_fast_compaction: c.storage.enable_fast_compaction, mem_table_spill_threshold: c.storage.mem_table_spill_threshold, + object_store_config: c.storage.object_store.clone(), } } } diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index df8190b42fb74..7c2f521b30985 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -20,7 +20,7 @@ use std::time::Duration; use enum_as_inner::EnumAsInner; use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; use risingwave_common_service::observer_manager::RpcNotificationClient; -use risingwave_object_store::object::parse_remote_object_store; +use risingwave_object_store::object::build_remote_object_store; use crate::error::StorageResult; use crate::filter_key_extractor::{RemoteTableAccessor, RpcFilterKeyExtractorManager}; @@ -609,18 +609,13 @@ impl StateStoreImpl { let store = match s { hummock if hummock.starts_with("hummock+") => { - let mut object_store = parse_remote_object_store( + let object_store = build_remote_object_store( hummock.strip_prefix("hummock+").unwrap(), object_store_metrics.clone(), "Hummock", + opts.object_store_config.clone(), ) .await; - object_store.set_opts( - opts.object_store_streaming_read_timeout_ms, - opts.object_store_streaming_upload_timeout_ms, - opts.object_store_read_timeout_ms, - opts.object_store_upload_timeout_ms, - ); let sstable_store = Arc::new(SstableStore::new( Arc::new(object_store), diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index f6129ce60a84f..0adc359a42335 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -27,7 +27,9 @@ use rand::{RngCore, SeedableRng}; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::hummock::PROPERTIES_RETENTION_SECOND_KEY; use risingwave_common::catalog::TableId; -use risingwave_common::config::{extract_storage_memory_config, load_config, NoOverride, RwConfig}; +use risingwave_common::config::{ + extract_storage_memory_config, load_config, NoOverride, ObjectStoreConfig, RwConfig, +}; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::TableKey; use risingwave_hummock_test::get_notification_client_for_test; @@ -35,8 +37,8 @@ use risingwave_hummock_test::local_state_store_test_utils::LocalStateStoreTestEx use risingwave_meta::hummock::compaction::compaction_config::CompactionConfigBuilder; use risingwave_meta::hummock::test_utils::setup_compute_env_with_config; use risingwave_meta::hummock::MockHummockMetaClient; +use risingwave_object_store::object::build_remote_object_store; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; -use risingwave_object_store::object::parse_remote_object_store; use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable}; use risingwave_pb::hummock::{CompactionConfig, CompactionGroupInfo}; use risingwave_pb::meta::SystemParams; @@ -200,10 +202,11 @@ async fn compaction_test( let state_store_metrics = Arc::new(HummockStateStoreMetrics::unused()); let compactor_metrics = Arc::new(CompactorMetrics::unused()); let object_store_metrics = Arc::new(ObjectStoreMetrics::unused()); - let remote_object_store = parse_remote_object_store( + let remote_object_store = build_remote_object_store( state_store_type.strip_prefix("hummock+").unwrap(), object_store_metrics.clone(), "Hummock", + ObjectStoreConfig::default(), ) .await; let sstable_store = Arc::new(SstableStore::new(