Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: More efficient AsyncRandomAccessFileCursor implementation #73

Merged
merged 3 commits into from
Mar 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 84 additions & 36 deletions rc-zip-tokio/src/read_zip.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use std::{cmp, io, ops::Deref, pin::Pin, sync::Arc, task};
use std::{
cmp, io,
ops::Deref,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};

use futures::future::BoxFuture;
use positioned_io::{RandomAccessFile, ReadAt, Size};
Expand Down Expand Up @@ -61,18 +67,23 @@
Some(cstate) => {
if cstate.offset == offset {
// all good, re-using
trace!(%offset, "read_zip_with_size: re-using cursor");

Check warning on line 70 in rc-zip-tokio/src/read_zip.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-tokio/src/read_zip.rs#L70

Added line #L70 was not covered by tests
cstate
} else {
trace!(%offset, %cstate.offset, "read_zip_with_size: making new cursor (had wrong offset)");

Check warning on line 73 in rc-zip-tokio/src/read_zip.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-tokio/src/read_zip.rs#L73

Added line #L73 was not covered by tests
CursorState {
cursor: self.cursor_at(offset),
offset,
}
}
}
None => CursorState {
cursor: self.cursor_at(offset),
offset,
},
None => {
trace!(%offset, "read_zip_with_size: making new cursor (had none)");

Check warning on line 81 in rc-zip-tokio/src/read_zip.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-tokio/src/read_zip.rs#L81

Added line #L81 was not covered by tests
CursorState {
cursor: self.cursor_at(offset),
offset,
}
}
};

match cstate_next.cursor.read(fsm.space()).await {
Expand Down Expand Up @@ -242,17 +253,31 @@

fn cursor_at(&self, offset: u64) -> Self::Cursor<'_> {
AsyncRandomAccessFileCursor {
pos: offset,
state: ARAFCState::Idle(ARAFCCore {
file_offset: offset,
inner_buf: vec![0u8; 128 * 1024],
// inner_buf: vec![0u8; 128],
inner_buf_len: 0,
inner_buf_offset: 0,
file: self.clone(),
}),
}
}
}

struct ARAFCCore {
// offset we're reading from in the file
file_offset: u64,

// note: the length of this vec is the inner buffer capacity
inner_buf: Vec<u8>,

// the start of data we haven't returned put to caller buffets yet
inner_buf_offset: usize,

// the end of data we haven't returned put to caller buffets yet
inner_buf_len: usize,

file: Arc<RandomAccessFile>,
}

Expand All @@ -262,7 +287,7 @@
enum ARAFCState {
Idle(ARAFCCore),
Reading {
fut: BoxFuture<'static, JoinResult<(Result<usize, io::Error>, ARAFCCore)>>,
fut: BoxFuture<'static, JoinResult<Result<ARAFCCore, io::Error>>>,
},

#[default]
Expand All @@ -271,51 +296,74 @@

/// A cursor for reading from a [RandomAccessFile] asynchronously.
pub struct AsyncRandomAccessFileCursor {
pos: u64,
state: ARAFCState,
}

impl AsyncRead for AsyncRandomAccessFileCursor {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> task::Poll<io::Result<()>> {
) -> Poll<io::Result<()>> {
match &mut self.state {
ARAFCState::Idle { .. } => {
let mut core = match std::mem::take(&mut self.state) {
ARAFCState::Idle(core) => core,
_ => unreachable!(),
ARAFCState::Idle(core) => {
if core.inner_buf_offset < core.inner_buf_len {
trace!(inner_buf_offset = %core.inner_buf_offset, inner_buf_len = %core.inner_buf_len, avail = %(core.inner_buf_len - core.inner_buf_offset), "poll_read: have data in inner buffer");

// we have data in the inner buffer, don't even need
// to spawn a blocking task
let read_len =
cmp::min(buf.remaining(), core.inner_buf_len - core.inner_buf_offset);
trace!(%read_len, "poll_read: putting slice");

buf.put_slice(&core.inner_buf[core.inner_buf_offset..][..read_len]);
core.inner_buf_offset += read_len;
trace!(inner_buf_offset = %core.inner_buf_offset, inner_buf_len = %core.inner_buf_len, "poll_read: after put_slice");

return Poll::Ready(Ok(()));
}

trace!("will have to issue a read call");

// this is just used to shadow core
#[allow(unused_variables, clippy::let_unit_value)]
let core = ();

let (file_offset, file, mut inner_buf) = {
let core = match std::mem::take(&mut self.state) {
ARAFCState::Idle(core) => core,
_ => unreachable!(),

Check warning on line 335 in rc-zip-tokio/src/read_zip.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-tokio/src/read_zip.rs#L335

Added line #L335 was not covered by tests
};
(core.file_offset, core.file, core.inner_buf)
};
let read_len = cmp::min(buf.remaining(), core.inner_buf.len());
let pos = self.pos;

let fut = Box::pin(tokio::task::spawn_blocking(move || {
let read = core.file.read_at(pos, &mut core.inner_buf[..read_len]);
(read, core)
let read_bytes = file.read_at(file_offset, &mut inner_buf)?;
trace!("read {} bytes", read_bytes);
Ok(ARAFCCore {
file_offset: file_offset + read_bytes as u64,
file,
inner_buf,
inner_buf_len: read_bytes,
inner_buf_offset: 0,
})
}));
self.state = ARAFCState::Reading { fut };
self.poll_read(cx, buf)
}
ARAFCState::Reading { fut } => {
let (read, core) = match fut.as_mut().poll(cx) {
task::Poll::Ready(Ok(r)) => r,
task::Poll::Ready(Err(e)) => {
return task::Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
e.to_string(),
)))
}
task::Poll::Pending => return task::Poll::Pending,
};
match read {
Ok(read) => {
self.pos += read as u64;
buf.put_slice(&core.inner_buf[..read]);
self.state = ARAFCState::Idle(core);
task::Poll::Ready(Ok(()))
}
Err(e) => task::Poll::Ready(Err(e)),
let core = futures::ready!(fut
.as_mut()
.poll(cx)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))??);
let is_eof = core.inner_buf_len == 0;
self.state = ARAFCState::Idle(core);

if is_eof {
// we're at EOF
return Poll::Ready(Ok(()));
}
self.poll_read(cx, buf)
}
ARAFCState::Transitioning => unreachable!(),
}
Expand Down
Loading