Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(object_store): refactor timeout and retry of object store interface #16231

Merged
merged 56 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
44ff6ff
refactor(object_store): refactor timeout and retry of iterface
Li0k Apr 10, 2024
faafbd1
refactor(object_storage): aws-sdk-s3 retry logic
Li0k Apr 10, 2024
b8e252b
refactor(object_store): refactor timeout and retry_attempts config
Li0k Apr 11, 2024
bbd7955
feat(object_store): remove RetryLayer of OpenDal backend
Li0k Apr 11, 2024
f59f6e3
fix(object_store): fix compile
Li0k Apr 11, 2024
82ab21d
feat(object_store): retry for streming_read and streaming_upload
Li0k Apr 12, 2024
f7f3034
feat(storage): set s3 error should_retry
Li0k Apr 12, 2024
0a9a286
refactor(storage): deprecate total_timeout of interface
Li0k Apr 12, 2024
0855678
fix(object_store): fix retry metrics
Li0k Apr 12, 2024
0510a5a
fix(object_store): fix check
Li0k Apr 12, 2024
d14d684
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Apr 12, 2024
171456f
fix(object_store): fix compile
Li0k Apr 12, 2024
a05443a
refactor(object_store): remove some code
Li0k Apr 15, 2024
7e8014b
refactor(object_store): remove retry config of aws-sdk
Li0k Apr 15, 2024
36b2df7
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Apr 15, 2024
4d5cc57
fix(storage): fix ut
Li0k Apr 16, 2024
3327727
fix(object_store): address comments
Li0k Apr 16, 2024
3268f89
fix(object_store): fix check
Li0k Apr 16, 2024
ef3a0cc
refactor(storage): refactor
Li0k Apr 17, 2024
db288c2
fix(object_store): fix dashboard
Li0k Apr 17, 2024
2959950
refactor(object_store): refactor config and fix max progress timeout
Li0k Apr 17, 2024
e313825
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Apr 17, 2024
957e29b
fix(object_store): fix compile
Li0k Apr 17, 2024
7b34adb
fix(storage): typo
Li0k Apr 17, 2024
34a1db2
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Apr 18, 2024
d4cac71
feat(object_store): upgrade timeout config base on longevity-test
Li0k Apr 18, 2024
c0a05ce
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Apr 18, 2024
86eabc3
feat(storage): upgrade timeout config
Li0k Apr 19, 2024
e720a8e
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Apr 19, 2024
2283a1f
fix(storage): upgrade backoff interval and fix grafana
Li0k Apr 19, 2024
52da021
fix(storage): fix timeout retry
Li0k Apr 19, 2024
23b916b
fix(storage): fix streaming timeout
Li0k Apr 22, 2024
1411982
fix(storage): typo
Li0k Apr 23, 2024
a944e93
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Apr 23, 2024
935cdb7
fix(object_store): fix check
Li0k Apr 25, 2024
3d0d84a
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Apr 25, 2024
6727882
fix(object_store): update exmaple
Li0k Apr 25, 2024
d8261db
fix(object_store): remove deprecated config
Li0k May 6, 2024
2e187e2
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k May 6, 2024
8522453
feat(grafana): update dashboard
Li0k May 6, 2024
b1024a9
fix(object_store): address comments
Li0k May 6, 2024
5750fbd
fix(object_store): address comments
Li0k May 6, 2024
85b137d
fix(config): update example
Li0k May 6, 2024
9452706
fix(object_store): address comments
Li0k May 7, 2024
2ca7613
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k May 7, 2024
4065164
fix(grafana): fix grafana
Li0k May 7, 2024
cf28941
fix(object_store): update example
Li0k May 7, 2024
0f02386
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k May 7, 2024
70fdccb
fix(grafana): upgrade grafana
Li0k May 7, 2024
5f1b9b7
fix(object_store): address comments
Li0k May 8, 2024
af51ace
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k May 8, 2024
e4d03e0
fix(grafana): fix grafana
Li0k May 8, 2024
a299ab1
fix(grafana): fix grafana
Li0k May 8, 2024
bdb0c93
fix(grafana): remove unused metrics
Li0k May 8, 2024
e7d7aca
fix(object_store): fix log
Li0k May 8, 2024
fe85b40
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k May 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 153 additions & 26 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,69 @@ pub struct ObjectStoreConfig {
#[serde(default = "default::object_store_config::object_store_set_atomic_write_dir")]
pub object_store_set_atomic_write_dir: bool,

#[serde(default = "default::object_store_config::object_store_req_retry_interval_ms")]
pub object_store_req_retry_interval_ms: u64,
Li0k marked this conversation as resolved.
Show resolved Hide resolved
#[serde(default = "default::object_store_config::object_store_req_retry_max_delay_ms")]
pub object_store_req_retry_max_delay_ms: u64,
#[serde(default = "default::object_store_config::object_store_req_retry_max_retry_attempts")]
pub object_store_req_retry_max_retry_attempts: usize,

// upload
#[serde(default = "default::object_store_config::object_store_upload_attempt_timeout_ms")]
pub object_store_upload_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_upload_retry_attempts")]
pub object_store_upload_retry_attempts: usize,

// streaming_upload_init + streaming_upload
#[serde(
default = "default::object_store_config::object_store_streaming_upload_attempt_timeout_ms"
)]
pub object_store_streaming_upload_attempt_timeout_ms: u64,
#[serde(
default = "default::object_store_config::object_store_streaming_upload_retry_attempts"
)]
pub object_store_streaming_upload_retry_attempts: usize,

