diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 49b0453334c17..d98276341d075 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -946,6 +946,9 @@ pub struct S3ObjectStoreConfig { pub object_store_req_retry_max_delay_ms: u64, #[serde(default = "default::object_store_config::s3::object_store_req_retry_max_attempts")] pub object_store_req_retry_max_attempts: usize, + /// Whether to retry s3 sdk error from which no error metadata is provided. + #[serde(default = "default::object_store_config::s3::retry_unknown_service_error")] + pub retry_unknown_service_error: bool, } impl SystemConfig { @@ -1548,6 +1551,10 @@ pub mod default { pub fn object_store_req_retry_max_attempts() -> usize { DEFAULT_RETRY_MAX_ATTEMPTS } + + pub fn retry_unknown_service_error() -> bool { + false + } } } } diff --git a/src/config/example.toml b/src/config/example.toml index 21d13f81fbdcd..a24393ecae13d 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -177,6 +177,7 @@ object_store_nodelay = true object_store_req_retry_interval_ms = 20 object_store_req_retry_max_delay_ms = 10000 object_store_req_retry_max_attempts = 8 +retry_unknown_service_error = false [system] barrier_interval_ms = 1000 diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 7ec004c786481..5c6f056d9e535 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -36,6 +36,7 @@ use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder; use aws_smithy_runtime_api::client::http::HttpClient; use aws_smithy_runtime_api::client::result::SdkError; use aws_smithy_types::body::SdkBody; +use aws_smithy_types::error::metadata::ProvideErrorMetadata; use aws_smithy_types::retry::RetryConfig; use either::Either; use fail::fail_point; @@ -43,7 +44,7 @@ use futures::future::{try_join_all, BoxFuture, FutureExt}; use futures::{stream, Stream, StreamExt, TryStreamExt}; use hyper::Body; use itertools::Itertools; -use risingwave_common::config::ObjectStoreConfig; +use risingwave_common::config::{ObjectStoreConfig, S3ObjectStoreConfig}; use risingwave_common::monitor::connection::monitor_connector; use risingwave_common::range::RangeBoundsExt; use tokio::task::JoinHandle; @@ -384,7 +385,7 @@ impl ObjectStore for S3ObjectStore { } } }, - Self::should_retry, + RetryCondition::new(&self.config.s3), ) .await?; @@ -456,7 +457,7 @@ impl ObjectStore for S3ObjectStore { } } }, - Self::should_retry, + RetryCondition::new(&self.config.s3), ) .await?; let reader = FuturesStreamCompatByteStream::new(resp.body); @@ -813,32 +814,6 @@ impl S3ObjectStore { is_expiration_configured } - #[inline(always)] - fn should_retry( - err: &Either< - SdkError>, - ByteStreamError, - >, - ) -> bool { - match err { - Either::Left(err) => { - if let SdkError::DispatchFailure(e) = err { - if e.is_timeout() { - tracing::warn!(target: "http_timeout_retry", "{:?} occurs, trying to retry S3 get_object request.", e); - return true; - } - } - } - Either::Right(_) => { - // Unfortunately `ErrorKind` of `ByteStreamError` is not accessible. - // Always returns true and relies on req_retry_max_attempts to avoid infinite loop. - return true; - } - } - - false - } - #[inline(always)] fn get_retry_strategy(&self) -> impl Iterator { ExponentialBackoff::from_millis(self.config.s3.object_store_req_retry_interval_ms) @@ -849,6 +824,7 @@ impl S3ObjectStore { .map(jitter) } } + struct S3ObjectIter { buffer: VecDeque, client: Client, @@ -942,20 +918,13 @@ impl Stream for S3ObjectIter { } } -impl - From< - Either< - SdkError>, - ByteStreamError, - >, - > for ObjectError -{ - fn from( - e: Either< - SdkError>, - ByteStreamError, - >, - ) -> Self { +type RetryError = Either< + SdkError>, + ByteStreamError, +>; + +impl From for ObjectError { + fn from(e: RetryError) -> Self { match e { Either::Left(e) => e.into(), Either::Right(e) => e.into(), @@ -963,6 +932,47 @@ impl } } +struct RetryCondition { + retry_unknown_service_error: bool, +} + +impl RetryCondition { + fn new(config: &S3ObjectStoreConfig) -> Self { + Self { + retry_unknown_service_error: config.retry_unknown_service_error, + } + } +} + +impl tokio_retry::Condition for RetryCondition { + fn should_retry(&mut self, err: &RetryError) -> bool { + match err { + Either::Left(err) => match err { + SdkError::DispatchFailure(e) => { + if e.is_timeout() { + tracing::warn!(target: "http_timeout_retry", "{e:?} occurs, retry S3 get_object request."); + return true; + } + } + SdkError::ServiceError(e) => { + if self.retry_unknown_service_error && e.err().code().is_none() { + tracing::warn!(target: "unknown_service_error", "{e:?} occurs, retry S3 get_object request."); + return true; + } + } + _ => {} + }, + Either::Right(_) => { + // Unfortunately `ErrorKind` of `ByteStreamError` is not accessible. + // Always returns true and relies on req_retry_max_attempts to avoid infinite loop. + return true; + } + } + + false + } +} + #[cfg(test)] #[cfg(not(madsim))] mod tests {