Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove the object_store prefix in the object store config naming #17290

Merged
merged 2 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 138 additions & 24 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1029,8 +1029,12 @@ for_all_params!(define_system_config);
/// The subsections `[storage.object_store]`.
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct ObjectStoreConfig {
#[serde(default = "default::object_store_config::object_store_set_atomic_write_dir")]
pub object_store_set_atomic_write_dir: bool,
// alias is for backward compatibility
#[serde(
default = "default::object_store_config::set_atomic_write_dir",
alias = "object_store_set_atomic_write_dir"
)]
pub set_atomic_write_dir: bool,

/// Retry and timeout configuration
/// Description retry strategy driven by exponential back-off
Expand All @@ -1045,25 +1049,36 @@ pub struct ObjectStoreConfig {

impl ObjectStoreConfig {
pub fn set_atomic_write_dir(&mut self) {
self.object_store_set_atomic_write_dir = true;
self.set_atomic_write_dir = true;
}
}

/// The subsections `[storage.object_store.s3]`.
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct S3ObjectStoreConfig {
#[serde(default = "default::object_store_config::s3::object_store_keepalive_ms")]
pub object_store_keepalive_ms: Option<u64>,
#[serde(default = "default::object_store_config::s3::object_store_recv_buffer_size")]
pub object_store_recv_buffer_size: Option<usize>,
#[serde(default = "default::object_store_config::s3::object_store_send_buffer_size")]
pub object_store_send_buffer_size: Option<usize>,
#[serde(default = "default::object_store_config::s3::object_store_nodelay")]
pub object_store_nodelay: Option<bool>,
/// For backwards compatibility, users should use `S3ObjectStoreDeveloperConfig` instead.
// alias is for backward compatibility
#[serde(
default = "default::object_store_config::s3::keepalive_ms",
alias = "object_store_keepalive_ms"
)]
pub keepalive_ms: Option<u64>,
#[serde(
default = "default::object_store_config::s3::recv_buffer_size",
alias = "object_store_recv_buffer_size"
)]
pub recv_buffer_size: Option<usize>,
#[serde(
default = "default::object_store_config::s3::send_buffer_size",
alias = "object_store_send_buffer_size"
)]
pub send_buffer_size: Option<usize>,
#[serde(
default = "default::object_store_config::s3::developer::object_store_retry_unknown_service_error"
default = "default::object_store_config::s3::nodelay",
alias = "object_store_nodelay"
)]
pub nodelay: Option<bool>,
/// For backwards compatibility, users should use `S3ObjectStoreDeveloperConfig` instead.
#[serde(default = "default::object_store_config::s3::developer::retry_unknown_service_error")]
pub retry_unknown_service_error: bool,
#[serde(default = "default::object_store_config::s3::identity_resolution_timeout_s")]
pub identity_resolution_timeout_s: u64,
Expand All @@ -1076,15 +1091,17 @@ pub struct S3ObjectStoreConfig {
pub struct S3ObjectStoreDeveloperConfig {
/// Whether to retry s3 sdk error from which no error metadata is provided.
#[serde(
default = "default::object_store_config::s3::developer::object_store_retry_unknown_service_error"
default = "default::object_store_config::s3::developer::retry_unknown_service_error",
alias = "object_store_retry_unknown_service_error"
)]
pub object_store_retry_unknown_service_error: bool,
pub retry_unknown_service_error: bool,
/// An array of error codes that should be retried.
/// e.g. `["SlowDown", "TooManyRequests"]`
#[serde(
default = "default::object_store_config::s3::developer::object_store_retryable_service_error_codes"
default = "default::object_store_config::s3::developer::retryable_service_error_codes",
alias = "object_store_retryable_service_error_codes"
)]
pub object_store_retryable_service_error_codes: Vec<String>,
pub retryable_service_error_codes: Vec<String>,

