Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(object_store): refactor timeout and retry of object store interface #16231

Merged
merged 56 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
44ff6ff
refactor(object_store): refactor timeout and retry of iterface
Li0k Apr 10, 2024
faafbd1
refactor(object_storage): aws-sdk-s3 retry logic
Li0k Apr 10, 2024
b8e252b
refactor(object_store): refactor timeout and retry_attempts config
Li0k Apr 11, 2024
bbd7955
feat(object_store): remove RetryLayer of OpenDal backend
Li0k Apr 11, 2024
f59f6e3
fix(object_store): fix compile
Li0k Apr 11, 2024
82ab21d
feat(object_store): retry for streming_read and streaming_upload
Li0k Apr 12, 2024
f7f3034
feat(storage): set s3 error should_retry
Li0k Apr 12, 2024
0a9a286
refactor(storage): deprecate total_timeout of interface
Li0k Apr 12, 2024
0855678
fix(object_store): fix retry metrics
Li0k Apr 12, 2024
0510a5a
fix(object_store): fix check
Li0k Apr 12, 2024
d14d684
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Apr 12, 2024
171456f
fix(object_store): fix compile
Li0k Apr 12, 2024
a05443a
refactor(object_store): remove some code
Li0k Apr 15, 2024
7e8014b
refactor(object_store): remove retry config of aws-sdk
Li0k Apr 15, 2024
36b2df7
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Apr 15, 2024
4d5cc57
fix(storage): fix ut
Li0k Apr 16, 2024
3327727
fix(object_store): address comments
Li0k Apr 16, 2024
3268f89
fix(object_store): fix check
Li0k Apr 16, 2024
ef3a0cc
refactor(storage): refactor
Li0k Apr 17, 2024
db288c2
fix(object_store): fix dashboard
Li0k Apr 17, 2024
2959950
refactor(object_store): refactor config and fix max progress timeout
Li0k Apr 17, 2024
e313825
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Apr 17, 2024
957e29b
fix(object_store): fix compile
Li0k Apr 17, 2024
7b34adb
fix(storage): typo
Li0k Apr 17, 2024
34a1db2
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Apr 18, 2024
d4cac71
feat(object_store): upgrade timeout config base on longevity-test
Li0k Apr 18, 2024
c0a05ce
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Apr 18, 2024
86eabc3
feat(storage): upgrade timeout config
Li0k Apr 19, 2024
e720a8e
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Apr 19, 2024
2283a1f
fix(storage): upgrade backoff interval and fix grafana
Li0k Apr 19, 2024
52da021
fix(storage): fix timeout retry
Li0k Apr 19, 2024
23b916b
fix(storage): fix streaming timeout
Li0k Apr 22, 2024
1411982
fix(storage): typo
Li0k Apr 23, 2024
a944e93
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Apr 23, 2024
935cdb7
fix(object_store): fix check
Li0k Apr 25, 2024
3d0d84a
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Apr 25, 2024
6727882
fix(object_store): update exmaple
Li0k Apr 25, 2024
d8261db
fix(object_store): remove deprecated config
Li0k May 6, 2024
2e187e2
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k May 6, 2024
8522453
feat(grafana): update dashboard
Li0k May 6, 2024
b1024a9
fix(object_store): address comments
Li0k May 6, 2024
5750fbd
fix(object_store): address comments
Li0k May 6, 2024
85b137d
fix(config): update example
Li0k May 6, 2024
9452706
fix(object_store): address comments
Li0k May 7, 2024
2ca7613
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k May 7, 2024
4065164
fix(grafana): fix grafana
Li0k May 7, 2024
cf28941
fix(object_store): update example
Li0k May 7, 2024
0f02386
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k May 7, 2024
70fdccb
fix(grafana): upgrade grafana
Li0k May 7, 2024
5f1b9b7
fix(object_store): address comments
Li0k May 8, 2024
af51ace
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k May 8, 2024
e4d03e0
fix(grafana): fix grafana
Li0k May 8, 2024
a299ab1
fix(grafana): fix grafana
Li0k May 8, 2024
bdb0c93
fix(grafana): remove unused metrics
Li0k May 8, 2024
e7d7aca
fix(object_store): fix log
Li0k May 8, 2024
fe85b40
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k May 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

