Skip to content

Commit

Permalink
fix(object_store): fix a corner case with retry (#13078)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Oct 26, 2023
1 parent bb100a7 commit 8eedf8d
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 13 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion e2e_test/sink/kafka/avro.slt
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ select
timestamp_millis_field,
date_field,
time_micros_field,
time_millis_field from from_kafka;
time_millis_field from from_kafka order by string_field;
----
t Rising \x6130 3.5 4.25 22 23 NULL {{NULL,3},NULL,{7,NULL,2}} 2006-01-02 22:04:05+00:00 NULL NULL 12:34:56.123456 NULL
f Wave \x5a4446 1.5 NULL 11 12 (NULL,foo) NULL NULL 2006-01-02 22:04:05+00:00 2021-04-01 NULL 23:45:16.654
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/sink/kafka/protobuf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ select
nested_message_field,
repeated_int_field,
timestamp_field,
oneof_int32 from from_kafka;
oneof_int32 from from_kafka order by string_field;
----
t Rising \x6130 3.5 4.25 22 23 24 0 26 27 (1,) {4,0,4} (1136239445,0) 42
f Wave \x5a4446 1.5 0 11 12 13 14 15 16 (4,foo) {} (0,0) 0
Expand Down
1 change: 1 addition & 0 deletions src/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ aws-smithy-http = { workspace = true }
aws-smithy-types = { workspace = true }
bytes = { version = "1", features = ["serde"] }
crc32fast = "1.3.2"
either = "1"
fail = "0.5"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
hyper = "0.14"
Expand Down
48 changes: 37 additions & 11 deletions src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder;
use aws_sdk_s3::operation::get_object::GetObjectError;
use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error;
use aws_sdk_s3::operation::upload_part::UploadPartOutput;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::primitives::{ByteStream, ByteStreamError};
use aws_sdk_s3::types::{
AbortIncompleteMultipartUpload, BucketLifecycleConfiguration, CompletedMultipartUpload,
CompletedPart, Delete, ExpirationStatus, LifecycleRule, LifecycleRuleFilter, ObjectIdentifier,
Expand All @@ -34,6 +34,7 @@ use aws_smithy_client::http_connector::{ConnectorSettings, HttpConnector};
use aws_smithy_http::body::SdkBody;
use aws_smithy_http::result::SdkError;
use aws_smithy_types::retry::RetryConfig;
use either::Either;
use fail::fail_point;
use futures::future::{try_join_all, BoxFuture, FutureExt};
use futures::{stream, Stream};
Expand Down Expand Up @@ -354,11 +355,19 @@ impl ObjectStore for S3ObjectStore {
)));

// retry if occurs AWS EC2 HTTP timeout error.
let resp = tokio_retry::RetryIf::spawn(
let val = tokio_retry::RetryIf::spawn(
self.config.get_retry_strategy(),
|| async {
match self.obj_store_request(path, range.clone()).send().await {
Ok(resp) => Ok(resp),
Ok(resp) => {
let val = resp
.body
.collect()
.await
.map_err(either::Right)?
.into_bytes();
Ok(val)
}
Err(err) => {
if let SdkError::DispatchFailure(e) = &err
&& e.is_timeout()
Expand All @@ -369,16 +378,14 @@ impl ObjectStore for S3ObjectStore {
.inc();
}

Err(err)
Err(either::Left(err))
}
}
},
Self::should_retry,
)
.await?;

let val = resp.body.collect().await?.into_bytes();

if let Some(len) = range.len() && len != val.len() {
return Err(ObjectError::internal(format!(
"mismatched size: expected {}, found {} when reading {} at {:?}",
Expand Down Expand Up @@ -445,7 +452,7 @@ impl ObjectStore for S3ObjectStore {
.inc();
}

Err(err)
Err(either::Left(err))
}
}
},
Expand Down Expand Up @@ -768,13 +775,23 @@ impl S3ObjectStore {
}

#[inline(always)]
fn should_retry(err: &SdkError<GetObjectError>) -> bool {
if let SdkError::DispatchFailure(e) = err {
if e.is_timeout() {
tracing::warn!(target: "http_timeout_retry", "{:?} occurs, trying to retry S3 get_object request.", e);
fn should_retry(err: &Either<SdkError<GetObjectError>, ByteStreamError>) -> bool {
match err {
Either::Left(err) => {
if let SdkError::DispatchFailure(e) = err {
if e.is_timeout() {
tracing::warn!(target: "http_timeout_retry", "{:?} occurs, trying to retry S3 get_object request.", e);
return true;
}
}
}
Either::Right(_) => {
// Unfortunately `ErrorKind` of `ByteStreamError` is not accessible.
// Always returns true and relies on req_retry_max_attempts to avoid infinite loop.
return true;
}
}

false
}
}
Expand Down Expand Up @@ -914,6 +931,15 @@ impl Stream for S3ObjectIter {
}
}

impl From<Either<SdkError<GetObjectError>, ByteStreamError>> for ObjectError {
fn from(e: Either<SdkError<GetObjectError>, ByteStreamError>) -> Self {
match e {
Either::Left(e) => e.into(),
Either::Right(e) => e.into(),
}
}
}

#[cfg(test)]
#[cfg(not(madsim))]
mod tests {
Expand Down

0 comments on commit 8eedf8d

Please sign in to comment.