// read
#[serde(default = "default::object_store_config::object_store_read_attempt_timeout_ms")]
pub object_store_read_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_read_retry_attempts")]
pub object_store_read_retry_attempts: usize,

// streaming_read_init + streaming_read
#[serde(
default = "default::object_store_config::object_store_streaming_read_attempt_timeout_ms"
)]
pub object_store_streaming_read_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_streaming_read_retry_attempts")]
pub object_store_streaming_read_retry_attempts: usize,

// metadata
#[serde(default = "default::object_store_config::object_store_metadata_attempt_timeout_ms")]
pub object_store_metadata_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_metadata_retry_attempts")]
pub object_store_metadata_retry_attempts: usize,

// delete
#[serde(default = "default::object_store_config::object_store_delete_attempt_timeout_ms")]
pub object_store_delete_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_delete_retry_attempts")]
pub object_store_delete_retry_attempts: usize,

// delete_object
#[serde(
default = "default::object_store_config::object_store_delete_objects_attempt_timeout_ms"
)]
pub object_store_delete_objects_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_delete_objects_retry_attempts")]
pub object_store_delete_objects_retry_attempts: usize,

// list
#[serde(default = "default::object_store_config::object_store_list_attempt_timeout_ms")]
pub object_store_list_attempt_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_list_retry_attempts")]
pub object_store_list_retry_attempts: usize,

