From 343491db0f8181cf05e1da59e47087c9a8aaea4b Mon Sep 17 00:00:00 2001 From: Weijie Guo Date: Tue, 18 Jun 2024 14:48:08 +0800 Subject: [PATCH] refactor(core): Remove unused `size` for `RangeWrite`. --- core/src/raw/oio/write/range_write.rs | 34 +++++++++++---------------- core/src/services/gcs/writer.rs | 21 +++++------------ 2 files changed, 20 insertions(+), 35 deletions(-) diff --git a/core/src/raw/oio/write/range_write.rs b/core/src/raw/oio/write/range_write.rs index 95b35aae52eb..67ae619dd924 100644 --- a/core/src/raw/oio/write/range_write.rs +++ b/core/src/raw/oio/write/range_write.rs @@ -58,7 +58,7 @@ pub trait RangeWrite: Send + Sync + Unpin + 'static { /// RangeWriter will call this API when: /// /// - All the data has been written to the buffer and we can perform the upload at once. - fn write_once(&self, size: u64, body: Buffer) -> impl Future> + MaybeSend; + fn write_once(&self, body: Buffer) -> impl Future> + MaybeSend; /// Initiate range the range write, the returning value is the location. fn initiate_range(&self) -> impl Future> + MaybeSend; @@ -68,7 +68,6 @@ pub trait RangeWrite: Send + Sync + Unpin + 'static { &self, location: &str, offset: u64, - size: u64, body: Buffer, ) -> impl Future> + MaybeSend; @@ -77,7 +76,6 @@ pub trait RangeWrite: Send + Sync + Unpin + 'static { &self, location: &str, offset: u64, - size: u64, body: Buffer, ) -> impl Future> + MaybeSend; @@ -119,12 +117,10 @@ impl RangeWriter { tasks: ConcurrentTasks::new(executor, concurrent, |input| { Box::pin(async move { - let fut = input.w.write_range( - &input.location, - input.offset, - input.bytes.len() as u64, - input.bytes.clone(), - ); + let fut = + input + .w + .write_range(&input.location, input.offset, input.bytes.clone()); match input.executor.timeout() { None => { let result = fut.await; @@ -197,12 +193,9 @@ impl oio::Write for RangeWriter { async fn close(&mut self) -> Result<()> { let Some(location) = self.location.clone() else { - let (size, body) = match self.cache.clone() { - Some(cache) => (cache.len(), cache), - None => (0, Buffer::new()), - }; + let body = self.cache.clone().unwrap_or_default(); // Call write_once if there is no data in buffer and no location. - self.w.write_once(size as u64, body).await?; + self.w.write_once(body).await?; self.cache = None; return Ok(()); }; @@ -212,9 +205,7 @@ impl oio::Write for RangeWriter { if let Some(buffer) = self.cache.clone() { let offset = self.next_offset; - self.w - .complete_range(&location, offset, buffer.len() as u64, buffer) - .await?; + self.w.complete_range(&location, offset, buffer).await?; self.cache = None; } @@ -265,8 +256,9 @@ mod tests { } impl RangeWrite for Arc> { - async fn write_once(&self, size: u64, _: Buffer) -> Result<()> { + async fn write_once(&self, body: Buffer) -> Result<()> { let mut test = self.lock().unwrap(); + let size = body.len() as u64; test.length += size; test.bytes.extend(0..size); @@ -277,7 +269,7 @@ mod tests { Ok("test".to_string()) } - async fn write_range(&self, _: &str, offset: u64, size: u64, _: Buffer) -> Result<()> { + async fn write_range(&self, _: &str, offset: u64, body: Buffer) -> Result<()> { // Add an async sleep here to enforce some pending. sleep(Duration::from_millis(50)).await; @@ -289,6 +281,7 @@ mod tests { } let mut test = self.lock().unwrap(); + let size = body.len() as u64; test.length += size; let input = (offset..offset + size).collect::>(); @@ -302,7 +295,7 @@ mod tests { Ok(()) } - async fn complete_range(&self, _: &str, offset: u64, size: u64, _: Buffer) -> Result<()> { + async fn complete_range(&self, _: &str, offset: u64, body: Buffer) -> Result<()> { // Add an async sleep here to enforce some pending. sleep(Duration::from_millis(50)).await; @@ -314,6 +307,7 @@ mod tests { } let mut test = self.lock().unwrap(); + let size = body.len() as u64; test.length += size; let input = (offset..offset + size).collect::>(); diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index 9307e331dae3..bd2827b76475 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -43,7 +43,8 @@ impl GcsWriter { } impl oio::RangeWrite for GcsWriter { - async fn write_once(&self, size: u64, body: Buffer) -> Result<()> { + async fn write_once(&self, body: Buffer) -> Result<()> { + let size = body.len() as u64; let mut req = self.core.gcs_insert_object_request( &percent_encode_path(&self.path), Some(size), @@ -83,13 +84,8 @@ impl oio::RangeWrite for GcsWriter { } } - async fn write_range( - &self, - location: &str, - written: u64, - size: u64, - body: Buffer, - ) -> Result<()> { + async fn write_range(&self, location: &str, written: u64, body: Buffer) -> Result<()> { + let size = body.len() as u64; let mut req = self .core .gcs_upload_in_resumable_upload(location, size, written, body)?; @@ -105,13 +101,8 @@ impl oio::RangeWrite for GcsWriter { } } - async fn complete_range( - &self, - location: &str, - written: u64, - size: u64, - body: Buffer, - ) -> Result<()> { + async fn complete_range(&self, location: &str, written: u64, body: Buffer) -> Result<()> { + let size = body.len() as u64; let resp = self .core .gcs_complete_resumable_upload(location, written, size, body)