7 changes: 1 addition & 6 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,12 +622,7 @@ def section_object_storage(outer_panels):
"",
[
panels.target(
f"sum(irate({metric('aws_sdk_retry_counts')}[$__rate_interval])) by ({NODE_LABEL}, {COMPONENT_LABEL}, type)",
"{{type}} - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
panels.target(
f"sum(irate({metric('s3_read_request_retry_count')}[$__rate_interval])) by ({NODE_LABEL}, {COMPONENT_LABEL}, type)",
f"sum(rate({metric('object_store_request_retry_count')}[$__rate_interval])) by ({NODE_LABEL}, {COMPONENT_LABEL}, type)",
"{{type}} - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

186 changes: 146 additions & 40 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -976,17 +976,12 @@ for_all_params!(define_system_config);
/// The subsections `[storage.object_store]`.
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct ObjectStoreConfig {
#[serde(default = "default::object_store_config::object_store_streaming_read_timeout_ms")]
pub object_store_streaming_read_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_streaming_upload_timeout_ms")]
pub object_store_streaming_upload_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_upload_timeout_ms")]
pub object_store_upload_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_read_timeout_ms")]
pub object_store_read_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_set_atomic_write_dir")]
pub object_store_set_atomic_write_dir: bool,

#[serde(default)]
pub retry: ObjectStoreRetryConfig,

#[serde(default)]
pub s3: S3ObjectStoreConfig,
}
Expand All @@ -1008,12 +1003,6 @@ pub struct S3ObjectStoreConfig {
pub object_store_send_buffer_size: Option<usize>,
#[serde(default = "default::object_store_config::s3::object_store_nodelay")]
pub object_store_nodelay: Option<bool>,
#[serde(default = "default::object_store_config::s3::object_store_req_retry_interval_ms")]
pub object_store_req_retry_interval_ms: u64,
#[serde(default = "default::object_store_config::s3::object_store_req_retry_max_delay_ms")]
pub object_store_req_retry_max_delay_ms: u64,
#[serde(default = "default::object_store_config::s3::object_store_req_retry_max_attempts")]
pub object_store_req_retry_max_attempts: usize,
/// For backwards compatibility, users should use `S3ObjectStoreDeveloperConfig` instead.
#[serde(
default = "default::object_store_config::s3::developer::object_store_retry_unknown_service_error"
Expand Down Expand Up @@ -1044,6 +1033,72 @@ pub struct S3ObjectStoreDeveloperConfig {
pub use_opendal: bool,
}

#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct ObjectStoreRetryConfig {
#[serde(default = "default::object_store_config::object_store_req_backoff_interval_ms")]
pub req_backoff_interval_ms: u64,
#[serde(default = "default::object_store_config::object_store_req_backoff_max_delay_ms")]
pub req_backoff_max_delay_ms: u64,
#[serde(default = "default::object_store_config::object_store_req_backoff_factor")]
pub req_backoff_factor: u64,

// upload
#[serde(default = "default::object_store_config::object_store_upload_attempt_timeout_ms")]
pub upload_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_upload_retry_attempts")]
pub upload_retry_attempts: usize,

// streaming_upload_init + streaming_upload
#[serde(
default = "default::object_store_config::object_store_streaming_upload_attempt_timeout_ms"
)]
pub streaming_upload_attempt_timeout_ms: u64,
#[serde(
default = "default::object_store_config::object_store_streaming_upload_retry_attempts"
)]
pub streaming_upload_retry_attempts: usize,

// read
#[serde(default = "default::object_store_config::object_store_read_attempt_timeout_ms")]
pub read_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_read_retry_attempts")]
pub read_retry_attempts: usize,

// streaming_read_init + streaming_read
#[serde(
default = "default::object_store_config::object_store_streaming_read_attempt_timeout_ms"
)]
pub streaming_read_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_streaming_read_retry_attempts")]
pub streaming_read_retry_attempts: usize,

// metadata
#[serde(default = "default::object_store_config::object_store_metadata_attempt_timeout_ms")]
pub metadata_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_metadata_retry_attempts")]
pub metadata_retry_attempts: usize,

// delete
#[serde(default = "default::object_store_config::object_store_delete_attempt_timeout_ms")]
pub delete_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_delete_retry_attempts")]
pub delete_retry_attempts: usize,

// delete_object
#[serde(
default = "default::object_store_config::object_store_delete_objects_attempt_timeout_ms"
)]
pub delete_objects_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_delete_objects_retry_attempts")]
pub delete_objects_retry_attempts: usize,

// list
#[serde(default = "default::object_store_config::object_store_list_attempt_timeout_ms")]
pub list_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_list_retry_attempts")]
pub list_retry_attempts: usize,
}

