Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: cherry-pick #13078 and #13046 #13123

Merged
merged 2 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ impl SourceExecutor {
for chunk in stream {
match chunk {
Ok(chunk) => {
yield covert_stream_chunk_to_batch_chunk(chunk.chunk)?;
let data_chunk = covert_stream_chunk_to_batch_chunk(chunk.chunk)?;
if data_chunk.capacity() > 0 {
yield data_chunk;
}
}
Err(e) => {
return Err(e);
Expand Down
7 changes: 6 additions & 1 deletion src/common/src/util/chunk_coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,12 @@ impl SlicedDataChunk {
}

pub fn with_offset_checked(data_chunk: DataChunk, offset: usize) -> Self {
assert!(offset < data_chunk.capacity());
assert!(
offset < data_chunk.capacity(),
"offset {}, data_chunk capacity {}",
offset,
data_chunk.capacity()
);
Self { data_chunk, offset }
}

Expand Down
1 change: 1 addition & 0 deletions src/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ aws-smithy-http = { workspace = true }
aws-smithy-types = { workspace = true }
bytes = { version = "1", features = ["serde"] }
crc32fast = "1.3.2"
either = "1"
fail = "0.5"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
hyper = "0.14"
Expand Down
48 changes: 37 additions & 11 deletions src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder;
use aws_sdk_s3::operation::get_object::GetObjectError;
use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error;
use aws_sdk_s3::operation::upload_part::UploadPartOutput;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::primitives::{ByteStream, ByteStreamError};
use aws_sdk_s3::types::{
AbortIncompleteMultipartUpload, BucketLifecycleConfiguration, CompletedMultipartUpload,
CompletedPart, Delete, ExpirationStatus, LifecycleRule, LifecycleRuleFilter, ObjectIdentifier,
Expand All @@ -34,6 +34,7 @@ use aws_smithy_client::http_connector::{ConnectorSettings, HttpConnector};
use aws_smithy_http::body::SdkBody;
use aws_smithy_http::result::SdkError;
use aws_smithy_types::retry::RetryConfig;
use either::Either;
use fail::fail_point;
use futures::future::{try_join_all, BoxFuture, FutureExt};
use futures::{stream, Stream};
Expand Down Expand Up @@ -354,11 +355,19 @@ impl ObjectStore for S3ObjectStore {
)));

// retry if occurs AWS EC2 HTTP timeout error.
let resp = tokio_retry::RetryIf::spawn(
let val = tokio_retry::RetryIf::spawn(
self.config.get_retry_strategy(),
|| async {
match self.obj_store_request(path, range.clone()).send().await {
Ok(resp) => Ok(resp),
Ok(resp) => {
let val = resp
.body
.collect()
.await
.map_err(either::Right)?
.into_bytes();
Ok(val)
}
Err(err) => {
if let SdkError::DispatchFailure(e) = &err
&& e.is_timeout()
Expand All @@ -369,16 +378,14 @@ impl ObjectStore for S3ObjectStore {
.inc();
}

Err(err)
Err(either::Left(err))
}
}
},
Self::should_retry,
)
.await?;

let val = resp.body.collect().await?.into_bytes();

if let Some(len) = range.len() && len != val.len() {
return Err(ObjectError::internal(format!(
"mismatched size: expected {}, found {} when reading {} at {:?}",
Expand Down Expand Up @@ -445,7 +452,7 @@ impl ObjectStore for S3ObjectStore {
.inc();
}

Err(err)
Err(either::Left(err))
}
}
},
Expand Down Expand Up @@ -758,13 +765,23 @@ impl S3ObjectStore {
}

#[inline(always)]
fn should_retry(err: &SdkError<GetObjectError>) -> bool {
if let SdkError::DispatchFailure(e) = err {
if e.is_timeout() {
tracing::warn!(target: "http_timeout_retry", "{:?} occurs, trying to retry S3 get_object request.", e);
fn should_retry(err: &Either<SdkError<GetObjectError>, ByteStreamError>) -> bool {
match err {
Either::Left(err) => {
if let SdkError::DispatchFailure(e) = err {
if e.is_timeout() {
tracing::warn!(target: "http_timeout_retry", "{:?} occurs, trying to retry S3 get_object request.", e);
return true;
}
}
}
Either::Right(_) => {
// Unfortunately `ErrorKind` of `ByteStreamError` is not accessible.
// Always returns true and relies on req_retry_max_attempts to avoid infinite loop.
return true;
}
}

false
}
}
Expand Down Expand Up @@ -904,6 +921,15 @@ impl Stream for S3ObjectIter {
}
}

impl From<Either<SdkError<GetObjectError>, ByteStreamError>> for ObjectError {
fn from(e: Either<SdkError<GetObjectError>, ByteStreamError>) -> Self {
match e {
Either::Left(e) => e.into(),
Either::Right(e) => e.into(),
}
}
}

#[cfg(test)]
#[cfg(not(madsim))]
mod tests {
Expand Down
Loading