Skip to content

Commit

Permalink
feat(s3): retry unhandled 503 error (#14562)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Jan 17, 2024
1 parent 217ff0b commit ec95fe3
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 43 deletions.
7 changes: 7 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,9 @@ pub struct S3ObjectStoreConfig {
pub object_store_req_retry_max_delay_ms: u64,
#[serde(default = "default::object_store_config::s3::object_store_req_retry_max_attempts")]
pub object_store_req_retry_max_attempts: usize,
/// Whether to retry s3 sdk error from which no error metadata is provided.
#[serde(default = "default::object_store_config::s3::retry_unknown_service_error")]
pub retry_unknown_service_error: bool,
}

impl SystemConfig {
Expand Down Expand Up @@ -1548,6 +1551,10 @@ pub mod default {
pub fn object_store_req_retry_max_attempts() -> usize {
DEFAULT_RETRY_MAX_ATTEMPTS
}

pub fn retry_unknown_service_error() -> bool {
false
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ object_store_nodelay = true
object_store_req_retry_interval_ms = 20
object_store_req_retry_max_delay_ms = 10000
object_store_req_retry_max_attempts = 8
retry_unknown_service_error = false

[system]
barrier_interval_ms = 1000
Expand Down
96 changes: 53 additions & 43 deletions src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder;
use aws_smithy_runtime_api::client::http::HttpClient;
use aws_smithy_runtime_api::client::result::SdkError;
use aws_smithy_types::body::SdkBody;
use aws_smithy_types::error::metadata::ProvideErrorMetadata;
use aws_smithy_types::retry::RetryConfig;
use either::Either;
use fail::fail_point;
use futures::future::{try_join_all, BoxFuture, FutureExt};
use futures::{stream, Stream, StreamExt, TryStreamExt};
use hyper::Body;
use itertools::Itertools;
use risingwave_common::config::ObjectStoreConfig;
use risingwave_common::config::{ObjectStoreConfig, S3ObjectStoreConfig};
use risingwave_common::monitor::connection::monitor_connector;
use risingwave_common::range::RangeBoundsExt;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -384,7 +385,7 @@ impl ObjectStore for S3ObjectStore {
}
}
},
Self::should_retry,
RetryCondition::new(&self.config.s3),
)
.await?;

Expand Down Expand Up @@ -456,7 +457,7 @@ impl ObjectStore for S3ObjectStore {
}
}
},
Self::should_retry,
RetryCondition::new(&self.config.s3),
)
.await?;
let reader = FuturesStreamCompatByteStream::new(resp.body);
Expand Down Expand Up @@ -813,32 +814,6 @@ impl S3ObjectStore {
is_expiration_configured
}

#[inline(always)]
fn should_retry(
err: &Either<
SdkError<GetObjectError, aws_smithy_runtime_api::http::Response<SdkBody>>,
ByteStreamError,
>,
) -> bool {
match err {
Either::Left(err) => {
if let SdkError::DispatchFailure(e) = err {
if e.is_timeout() {
tracing::warn!(target: "http_timeout_retry", "{:?} occurs, trying to retry S3 get_object request.", e);
return true;
}
}
}
Either::Right(_) => {
// Unfortunately `ErrorKind` of `ByteStreamError` is not accessible.
// Always returns true and relies on req_retry_max_attempts to avoid infinite loop.
return true;
}
}

false
}

#[inline(always)]
fn get_retry_strategy(&self) -> impl Iterator<Item = Duration> {
ExponentialBackoff::from_millis(self.config.s3.object_store_req_retry_interval_ms)
Expand All @@ -849,6 +824,7 @@ impl S3ObjectStore {
.map(jitter)
}
}

struct S3ObjectIter {
buffer: VecDeque<ObjectMetadata>,
client: Client,
Expand Down Expand Up @@ -942,27 +918,61 @@ impl Stream for S3ObjectIter {
}
}

impl
From<
Either<
SdkError<GetObjectError, aws_smithy_runtime_api::http::Response<SdkBody>>,
ByteStreamError,
>,
> for ObjectError
{
fn from(
e: Either<
SdkError<GetObjectError, aws_smithy_runtime_api::http::Response<SdkBody>>,
ByteStreamError,
>,
) -> Self {
type RetryError = Either<
SdkError<GetObjectError, aws_smithy_runtime_api::http::Response<SdkBody>>,
ByteStreamError,
>;

impl From<RetryError> for ObjectError {
fn from(e: RetryError) -> Self {
match e {
Either::Left(e) => e.into(),
Either::Right(e) => e.into(),
}
}
}

struct RetryCondition {
retry_unknown_service_error: bool,
}

impl RetryCondition {
fn new(config: &S3ObjectStoreConfig) -> Self {
Self {
retry_unknown_service_error: config.retry_unknown_service_error,
}
}
}

impl tokio_retry::Condition<RetryError> for RetryCondition {
fn should_retry(&mut self, err: &RetryError) -> bool {
match err {
Either::Left(err) => match err {
SdkError::DispatchFailure(e) => {
if e.is_timeout() {
tracing::warn!(target: "http_timeout_retry", "{e:?} occurs, retry S3 get_object request.");
return true;
}
}
SdkError::ServiceError(e) => {
if self.retry_unknown_service_error && e.err().code().is_none() {
tracing::warn!(target: "unknown_service_error", "{e:?} occurs, retry S3 get_object request.");
return true;
}
}
_ => {}
},
Either::Right(_) => {
// Unfortunately `ErrorKind` of `ByteStreamError` is not accessible.
// Always returns true and relies on req_retry_max_attempts to avoid infinite loop.
return true;
}
}

false
}
}

#[cfg(test)]
#[cfg(not(madsim))]
mod tests {
Expand Down

0 comments on commit ec95fe3

Please sign in to comment.