diff --git a/src/common/src/config.rs b/src/common/src/config.rs index d8bb65e90b062..0dc6b48d2d8da 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -1029,8 +1029,12 @@ for_all_params!(define_system_config); /// The subsections `[storage.object_store]`. #[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)] pub struct ObjectStoreConfig { - #[serde(default = "default::object_store_config::object_store_set_atomic_write_dir")] - pub object_store_set_atomic_write_dir: bool, + // alias is for backward compatibility + #[serde( + default = "default::object_store_config::set_atomic_write_dir", + alias = "object_store_set_atomic_write_dir" + )] + pub set_atomic_write_dir: bool, /// Retry and timeout configuration /// Description retry strategy driven by exponential back-off @@ -1045,25 +1049,36 @@ pub struct ObjectStoreConfig { impl ObjectStoreConfig { pub fn set_atomic_write_dir(&mut self) { - self.object_store_set_atomic_write_dir = true; + self.set_atomic_write_dir = true; } } /// The subsections `[storage.object_store.s3]`. #[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)] pub struct S3ObjectStoreConfig { - #[serde(default = "default::object_store_config::s3::object_store_keepalive_ms")] - pub object_store_keepalive_ms: Option, - #[serde(default = "default::object_store_config::s3::object_store_recv_buffer_size")] - pub object_store_recv_buffer_size: Option, - #[serde(default = "default::object_store_config::s3::object_store_send_buffer_size")] - pub object_store_send_buffer_size: Option, - #[serde(default = "default::object_store_config::s3::object_store_nodelay")] - pub object_store_nodelay: Option, - /// For backwards compatibility, users should use `S3ObjectStoreDeveloperConfig` instead. + // alias is for backward compatibility + #[serde( + default = "default::object_store_config::s3::keepalive_ms", + alias = "object_store_keepalive_ms" + )] + pub keepalive_ms: Option, + #[serde( + default = "default::object_store_config::s3::recv_buffer_size", + alias = "object_store_recv_buffer_size" + )] + pub recv_buffer_size: Option, + #[serde( + default = "default::object_store_config::s3::send_buffer_size", + alias = "object_store_send_buffer_size" + )] + pub send_buffer_size: Option, #[serde( - default = "default::object_store_config::s3::developer::object_store_retry_unknown_service_error" + default = "default::object_store_config::s3::nodelay", + alias = "object_store_nodelay" )] + pub nodelay: Option, + /// For backwards compatibility, users should use `S3ObjectStoreDeveloperConfig` instead. + #[serde(default = "default::object_store_config::s3::developer::retry_unknown_service_error")] pub retry_unknown_service_error: bool, #[serde(default = "default::object_store_config::s3::identity_resolution_timeout_s")] pub identity_resolution_timeout_s: u64, @@ -1076,15 +1091,17 @@ pub struct S3ObjectStoreConfig { pub struct S3ObjectStoreDeveloperConfig { /// Whether to retry s3 sdk error from which no error metadata is provided. #[serde( - default = "default::object_store_config::s3::developer::object_store_retry_unknown_service_error" + default = "default::object_store_config::s3::developer::retry_unknown_service_error", + alias = "object_store_retry_unknown_service_error" )] - pub object_store_retry_unknown_service_error: bool, + pub retry_unknown_service_error: bool, /// An array of error codes that should be retried. /// e.g. `["SlowDown", "TooManyRequests"]` #[serde( - default = "default::object_store_config::s3::developer::object_store_retryable_service_error_codes" + default = "default::object_store_config::s3::developer::retryable_service_error_codes", + alias = "object_store_retryable_service_error_codes" )] - pub object_store_retryable_service_error_codes: Vec, + pub retryable_service_error_codes: Vec, #[serde(default = "default::object_store_config::s3::developer::use_opendal")] pub use_opendal: bool, @@ -1904,7 +1921,7 @@ pub mod default { const DEFAULT_REQ_BACKOFF_MAX_DELAY_MS: u64 = 10 * 1000; // 10s const DEFAULT_REQ_MAX_RETRY_ATTEMPTS: usize = 3; - pub fn object_store_set_atomic_write_dir() -> bool { + pub fn set_atomic_write_dir() -> bool { false } @@ -1992,19 +2009,19 @@ pub mod default { const DEFAULT_KEEPALIVE_MS: u64 = 600 * 1000; // 10min - pub fn object_store_keepalive_ms() -> Option { + pub fn keepalive_ms() -> Option { Some(DEFAULT_KEEPALIVE_MS) // 10min } - pub fn object_store_recv_buffer_size() -> Option { + pub fn recv_buffer_size() -> Option { Some(1 << 21) // 2m } - pub fn object_store_send_buffer_size() -> Option { + pub fn send_buffer_size() -> Option { None } - pub fn object_store_nodelay() -> Option { + pub fn nodelay() -> Option { Some(true) } @@ -2017,11 +2034,11 @@ pub mod default { const RW_USE_OPENDAL_FOR_S3: &str = "RW_USE_OPENDAL_FOR_S3"; - pub fn object_store_retry_unknown_service_error() -> bool { + pub fn retry_unknown_service_error() -> bool { false } - pub fn object_store_retryable_service_error_codes() -> Vec { + pub fn retryable_service_error_codes() -> Vec { vec!["SlowDown".into(), "TooManyRequests".into()] } @@ -2336,4 +2353,99 @@ mod tests { } } } + + #[test] + fn test_object_store_configs_backward_compatibility() { + // Define configs with the old name and make sure it still works + { + let config: RwConfig = toml::from_str( + r#" + [storage.object_store] + object_store_set_atomic_write_dir = true + + [storage.object_store.s3] + object_store_keepalive_ms = 1 + object_store_send_buffer_size = 1 + object_store_recv_buffer_size = 1 + object_store_nodelay = false + + [storage.object_store.s3.developer] + object_store_retry_unknown_service_error = true + object_store_retryable_service_error_codes = ['dummy'] + + + "#, + ) + .unwrap(); + + assert!(config.storage.object_store.set_atomic_write_dir); + assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1)); + assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1)); + assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1)); + assert_eq!(config.storage.object_store.s3.nodelay, Some(false)); + assert!( + config + .storage + .object_store + .s3 + .developer + .retry_unknown_service_error + ); + assert_eq!( + config + .storage + .object_store + .s3 + .developer + .retryable_service_error_codes, + vec!["dummy".to_string()] + ); + } + + // Define configs with the new name and make sure it works + { + let config: RwConfig = toml::from_str( + r#" + [storage.object_store] + set_atomic_write_dir = true + + [storage.object_store.s3] + keepalive_ms = 1 + send_buffer_size = 1 + recv_buffer_size = 1 + nodelay = false + + [storage.object_store.s3.developer] + retry_unknown_service_error = true + retryable_service_error_codes = ['dummy'] + + + "#, + ) + .unwrap(); + + assert!(config.storage.object_store.set_atomic_write_dir); + assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1)); + assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1)); + assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1)); + assert_eq!(config.storage.object_store.s3.nodelay, Some(false)); + assert!( + config + .storage + .object_store + .s3 + .developer + .retry_unknown_service_error + ); + assert_eq!( + config + .storage + .object_store + .s3 + .developer + .retryable_service_error_codes, + vec!["dummy".to_string()] + ); + } + } } diff --git a/src/config/example.toml b/src/config/example.toml index b35590c85059b..27bbea13ade15 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -190,7 +190,7 @@ recent_filter_layers = 6 recent_filter_rotate_interval_ms = 10000 [storage.object_store] -object_store_set_atomic_write_dir = false +set_atomic_write_dir = false [storage.object_store.retry] req_backoff_interval_ms = 1000 @@ -214,15 +214,15 @@ list_attempt_timeout_ms = 600000 list_retry_attempts = 3 [storage.object_store.s3] -object_store_keepalive_ms = 600000 -object_store_recv_buffer_size = 2097152 -object_store_nodelay = true +keepalive_ms = 600000 +recv_buffer_size = 2097152 +nodelay = true retry_unknown_service_error = false identity_resolution_timeout_s = 5 [storage.object_store.s3.developer] -object_store_retry_unknown_service_error = false -object_store_retryable_service_error_codes = ["SlowDown", "TooManyRequests"] +retry_unknown_service_error = false +retryable_service_error_codes = ["SlowDown", "TooManyRequests"] use_opendal = false [system] diff --git a/src/object_store/src/object/opendal_engine/fs.rs b/src/object_store/src/object/opendal_engine/fs.rs index 4f2715d6ccfb8..ecb1131f0def8 100644 --- a/src/object_store/src/object/opendal_engine/fs.rs +++ b/src/object_store/src/object/opendal_engine/fs.rs @@ -29,7 +29,7 @@ impl OpendalObjectStore { // Create fs backend builder. let mut builder = Fs::default(); builder.root(&root); - if config.object_store_set_atomic_write_dir { + if config.set_atomic_write_dir { let atomic_write_dir = format!("{}/{}", root, ATOMIC_WRITE_DIR); builder.atomic_write_dir(&atomic_write_dir); } diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index 7a51cbb36955f..e86a209f4f3fa 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -95,11 +95,11 @@ impl OpendalObjectStore { pub fn new_http_client(config: &ObjectStoreConfig) -> ObjectResult { let mut client_builder = reqwest::ClientBuilder::new(); - if let Some(keepalive_ms) = config.s3.object_store_keepalive_ms.as_ref() { + if let Some(keepalive_ms) = config.s3.keepalive_ms.as_ref() { client_builder = client_builder.tcp_keepalive(Duration::from_millis(*keepalive_ms)); } - if let Some(nodelay) = config.s3.object_store_nodelay.as_ref() { + if let Some(nodelay) = config.s3.nodelay.as_ref() { client_builder = client_builder.tcp_nodelay(*nodelay); } diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 6c72ced36563d..077d4179e06c9 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -617,19 +617,19 @@ impl S3ObjectStore { let mut http = hyper::client::HttpConnector::new(); // connection config - if let Some(keepalive_ms) = config.s3.object_store_keepalive_ms.as_ref() { + if let Some(keepalive_ms) = config.s3.keepalive_ms.as_ref() { http.set_keepalive(Some(Duration::from_millis(*keepalive_ms))); } - if let Some(nodelay) = config.s3.object_store_nodelay.as_ref() { + if let Some(nodelay) = config.s3.nodelay.as_ref() { http.set_nodelay(*nodelay); } - if let Some(recv_buffer_size) = config.s3.object_store_recv_buffer_size.as_ref() { + if let Some(recv_buffer_size) = config.s3.recv_buffer_size.as_ref() { http.set_recv_buffer_size(Some(*recv_buffer_size)); } - if let Some(send_buffer_size) = config.s3.object_store_send_buffer_size.as_ref() { + if let Some(send_buffer_size) = config.s3.send_buffer_size.as_ref() { http.set_send_buffer_size(Some(*send_buffer_size)); } @@ -1042,7 +1042,7 @@ where Some(SdkError::ServiceError(e)) => { let retry = match e.err().code() { None => { - if config.s3.developer.object_store_retry_unknown_service_error + if config.s3.developer.retry_unknown_service_error || config.s3.retry_unknown_service_error { tracing::warn!(target: "unknown_service_error", "{e:?} occurs, retry S3 get_object request."); @@ -1055,7 +1055,7 @@ where if config .s3 .developer - .object_store_retryable_service_error_codes + .retryable_service_error_codes .iter() .any(|s| s.as_str().eq_ignore_ascii_case(code)) { diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 41a55518158de..0336f6f542fa4 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -380,7 +380,7 @@ pub async fn compact( .storage_opts .object_store_config .s3 - .object_store_recv_buffer_size + .recv_buffer_size .unwrap_or(6 * 1024 * 1024) as u64, capacity as u64, ) * compact_task.splits.len() as u64;