Skip to content

Commit

Permalink
chore: remove the object_store prefix in the object store config nami…
Browse files Browse the repository at this point in the history
…ng (#17290)
  • Loading branch information
hzxa21 committed Jun 19, 2024
1 parent 6e56ff3 commit 23365df
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 40 deletions.
160 changes: 136 additions & 24 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1029,8 +1029,12 @@ for_all_params!(define_system_config);
/// The subsections `[storage.object_store]`.
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct ObjectStoreConfig {
#[serde(default = "default::object_store_config::object_store_set_atomic_write_dir")]
pub object_store_set_atomic_write_dir: bool,
// alias is for backward compatibility
#[serde(
default = "default::object_store_config::set_atomic_write_dir",
alias = "object_store_set_atomic_write_dir"
)]
pub set_atomic_write_dir: bool,

/// Retry and timeout configuration
/// Description retry strategy driven by exponential back-off
Expand All @@ -1045,25 +1049,36 @@ pub struct ObjectStoreConfig {

impl ObjectStoreConfig {
pub fn set_atomic_write_dir(&mut self) {
self.object_store_set_atomic_write_dir = true;
self.set_atomic_write_dir = true;
}
}

/// The subsections `[storage.object_store.s3]`.
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct S3ObjectStoreConfig {
#[serde(default = "default::object_store_config::s3::object_store_keepalive_ms")]
pub object_store_keepalive_ms: Option<u64>,
#[serde(default = "default::object_store_config::s3::object_store_recv_buffer_size")]
pub object_store_recv_buffer_size: Option<usize>,
#[serde(default = "default::object_store_config::s3::object_store_send_buffer_size")]
pub object_store_send_buffer_size: Option<usize>,
#[serde(default = "default::object_store_config::s3::object_store_nodelay")]
pub object_store_nodelay: Option<bool>,
/// For backwards compatibility, users should use `S3ObjectStoreDeveloperConfig` instead.
// alias is for backward compatibility
#[serde(
default = "default::object_store_config::s3::keepalive_ms",
alias = "object_store_keepalive_ms"
)]
pub keepalive_ms: Option<u64>,
#[serde(
default = "default::object_store_config::s3::recv_buffer_size",
alias = "object_store_recv_buffer_size"
)]
pub recv_buffer_size: Option<usize>,
#[serde(
default = "default::object_store_config::s3::send_buffer_size",
alias = "object_store_send_buffer_size"
)]
pub send_buffer_size: Option<usize>,
#[serde(
default = "default::object_store_config::s3::developer::object_store_retry_unknown_service_error"
default = "default::object_store_config::s3::nodelay",
alias = "object_store_nodelay"
)]
pub nodelay: Option<bool>,
/// For backwards compatibility, users should use `S3ObjectStoreDeveloperConfig` instead.
#[serde(default = "default::object_store_config::s3::developer::retry_unknown_service_error")]
pub retry_unknown_service_error: bool,
#[serde(default = "default::object_store_config::s3::identity_resolution_timeout_s")]
pub identity_resolution_timeout_s: u64,
Expand All @@ -1076,15 +1091,17 @@ pub struct S3ObjectStoreConfig {
pub struct S3ObjectStoreDeveloperConfig {
/// Whether to retry s3 sdk error from which no error metadata is provided.
#[serde(
default = "default::object_store_config::s3::developer::object_store_retry_unknown_service_error"
default = "default::object_store_config::s3::developer::retry_unknown_service_error",
alias = "object_store_retry_unknown_service_error"
)]
pub object_store_retry_unknown_service_error: bool,
pub retry_unknown_service_error: bool,
/// An array of error codes that should be retried.
/// e.g. `["SlowDown", "TooManyRequests"]`
#[serde(
default = "default::object_store_config::s3::developer::object_store_retryable_service_error_codes"
default = "default::object_store_config::s3::developer::retryable_service_error_codes",
alias = "object_store_retryable_service_error_codes"
)]
pub object_store_retryable_service_error_codes: Vec<String>,
pub retryable_service_error_codes: Vec<String>,

