Skip to content

Commit

Permalink
chore: Cleanup of oio::Read, docs, comments, naming (#4345)
Browse files Browse the repository at this point in the history
* Polish comments

Signed-off-by: Xuanwo <[email protected]>

* Remove not exist op

Signed-off-by: Xuanwo <[email protected]>

* add comments

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Mar 12, 2024
1 parent e704a89 commit e79daa5
Show file tree
Hide file tree
Showing 30 changed files with 153 additions and 142 deletions.
4 changes: 2 additions & 2 deletions core/src/layers/chaos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ impl<R> ChaosReader<R> {
}

impl<R: oio::Read> oio::Read for ChaosReader<R> {
async fn read(&mut self, size: usize) -> Result<Bytes> {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
if self.i_feel_lucky() {
self.inner.read(size).await
self.inner.read(limit).await
} else {
Err(Self::unexpected_eof())
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ impl<R: oio::Read> oio::Read for ConcurrentLimitWrapper<R> {
self.inner.seek(pos).await
}

async fn read(&mut self, size: usize) -> Result<Bytes> {
self.inner.read(size).await
async fn read(&mut self, limit: usize) -> Result<Bytes> {
self.inner.read(limit).await
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/dtrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,10 @@ impl<R> DtraceLayerWrapper<R> {
}

impl<R: oio::Read> oio::Read for DtraceLayerWrapper<R> {
async fn read(&mut self, size: usize) -> Result<Bytes> {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, reader_read_start, c_path.as_ptr());
match self.inner.read(size).await {
match self.inner.read(limit).await {
Ok(bs) => {
probe_lazy!(opendal, reader_read_ok, c_path.as_ptr(), bs.len());
Ok(bs)
Expand Down
12 changes: 6 additions & 6 deletions core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,17 +349,17 @@ pub struct ErrorContextWrapper<T> {
}

impl<T: oio::Read> oio::Read for ErrorContextWrapper<T> {
async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
self.inner.seek(pos).await.map_err(|err| {
err.with_operation(ReadOperation::Seek)
async fn read(&mut self, limit: usize) -> Result<Bytes> {
self.inner.read(limit).await.map_err(|err| {
err.with_operation(ReadOperation::Read)
.with_context("service", self.scheme)
.with_context("path", &self.path)
})
}

async fn read(&mut self, size: usize) -> Result<Bytes> {
self.inner.read(size).await.map_err(|err| {
err.with_operation(ReadOperation::Next)
async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
self.inner.seek(pos).await.map_err(|err| {
err.with_operation(ReadOperation::Seek)
.with_context("service", self.scheme)
.with_context("path", &self.path)
})
Expand Down
36 changes: 18 additions & 18 deletions core/src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -991,27 +991,29 @@ impl<R> Drop for LoggingReader<R> {
}

impl<R: oio::Read> oio::Read for LoggingReader<R> {
async fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
match self.inner.seek(pos).await {
Ok(n) => {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
match self.inner.read(limit).await {
Ok(bs) => {
self.read += bs.len() as u64;
trace!(
target: LOGGING_TARGET,
"service={} operation={} path={} read={} -> seek to {pos:?}, current offset {n}",
"service={} operation={} path={} read={} -> next returns {}B",
self.ctx.scheme,
ReadOperation::Seek,
ReadOperation::Read,
self.path,
self.read,
bs.len()
);
Ok(n)
Ok(bs)
}
Err(err) => {
if let Some(lvl) = self.ctx.error_level(&err) {
log!(
target: LOGGING_TARGET,
lvl,
"service={} operation={} path={} read={} -> seek to {pos:?} failed: {}",
"service={} operation={} path={} read={} -> next failed: {}",
self.ctx.scheme,
ReadOperation::Seek,
ReadOperation::Read,
self.path,
self.read,
self.ctx.error_print(&err),
Expand All @@ -1022,29 +1024,27 @@ impl<R: oio::Read> oio::Read for LoggingReader<R> {
}
}

async fn read(&mut self, size: usize) -> Result<Bytes> {
match self.inner.read(size).await {
Ok(bs) => {
self.read += bs.len() as u64;
async fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
match self.inner.seek(pos).await {
Ok(n) => {
trace!(
target: LOGGING_TARGET,
"service={} operation={} path={} read={} -> next returns {}B",
"service={} operation={} path={} read={} -> seek to {pos:?}, current offset {n}",
self.ctx.scheme,
ReadOperation::Next,
ReadOperation::Seek,
self.path,
self.read,
bs.len()
);
Ok(bs)
Ok(n)
}
Err(err) => {
if let Some(lvl) = self.ctx.error_level(&err) {
log!(
target: LOGGING_TARGET,
lvl,
"service={} operation={} path={} read={} -> next failed: {}",
"service={} operation={} path={} read={} -> seek to {pos:?} failed: {}",
self.ctx.scheme,
ReadOperation::Next,
ReadOperation::Seek,
self.path,
self.read,
self.ctx.error_print(&err),
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,8 +773,8 @@ impl<R> Drop for MetricWrapper<R> {
}

impl<R: oio::Read> oio::Read for MetricWrapper<R> {
async fn read(&mut self, size: usize) -> Result<Bytes> {
match self.inner.read(size).await {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
match self.inner.read(limit).await {
Ok(bytes) => {
self.bytes += bytes.len() as u64;
Ok(bytes)
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/minitrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,8 @@ impl<R> MinitraceWrapper<R> {

impl<R: oio::Read> oio::Read for MinitraceWrapper<R> {
#[trace(enter_on_poll = true)]
async fn read(&mut self, size: usize) -> Result<Bytes> {
self.inner.read(size).await
async fn read(&mut self, limit: usize) -> Result<Bytes> {
self.inner.read(limit).await
}

#[trace(enter_on_poll = true)]
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/oteltrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ impl<R> OtelTraceWrapper<R> {
}

impl<R: oio::Read> oio::Read for OtelTraceWrapper<R> {
async fn read(&mut self, size: usize) -> Result<Bytes> {
self.inner.read(size).await
async fn read(&mut self, limit: usize) -> Result<Bytes> {
self.inner.read(limit).await
}

async fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,13 +686,13 @@ impl<R> PrometheusMetricWrapper<R> {
}

impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
async fn read(&mut self, size: usize) -> Result<Bytes> {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
Operation::Read.into_static(),
&self.path,
);
match self.inner.read(size).await {
match self.inner.read(limit).await {
Ok(bytes) => {
self.stats
.bytes_total
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/prometheus_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,8 +539,8 @@ impl<R> PrometheusMetricWrapper<R> {
}

impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
async fn read(&mut self, size: usize) -> Result<Bytes> {
match self.inner.read(size).await {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
match self.inner.read(limit).await {
Ok(bytes) => {
self.bytes_total += bytes.len();
Ok(bytes)
Expand Down
6 changes: 3 additions & 3 deletions core/src/layers/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,14 +703,14 @@ impl<R: oio::Read, I: RetryInterceptor> oio::Read for RetryWrapper<R, I> {
res
}

async fn read(&mut self, size: usize) -> Result<Bytes> {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
use backon::RetryableWithContext;

let inner = self.inner.take().expect("inner must be valid");

let (inner, res) = {
|mut r: R| async move {
let res = r.read(size).await;
let res = r.read(limit).await;

(r, res)
}
Expand All @@ -723,7 +723,7 @@ impl<R: oio::Read, I: RetryInterceptor> oio::Read for RetryWrapper<R, I> {
err,
dur,
&[
("operation", ReadOperation::Next.into_static()),
("operation", ReadOperation::Read.into_static()),
("path", &self.path),
],
)
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ impl<R> ThrottleWrapper<R> {
}

impl<R: oio::Read> oio::Read for ThrottleWrapper<R> {
async fn read(&mut self, size: usize) -> Result<Bytes> {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
// TODO: How can we handle buffer reads with a limiter?
self.inner.read(size).await
self.inner.read(limit).await
}

async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
Expand Down
10 changes: 5 additions & 5 deletions core/src/layers/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,15 +322,15 @@ impl<R> TimeoutWrapper<R> {
}

impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
let fut = self.inner.read(limit);
Self::io_timeout(self.timeout, ReadOperation::Read.into_static(), fut).await
}

async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
let fut = self.inner.seek(pos);
Self::io_timeout(self.timeout, ReadOperation::Seek.into_static(), fut).await
}

async fn read(&mut self, size: usize) -> Result<Bytes> {
let fut = self.inner.read(size);
Self::io_timeout(self.timeout, ReadOperation::Next.into_static(), fut).await
}
}

impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ impl<R: oio::Read> oio::Read for TracingWrapper<R> {
parent = &self.span,
level = "trace",
skip_all)]
async fn read(&mut self, size: usize) -> Result<Bytes> {
self.inner.read(size).await
async fn read(&mut self, limit: usize) -> Result<Bytes> {
self.inner.read(limit).await
}

#[tracing::instrument(
Expand Down
24 changes: 12 additions & 12 deletions core/src/raw/enum_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ impl<ONE: oio::Read, TWO: oio::Read> oio::Read for TwoWays<ONE, TWO> {
}
}

async fn read(&mut self, size: usize) -> Result<Bytes> {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
match self {
Self::One(v) => v.read(size).await,
Self::Two(v) => v.read(size).await,
Self::One(v) => v.read(limit).await,
Self::Two(v) => v.read(limit).await,
}
}
}
Expand Down Expand Up @@ -140,11 +140,11 @@ impl<ONE: oio::Read, TWO: oio::Read, THREE: oio::Read> oio::Read for ThreeWays<O
}
}

async fn read(&mut self, size: usize) -> Result<Bytes> {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
match self {
Self::One(v) => v.read(size).await,
Self::Two(v) => v.read(size).await,
Self::Three(v) => v.read(size).await,
Self::One(v) => v.read(limit).await,
Self::Two(v) => v.read(limit).await,
Self::Three(v) => v.read(limit).await,
}
}
}
Expand Down Expand Up @@ -235,12 +235,12 @@ where
}
}

async fn read(&mut self, size: usize) -> Result<Bytes> {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
match self {
Self::One(v) => v.read(size).await,
Self::Two(v) => v.read(size).await,
Self::Three(v) => v.read(size).await,
Self::Four(v) => v.read(size).await,
Self::One(v) => v.read(limit).await,
Self::Two(v) => v.read(limit).await,
Self::Three(v) => v.read(limit).await,
Self::Four(v) => v.read(limit).await,
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/raw/http_util/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl IncomingAsyncBody {
}

impl oio::Read for IncomingAsyncBody {
async fn read(&mut self, size: usize) -> Result<Bytes> {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
if self.size == Some(0) {
return Ok(Bytes::new());
}
Expand All @@ -182,7 +182,7 @@ impl oio::Read for IncomingAsyncBody {
};
}

let size = min(size, self.chunk.len());
let size = min(limit, self.chunk.len());
self.consumed += size as u64;
let bs = self.chunk.split_to(size);
Ok(bs)
Expand Down
4 changes: 2 additions & 2 deletions core/src/raw/oio/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ impl oio::Read for Cursor {
Ok(n)
}

async fn read(&mut self, size: usize) -> Result<Bytes> {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
if self.is_empty() {
Ok(Bytes::new())
} else {
// The clone here is required as we don't want to change it.
let mut bs = self.inner.clone().split_off(self.pos as usize);
let bs = bs.split_to(min(bs.len(), size));
let bs = bs.split_to(min(bs.len(), limit));
self.pos += bs.len() as u64;
Ok(bs)
}
Expand Down
Loading

0 comments on commit e79daa5

Please sign in to comment.