Skip to content

Commit

Permalink
refactor(object_store): refactor timeout and retry of object store in…
Browse files Browse the repository at this point in the history
…terface (#16231)
  • Loading branch information
Li0k authored May 8, 2024
1 parent ce7522d commit b4c9058
Show file tree
Hide file tree
Showing 38 changed files with 988 additions and 591 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

7 changes: 1 addition & 6 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,12 +622,7 @@ def section_object_storage(outer_panels):
"",
[
panels.target(
f"sum(irate({metric('aws_sdk_retry_counts')}[$__rate_interval])) by ({NODE_LABEL}, {COMPONENT_LABEL}, type)",
"{{type}} - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
panels.target(
f"sum(irate({metric('s3_read_request_retry_count')}[$__rate_interval])) by ({NODE_LABEL}, {COMPONENT_LABEL}, type)",
f"sum(rate({metric('object_store_request_retry_count')}[$__rate_interval])) by ({NODE_LABEL}, {COMPONENT_LABEL}, type)",
"{{type}} - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

186 changes: 146 additions & 40 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -976,17 +976,12 @@ for_all_params!(define_system_config);
/// The subsections `[storage.object_store]`.
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct ObjectStoreConfig {
#[serde(default = "default::object_store_config::object_store_streaming_read_timeout_ms")]
pub object_store_streaming_read_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_streaming_upload_timeout_ms")]
pub object_store_streaming_upload_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_upload_timeout_ms")]
pub object_store_upload_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_read_timeout_ms")]
pub object_store_read_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_set_atomic_write_dir")]
pub object_store_set_atomic_write_dir: bool,

#[serde(default)]
pub retry: ObjectStoreRetryConfig,

#[serde(default)]
pub s3: S3ObjectStoreConfig,
}
Expand All @@ -1008,12 +1003,6 @@ pub struct S3ObjectStoreConfig {
pub object_store_send_buffer_size: Option<usize>,
#[serde(default = "default::object_store_config::s3::object_store_nodelay")]
pub object_store_nodelay: Option<bool>,
#[serde(default = "default::object_store_config::s3::object_store_req_retry_interval_ms")]
pub object_store_req_retry_interval_ms: u64,
#[serde(default = "default::object_store_config::s3::object_store_req_retry_max_delay_ms")]
pub object_store_req_retry_max_delay_ms: u64,
#[serde(default = "default::object_store_config::s3::object_store_req_retry_max_attempts")]
pub object_store_req_retry_max_attempts: usize,
/// For backwards compatibility, users should use `S3ObjectStoreDeveloperConfig` instead.
#[serde(
default = "default::object_store_config::s3::developer::object_store_retry_unknown_service_error"
Expand Down Expand Up @@ -1044,6 +1033,72 @@ pub struct S3ObjectStoreDeveloperConfig {
pub use_opendal: bool,
}

#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct ObjectStoreRetryConfig {
#[serde(default = "default::object_store_config::object_store_req_backoff_interval_ms")]
pub req_backoff_interval_ms: u64,
#[serde(default = "default::object_store_config::object_store_req_backoff_max_delay_ms")]
pub req_backoff_max_delay_ms: u64,
#[serde(default = "default::object_store_config::object_store_req_backoff_factor")]
pub req_backoff_factor: u64,

// upload
#[serde(default = "default::object_store_config::object_store_upload_attempt_timeout_ms")]
pub upload_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_upload_retry_attempts")]
pub upload_retry_attempts: usize,

// streaming_upload_init + streaming_upload
#[serde(
default = "default::object_store_config::object_store_streaming_upload_attempt_timeout_ms"
)]
pub streaming_upload_attempt_timeout_ms: u64,
#[serde(
default = "default::object_store_config::object_store_streaming_upload_retry_attempts"
)]
pub streaming_upload_retry_attempts: usize,

// read
#[serde(default = "default::object_store_config::object_store_read_attempt_timeout_ms")]
pub read_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_read_retry_attempts")]
pub read_retry_attempts: usize,

// streaming_read_init + streaming_read
#[serde(
default = "default::object_store_config::object_store_streaming_read_attempt_timeout_ms"
)]
pub streaming_read_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_streaming_read_retry_attempts")]
pub streaming_read_retry_attempts: usize,

// metadata
#[serde(default = "default::object_store_config::object_store_metadata_attempt_timeout_ms")]
pub metadata_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_metadata_retry_attempts")]
pub metadata_retry_attempts: usize,

// delete
#[serde(default = "default::object_store_config::object_store_delete_attempt_timeout_ms")]
pub delete_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_delete_retry_attempts")]
pub delete_retry_attempts: usize,

