diff --git a/Cargo.lock b/Cargo.lock index 4e969a94fc134..a2a8941e5905a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7832,6 +7832,7 @@ dependencies = [ "aws-smithy-types", "bytes", "crc32fast", + "either", "fail", "futures", "hyper", diff --git a/e2e_test/sink/kafka/avro.slt b/e2e_test/sink/kafka/avro.slt index e1b09e3608e37..a30d8b70fd4ba 100644 --- a/e2e_test/sink/kafka/avro.slt +++ b/e2e_test/sink/kafka/avro.slt @@ -57,7 +57,7 @@ select timestamp_millis_field, date_field, time_micros_field, - time_millis_field from from_kafka; + time_millis_field from from_kafka order by string_field; ---- t Rising \x6130 3.5 4.25 22 23 NULL {{NULL,3},NULL,{7,NULL,2}} 2006-01-02 22:04:05+00:00 NULL NULL 12:34:56.123456 NULL f Wave \x5a4446 1.5 NULL 11 12 (NULL,foo) NULL NULL 2006-01-02 22:04:05+00:00 2021-04-01 NULL 23:45:16.654 diff --git a/e2e_test/sink/kafka/protobuf.slt b/e2e_test/sink/kafka/protobuf.slt index 2f827aeda9fc0..c6ccb2ac24416 100644 --- a/e2e_test/sink/kafka/protobuf.slt +++ b/e2e_test/sink/kafka/protobuf.slt @@ -61,7 +61,7 @@ select nested_message_field, repeated_int_field, timestamp_field, - oneof_int32 from from_kafka; + oneof_int32 from from_kafka order by string_field; ---- t Rising \x6130 3.5 4.25 22 23 24 0 26 27 (1,) {4,0,4} (1136239445,0) 42 f Wave \x5a4446 1.5 0 11 12 13 14 15 16 (4,foo) {} (0,0) 0 diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml index 88f73ca1d3c7d..f117c272a9afc 100644 --- a/src/object_store/Cargo.toml +++ b/src/object_store/Cargo.toml @@ -18,6 +18,7 @@ aws-smithy-http = { workspace = true } aws-smithy-types = { workspace = true } bytes = { version = "1", features = ["serde"] } crc32fast = "1.3.2" +either = "1" fail = "0.5" futures = { version = "0.3", default-features = false, features = ["alloc"] } hyper = "0.14" diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 69e7f3687fdeb..89f9aa5a053d5 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -24,7 +24,7 @@ use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder; use aws_sdk_s3::operation::get_object::GetObjectError; use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error; use aws_sdk_s3::operation::upload_part::UploadPartOutput; -use aws_sdk_s3::primitives::ByteStream; +use aws_sdk_s3::primitives::{ByteStream, ByteStreamError}; use aws_sdk_s3::types::{ AbortIncompleteMultipartUpload, BucketLifecycleConfiguration, CompletedMultipartUpload, CompletedPart, Delete, ExpirationStatus, LifecycleRule, LifecycleRuleFilter, ObjectIdentifier, @@ -34,6 +34,7 @@ use aws_smithy_client::http_connector::{ConnectorSettings, HttpConnector}; use aws_smithy_http::body::SdkBody; use aws_smithy_http::result::SdkError; use aws_smithy_types::retry::RetryConfig; +use either::Either; use fail::fail_point; use futures::future::{try_join_all, BoxFuture, FutureExt}; use futures::{stream, Stream}; @@ -354,11 +355,19 @@ impl ObjectStore for S3ObjectStore { ))); // retry if occurs AWS EC2 HTTP timeout error. - let resp = tokio_retry::RetryIf::spawn( + let val = tokio_retry::RetryIf::spawn( self.config.get_retry_strategy(), || async { match self.obj_store_request(path, range.clone()).send().await { - Ok(resp) => Ok(resp), + Ok(resp) => { + let val = resp + .body + .collect() + .await + .map_err(either::Right)? + .into_bytes(); + Ok(val) + } Err(err) => { if let SdkError::DispatchFailure(e) = &err && e.is_timeout() @@ -369,7 +378,7 @@ impl ObjectStore for S3ObjectStore { .inc(); } - Err(err) + Err(either::Left(err)) } } }, @@ -377,8 +386,6 @@ impl ObjectStore for S3ObjectStore { ) .await?; - let val = resp.body.collect().await?.into_bytes(); - if let Some(len) = range.len() && len != val.len() { return Err(ObjectError::internal(format!( "mismatched size: expected {}, found {} when reading {} at {:?}", @@ -445,7 +452,7 @@ impl ObjectStore for S3ObjectStore { .inc(); } - Err(err) + Err(either::Left(err)) } } }, @@ -768,13 +775,23 @@ impl S3ObjectStore { } #[inline(always)] - fn should_retry(err: &SdkError) -> bool { - if let SdkError::DispatchFailure(e) = err { - if e.is_timeout() { - tracing::warn!(target: "http_timeout_retry", "{:?} occurs, trying to retry S3 get_object request.", e); + fn should_retry(err: &Either, ByteStreamError>) -> bool { + match err { + Either::Left(err) => { + if let SdkError::DispatchFailure(e) = err { + if e.is_timeout() { + tracing::warn!(target: "http_timeout_retry", "{:?} occurs, trying to retry S3 get_object request.", e); + return true; + } + } + } + Either::Right(_) => { + // Unfortunately `ErrorKind` of `ByteStreamError` is not accessible. + // Always returns true and relies on req_retry_max_attempts to avoid infinite loop. return true; } } + false } } @@ -914,6 +931,15 @@ impl Stream for S3ObjectIter { } } +impl From, ByteStreamError>> for ObjectError { + fn from(e: Either, ByteStreamError>) -> Self { + match e { + Either::Left(e) => e.into(), + Either::Right(e) => e.into(), + } + } +} + #[cfg(test)] #[cfg(not(madsim))] mod tests {