#[serde(default = "default::object_store_config::s3::developer::use_opendal")]
pub use_opendal: bool,
Expand Down Expand Up @@ -1904,7 +1921,7 @@ pub mod default {
const DEFAULT_REQ_BACKOFF_MAX_DELAY_MS: u64 = 10 * 1000; // 10s
const DEFAULT_REQ_MAX_RETRY_ATTEMPTS: usize = 3;

pub fn object_store_set_atomic_write_dir() -> bool {
pub fn set_atomic_write_dir() -> bool {
false
}

Expand Down Expand Up @@ -1992,19 +2009,19 @@ pub mod default {

const DEFAULT_KEEPALIVE_MS: u64 = 600 * 1000; // 10min

pub fn object_store_keepalive_ms() -> Option<u64> {
pub fn keepalive_ms() -> Option<u64> {
Some(DEFAULT_KEEPALIVE_MS) // 10min
}

pub fn object_store_recv_buffer_size() -> Option<usize> {
pub fn recv_buffer_size() -> Option<usize> {
Some(1 << 21) // 2m
}

pub fn object_store_send_buffer_size() -> Option<usize> {
pub fn send_buffer_size() -> Option<usize> {
None
}

pub fn object_store_nodelay() -> Option<bool> {
pub fn nodelay() -> Option<bool> {
Some(true)
}

Expand All @@ -2017,11 +2034,11 @@ pub mod default {

const RW_USE_OPENDAL_FOR_S3: &str = "RW_USE_OPENDAL_FOR_S3";

pub fn object_store_retry_unknown_service_error() -> bool {
pub fn retry_unknown_service_error() -> bool {
false
}

pub fn object_store_retryable_service_error_codes() -> Vec<String> {
pub fn retryable_service_error_codes() -> Vec<String> {
vec!["SlowDown".into(), "TooManyRequests".into()]
}

Expand Down Expand Up @@ -2336,4 +2353,99 @@ mod tests {
}
}
}

#[test]
fn test_object_store_configs_backward_compatibility() {
// Define configs with the old name and make sure it still works
{
let config: RwConfig = toml::from_str(
r#"
[storage.object_store]
object_store_set_atomic_write_dir = true
[storage.object_store.s3]
object_store_keepalive_ms = 1
object_store_send_buffer_size = 1
object_store_recv_buffer_size = 1
object_store_nodelay = false
[storage.object_store.s3.developer]
object_store_retry_unknown_service_error = true
object_store_retryable_service_error_codes = ['dummy']
"#,
)
.unwrap();

assert!(config.storage.object_store.set_atomic_write_dir);
assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
assert!(
config
.storage
.object_store
.s3
.developer
.retry_unknown_service_error
);
assert_eq!(
config
.storage
.object_store
.s3
.developer
.retryable_service_error_codes,
vec!["dummy".to_string()]
);
}

// Define configs with the new name and make sure it works
{
let config: RwConfig = toml::from_str(
r#"
[storage.object_store]
set_atomic_write_dir = true
[storage.object_store.s3]
keepalive_ms = 1
send_buffer_size = 1
recv_buffer_size = 1
nodelay = false
[storage.object_store.s3.developer]
retry_unknown_service_error = true
retryable_service_error_codes = ['dummy']
"#,
)
.unwrap();

assert!(config.storage.object_store.set_atomic_write_dir);
assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
assert!(
config
.storage
.object_store
.s3
.developer
.retry_unknown_service_error
);
assert_eq!(
config
.storage
.object_store
.s3
.developer
.retryable_service_error_codes,
vec!["dummy".to_string()]
);
}
}
}
12 changes: 6 additions & 6 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ recent_filter_layers = 6
recent_filter_rotate_interval_ms = 10000

[storage.object_store]
object_store_set_atomic_write_dir = false
set_atomic_write_dir = false

[storage.object_store.retry]
req_backoff_interval_ms = 1000
Expand All @@ -214,15 +214,15 @@ list_attempt_timeout_ms = 600000
list_retry_attempts = 3

[storage.object_store.s3]
object_store_keepalive_ms = 600000
object_store_recv_buffer_size = 2097152
object_store_nodelay = true
keepalive_ms = 600000
recv_buffer_size = 2097152
nodelay = true
retry_unknown_service_error = false
identity_resolution_timeout_s = 5

[storage.object_store.s3.developer]
object_store_retry_unknown_service_error = false
object_store_retryable_service_error_codes = ["SlowDown", "TooManyRequests"]
retry_unknown_service_error = false
retryable_service_error_codes = ["SlowDown", "TooManyRequests"]
use_opendal = false

[system]
Expand Down
2 changes: 1 addition & 1 deletion src/object_store/src/object/opendal_engine/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl OpendalObjectStore {
// Create fs backend builder.
let mut builder = Fs::default();
builder.root(&root);
if config.object_store_set_atomic_write_dir {
if config.set_atomic_write_dir {
let atomic_write_dir = format!("{}/{}", root, ATOMIC_WRITE_DIR);
builder.atomic_write_dir(&atomic_write_dir);
}
Expand Down
4 changes: 2 additions & 2 deletions src/object_store/src/object/opendal_engine/opendal_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ impl OpendalObjectStore {
pub fn new_http_client(config: &ObjectStoreConfig) -> ObjectResult<HttpClient> {
let mut client_builder = reqwest::ClientBuilder::new();

if let Some(keepalive_ms) = config.s3.object_store_keepalive_ms.as_ref() {
if let Some(keepalive_ms) = config.s3.keepalive_ms.as_ref() {
client_builder = client_builder.tcp_keepalive(Duration::from_millis(*keepalive_ms));
}

if let Some(nodelay) = config.s3.object_store_nodelay.as_ref() {
if let Some(nodelay) = config.s3.nodelay.as_ref() {
client_builder = client_builder.tcp_nodelay(*nodelay);
}

Expand Down
12 changes: 6 additions & 6 deletions src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,19 +617,19 @@ impl S3ObjectStore {
let mut http = hyper::client::HttpConnector::new();

// connection config
if let Some(keepalive_ms) = config.s3.object_store_keepalive_ms.as_ref() {
if let Some(keepalive_ms) = config.s3.keepalive_ms.as_ref() {
http.set_keepalive(Some(Duration::from_millis(*keepalive_ms)));
}

if let Some(nodelay) = config.s3.object_store_nodelay.as_ref() {
if let Some(nodelay) = config.s3.nodelay.as_ref() {
http.set_nodelay(*nodelay);
}

if let Some(recv_buffer_size) = config.s3.object_store_recv_buffer_size.as_ref() {
if let Some(recv_buffer_size) = config.s3.recv_buffer_size.as_ref() {
http.set_recv_buffer_size(Some(*recv_buffer_size));
}

if let Some(send_buffer_size) = config.s3.object_store_send_buffer_size.as_ref() {
if let Some(send_buffer_size) = config.s3.send_buffer_size.as_ref() {
http.set_send_buffer_size(Some(*send_buffer_size));
}

Expand Down Expand Up @@ -1042,7 +1042,7 @@ where
Some(SdkError::ServiceError(e)) => {
let retry = match e.err().code() {
None => {
if config.s3.developer.object_store_retry_unknown_service_error
if config.s3.developer.retry_unknown_service_error
|| config.s3.retry_unknown_service_error
{
tracing::warn!(target: "unknown_service_error", "{e:?} occurs, retry S3 get_object request.");
Expand All @@ -1055,7 +1055,7 @@ where
if config
.s3
.developer
.object_store_retryable_service_error_codes
.retryable_service_error_codes
.iter()
.any(|s| s.as_str().eq_ignore_ascii_case(code))
{
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ pub async fn compact(
.storage_opts
.object_store_config
.s3
.object_store_recv_buffer_size
.recv_buffer_size
.unwrap_or(6 * 1024 * 1024) as u64,
capacity as u64,
) * compact_task.splits.len() as u64;
Expand Down

0 comments on commit 23365df

Please sign in to comment.