// delete_object
#[serde(
default = "default::object_store_config::object_store_delete_objects_attempt_timeout_ms"
)]
pub delete_objects_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_delete_objects_retry_attempts")]
pub delete_objects_retry_attempts: usize,

// list
#[serde(default = "default::object_store_config::object_store_list_attempt_timeout_ms")]
pub list_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_list_retry_attempts")]
pub list_retry_attempts: usize,
}

impl SystemConfig {
#![allow(deprecated)]
pub fn into_init_system_params(self) -> SystemParams {
Expand Down Expand Up @@ -1683,31 +1738,94 @@ pub mod default {
}

pub mod object_store_config {
pub fn object_store_streaming_read_timeout_ms() -> u64 {
8 * 60 * 1000
const DEFAULT_REQ_BACKOFF_INTERVAL_MS: u64 = 1000; // 1s
const DEFAULT_REQ_BACKOFF_MAX_DELAY_MS: u64 = 10 * 1000; // 10s
const DEFAULT_REQ_MAX_RETRY_ATTEMPTS: usize = 3;

pub fn object_store_set_atomic_write_dir() -> bool {
false
}

pub fn object_store_streaming_upload_timeout_ms() -> u64 {
8 * 60 * 1000
pub fn object_store_req_backoff_interval_ms() -> u64 {
DEFAULT_REQ_BACKOFF_INTERVAL_MS
}

pub fn object_store_upload_timeout_ms() -> u64 {
8 * 60 * 1000
pub fn object_store_req_backoff_max_delay_ms() -> u64 {
DEFAULT_REQ_BACKOFF_MAX_DELAY_MS // 10s
}

pub fn object_store_read_timeout_ms() -> u64 {
8 * 60 * 1000
pub fn object_store_req_backoff_factor() -> u64 {
2
}

pub fn object_store_set_atomic_write_dir() -> bool {
false
pub fn object_store_upload_attempt_timeout_ms() -> u64 {
8 * 1000 // 8s
}

pub fn object_store_upload_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

// init + upload_part + finish
pub fn object_store_streaming_upload_attempt_timeout_ms() -> u64 {
5 * 1000 // 5s
}

pub fn object_store_streaming_upload_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

// tips: depend on block_size
pub fn object_store_read_attempt_timeout_ms() -> u64 {
8 * 1000 // 8s
}

pub fn object_store_read_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

pub fn object_store_streaming_read_attempt_timeout_ms() -> u64 {
3 * 1000 // 3s
}

pub fn object_store_streaming_read_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

pub fn object_store_metadata_attempt_timeout_ms() -> u64 {
60 * 1000 // 1min
}

pub fn object_store_metadata_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

pub fn object_store_delete_attempt_timeout_ms() -> u64 {
5 * 1000
}

pub fn object_store_delete_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

// tips: depend on batch size
pub fn object_store_delete_objects_attempt_timeout_ms() -> u64 {
5 * 1000
}

pub fn object_store_delete_objects_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

pub fn object_store_list_attempt_timeout_ms() -> u64 {
10 * 60 * 1000
}

pub fn object_store_list_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

pub mod s3 {
/// Retry config for compute node http timeout error.
const DEFAULT_RETRY_INTERVAL_MS: u64 = 20;
const DEFAULT_RETRY_MAX_DELAY_MS: u64 = 10 * 1000;
const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 8;
const DEFAULT_IDENTITY_RESOLUTION_TIMEOUT_S: u64 = 5;

const DEFAULT_KEEPALIVE_MS: u64 = 600 * 1000; // 10min
Expand All @@ -1728,18 +1846,6 @@ pub mod default {
Some(true)
}

pub fn object_store_req_retry_interval_ms() -> u64 {
DEFAULT_RETRY_INTERVAL_MS
}

pub fn object_store_req_retry_max_delay_ms() -> u64 {
DEFAULT_RETRY_MAX_DELAY_MS // 10s
}

pub fn object_store_req_retry_max_attempts() -> usize {
DEFAULT_RETRY_MAX_ATTEMPTS
}

pub fn identity_resolution_timeout_s() -> u64 {
DEFAULT_IDENTITY_RESOLUTION_TIMEOUT_S
}
Expand Down
28 changes: 21 additions & 7 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -185,19 +185,33 @@ recent_filter_layers = 6
recent_filter_rotate_interval_ms = 10000

[storage.object_store]
object_store_streaming_read_timeout_ms = 480000
object_store_streaming_upload_timeout_ms = 480000
object_store_upload_timeout_ms = 480000
object_store_read_timeout_ms = 480000
object_store_set_atomic_write_dir = false

[storage.object_store.retry]
req_backoff_interval_ms = 1000
req_backoff_max_delay_ms = 10000
req_backoff_factor = 2
upload_attempt_timeout_ms = 8000
upload_retry_attempts = 3
streaming_upload_attempt_timeout_ms = 5000
streaming_upload_retry_attempts = 3
read_attempt_timeout_ms = 8000
read_retry_attempts = 3
streaming_read_attempt_timeout_ms = 3000
streaming_read_retry_attempts = 3
metadata_attempt_timeout_ms = 60000
metadata_retry_attempts = 3
delete_attempt_timeout_ms = 5000
delete_retry_attempts = 3
delete_objects_attempt_timeout_ms = 5000
delete_objects_retry_attempts = 3
list_attempt_timeout_ms = 600000
list_retry_attempts = 3

[storage.object_store.s3]
object_store_keepalive_ms = 600000
object_store_recv_buffer_size = 2097152
object_store_nodelay = true
object_store_req_retry_interval_ms = 20
object_store_req_retry_max_delay_ms = 10000
object_store_req_retry_max_attempts = 8
retry_unknown_service_error = false
identity_resolution_timeout_s = 5

Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/sink/snowflake_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use anyhow::{anyhow, Context};
Expand Down Expand Up @@ -192,13 +193,14 @@ impl SnowflakeS3Client {
aws_secret_access_key: String,
aws_region: String,
) -> Result<Self> {
// FIXME: we should use the `ObjectStoreConfig` instead of default
// just use default configuration here for opendal s3 engine
let config = ObjectStoreConfig::default();

// create the s3 engine for streaming upload to the intermediate s3 bucket
let opendal_s3_engine = OpendalObjectStore::new_s3_engine_with_credentials(
&s3_bucket,
config,
Arc::new(config),
&aws_access_key_id,
&aws_secret_access_key,
&aws_region,
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/common/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl HummockServiceOpts {
self.hummock_url.strip_prefix("hummock+").unwrap(),
Arc::new(ObjectStoreMetrics::unused()),
"Hummock",
ObjectStoreConfig::default(),
Arc::new(ObjectStoreConfig::default()),
)
.await;

Expand Down
2 changes: 1 addition & 1 deletion src/jni_core/src/hummock_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl HummockJavaBindingIterator {
&read_plan.object_store_url,
Arc::new(ObjectStoreMetrics::unused()),
"Hummock",
ObjectStoreConfig::default(),
Arc::new(ObjectStoreConfig::default()),
)
.await,
);
Expand Down
39 changes: 20 additions & 19 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,25 +256,26 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {

const MIN_TIMEOUT_INTERVAL_SEC: u64 = 20;
let compaction_task_max_progress_interval_secs = {
(config
.storage
.object_store
.object_store_read_timeout_ms
.max(config.storage.object_store.object_store_upload_timeout_ms)
.max(
config
.storage
.object_store
.object_store_streaming_read_timeout_ms,
)
.max(
config
.storage
.object_store
.object_store_streaming_upload_timeout_ms,
)
.max(config.meta.compaction_task_max_progress_interval_secs * 1000))
/ 1000
let retry_config = &config.storage.object_store.retry;
let max_streming_read_timeout_ms = (retry_config.streaming_read_attempt_timeout_ms
+ retry_config.req_backoff_max_delay_ms)
* retry_config.streaming_read_retry_attempts as u64;
let max_streaming_upload_timeout_ms = (retry_config
.streaming_upload_attempt_timeout_ms
+ retry_config.req_backoff_max_delay_ms)
* retry_config.streaming_upload_retry_attempts as u64;
let max_upload_timeout_ms = (retry_config.upload_attempt_timeout_ms
+ retry_config.req_backoff_max_delay_ms)
* retry_config.upload_retry_attempts as u64;
let max_read_timeout_ms = (retry_config.read_attempt_timeout_ms
+ retry_config.req_backoff_max_delay_ms)
* retry_config.read_retry_attempts as u64;
let max_timeout_ms = max_streming_read_timeout_ms
.max(max_upload_timeout_ms)
.max(max_streaming_upload_timeout_ms)
.max(max_read_timeout_ms)
.max(config.meta.compaction_task_max_progress_interval_secs * 1000);
max_timeout_ms / 1000
} + MIN_TIMEOUT_INTERVAL_SEC;

let (mut join_handle, leader_lost_handle, shutdown_send) = rpc_serve(
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/backup_restore/backup_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ async fn create_snapshot_store(
&config.0,
metric,
"Meta Backup",
object_store_config.clone(),
Arc::new(object_store_config.clone()),
)
.await,
);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/backup_restore/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async fn restore_hummock_version(
hummock_storage_url,
Arc::new(ObjectStoreMetrics::unused()),
"Version Checkpoint",
ObjectStoreConfig::default(),
Arc::new(ObjectStoreConfig::default()),
)
.await,
);
Expand Down
Loading

0 comments on commit b4c9058

Please sign in to comment.