From 07836ebbc09ef0545f230927947d9d0835258532 Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Fri, 24 May 2024 19:14:16 +0800 Subject: [PATCH] move the check inside monitored streaming uploader --- src/object_store/src/object/mod.rs | 74 ++++++++++++++++-------------- src/object_store/src/object/s3.rs | 3 +- 2 files changed, 42 insertions(+), 35 deletions(-) diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index e702de0eab58b..c4332db213a52 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -384,25 +384,30 @@ impl MonitoredStreamingUploader { /// NOTICE: after #16231, streaming uploader implemented via aws-sdk-s3 will maintain metrics internally in s3.rs /// so MonitoredStreamingUploader will only be used when the inner object store is opendal. -/// -/// TODO: we should avoid this special case after fully migrating to opeandal for s3. impl MonitoredStreamingUploader { async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { let operation_type = OperationType::StreamingUpload; let operation_type_str = operation_type.as_str(); let data_len = data.len(); - let _timer = self - .object_store_metrics - .operation_latency - .with_label_values(&[self.media_type, operation_type_str]) - .start_timer(); + let res = if self.media_type() == "s3" { + // TODO: we should avoid this special case after fully migrating to opeandal for s3. + self.inner + .write_bytes(data) + .verbose_instrument_await(operation_type_str) + .await + } else { + let _timer = self + .object_store_metrics + .operation_latency + .with_label_values(&[self.media_type, operation_type_str]) + .start_timer(); - let res = self - .inner - .write_bytes(data) - .verbose_instrument_await(operation_type_str) - .await; + self.inner + .write_bytes(data) + .verbose_instrument_await(operation_type_str) + .await + }; try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str); @@ -421,17 +426,25 @@ impl MonitoredStreamingUploader { async fn finish(self) -> ObjectResult<()> { let operation_type = OperationType::StreamingUploadFinish; let operation_type_str = operation_type.as_str(); - let _timer = self - .object_store_metrics - .operation_latency - .with_label_values(&[self.media_type, operation_type_str]) - .start_timer(); - let res = self - .inner - .finish() - .verbose_instrument_await(operation_type_str) - .await; + let res = if self.media_type() == "s3" { + // TODO: we should avoid this special case after fully migrating to opeandal for s3. + self.inner + .finish() + .verbose_instrument_await(operation_type_str) + .await + } else { + let _timer = self + .object_store_metrics + .operation_latency + .with_label_values(&[self.media_type, operation_type_str]) + .start_timer(); + + self.inner + .finish() + .verbose_instrument_await(operation_type_str) + .await + }; try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str); self.object_store_metrics @@ -625,18 +638,11 @@ impl MonitoredObjectStore { try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str); - if self.media_type() == "s3" && !self.config.s3.developer.use_opendal { - // After #16231, streaming uploader implemented via aws-sdk-s3 will maintain metrics internally in s3.rs - // so MonitoredStreamingUploader will only be used when the inner object store is opendal. - // TODO: we should avoid this special case after fully migrating to opeandal for s3. - Ok(res?) - } else { - Ok(MonitoredStreamingUploader::new( - media_type, - res?, - self.object_store_metrics.clone(), - )) - } + Ok(MonitoredStreamingUploader::new( + media_type, + res?, + self.object_store_metrics.clone(), + )) } pub async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult { diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 9927ea80f2cd6..6c72ced36563d 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -357,7 +357,8 @@ impl StreamingUploader for S3StreamingUploader { retry_request(builder, &self.config, operation_type, self.metrics.clone()) .await; try_update_failure_metric(&self.metrics, &res, operation_type.as_str()); - res + res?; + Ok(()) } } else if let Err(e) = self .flush_multipart_and_complete()