Skip to content

Commit

Permalink
fix: cherry-pick #13078
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Oct 30, 2023
1 parent d7637cf commit 2f9eb97
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ aws-smithy-http = { workspace = true }
aws-smithy-types = { workspace = true }
bytes = { version = "1", features = ["serde"] }
crc32fast = "1.3.2"
either = "1"
fail = "0.5"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
hyper = "0.14"
Expand Down
48 changes: 37 additions & 11 deletions src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder;
use aws_sdk_s3::operation::get_object::GetObjectError;
use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error;
use aws_sdk_s3::operation::upload_part::UploadPartOutput;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::primitives::{ByteStream, ByteStreamError};
use aws_sdk_s3::types::{
AbortIncompleteMultipartUpload, BucketLifecycleConfiguration, CompletedMultipartUpload,
CompletedPart, Delete, ExpirationStatus, LifecycleRule, LifecycleRuleFilter, ObjectIdentifier,
Expand All @@ -34,6 +34,7 @@ use aws_smithy_client::http_connector::{ConnectorSettings, HttpConnector};
use aws_smithy_http::body::SdkBody;
use aws_smithy_http::result::SdkError;
use aws_smithy_types::retry::RetryConfig;
use either::Either;
use fail::fail_point;
use futures::future::{try_join_all, BoxFuture, FutureExt};
use futures::{stream, Stream};
Expand Down Expand Up @@ -354,11 +355,19 @@ impl ObjectStore for S3ObjectStore {
)));

// retry if occurs AWS EC2 HTTP timeout error.
let resp = tokio_retry::RetryIf::spawn(
let val = tokio_retry::RetryIf::spawn(
self.config.get_retry_strategy(),
|| async {
match self.obj_store_request(path, range.clone()).send().await {
Ok(resp) => Ok(resp),
Ok(resp) => {
let val = resp
.body
.collect()
.await
.map_err(either::Right)?
.into_bytes();
Ok(val)
}
Err(err) => {
if let SdkError::DispatchFailure(e) = &err
&& e.is_timeout()
Expand All @@ -369,16 +378,14 @@ impl ObjectStore for S3ObjectStore {
.inc();
}

Err(err)
Err(either::Left(err))
}
}
},
Self::should_retry,
)
.await?;

let val = resp.body.collect().await?.into_bytes();

if let Some(len) = range.len() && len != val.len() {
return Err(ObjectError::internal(format!(
"mismatched size: expected {}, found {} when reading {} at {:?}",
Expand Down Expand Up @@ -445,7 +452,7 @@ impl ObjectStore for S3ObjectStore {
.inc();
}

Err(err)
Err(either::Left(err))
}
}
},
Expand Down Expand Up @@ -758,13 +765,23 @@ impl S3ObjectStore {
}

#[inline(always)]
fn should_retry(err: &SdkError<GetObjectError>) -> bool {
if let SdkError::DispatchFailure(e) = err {
if e.is_timeout() {
tracing::warn!(target: "http_timeout_retry", "{:?} occurs, trying to retry S3 get_object request.", e);
fn should_retry(err: &Either<SdkError<GetObjectError>, ByteStreamError>) -> bool {
match err {
Either::Left(err) => {
if let SdkError::DispatchFailure(e) = err {
if e.is_timeout() {
tracing::warn!(target: "http_timeout_retry", "{:?} occurs, trying to retry S3 get_object request.", e);
return true;
}
}
}
Either::Right(_) => {
// Unfortunately `ErrorKind` of `ByteStreamError` is not accessible.
// Always returns true and relies on req_retry_max_attempts to avoid infinite loop.
return true;
}
}

false
}
}
Expand Down Expand Up @@ -904,6 +921,15 @@ impl Stream for S3ObjectIter {
}
}

impl From<Either<SdkError<GetObjectError>, ByteStreamError>> for ObjectError {
fn from(e: Either<SdkError<GetObjectError>, ByteStreamError>) -> Self {
match e {
Either::Left(e) => e.into(),
Either::Right(e) => e.into(),
}
}
}

#[cfg(test)]
#[cfg(not(madsim))]
mod tests {
Expand Down

0 comments on commit 2f9eb97

Please sign in to comment.