Skip to content

Commit

Permalink
refactor(core/raw/oio): Use oio buffer in write (#4436)
Browse files Browse the repository at this point in the history
* save work

Signed-off-by: Xuanwo <[email protected]>

* Fix fs

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Apr 9, 2024
1 parent 89a0afb commit 138612a
Show file tree
Hide file tree
Showing 38 changed files with 497 additions and 175 deletions.
2 changes: 1 addition & 1 deletion core/benches/oio/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use rand::RngCore;
pub struct BlackHoleWriter;

impl oio::Write for BlackHoleWriter {
async unsafe fn write(&mut self, bs: oio::ReadableBuf) -> opendal::Result<usize> {
async unsafe fn write(&mut self, bs: oio::Buffer) -> opendal::Result<usize> {
Ok(bs.len())
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ impl<I: oio::Read + 'static> oio::BlockingRead for BlockingWrapper<I> {
}

impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
self.handle.block_on(self.inner.write(bs))
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ impl<W> oio::Write for CompleteWriter<W>
where
W: oio::Write,
{
async unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
async unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;
Expand Down Expand Up @@ -673,7 +673,7 @@ impl<W> oio::BlockingWrite for CompleteWriter<W>
where
W: oio::BlockingWrite,
{
unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> {
}

impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
async unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
async unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
self.inner.write(bs).await
}

Expand All @@ -280,7 +280,7 @@ impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> {
unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
self.inner.write(bs)
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/dtrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for DtraceLayerWrapper<R> {
}

impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
async unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
async unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, writer_write_start, c_path.as_ptr());
self.inner
Expand Down Expand Up @@ -429,7 +429,7 @@ impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for DtraceLayerWrapper<R> {
unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, blocking_writer_write_start, c_path.as_ptr());
self.inner
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> {
}

impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
async unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
async unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
self.inner.write(bs.clone()).await.map_err(|err| {
err.with_operation(WriteOperation::Write)
.with_context("service", self.scheme)
Expand All @@ -393,7 +393,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
}

impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
self.inner.write(bs.clone()).map_err(|err| {
err.with_operation(WriteOperation::BlockingWrite)
.with_context("service", self.scheme)
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,7 @@ impl<W> LoggingWriter<W> {
}

impl<W: oio::Write> oio::Write for LoggingWriter<W> {
async unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
async unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
match self.inner.write(bs.clone()).await {
Ok(n) => {
self.written += n as u64;
Expand Down Expand Up @@ -1173,7 +1173,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
}

impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
match self.inner.write(bs.clone()) {
Ok(n) => {
self.written += n as u64;
Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/madsim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ pub struct MadsimWriter {
}

impl oio::Write for MadsimWriter {
async unsafe fn write(&mut self, bs: oio::ReadableBuf) -> crate::Result<usize> {
async unsafe fn write(&mut self, bs: oio::Buffer) -> crate::Result<usize> {
#[cfg(madsim)]
{
let req = Request::Write(self.path.to_string(), bs);
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MetricWrapper<R> {
}

impl<R: oio::Write> oio::Write for MetricWrapper<R> {
async unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
async unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
let start = Instant::now();

self.inner
Expand Down Expand Up @@ -822,7 +822,7 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for MetricWrapper<R> {
unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
self.inner
.write(bs)
.map(|n| {
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/minitrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> {
}

impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
unsafe fn write(&mut self, bs: oio::ReadableBuf) -> impl Future<Output = Result<usize>> + Send {
unsafe fn write(&mut self, bs: oio::Buffer) -> impl Future<Output = Result<usize>> + Send {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::Write.into_static());
self.inner.write(bs)
Expand All @@ -329,7 +329,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> {
unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static());
self.inner.write(bs)
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/oteltrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for OtelTraceWrapper<R> {
}

impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
unsafe fn write(&mut self, bs: oio::ReadableBuf) -> impl Future<Output = Result<usize>> + Send {
unsafe fn write(&mut self, bs: oio::Buffer) -> impl Future<Output = Result<usize>> + Send {
self.inner.write(bs)
}

Expand All @@ -301,7 +301,7 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> {
unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
self.inner.write(bs)
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
}

impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
async unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
async unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
Operation::Write.into_static(),
Expand Down Expand Up @@ -767,7 +767,7 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
Operation::BlockingWrite.into_static(),
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/prometheus_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
}

impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
async unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
async unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
let start = Instant::now();

self.inner
Expand Down Expand Up @@ -614,7 +614,7 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
self.inner
.write(bs)
.map(|n| {
Expand Down
6 changes: 3 additions & 3 deletions core/src/layers/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,13 +707,13 @@ impl<R: oio::BlockingRead, I: RetryInterceptor> oio::BlockingRead for RetryWrapp
}

impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
async unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
async unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
use backon::RetryableWithContext;

let inner = self.inner.take().expect("inner must be valid");

let ((inner, _), res) = {
|(mut r, bs): (R, oio::ReadableBuf)| async move {
|(mut r, bs): (R, oio::Buffer)| async move {
let res = r.write(bs.clone()).await;

((r, bs), res)
Expand Down Expand Up @@ -805,7 +805,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
}

impl<R: oio::BlockingWrite, I: RetryInterceptor> oio::BlockingWrite for RetryWrapper<R, I> {
unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
{ || self.inner.as_mut().unwrap().write(bs.clone()) }
.retry(&self.builder)
.when(|e| e.is_temporary())
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ThrottleWrapper<R> {
}

impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
async unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
async unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();

loop {
Expand Down Expand Up @@ -231,7 +231,7 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for ThrottleWrapper<R> {
unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();

loop {
Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
}

impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
async unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
async unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
let fut = self.inner.write(bs);
Self::io_timeout(self.timeout, WriteOperation::Write.into_static(), fut).await
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
parent = &self.span,
level = "trace",
skip_all)]
unsafe fn write(&mut self, bs: oio::ReadableBuf) -> impl Future<Output = Result<usize>> + Send {
unsafe fn write(&mut self, bs: oio::Buffer) -> impl Future<Output = Result<usize>> + Send {
self.inner.write(bs)
}

Expand All @@ -315,7 +315,7 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for TracingWrapper<R> {
parent = &self.span,
level = "trace",
skip_all)]
unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
self.inner.write(bs)
}

Expand Down
15 changes: 8 additions & 7 deletions core/src/raw/adapters/kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use std::sync::Arc;
use std::vec::IntoIter;

use async_trait::async_trait;
use bytes::Bytes;
use bytes::BytesMut;
use bytes::{Buf, Bytes};

use super::Adapter;
use crate::raw::oio::HierarchyLister;
Expand Down Expand Up @@ -249,6 +249,7 @@ impl<S> KvWriter<S> {
}
}

/// TODO: replace with FlexBuf.
enum Buffer {
Active(BytesMut),
Frozen(Bytes),
Expand All @@ -260,11 +261,11 @@ enum Buffer {
unsafe impl<S: Adapter> Sync for KvWriter<S> {}

impl<S: Adapter> oio::Write for KvWriter<S> {
async unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
async unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
match &mut self.buffer {
Buffer::Active(buf) => {
buf.extend_from_slice(&bs);
Ok(bs.len())
buf.extend_from_slice(bs.chunk());
Ok(bs.chunk().len())
}
Buffer::Frozen(_) => unreachable!("KvWriter should not be frozen during poll_write"),
}
Expand All @@ -290,11 +291,11 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
}

impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
match &mut self.buffer {
Buffer::Active(buf) => {
buf.extend_from_slice(&bs);
Ok(bs.len())
buf.extend_from_slice(bs.chunk());
Ok(bs.chunk().len())
}
Buffer::Frozen(_) => unreachable!("KvWriter should not be frozen during poll_write"),
}
Expand Down
12 changes: 6 additions & 6 deletions core/src/raw/adapters/typed_kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::Arc;
use std::vec::IntoIter;

use async_trait::async_trait;
use bytes::Bytes;
use bytes::{Buf, Bytes};

use super::Adapter;
use super::Value;
Expand Down Expand Up @@ -279,11 +279,11 @@ impl<S> KvWriter<S> {
}

impl<S: Adapter> oio::Write for KvWriter<S> {
async unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
let size = bs.len();
async unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
let size = bs.chunk().len();

let mut buf = self.buf.take().unwrap_or_else(|| Vec::with_capacity(size));
buf.extend_from_slice(&bs);
buf.extend_from_slice(bs.chunk());

self.buf = Some(buf);
Ok(size)
Expand All @@ -309,11 +309,11 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
}

impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
let size = bs.len();

let mut buf = self.buf.take().unwrap_or_else(|| Vec::with_capacity(size));
buf.extend_from_slice(&bs);
buf.extend_from_slice(bs.chunk());

self.buf = Some(buf);

Expand Down
4 changes: 2 additions & 2 deletions core/src/raw/enum_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl<ONE: oio::BlockingRead, TWO: oio::BlockingRead> oio::BlockingRead for TwoWa
}

impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWays<ONE, TWO> {
async unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
async unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
match self {
Self::One(v) => v.write(bs).await,
Self::Two(v) => v.write(bs).await,
Expand Down Expand Up @@ -130,7 +130,7 @@ impl<ONE: oio::BlockingRead, TWO: oio::BlockingRead, THREE: oio::BlockingRead> o
impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write
for ThreeWays<ONE, TWO, THREE>
{
async unsafe fn write(&mut self, bs: oio::ReadableBuf) -> Result<usize> {
async unsafe fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
match self {
Self::One(v) => v.write(bs).await,
Self::Two(v) => v.write(bs).await,
Expand Down
2 changes: 1 addition & 1 deletion core/src/raw/oio/buf/adaptive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const DEFAULT_MAX_BUFFER_SIZE: usize = 4 * 1024 * 1024;
pub struct AdaptiveBuf {
/// The underlying buffer.
buffer: BytesMut,

/// The next buffer size.
next: usize,
decrease_now: bool,
}
Expand Down
Loading

0 comments on commit 138612a

Please sign in to comment.