diff --git a/Cargo.lock b/Cargo.lock index bb460691a319c..d345d96724938 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7699,6 +7699,7 @@ dependencies = [ "aws-smithy-types", "bytes", "crc32fast", + "either", "fail", "futures", "hyper", diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index 71cac6e917dc3..fe6e9601f902d 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -162,7 +162,10 @@ impl SourceExecutor { for chunk in stream { match chunk { Ok(chunk) => { - yield covert_stream_chunk_to_batch_chunk(chunk.chunk)?; + let data_chunk = covert_stream_chunk_to_batch_chunk(chunk.chunk)?; + if data_chunk.capacity() > 0 { + yield data_chunk; + } } Err(e) => { return Err(e); diff --git a/src/common/src/util/chunk_coalesce.rs b/src/common/src/util/chunk_coalesce.rs index 9a41fc83e8f0e..3bd56b19e434d 100644 --- a/src/common/src/util/chunk_coalesce.rs +++ b/src/common/src/util/chunk_coalesce.rs @@ -285,7 +285,12 @@ impl SlicedDataChunk { } pub fn with_offset_checked(data_chunk: DataChunk, offset: usize) -> Self { - assert!(offset < data_chunk.capacity()); + assert!( + offset < data_chunk.capacity(), + "offset {}, data_chunk capacity {}", + offset, + data_chunk.capacity() + ); Self { data_chunk, offset } } diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml index 88f73ca1d3c7d..f117c272a9afc 100644 --- a/src/object_store/Cargo.toml +++ b/src/object_store/Cargo.toml @@ -18,6 +18,7 @@ aws-smithy-http = { workspace = true } aws-smithy-types = { workspace = true } bytes = { version = "1", features = ["serde"] } crc32fast = "1.3.2" +either = "1" fail = "0.5" futures = { version = "0.3", default-features = false, features = ["alloc"] } hyper = "0.14" diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index a53f1a6825281..22aaa4301af26 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -24,7 +24,7 @@ use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder; use aws_sdk_s3::operation::get_object::GetObjectError; use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error; use aws_sdk_s3::operation::upload_part::UploadPartOutput; -use aws_sdk_s3::primitives::ByteStream; +use aws_sdk_s3::primitives::{ByteStream, ByteStreamError}; use aws_sdk_s3::types::{ AbortIncompleteMultipartUpload, BucketLifecycleConfiguration, CompletedMultipartUpload, CompletedPart, Delete, ExpirationStatus, LifecycleRule, LifecycleRuleFilter, ObjectIdentifier, @@ -34,6 +34,7 @@ use aws_smithy_client::http_connector::{ConnectorSettings, HttpConnector}; use aws_smithy_http::body::SdkBody; use aws_smithy_http::result::SdkError; use aws_smithy_types::retry::RetryConfig; +use either::Either; use fail::fail_point; use futures::future::{try_join_all, BoxFuture, FutureExt}; use futures::{stream, Stream}; @@ -354,11 +355,19 @@ impl ObjectStore for S3ObjectStore { ))); // retry if occurs AWS EC2 HTTP timeout error. - let resp = tokio_retry::RetryIf::spawn( + let val = tokio_retry::RetryIf::spawn( self.config.get_retry_strategy(), || async { match self.obj_store_request(path, range.clone()).send().await { - Ok(resp) => Ok(resp), + Ok(resp) => { + let val = resp + .body + .collect() + .await + .map_err(either::Right)? + .into_bytes(); + Ok(val) + } Err(err) => { if let SdkError::DispatchFailure(e) = &err && e.is_timeout() @@ -369,7 +378,7 @@ impl ObjectStore for S3ObjectStore { .inc(); } - Err(err) + Err(either::Left(err)) } } }, @@ -377,8 +386,6 @@ impl ObjectStore for S3ObjectStore { ) .await?; - let val = resp.body.collect().await?.into_bytes(); - if let Some(len) = range.len() && len != val.len() { return Err(ObjectError::internal(format!( "mismatched size: expected {}, found {} when reading {} at {:?}", @@ -445,7 +452,7 @@ impl ObjectStore for S3ObjectStore { .inc(); } - Err(err) + Err(either::Left(err)) } } }, @@ -758,13 +765,23 @@ impl S3ObjectStore { } #[inline(always)] - fn should_retry(err: &SdkError) -> bool { - if let SdkError::DispatchFailure(e) = err { - if e.is_timeout() { - tracing::warn!(target: "http_timeout_retry", "{:?} occurs, trying to retry S3 get_object request.", e); + fn should_retry(err: &Either, ByteStreamError>) -> bool { + match err { + Either::Left(err) => { + if let SdkError::DispatchFailure(e) = err { + if e.is_timeout() { + tracing::warn!(target: "http_timeout_retry", "{:?} occurs, trying to retry S3 get_object request.", e); + return true; + } + } + } + Either::Right(_) => { + // Unfortunately `ErrorKind` of `ByteStreamError` is not accessible. + // Always returns true and relies on req_retry_max_attempts to avoid infinite loop. return true; } } + false } } @@ -904,6 +921,15 @@ impl Stream for S3ObjectIter { } } +impl From, ByteStreamError>> for ObjectError { + fn from(e: Either, ByteStreamError>) -> Self { + match e { + Either::Left(e) => e.into(), + Either::Right(e) => e.into(), + } + } +} + #[cfg(test)] #[cfg(not(madsim))] mod tests {