Skip to content

Commit

Permalink
move the check inside monitored streaming uploader
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 committed May 24, 2024
1 parent 66fb60b commit 07836eb
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 35 deletions.
74 changes: 40 additions & 34 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,25 +384,30 @@ impl<U: StreamingUploader> MonitoredStreamingUploader<U> {

/// NOTICE: after #16231, streaming uploader implemented via aws-sdk-s3 will maintain metrics internally in s3.rs
/// so MonitoredStreamingUploader will only be used when the inner object store is opendal.
///
/// TODO: we should avoid this special case after fully migrating to opeandal for s3.
impl<U: StreamingUploader> MonitoredStreamingUploader<U> {
async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> {
let operation_type = OperationType::StreamingUpload;
let operation_type_str = operation_type.as_str();
let data_len = data.len();

let _timer = self
.object_store_metrics
.operation_latency
.with_label_values(&[self.media_type, operation_type_str])
.start_timer();
let res = if self.media_type() == "s3" {
// TODO: we should avoid this special case after fully migrating to opeandal for s3.
self.inner
.write_bytes(data)
.verbose_instrument_await(operation_type_str)
.await
} else {
let _timer = self
.object_store_metrics
.operation_latency
.with_label_values(&[self.media_type, operation_type_str])
.start_timer();

let res = self
.inner
.write_bytes(data)
.verbose_instrument_await(operation_type_str)
.await;
self.inner
.write_bytes(data)
.verbose_instrument_await(operation_type_str)
.await
};

try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);

Expand All @@ -421,17 +426,25 @@ impl<U: StreamingUploader> MonitoredStreamingUploader<U> {
async fn finish(self) -> ObjectResult<()> {
let operation_type = OperationType::StreamingUploadFinish;
let operation_type_str = operation_type.as_str();
let _timer = self
.object_store_metrics
.operation_latency
.with_label_values(&[self.media_type, operation_type_str])
.start_timer();

let res = self
.inner
.finish()
.verbose_instrument_await(operation_type_str)
.await;
let res = if self.media_type() == "s3" {
// TODO: we should avoid this special case after fully migrating to opeandal for s3.
self.inner
.finish()
.verbose_instrument_await(operation_type_str)
.await
} else {
let _timer = self
.object_store_metrics
.operation_latency
.with_label_values(&[self.media_type, operation_type_str])
.start_timer();

self.inner
.finish()
.verbose_instrument_await(operation_type_str)
.await
};

try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);
self.object_store_metrics
Expand Down Expand Up @@ -625,18 +638,11 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {

try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);

if self.media_type() == "s3" && !self.config.s3.developer.use_opendal {
// After #16231, streaming uploader implemented via aws-sdk-s3 will maintain metrics internally in s3.rs
// so MonitoredStreamingUploader will only be used when the inner object store is opendal.
// TODO: we should avoid this special case after fully migrating to opeandal for s3.
Ok(res?)
} else {
Ok(MonitoredStreamingUploader::new(
media_type,
res?,
self.object_store_metrics.clone(),
))
}
Ok(MonitoredStreamingUploader::new(
media_type,
res?,
self.object_store_metrics.clone(),
))
}

pub async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes> {
Expand Down
3 changes: 2 additions & 1 deletion src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ impl StreamingUploader for S3StreamingUploader {
retry_request(builder, &self.config, operation_type, self.metrics.clone())
.await;
try_update_failure_metric(&self.metrics, &res, operation_type.as_str());
res
res?;
Ok(())
}
} else if let Err(e) = self
.flush_multipart_and_complete()
Expand Down

0 comments on commit 07836eb

Please sign in to comment.