Skip to content

Commit

Permalink
Make oio::Buffer cheap to clone
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Mar 22, 2024
1 parent 8ba1c45 commit fee4f8f
Showing 1 changed file with 39 additions and 64 deletions.
103 changes: 39 additions & 64 deletions core/src/raw/oio/buf/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
// under the License.

use std::collections::VecDeque;
use std::sync::Arc;

use bytes::Buf;
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;

/// Buffer is a wrapper of `Bytes` and `VecDeque<Bytes>`.
/// Buffer is a wrapper of contiguous `Bytes` and non contiguous `[Bytes]`.
///
/// We designed buffer to allow underlying storage to return non-contiguous bytes.
///
Expand All @@ -33,7 +32,11 @@ pub struct Buffer(Inner);
#[derive(Clone)]
enum Inner {
Contiguous(Bytes),
NonContiguous(VecDeque<Bytes>),
NonContiguous {
parts: Arc<[Bytes]>,
idx: usize,
offset: usize,
},
}

impl Buffer {
Expand All @@ -42,7 +45,7 @@ impl Buffer {
/// This operation is const and no allocation will be performed.
#[inline]
pub const fn new() -> Self {
Self(Inner::NonContiguous(VecDeque::new()))
Self(Inner::Contiguous(Bytes::new()))
}

/// Clone internal bytes to a new `Bytes`.
Expand All @@ -51,29 +54,6 @@ impl Buffer {
let mut bs = self.clone();
bs.copy_to_bytes(bs.remaining())
}

/// Merge two buffer together without copying internal bytes.
pub fn merge(self, buf: Buffer) -> Self {
let mut vec = match self.0 {
Inner::Contiguous(b) => {
// NOTE: we will have at least two bytes in the vec.
let mut vec = VecDeque::with_capacity(2);
vec.push_back(b);
vec
}
Inner::NonContiguous(v) => v,
};

match buf.0 {
Inner::Contiguous(b) => vec.push_back(b),
Inner::NonContiguous(bs) => {
vec.reserve(bs.len());
vec.extend(bs)
}
}

Self(Inner::NonContiguous(vec))
}
}

impl From<Vec<u8>> for Buffer {
Expand All @@ -88,15 +68,25 @@ impl From<Bytes> for Buffer {
}
}

/// Transform `VecDeque<Bytes>` to `Arc<[Bytes]>`.
impl From<VecDeque<Bytes>> for Buffer {
fn from(bs: VecDeque<Bytes>) -> Self {
Self(Inner::NonContiguous(bs))
Self(Inner::NonContiguous {
parts: Vec::from(bs).into(),
idx: 0,
offset: 0,
})
}
}

/// Transform `Vec<Bytes>` to `Arc<[Bytes]>`.
impl From<Vec<Bytes>> for Buffer {
fn from(bs: Vec<Bytes>) -> Self {
Self(Inner::NonContiguous(bs.into()))
Self(Inner::NonContiguous {
parts: bs.into(),
idx: 0,
offset: 0,
})
}
}

Expand All @@ -105,61 +95,46 @@ impl Buf for Buffer {
fn remaining(&self) -> usize {
match &self.0 {
Inner::Contiguous(b) => b.remaining(),
Inner::NonContiguous(v) => v.iter().map(|b| b.remaining()).sum(),
Inner::NonContiguous { parts, idx, offset } => {
parts[*idx..].iter().map(|p| p.len()).sum::<usize>() - offset
}
}
}

#[inline]
fn chunk(&self) -> &[u8] {
match &self.0 {
Inner::Contiguous(b) => b.chunk(),
Inner::NonContiguous(v) => {
if let Some(b) = v.front() {
b.chunk()
} else {
Inner::NonContiguous { parts, idx, offset } => {
if parts.is_empty() {
&[]
} else {
&parts[*idx][*offset..]
}
}
}
}

#[inline]
fn advance(&mut self, cnt: usize) {
fn advance(&mut self, mut cnt: usize) {
match &mut self.0 {
Inner::Contiguous(b) => b.advance(cnt),
Inner::NonContiguous(v) => {
let mut cnt = cnt;
Inner::NonContiguous { parts, idx, offset } => {
while cnt > 0 {
let b = &mut v[0];
if b.remaining() > cnt {
b.advance(cnt);
break;
let remaining = parts[*idx].len() - *offset;
if cnt < remaining {
*offset += cnt;
return;
} else {
cnt -= b.remaining();
v.remove(0);
cnt -= remaining;
*idx += 1;
*offset = 0;
if *idx >= parts.len() {
break;
}
}
}
}
}
}

#[inline]
fn copy_to_bytes(&mut self, len: usize) -> Bytes {
match &mut self.0 {
Inner::Contiguous(b) => b.copy_to_bytes(len),
Inner::NonContiguous(v) => {
if len > 0 && len <= v[0].remaining() {
let bs = v[0].copy_to_bytes(len);
if v[0].is_empty() {
v.remove(0);
}
return bs;
}

let mut bs = BytesMut::with_capacity(len);
bs.put(self.take(len));
bs.freeze()
}
}
}
}

0 comments on commit fee4f8f

Please sign in to comment.