Skip to content

Commit

Permalink
fix: Don't give up on reading local header when given short reads
Browse files Browse the repository at this point in the history
Closes #74
  • Loading branch information
fasterthanlime committed Mar 19, 2024
1 parent d00ef26 commit e1beda2
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 46 deletions.
75 changes: 40 additions & 35 deletions rc-zip-tokio/src/entry_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,48 +42,53 @@ where
cx: &mut task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> task::Poll<std::io::Result<()>> {
let this = self.as_mut().project();
let mut this = self.as_mut().project();

let mut fsm = match this.fsm.take() {
Some(fsm) => fsm,
None => return Ok(()).into(),
};
loop {
let mut fsm = match this.fsm.take() {
Some(fsm) => fsm,
None => return Ok(()).into(),

Check warning on line 50 in rc-zip-tokio/src/entry_reader.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-tokio/src/entry_reader.rs#L50

Added line #L50 was not covered by tests
};

if fsm.wants_read() {
tracing::trace!(space_avail = fsm.space().len(), "fsm wants read");
let mut buf = ReadBuf::new(fsm.space());
match this.rd.poll_read(cx, &mut buf) {
task::Poll::Ready(res) => res?,
task::Poll::Pending => {
*this.fsm = Some(fsm);
return task::Poll::Pending;
let filled_bytes;
if fsm.wants_read() {
tracing::trace!(space_avail = fsm.space().len(), "fsm wants read");
let mut buf = ReadBuf::new(fsm.space());
match this.rd.as_mut().poll_read(cx, &mut buf) {
task::Poll::Ready(res) => res?,
task::Poll::Pending => {
*this.fsm = Some(fsm);
return task::Poll::Pending;
}
}
}
let n = buf.filled().len();
let n = buf.filled().len();

tracing::trace!("read {} bytes", n);
fsm.fill(n);
} else {
tracing::trace!("fsm does not want read");
}
tracing::trace!("read {} bytes", n);
fsm.fill(n);
filled_bytes = n;
} else {
tracing::trace!("fsm does not want read");
filled_bytes = 0;

Check warning on line 71 in rc-zip-tokio/src/entry_reader.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-tokio/src/entry_reader.rs#L70-L71

Added lines #L70 - L71 were not covered by tests
}

match fsm.process(buf.initialize_unfilled())? {
FsmResult::Continue((fsm, outcome)) => {
*this.fsm = Some(fsm);
if outcome.bytes_written > 0 {
tracing::trace!("wrote {} bytes", outcome.bytes_written);
buf.advance(outcome.bytes_written);
} else if outcome.bytes_read == 0 {
// that's EOF, baby!
} else {
// loop, it happens
return self.poll_read(cx, buf);
match fsm.process(buf.initialize_unfilled())? {
FsmResult::Continue((fsm, outcome)) => {
*this.fsm = Some(fsm);
if outcome.bytes_written > 0 {
tracing::trace!("wrote {} bytes", outcome.bytes_written);
buf.advance(outcome.bytes_written);
} else if filled_bytes > 0 {
// keep reading
continue;

Check warning on line 82 in rc-zip-tokio/src/entry_reader.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-tokio/src/entry_reader.rs#L82

Added line #L82 was not covered by tests
} else {
// that's EOF, baby!
}
}
FsmResult::Done(_) => {
// neat!
}
}
FsmResult::Done(_) => {
// neat!
}
return Ok(()).into();
}
Ok(()).into()
}
}
14 changes: 3 additions & 11 deletions rc-zip-tokio/src/read_zip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,10 @@ where
let mut fsm = ArchiveFsm::new(size);
loop {
if let Some(offset) = fsm.wants_read() {
trace!(%offset, "read_zip_with_size: wants_read, space len = {}", fsm.space().len());

let mut cstate_next = match cstate.take() {
Some(cstate) => {
if cstate.offset == offset {
// all good, re-using
trace!(%offset, "read_zip_with_size: re-using cursor");
cstate
} else {
trace!(%offset, %cstate.offset, "read_zip_with_size: making new cursor (had wrong offset)");
Expand All @@ -91,7 +88,7 @@ where
cstate_next.offset += read_bytes as u64;
cstate = Some(cstate_next);

trace!(%read_bytes, "read_zip_with_size: read");
trace!(%read_bytes, "filling fsm");

Check warning on line 91 in rc-zip-tokio/src/read_zip.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-tokio/src/read_zip.rs#L91

Added line #L91 was not covered by tests
if read_bytes == 0 {
return Err(Error::IO(io::ErrorKind::UnexpectedEof.into()));
}
Expand Down Expand Up @@ -308,23 +305,18 @@ impl AsyncRead for AsyncRandomAccessFileCursor {
match &mut self.state {
ARAFCState::Idle(core) => {
if core.inner_buf_offset < core.inner_buf_len {
trace!(inner_buf_offset = %core.inner_buf_offset, inner_buf_len = %core.inner_buf_len, avail = %(core.inner_buf_len - core.inner_buf_offset), "poll_read: have data in inner buffer");

// we have data in the inner buffer, don't even need
// to spawn a blocking task
let read_len =
cmp::min(buf.remaining(), core.inner_buf_len - core.inner_buf_offset);
trace!(%read_len, "poll_read: putting slice");

buf.put_slice(&core.inner_buf[core.inner_buf_offset..][..read_len]);
core.inner_buf_offset += read_len;
trace!(inner_buf_offset = %core.inner_buf_offset, inner_buf_len = %core.inner_buf_len, "poll_read: after put_slice");
trace!(inner_buf_offset = %core.inner_buf_offset, inner_buf_len = %core.inner_buf_len, "read from inner buffer");

return Poll::Ready(Ok(()));
}

trace!("will have to issue a read call");

// this is just used to shadow core
#[allow(unused_variables, clippy::let_unit_value)]
let core = ();
Expand All @@ -339,7 +331,7 @@ impl AsyncRead for AsyncRandomAccessFileCursor {

let fut = Box::pin(tokio::task::spawn_blocking(move || {
let read_bytes = file.read_at(file_offset, &mut inner_buf)?;
trace!("read {} bytes", read_bytes);
trace!(%read_bytes, "read from file");
Ok(ARAFCCore {
file_offset: file_offset + read_bytes as u64,
file,
Expand Down

0 comments on commit e1beda2

Please sign in to comment.