impl SystemConfig {
#![allow(deprecated)]
pub fn into_init_system_params(self) -> SystemParams {
Expand Down Expand Up @@ -1683,31 +1738,94 @@ pub mod default {
}

pub mod object_store_config {
pub fn object_store_streaming_read_timeout_ms() -> u64 {
8 * 60 * 1000
const DEFAULT_REQ_BACKOFF_INTERVAL_MS: u64 = 1000; // 1s
const DEFAULT_REQ_BACKOFF_MAX_DELAY_MS: u64 = 10 * 1000; // 10s
const DEFAULT_REQ_MAX_RETRY_ATTEMPTS: usize = 3;

pub fn object_store_set_atomic_write_dir() -> bool {
false
}

pub fn object_store_streaming_upload_timeout_ms() -> u64 {
8 * 60 * 1000
pub fn object_store_req_backoff_interval_ms() -> u64 {
DEFAULT_REQ_BACKOFF_INTERVAL_MS
}

pub fn object_store_upload_timeout_ms() -> u64 {
8 * 60 * 1000
pub fn object_store_req_backoff_max_delay_ms() -> u64 {
DEFAULT_REQ_BACKOFF_MAX_DELAY_MS // 10s
}

pub fn object_store_read_timeout_ms() -> u64 {
8 * 60 * 1000
pub fn object_store_req_backoff_factor() -> u64 {
2
}

pub fn object_store_set_atomic_write_dir() -> bool {
false
pub fn object_store_upload_attempt_timeout_ms() -> u64 {
8 * 1000 // 8s
}

pub fn object_store_upload_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

// init + upload_part + finish
pub fn object_store_streaming_upload_attempt_timeout_ms() -> u64 {
5 * 1000 // 5s
}

pub fn object_store_streaming_upload_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

// tips: depend on block_size
pub fn object_store_read_attempt_timeout_ms() -> u64 {
8 * 1000 // 8s
}

pub fn object_store_read_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

pub fn object_store_streaming_read_attempt_timeout_ms() -> u64 {
3 * 1000 // 3s
}

pub fn object_store_streaming_read_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

pub fn object_store_metadata_attempt_timeout_ms() -> u64 {
60 * 1000 // 1min
}

pub fn object_store_metadata_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

pub fn object_store_delete_attempt_timeout_ms() -> u64 {
5 * 1000
}

pub fn object_store_delete_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

// tips: depend on batch size
pub fn object_store_delete_objects_attempt_timeout_ms() -> u64 {
5 * 1000
}

pub fn object_store_delete_objects_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

pub fn object_store_list_attempt_timeout_ms() -> u64 {
10 * 60 * 1000
}

pub fn object_store_list_retry_attempts() -> usize {
DEFAULT_REQ_MAX_RETRY_ATTEMPTS
}

pub mod s3 {
/// Retry config for compute node http timeout error.
const DEFAULT_RETRY_INTERVAL_MS: u64 = 20;
const DEFAULT_RETRY_MAX_DELAY_MS: u64 = 10 * 1000;
const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 8;
const DEFAULT_IDENTITY_RESOLUTION_TIMEOUT_S: u64 = 5;

const DEFAULT_KEEPALIVE_MS: u64 = 600 * 1000; // 10min
Expand All @@ -1728,18 +1846,6 @@ pub mod default {
Some(true)
}

pub fn object_store_req_retry_interval_ms() -> u64 {
DEFAULT_RETRY_INTERVAL_MS
}

pub fn object_store_req_retry_max_delay_ms() -> u64 {
DEFAULT_RETRY_MAX_DELAY_MS // 10s
}

pub fn object_store_req_retry_max_attempts() -> usize {
DEFAULT_RETRY_MAX_ATTEMPTS
}

pub fn identity_resolution_timeout_s() -> u64 {
DEFAULT_IDENTITY_RESOLUTION_TIMEOUT_S
}
Expand Down
28 changes: 21 additions & 7 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -185,19 +185,33 @@ recent_filter_layers = 6
recent_filter_rotate_interval_ms = 10000

[storage.object_store]
object_store_streaming_read_timeout_ms = 480000
object_store_streaming_upload_timeout_ms = 480000
object_store_upload_timeout_ms = 480000
object_store_read_timeout_ms = 480000
object_store_set_atomic_write_dir = false

[storage.object_store.retry]
req_backoff_interval_ms = 1000
req_backoff_max_delay_ms = 10000
req_backoff_factor = 2
upload_attempt_timeout_ms = 8000
upload_retry_attempts = 3
streaming_upload_attempt_timeout_ms = 5000
streaming_upload_retry_attempts = 3
read_attempt_timeout_ms = 8000
read_retry_attempts = 3
streaming_read_attempt_timeout_ms = 3000
streaming_read_retry_attempts = 3
metadata_attempt_timeout_ms = 60000
metadata_retry_attempts = 3
delete_attempt_timeout_ms = 5000
delete_retry_attempts = 3
delete_objects_attempt_timeout_ms = 5000
delete_objects_retry_attempts = 3
list_attempt_timeout_ms = 600000
list_retry_attempts = 3

[storage.object_store.s3]
object_store_keepalive_ms = 600000
object_store_recv_buffer_size = 2097152
object_store_nodelay = true
object_store_req_retry_interval_ms = 20
object_store_req_retry_max_delay_ms = 10000
object_store_req_retry_max_attempts = 8
retry_unknown_service_error = false
identity_resolution_timeout_s = 5

Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/sink/snowflake_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use anyhow::{anyhow, Context};
Expand Down Expand Up @@ -192,13 +193,14 @@ impl SnowflakeS3Client {
aws_secret_access_key: String,
aws_region: String,
) -> Result<Self> {
// FIXME: we should use the `ObjectStoreConfig` instead of default
// just use default configuration here for opendal s3 engine
let config = ObjectStoreConfig::default();

// create the s3 engine for streaming upload to the intermediate s3 bucket
let opendal_s3_engine = OpendalObjectStore::new_s3_engine_with_credentials(
&s3_bucket,
config,
Arc::new(config),
&aws_access_key_id,
&aws_secret_access_key,
&aws_region,
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/common/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl HummockServiceOpts {
self.hummock_url.strip_prefix("hummock+").unwrap(),
Arc::new(ObjectStoreMetrics::unused()),
"Hummock",
ObjectStoreConfig::default(),
Arc::new(ObjectStoreConfig::default()),
)
.await;

Expand Down
2 changes: 1 addition & 1 deletion src/jni_core/src/hummock_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl HummockJavaBindingIterator {
&read_plan.object_store_url,
Arc::new(ObjectStoreMetrics::unused()),
"Hummock",
ObjectStoreConfig::default(),
Arc::new(ObjectStoreConfig::default()),
)
.await,
);
Expand Down
39 changes: 20 additions & 19 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,25 +256,26 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {

const MIN_TIMEOUT_INTERVAL_SEC: u64 = 20;
let compaction_task_max_progress_interval_secs = {
(config
.storage
.object_store
.object_store_read_timeout_ms
.max(config.storage.object_store.object_store_upload_timeout_ms)
.max(
config
.storage
.object_store
.object_store_streaming_read_timeout_ms,
)
.max(
config
.storage
.object_store
.object_store_streaming_upload_timeout_ms,
)
.max(config.meta.compaction_task_max_progress_interval_secs * 1000))
/ 1000
let retry_config = &config.storage.object_store.retry;
let max_streming_read_timeout_ms = (retry_config.streaming_read_attempt_timeout_ms
+ retry_config.req_backoff_max_delay_ms)
* retry_config.streaming_read_retry_attempts as u64;
let max_streaming_upload_timeout_ms = (retry_config
.streaming_upload_attempt_timeout_ms
+ retry_config.req_backoff_max_delay_ms)
* retry_config.streaming_upload_retry_attempts as u64;
let max_upload_timeout_ms = (retry_config.upload_attempt_timeout_ms
+ retry_config.req_backoff_max_delay_ms)
* retry_config.upload_retry_attempts as u64;
let max_read_timeout_ms = (retry_config.read_attempt_timeout_ms
+ retry_config.req_backoff_max_delay_ms)
* retry_config.read_retry_attempts as u64;
let max_timeout_ms = max_streming_read_timeout_ms
.max(max_upload_timeout_ms)
.max(max_streaming_upload_timeout_ms)
.max(max_read_timeout_ms)
.max(config.meta.compaction_task_max_progress_interval_secs * 1000);
max_timeout_ms / 1000
} + MIN_TIMEOUT_INTERVAL_SEC;

let (mut join_handle, leader_lost_handle, shutdown_send) = rpc_serve(
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/backup_restore/backup_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ async fn create_snapshot_store(
&config.0,
metric,
"Meta Backup",
object_store_config.clone(),
Arc::new(object_store_config.clone()),
)
.await,
);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/backup_restore/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async fn restore_hummock_version(
hummock_storage_url,
Arc::new(ObjectStoreMetrics::unused()),
"Version Checkpoint",
ObjectStoreConfig::default(),
Arc::new(ObjectStoreConfig::default()),
)
.await,
);
Expand Down
Loading
Loading