Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(object_store): cherry-pick refactor timeout and retry of object store (2929fb8) #16652

Merged
merged 2 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

7 changes: 1 addition & 6 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,12 +622,7 @@ def section_object_storage(outer_panels):
"",
[
panels.target(
f"sum(irate({metric('aws_sdk_retry_counts')}[$__rate_interval])) by ({NODE_LABEL}, {COMPONENT_LABEL}, type)",
"{{type}} - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
panels.target(
f"sum(irate({metric('s3_read_request_retry_count')}[$__rate_interval])) by ({NODE_LABEL}, {COMPONENT_LABEL}, type)",
f"sum(rate({metric('object_store_request_retry_count')}[$__rate_interval])) by ({NODE_LABEL}, {COMPONENT_LABEL}, type)",
"{{type}} - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

186 changes: 146 additions & 40 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -976,17 +976,12 @@ for_all_params!(define_system_config);
/// The subsections `[storage.object_store]`.
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct ObjectStoreConfig {
#[serde(default = "default::object_store_config::object_store_streaming_read_timeout_ms")]
pub object_store_streaming_read_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_streaming_upload_timeout_ms")]
pub object_store_streaming_upload_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_upload_timeout_ms")]
pub object_store_upload_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_read_timeout_ms")]
pub object_store_read_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_set_atomic_write_dir")]
pub object_store_set_atomic_write_dir: bool,

#[serde(default)]
pub retry: ObjectStoreRetryConfig,

#[serde(default)]
pub s3: S3ObjectStoreConfig,
}
Expand All @@ -1008,12 +1003,6 @@ pub struct S3ObjectStoreConfig {
pub object_store_send_buffer_size: Option<usize>,
#[serde(default = "default::object_store_config::s3::object_store_nodelay")]
pub object_store_nodelay: Option<bool>,
#[serde(default = "default::object_store_config::s3::object_store_req_retry_interval_ms")]
pub object_store_req_retry_interval_ms: u64,
#[serde(default = "default::object_store_config::s3::object_store_req_retry_max_delay_ms")]
pub object_store_req_retry_max_delay_ms: u64,
#[serde(default = "default::object_store_config::s3::object_store_req_retry_max_attempts")]
pub object_store_req_retry_max_attempts: usize,
/// For backwards compatibility, users should use `S3ObjectStoreDeveloperConfig` instead.
#[serde(
default = "default::object_store_config::s3::developer::object_store_retry_unknown_service_error"
Expand Down Expand Up @@ -1044,6 +1033,72 @@ pub struct S3ObjectStoreDeveloperConfig {
pub use_opendal: bool,
}

#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct ObjectStoreRetryConfig {
#[serde(default = "default::object_store_config::object_store_req_backoff_interval_ms")]
pub req_backoff_interval_ms: u64,
#[serde(default = "default::object_store_config::object_store_req_backoff_max_delay_ms")]
pub req_backoff_max_delay_ms: u64,
#[serde(default = "default::object_store_config::object_store_req_backoff_factor")]
pub req_backoff_factor: u64,

// upload
#[serde(default = "default::object_store_config::object_store_upload_attempt_timeout_ms")]
pub upload_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_upload_retry_attempts")]
pub upload_retry_attempts: usize,

// streaming_upload_init + streaming_upload
#[serde(
default = "default::object_store_config::object_store_streaming_upload_attempt_timeout_ms"
)]
pub streaming_upload_attempt_timeout_ms: u64,
#[serde(
default = "default::object_store_config::object_store_streaming_upload_retry_attempts"
)]
pub streaming_upload_retry_attempts: usize,

// read
#[serde(default = "default::object_store_config::object_store_read_attempt_timeout_ms")]
pub read_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_read_retry_attempts")]
pub read_retry_attempts: usize,

// streaming_read_init + streaming_read
#[serde(
default = "default::object_store_config::object_store_streaming_read_attempt_timeout_ms"
)]
pub streaming_read_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_streaming_read_retry_attempts")]
pub streaming_read_retry_attempts: usize,

// metadata
#[serde(default = "default::object_store_config::object_store_metadata_attempt_timeout_ms")]
pub metadata_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_metadata_retry_attempts")]
pub metadata_retry_attempts: usize,

// delete
#[serde(default = "default::object_store_config::object_store_delete_attempt_timeout_ms")]
pub delete_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_delete_retry_attempts")]
pub delete_retry_attempts: usize,

// delete_object
#[serde(
default = "default::object_store_config::object_store_delete_objects_attempt_timeout_ms"
)]
pub delete_objects_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_delete_objects_retry_attempts")]
pub delete_objects_retry_attempts: usize,

