diff --git a/Cargo.lock b/Cargo.lock index bb460691a319..d345d9672493 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7699,6 +7699,7 @@ dependencies = [ "aws-smithy-types", "bytes", "crc32fast", + "either", "fail", "futures", "hyper", diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml index 88f73ca1d3c7..f117c272a9af 100644 --- a/src/object_store/Cargo.toml +++ b/src/object_store/Cargo.toml @@ -18,6 +18,7 @@ aws-smithy-http = { workspace = true } aws-smithy-types = { workspace = true } bytes = { version = "1", features = ["serde"] } crc32fast = "1.3.2" +either = "1" fail = "0.5" futures = { version = "0.3", default-features = false, features = ["alloc"] } hyper = "0.14" diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index a53f1a682528..22aaa4301af2 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -24,7 +24,7 @@ use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder; use aws_sdk_s3::operation::get_object::GetObjectError; use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error; use aws_sdk_s3::operation::upload_part::UploadPartOutput; -use aws_sdk_s3::primitives::ByteStream; +use aws_sdk_s3::primitives::{ByteStream, ByteStreamError}; use aws_sdk_s3::types::{ AbortIncompleteMultipartUpload, BucketLifecycleConfiguration, CompletedMultipartUpload, CompletedPart, Delete, ExpirationStatus, LifecycleRule, LifecycleRuleFilter, ObjectIdentifier, @@ -34,6 +34,7 @@ use aws_smithy_client::http_connector::{ConnectorSettings, HttpConnector}; use aws_smithy_http::body::SdkBody; use aws_smithy_http::result::SdkError; use aws_smithy_types::retry::RetryConfig; +use either::Either; use fail::fail_point; use futures::future::{try_join_all, BoxFuture, FutureExt}; use futures::{stream, Stream}; @@ -354,11 +355,19 @@ impl ObjectStore for S3ObjectStore { ))); // retry if occurs AWS EC2 HTTP timeout error. - let resp = tokio_retry::RetryIf::spawn( + let val = tokio_retry::RetryIf::spawn( self.config.get_retry_strategy(), || async { match self.obj_store_request(path, range.clone()).send().await { - Ok(resp) => Ok(resp), + Ok(resp) => { + let val = resp + .body + .collect() + .await + .map_err(either::Right)? + .into_bytes(); + Ok(val) + } Err(err) => { if let SdkError::DispatchFailure(e) = &err && e.is_timeout() @@ -369,7 +378,7 @@ impl ObjectStore for S3ObjectStore { .inc(); } - Err(err) + Err(either::Left(err)) } } }, @@ -377,8 +386,6 @@ impl ObjectStore for S3ObjectStore { ) .await?; - let val = resp.body.collect().await?.into_bytes(); - if let Some(len) = range.len() && len != val.len() { return Err(ObjectError::internal(format!( "mismatched size: expected {}, found {} when reading {} at {:?}", @@ -445,7 +452,7 @@ impl ObjectStore for S3ObjectStore { .inc(); } - Err(err) + Err(either::Left(err)) } } }, @@ -758,13 +765,23 @@ impl S3ObjectStore { } #[inline(always)] - fn should_retry(err: &SdkError) -> bool { - if let SdkError::DispatchFailure(e) = err { - if e.is_timeout() { - tracing::warn!(target: "http_timeout_retry", "{:?} occurs, trying to retry S3 get_object request.", e); + fn should_retry(err: &Either, ByteStreamError>) -> bool { + match err { + Either::Left(err) => { + if let SdkError::DispatchFailure(e) = err { + if e.is_timeout() { + tracing::warn!(target: "http_timeout_retry", "{:?} occurs, trying to retry S3 get_object request.", e); + return true; + } + } + } + Either::Right(_) => { + // Unfortunately `ErrorKind` of `ByteStreamError` is not accessible. + // Always returns true and relies on req_retry_max_attempts to avoid infinite loop. return true; } } + false } } @@ -904,6 +921,15 @@ impl Stream for S3ObjectIter { } } +impl From, ByteStreamError>> for ObjectError { + fn from(e: Either, ByteStreamError>) -> Self { + match e { + Either::Left(e) => e.into(), + Either::Right(e) => e.into(), + } + } +} + #[cfg(test)] #[cfg(not(madsim))] mod tests {