#[serde(default = "default::object_store_config::s3::developer::use_opendal")]
pub use_opendal: bool,
Expand Down Expand Up @@ -1904,7 +1921,7 @@ pub mod default {
const DEFAULT_REQ_BACKOFF_MAX_DELAY_MS: u64 = 10 * 1000; // 10s
const DEFAULT_REQ_MAX_RETRY_ATTEMPTS: usize = 3;

pub fn object_store_set_atomic_write_dir() -> bool {
pub fn set_atomic_write_dir() -> bool {
false
}

Expand Down Expand Up @@ -1992,19 +2009,19 @@ pub mod default {

const DEFAULT_KEEPALIVE_MS: u64 = 600 * 1000; // 10min

pub fn object_store_keepalive_ms() -> Option<u64> {
pub fn keepalive_ms() -> Option<u64> {
Some(DEFAULT_KEEPALIVE_MS) // 10min
}

pub fn object_store_recv_buffer_size() -> Option<usize> {
pub fn recv_buffer_size() -> Option<usize> {
Some(1 << 21) // 2m
}

pub fn object_store_send_buffer_size() -> Option<usize> {
pub fn send_buffer_size() -> Option<usize> {
None
}

pub fn object_store_nodelay() -> Option<bool> {
pub fn nodelay() -> Option<bool> {
Some(true)
}

Expand All @@ -2017,11 +2034,11 @@ pub mod default {

const RW_USE_OPENDAL_FOR_S3: &str = "RW_USE_OPENDAL_FOR_S3";

pub fn object_store_retry_unknown_service_error() -> bool {
pub fn retry_unknown_service_error() -> bool {
false
}

pub fn object_store_retryable_service_error_codes() -> Vec<String> {
pub fn retryable_service_error_codes() -> Vec<String> {
vec!["SlowDown".into(), "TooManyRequests".into()]
}

Expand Down Expand Up @@ -2336,4 +2353,101 @@ mod tests {
}
}
}

#[test]
fn test_object_store_configs_backward_compatibility() {
// Define configs with the old name and make sure it still works
{
let config: RwConfig = toml::from_str(
r#"
[storage.object_store]
object_store_set_atomic_write_dir = true

[storage.object_store.s3]
object_store_keepalive_ms = 1
object_store_send_buffer_size = 1
object_store_recv_buffer_size = 1
object_store_nodelay = false

[storage.object_store.s3.developer]
object_store_retry_unknown_service_error = true
object_store_retryable_service_error_codes = ['dummy']


"#,
)
.unwrap();

assert_eq!(config.storage.object_store.set_atomic_write_dir, true);
assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
assert_eq!(
config
.storage
.object_store
.s3
.developer
.retry_unknown_service_error,
true
);
assert_eq!(
config
.storage
.object_store
.s3
.developer
.retryable_service_error_codes,
vec!["dummy".to_string()]
);
}

// Define configs with the new name and make sure it works
{
let config: RwConfig = toml::from_str(
r#"
[storage.object_store]
set_atomic_write_dir = true

[storage.object_store.s3]
keepalive_ms = 1
send_buffer_size = 1
recv_buffer_size = 1
nodelay = false

[storage.object_store.s3.developer]
retry_unknown_service_error = true
retryable_service_error_codes = ['dummy']


"#,
)
.unwrap();

assert_eq!(config.storage.object_store.set_atomic_write_dir, true);
assert_eq!(config.storage.object_store.s3.keepalive_ms, Some(1));
assert_eq!(config.storage.object_store.s3.send_buffer_size, Some(1));
assert_eq!(config.storage.object_store.s3.recv_buffer_size, Some(1));
assert_eq!(config.storage.object_store.s3.nodelay, Some(false));
assert_eq!(
config
.storage
.object_store
.s3
.developer
.retry_unknown_service_error,
true
);
assert_eq!(
config
.storage
.object_store
.s3
.developer
.retryable_service_error_codes,
vec!["dummy".to_string()]
);
}
}
}
12 changes: 6 additions & 6 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ recent_filter_layers = 6
recent_filter_rotate_interval_ms = 10000

[storage.object_store]
object_store_set_atomic_write_dir = false
set_atomic_write_dir = false

[storage.object_store.retry]
req_backoff_interval_ms = 1000
Expand All @@ -214,15 +214,15 @@ list_attempt_timeout_ms = 600000
list_retry_attempts = 3

[storage.object_store.s3]
object_store_keepalive_ms = 600000
object_store_recv_buffer_size = 2097152
object_store_nodelay = true
keepalive_ms = 600000
recv_buffer_size = 2097152
nodelay = true
retry_unknown_service_error = false
identity_resolution_timeout_s = 5

[storage.object_store.s3.developer]
object_store_retry_unknown_service_error = false
object_store_retryable_service_error_codes = ["SlowDown", "TooManyRequests"]
retry_unknown_service_error = false
retryable_service_error_codes = ["SlowDown", "TooManyRequests"]
use_opendal = false

[system]
Expand Down
2 changes: 1 addition & 1 deletion src/object_store/src/object/opendal_engine/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl OpendalObjectStore {
// Create fs backend builder.
let mut builder = Fs::default();
builder.root(&root);
if config.object_store_set_atomic_write_dir {
if config.set_atomic_write_dir {
let atomic_write_dir = format!("{}/{}", root, ATOMIC_WRITE_DIR);
builder.atomic_write_dir(&atomic_write_dir);
}
Expand Down
4 changes: 2 additions & 2 deletions src/object_store/src/object/opendal_engine/opendal_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ impl OpendalObjectStore {
pub fn new_http_client(config: &ObjectStoreConfig) -> ObjectResult<HttpClient> {
let mut client_builder = reqwest::ClientBuilder::new();

if let Some(keepalive_ms) = config.s3.object_store_keepalive_ms.as_ref() {
if let Some(keepalive_ms) = config.s3.keepalive_ms.as_ref() {
client_builder = client_builder.tcp_keepalive(Duration::from_millis(*keepalive_ms));
}

if let Some(nodelay) = config.s3.object_store_nodelay.as_ref() {
if let Some(nodelay) = config.s3.nodelay.as_ref() {
client_builder = client_builder.tcp_nodelay(*nodelay);
}

Expand Down
12 changes: 6 additions & 6 deletions src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,19 +618,19 @@ impl S3ObjectStore {
let mut http = hyper::client::HttpConnector::new();

// connection config
if let Some(keepalive_ms) = config.s3.object_store_keepalive_ms.as_ref() {
if let Some(keepalive_ms) = config.s3.keepalive_ms.as_ref() {
http.set_keepalive(Some(Duration::from_millis(*keepalive_ms)));
}

if let Some(nodelay) = config.s3.object_store_nodelay.as_ref() {
if let Some(nodelay) = config.s3.nodelay.as_ref() {
http.set_nodelay(*nodelay);
}

if let Some(recv_buffer_size) = config.s3.object_store_recv_buffer_size.as_ref() {
if let Some(recv_buffer_size) = config.s3.recv_buffer_size.as_ref() {
http.set_recv_buffer_size(Some(*recv_buffer_size));
}

if let Some(send_buffer_size) = config.s3.object_store_send_buffer_size.as_ref() {
if let Some(send_buffer_size) = config.s3.send_buffer_size.as_ref() {
http.set_send_buffer_size(Some(*send_buffer_size));
}

Expand Down Expand Up @@ -1043,7 +1043,7 @@ where
Some(SdkError::ServiceError(e)) => {
let retry = match e.err().code() {
None => {
if config.s3.developer.object_store_retry_unknown_service_error
if config.s3.developer.retry_unknown_service_error
|| config.s3.retry_unknown_service_error
{
tracing::warn!(target: "unknown_service_error", "{e:?} occurs, retry S3 get_object request.");
Expand All @@ -1056,7 +1056,7 @@ where
if config
.s3
.developer
.object_store_retryable_service_error_codes
.retryable_service_error_codes
.iter()
.any(|s| s.as_str().eq_ignore_ascii_case(code))
{
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ pub async fn compact(
.storage_opts
.object_store_config
.s3
.object_store_recv_buffer_size
.recv_buffer_size
.unwrap_or(6 * 1024 * 1024) as u64,
capacity as u64,
) * compact_task.splits.len() as u64;
Expand Down
Loading