// list
#[serde(default = "default::object_store_config::object_store_list_attempt_timeout_ms")]
pub list_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_list_retry_attempts")]
pub list_retry_attempts: usize,
}

impl SystemConfig {
#![allow(deprecated)]
pub fn into_init_system_params(self) -> SystemParams {
Expand Down Expand Up @@ -1683,31 +1738,94 @@ pub mod default {
}

pub mod object_store_config {
pub fn object_store_streaming_read_timeout_ms() -> u64 {
8 * 60 * 1000
const DEFAULT_REQ_BACKOFF_INTERVAL_MS: u64 = 1000; // 1s
const DEFAULT_REQ_BACKOFF_MAX_DELAY_MS: u64 = 10 * 1000; // 10s
const DEFAULT_REQ_MAX_RETRY_ATTEMPTS: usize = 3;

pub fn object_store_set_atomic_write_dir() -> bool {
false
}

pub fn object_store_streaming_upload_timeout_ms() -> u64 {
8 * 60 * 1000
pub fn object_store_req_backoff_interval_ms() -> u64 {
DEFAULT_REQ_BACKOFF_INTERVAL_MS
}

pub fn object_store_upload_timeout_ms() -> u64 {
8 * 60 * 1000
pub fn object_store_req_backoff_max_delay_ms() -> u64 {
DEFAULT_REQ_BACKOFF_MAX_DELAY_MS // 10s
}

pub fn object_store_read_timeout_ms() -> u64 {
8 * 60 * 1000
pub fn object_store_req_backoff_factor() -> u64 {
2
}

pub fn object_store_set_atomic_write_dir() -> bool {
false
pub fn object_store_upload_attempt_timeout_ms() -> u64 {
8 * 1000 // 8s
}

pub fn object_store_upload_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

// init + upload_part + finish
pub fn object_store_streaming_upload_attempt_timeout_ms() -> u64 {
5 * 1000 // 5s
}

pub fn object_store_streaming_upload_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

// tips: depend on block_size
pub fn object_store_read_attempt_timeout_ms() -> u64 {
8 * 1000 // 8s
}

pub fn object_store_read_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

pub fn object_store_streaming_read_attempt_timeout_ms() -> u64 {
3 * 1000 // 3s
}

pub fn object_store_streaming_read_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

pub fn object_store_metadata_attempt_timeout_ms() -> u64 {
60 * 1000 // 1min
}

pub fn object_store_metadata_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

pub fn object_store_delete_attempt_timeout_ms() -> u64 {
5 * 1000
}

pub fn object_store_delete_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

// tips: depend on batch size
pub fn object_store_delete_objects_attempt_timeout_ms() -> u64 {
5 * 1000
}

pub fn object_store_delete_objects_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

pub fn object_store_list_attempt_timeout_ms() -> u64 {
10 * 60 * 1000
}

pub fn object_store_list_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

pub mod s3 {
/// Retry config for compute node http timeout error.
const DEFAULT_RETRY_INTERVAL_MS: u64 = 20;
const DEFAULT_RETRY_MAX_DELAY_MS: u64 = 10 * 1000;
const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 8;
const DEFAULT_IDENTITY_RESOLUTION_TIMEOUT_S: u64 = 5;

const DEFAULT_KEEPALIVE_MS: u64 = 600 * 1000; // 10min
Expand All @@ -1728,18 +1846,6 @@ pub mod default {
Some(true)
}

pub fn object_store_req_retry_interval_ms() -> u64 {
DEFAULT_RETRY_INTERVAL_MS
}

pub fn object_store_req_retry_max_delay_ms() -> u64 {
DEFAULT_RETRY_MAX_DELAY_MS // 10s
}

pub fn object_store_req_retry_max_attempts() -> usize {
DEFAULT_RETRY_MAX_ATTEMPTS
}

pub fn identity_resolution_timeout_s() -> u64 {
DEFAULT_IDENTITY_RESOLUTION_TIMEOUT_S
}
Expand Down
28 changes: 21 additions & 7 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -185,19 +185,33 @@ recent_filter_layers = 6
recent_filter_rotate_interval_ms = 10000

[storage.object_store]
object_store_streaming_read_timeout_ms = 480000
object_store_streaming_upload_timeout_ms = 480000
object_store_upload_timeout_ms = 480000
object_store_read_timeout_ms = 480000
object_store_set_atomic_write_dir = false

[storage.object_store.retry]
req_backoff_interval_ms = 1000
req_backoff_max_delay_ms = 10000
req_backoff_factor = 2
upload_attempt_timeout_ms = 8000
upload_retry_attempts = 3
streaming_upload_attempt_timeout_ms = 5000
streaming_upload_retry_attempts = 3
read_attempt_timeout_ms = 8000
read_retry_attempts = 3
streaming_read_attempt_timeout_ms = 3000
streaming_read_retry_attempts = 3
metadata_attempt_timeout_ms = 60000
metadata_retry_attempts = 3
delete_attempt_timeout_ms = 5000
delete_retry_attempts = 3
delete_objects_attempt_timeout_ms = 5000
delete_objects_retry_attempts = 3
list_attempt_timeout_ms = 600000
list_retry_attempts = 3

[storage.object_store.s3]
object_store_keepalive_ms = 600000
object_store_recv_buffer_size = 2097152
object_store_nodelay = true
object_store_req_retry_interval_ms = 20
object_store_req_retry_max_delay_ms = 10000
object_store_req_retry_max_attempts = 8
retry_unknown_service_error = false
identity_resolution_timeout_s = 5

Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/sink/snowflake_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use anyhow::{anyhow, Context};
Expand Down Expand Up @@ -192,13 +193,14 @@ impl SnowflakeS3Client {
aws_secret_access_key: String,
aws_region: String,
) -> Result<Self> {
// FIXME: we should use the `ObjectStoreConfig` instead of default
// just use default configuration here for opendal s3 engine
let config = ObjectStoreConfig::default();

// create the s3 engine for streaming upload to the intermediate s3 bucket
let opendal_s3_engine = OpendalObjectStore::new_s3_engine_with_credentials(
&s3_bucket,
config,
Arc::new(config),
&aws_access_key_id,
&aws_secret_access_key,
&aws_region,
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/common/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl HummockServiceOpts {
self.hummock_url.strip_prefix("hummock+").unwrap(),
Arc::new(ObjectStoreMetrics::unused()),
"Hummock",
ObjectStoreConfig::default(),
Arc::new(ObjectStoreConfig::default()),
)
.await;

Expand Down
2 changes: 1 addition & 1 deletion src/jni_core/src/hummock_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl HummockJavaBindingIterator {
&read_plan.object_store_url,
Arc::new(ObjectStoreMetrics::unused()),
"Hummock",
ObjectStoreConfig::default(),
Arc::new(ObjectStoreConfig::default()),
)
.await,
);
Expand Down
39 changes: 20 additions & 19 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,25 +256,26 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {

const MIN_TIMEOUT_INTERVAL_SEC: u64 = 20;
let compaction_task_max_progress_interval_secs = {
(config
.storage
.object_store
.object_store_read_timeout_ms
.max(config.storage.object_store.object_store_upload_timeout_ms)
.max(
config
.storage
.object_store
.object_store_streaming_read_timeout_ms,
)
.max(
config
.storage
.object_store
.object_store_streaming_upload_timeout_ms,
)
.max(config.meta.compaction_task_max_progress_interval_secs * 1000))
/ 1000
let retry_config = &config.storage.object_store.retry;
let max_streming_read_timeout_ms = (retry_config.streaming_read_attempt_timeout_ms
+ retry_config.req_backoff_max_delay_ms)
* retry_config.streaming_read_retry_attempts as u64;
let max_streaming_upload_timeout_ms = (retry_config
.streaming_upload_attempt_timeout_ms
+ retry_config.req_backoff_max_delay_ms)
* retry_config.streaming_upload_retry_attempts as u64;
let max_upload_timeout_ms = (retry_config.upload_attempt_timeout_ms
+ retry_config.req_backoff_max_delay_ms)
* retry_config.upload_retry_attempts as u64;
let max_read_timeout_ms = (retry_config.read_attempt_timeout_ms
+ retry_config.req_backoff_max_delay_ms)
* retry_config.read_retry_attempts as u64;
let max_timeout_ms = max_streming_read_timeout_ms
.max(max_upload_timeout_ms)
.max(max_streaming_upload_timeout_ms)
.max(max_read_timeout_ms)
.max(config.meta.compaction_task_max_progress_interval_secs * 1000);
max_timeout_ms / 1000
} + MIN_TIMEOUT_INTERVAL_SEC;

let (mut join_handle, leader_lost_handle, shutdown_send) = rpc_serve(
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/backup_restore/backup_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ async fn create_snapshot_store(
&config.0,
metric,
"Meta Backup",
object_store_config.clone(),
Arc::new(object_store_config.clone()),
)
.await,
);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/backup_restore/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async fn restore_hummock_version(
hummock_storage_url,
Arc::new(ObjectStoreMetrics::unused()),
"Version Checkpoint",
ObjectStoreConfig::default(),
Arc::new(ObjectStoreConfig::default()),
)
.await,
);
Expand Down
Loading
Loading