From 9bde47d5530183df1ff9b81f8e179cf10d58b31a Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 25 Mar 2024 19:22:58 +0800 Subject: [PATCH] Fix tests Signed-off-by: Xuanwo --- core/src/raw/http_util/bytes_range.rs | 5 --- core/src/raw/http_util/client.rs | 1 + core/src/raw/oio/buf/buffer.rs | 58 ++++++++++++++------------- core/src/types/reader.rs | 11 ++--- 4 files changed, 35 insertions(+), 40 deletions(-) diff --git a/core/src/raw/http_util/bytes_range.rs b/core/src/raw/http_util/bytes_range.rs index a5c7284c2a9e..b2bde391311a 100644 --- a/core/src/raw/http_util/bytes_range.rs +++ b/core/src/raw/http_util/bytes_range.rs @@ -81,11 +81,6 @@ impl BytesRange { } /// Convert bytes range into Range header. - /// - /// # NOTE - /// - /// - `bytes=-1023` means get the suffix of the file. - /// - `bytes=0-1023` means get the first 1024 bytes, we must set the end to 1023. pub fn to_header(&self) -> String { format!("bytes={self}") } diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs index a38b3f16c053..6c78130c6cbc 100644 --- a/core/src/raw/http_util/client.rs +++ b/core/src/raw/http_util/client.rs @@ -178,6 +178,7 @@ impl HttpClient { .with_context("url", uri.to_string()) .set_source(err) })?; + let buffer = oio::Buffer::from(bs); if let Some(expect) = content_length { diff --git a/core/src/raw/oio/buf/buffer.rs b/core/src/raw/oio/buf/buffer.rs index 16668da3c831..ea68425a8648 100644 --- a/core/src/raw/oio/buf/buffer.rs +++ b/core/src/raw/oio/buf/buffer.rs @@ -96,12 +96,9 @@ impl Buf for Buffer { match &self.0 { Inner::Contiguous(b) => b.remaining(), Inner::NonContiguous { parts, idx, offset } => { - assert!( - *idx <= parts.len(), - "idx larger than parts length: {:?} <= {:?}", - *idx, - parts.len(), - ); + if *idx >= parts.len() { + return 0; + } parts[*idx..].iter().map(|p| p.len()).sum::() - offset } @@ -113,12 +110,9 @@ impl Buf for Buffer { match &self.0 { Inner::Contiguous(b) => b.chunk(), Inner::NonContiguous { parts, idx, offset } => { - assert!( - *idx <= parts.len(), - "idx larger than parts length: {:?} <= {:?}", - *idx, - parts.len(), - ); + if *idx >= parts.len() { + return &[]; + } &parts[*idx][*offset..] } @@ -126,28 +120,36 @@ impl Buf for Buffer { } #[inline] - fn advance(&mut self, mut cnt: usize) { - assert!( - cnt <= self.remaining(), - "cannot advance past `remaining`: {:?} <= {:?}", - cnt, - self.remaining(), - ); - + fn advance(&mut self, cnt: usize) { match &mut self.0 { Inner::Contiguous(b) => b.advance(cnt), Inner::NonContiguous { parts, idx, offset } => { - while cnt > 0 { - let remaining = parts[*idx].len() - *offset; - if cnt <= remaining { - *offset += cnt; - return; + let mut new_cnt = cnt; + let mut new_idx = *idx; + let mut new_offset = *offset; + + while new_cnt > 0 { + let remaining = parts[new_idx].len() - new_offset; + if new_cnt < remaining { + new_offset += new_cnt; + new_cnt = 0; + break; } else { - cnt -= remaining; - *idx += 1; - *offset = 0; + new_cnt -= remaining; + new_idx += 1; + new_offset = 0; + if new_idx > parts.len() { + break; + } } } + + if new_cnt == 0 { + *idx = new_idx; + *offset = new_offset; + } else { + panic!("cannot advance past {cnt} bytes") + } } } } diff --git a/core/src/types/reader.rs b/core/src/types/reader.rs index d2cad6732f14..e402f915ea33 100644 --- a/core/src/types/reader.rs +++ b/core/src/types/reader.rs @@ -97,20 +97,17 @@ impl Reader { let mut read = 0; loop { - let bs = self - .inner - // TODO: use service preferred io size instead. - .read_at_dyn(offset, size.unwrap_or(4 * 1024 * 1024) as usize) - .await?; + // TODO: use service preferred io size instead. + let limit = size.unwrap_or(4 * 1024 * 1024) as usize; + let bs = self.inner.read_at_dyn(offset, limit).await?; let n = bs.remaining(); read += n; buf.put(bs); - if n == 0 { + if n < limit { return Ok(read); } offset += n as u64; - size = size.map(|v| v - n as u64); if size == Some(0) { return Ok(read);