#[serde(default)]
pub s3: S3ObjectStoreConfig,
}
Expand All @@ -1003,12 +1066,6 @@ pub struct S3ObjectStoreConfig {
pub object_store_send_buffer_size: Option<usize>,
#[serde(default = "default::object_store_config::s3::object_store_nodelay")]
pub object_store_nodelay: Option<bool>,
#[serde(default = "default::object_store_config::s3::object_store_req_retry_interval_ms")]
pub object_store_req_retry_interval_ms: u64,
#[serde(default = "default::object_store_config::s3::object_store_req_retry_max_delay_ms")]
pub object_store_req_retry_max_delay_ms: u64,
#[serde(default = "default::object_store_config::s3::object_store_req_retry_max_attempts")]
pub object_store_req_retry_max_attempts: usize,
/// For backwards compatibility, users should use `S3ObjectStoreDeveloperConfig` instead.
#[serde(
default = "default::object_store_config::s3::developer::object_store_retry_unknown_service_error"
Expand Down Expand Up @@ -1670,31 +1727,113 @@ pub mod default {
}

pub mod object_store_config {
const DEFAULT_RETRY_INTERVAL_MS: u64 = 20;
Li0k marked this conversation as resolved.
Show resolved Hide resolved
const DEFAULT_RETRY_MAX_DELAY_MS: u64 = 10 * 1000;
const DEFAULT_RETRY_MAX_RETRY_ATTEMPTS: usize = 3;
pub const UNLIMITED_MAX_TIMEOUT: u64 = u64::MAX;

/// DEPRECATED: This config will be deprecated in the future version, use `object_store_streaming_read_attempt_timeout_ms` instead.
pub fn object_store_streaming_read_timeout_ms() -> u64 {
Li0k marked this conversation as resolved.
Show resolved Hide resolved
8 * 60 * 1000
UNLIMITED_MAX_TIMEOUT
}

/// DEPRECATED: This config will be deprecated in the future version, use `object_store_streaming_upload_attempt_timeout_ms` instead.
pub fn object_store_streaming_upload_timeout_ms() -> u64 {
8 * 60 * 1000
UNLIMITED_MAX_TIMEOUT
}

/// DEPRECATED: This config will be deprecated in the future version, use `object_store_upload_attempt_timeout_ms` instead.
pub fn object_store_upload_timeout_ms() -> u64 {
8 * 60 * 1000
UNLIMITED_MAX_TIMEOUT
}

/// DEPRECATED: This config will be deprecated in the future version, use `object_store_read_attempt_timeout_ms` instead.
pub fn object_store_read_timeout_ms() -> u64 {
8 * 60 * 1000
UNLIMITED_MAX_TIMEOUT
}

pub fn object_store_set_atomic_write_dir() -> bool {
false
}

pub fn object_store_req_retry_interval_ms() -> u64 {
DEFAULT_RETRY_INTERVAL_MS
}

pub fn object_store_req_retry_max_delay_ms() -> u64 {
DEFAULT_RETRY_MAX_DELAY_MS // 10s
}

/// DEPRECATED: This config will be deprecated in the future version
pub fn object_store_req_retry_max_retry_attempts() -> usize {
DEFAULT_RETRY_MAX_RETRY_ATTEMPTS
}

pub fn object_store_upload_attempt_timeout_ms() -> u64 {
5 * 60 * 1000
}

pub fn object_store_upload_retry_attempts() -> usize {
DEFAULT_RETRY_MAX_RETRY_ATTEMPTS
}

pub fn object_store_streaming_upload_attempt_timeout_ms() -> u64 {
60 * 1000
}

pub fn object_store_streaming_upload_retry_attempts() -> usize {
DEFAULT_RETRY_MAX_RETRY_ATTEMPTS
}

pub fn object_store_read_attempt_timeout_ms() -> u64 {
60 * 1000
}

pub fn object_store_read_retry_attempts() -> usize {
DEFAULT_RETRY_MAX_RETRY_ATTEMPTS
}

pub fn object_store_streaming_read_attempt_timeout_ms() -> u64 {
2 * 60 * 1000
}

pub fn object_store_streaming_read_retry_attempts() -> usize {
DEFAULT_RETRY_MAX_RETRY_ATTEMPTS
}

pub fn object_store_metadata_attempt_timeout_ms() -> u64 {
5 * 60 * 1000
}

pub fn object_store_metadata_retry_attempts() -> usize {
DEFAULT_RETRY_MAX_RETRY_ATTEMPTS
}

pub fn object_store_delete_attempt_timeout_ms() -> u64 {
2 * 60 * 1000
}

pub fn object_store_delete_retry_attempts() -> usize {
DEFAULT_RETRY_MAX_RETRY_ATTEMPTS
}

pub fn object_store_delete_objects_attempt_timeout_ms() -> u64 {
10 * 60 * 1000
}

pub fn object_store_delete_objects_retry_attempts() -> usize {
DEFAULT_RETRY_MAX_RETRY_ATTEMPTS
}

pub fn object_store_list_attempt_timeout_ms() -> u64 {
10 * 60 * 1000
}

pub fn object_store_list_retry_attempts() -> usize {
DEFAULT_RETRY_MAX_RETRY_ATTEMPTS
}

pub mod s3 {
/// Retry config for compute node http timeout error.
const DEFAULT_RETRY_INTERVAL_MS: u64 = 20;
const DEFAULT_RETRY_MAX_DELAY_MS: u64 = 10 * 1000;
const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 8;
const DEFAULT_IDENTITY_RESOLUTION_TIMEOUT_S: u64 = 5;

const DEFAULT_KEEPALIVE_MS: u64 = 600 * 1000; // 10min
Expand All @@ -1715,18 +1854,6 @@ pub mod default {
Some(true)
}

pub fn object_store_req_retry_interval_ms() -> u64 {
DEFAULT_RETRY_INTERVAL_MS
}

pub fn object_store_req_retry_max_delay_ms() -> u64 {
DEFAULT_RETRY_MAX_DELAY_MS // 10s
}

pub fn object_store_req_retry_max_attempts() -> usize {
DEFAULT_RETRY_MAX_ATTEMPTS
}

pub fn identity_resolution_timeout_s() -> u64 {
DEFAULT_IDENTITY_RESOLUTION_TIMEOUT_S
}
Expand Down
30 changes: 23 additions & 7 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -183,19 +183,35 @@ recent_filter_layers = 6
recent_filter_rotate_interval_ms = 10000

[storage.object_store]
object_store_streaming_read_timeout_ms = 480000
object_store_streaming_upload_timeout_ms = 480000
object_store_upload_timeout_ms = 480000
object_store_read_timeout_ms = 480000
object_store_streaming_read_timeout_ms = 0
object_store_streaming_upload_timeout_ms = 0
object_store_upload_timeout_ms = 0
object_store_read_timeout_ms = 0
object_store_set_atomic_write_dir = false
object_store_req_retry_interval_ms = 20
object_store_req_retry_max_delay_ms = 10000
object_store_req_retry_max_retry_attempts = 3
object_store_upload_attempt_timeout_ms = 300000
object_store_upload_retry_attempts = 3
object_store_streaming_upload_attempt_timeout_ms = 60000
object_store_streaming_upload_retry_attempts = 3
object_store_read_attempt_timeout_ms = 60000
object_store_read_retry_attempts = 3
object_store_streaming_read_attempt_timeout_ms = 120000
object_store_streaming_read_retry_attempts = 3
object_store_metadata_attempt_timeout_ms = 300000
object_store_metadata_retry_attempts = 3
object_store_delete_attempt_timeout_ms = 120000
object_store_delete_retry_attempts = 3
object_store_delete_objects_attempt_timeout_ms = 600000
object_store_delete_objects_retry_attempts = 3
object_store_list_attempt_timeout_ms = 600000
object_store_list_retry_attempts = 3

[storage.object_store.s3]
object_store_keepalive_ms = 600000
object_store_recv_buffer_size = 2097152
object_store_nodelay = true
object_store_req_retry_interval_ms = 20
object_store_req_retry_max_delay_ms = 10000
object_store_req_retry_max_attempts = 8
retry_unknown_service_error = false
identity_resolution_timeout_s = 5

Expand Down
48 changes: 39 additions & 9 deletions src/object_store/src/object/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@ use tokio::sync::oneshot::error::RecvError;
#[derive(Error, Debug, thiserror_ext::Box, thiserror_ext::Construct)]
#[thiserror_ext(newtype(name = ObjectError, backtrace, report_debug))]
pub enum ObjectErrorInner {
#[error("s3 error: {0}")]
S3(#[source] BoxedError),
#[error("s3 error: {inner}")]
S3 {
// TODO: remove this after switch s3 backend to opendal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0.0 Why? Any infomation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can move to opendal::Error after switching to opendal

should_retry: bool,
#[source]
inner: BoxedError,
},
#[error("disk error: {msg}")]
Disk {
msg: String,
Expand All @@ -55,17 +60,20 @@ impl ObjectError {
/// Tells whether the error indicates the target object is not found.
pub fn is_object_not_found_error(&self) -> bool {
match self.inner() {
ObjectErrorInner::S3(e) => {
if let Some(aws_smithy_runtime_api::client::result::SdkError::ServiceError(err)) = e
.downcast_ref::<aws_smithy_runtime_api::client::result::SdkError<
ObjectErrorInner::S3 {
inner,
should_retry: _,
} => {
if let Some(aws_smithy_runtime_api::client::result::SdkError::ServiceError(err)) =
inner.downcast_ref::<aws_smithy_runtime_api::client::result::SdkError<
GetObjectError,
aws_smithy_runtime_api::http::Response<SdkBody>,
>>()
{
return matches!(err.err(), GetObjectError::NoSuchKey(_));
}
if let Some(aws_smithy_runtime_api::client::result::SdkError::ServiceError(err)) = e
.downcast_ref::<aws_smithy_runtime_api::client::result::SdkError<
if let Some(aws_smithy_runtime_api::client::result::SdkError::ServiceError(err)) =
inner.downcast_ref::<aws_smithy_runtime_api::client::result::SdkError<
HeadObjectError,
aws_smithy_runtime_api::http::Response<SdkBody>,
>>()
Expand All @@ -90,6 +98,20 @@ impl ObjectError {
};
false
}

pub fn should_retry(&self) -> bool {
// return true;
match self.inner() {
ObjectErrorInner::S3 {
inner: _,
should_retry,
} => *should_retry,

ObjectErrorInner::Opendal(e) => e.is_temporary(),

_ => false,
}
}
}

impl<E, R> From<aws_smithy_runtime_api::client::result::SdkError<E, R>> for ObjectError
Expand All @@ -98,7 +120,11 @@ where
R: Send + Sync + 'static + std::fmt::Debug,
{
fn from(e: aws_smithy_runtime_api::client::result::SdkError<E, R>) -> Self {
ObjectErrorInner::S3(e.into()).into()
ObjectErrorInner::S3 {
inner: e.into(),
should_retry: false,
}
.into()
}
}

Expand All @@ -110,7 +136,11 @@ impl From<RecvError> for ObjectError {

impl From<ByteStreamError> for ObjectError {
fn from(e: ByteStreamError) -> Self {
ObjectErrorInner::Internal(e.to_report_string()).into()
ObjectErrorInner::S3 {
inner: e.into(),
should_retry: true,
}
.into()
}
}

Expand Down
Loading
Loading