From e1beda227d75eb5df86aafd140c3c8396be9f51b Mon Sep 17 00:00:00 2001 From: Amos Wenger Date: Tue, 19 Mar 2024 14:02:56 +0100 Subject: [PATCH] fix: Don't give up on reading local header when given short reads Closes #74 --- rc-zip-tokio/src/entry_reader.rs | 75 +++++++++++++++++--------------- rc-zip-tokio/src/read_zip.rs | 14 ++---- 2 files changed, 43 insertions(+), 46 deletions(-) diff --git a/rc-zip-tokio/src/entry_reader.rs b/rc-zip-tokio/src/entry_reader.rs index 094da2a..6c3024f 100644 --- a/rc-zip-tokio/src/entry_reader.rs +++ b/rc-zip-tokio/src/entry_reader.rs @@ -42,48 +42,53 @@ where cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>, ) -> task::Poll> { - let this = self.as_mut().project(); + let mut this = self.as_mut().project(); - let mut fsm = match this.fsm.take() { - Some(fsm) => fsm, - None => return Ok(()).into(), - }; + loop { + let mut fsm = match this.fsm.take() { + Some(fsm) => fsm, + None => return Ok(()).into(), + }; - if fsm.wants_read() { - tracing::trace!(space_avail = fsm.space().len(), "fsm wants read"); - let mut buf = ReadBuf::new(fsm.space()); - match this.rd.poll_read(cx, &mut buf) { - task::Poll::Ready(res) => res?, - task::Poll::Pending => { - *this.fsm = Some(fsm); - return task::Poll::Pending; + let filled_bytes; + if fsm.wants_read() { + tracing::trace!(space_avail = fsm.space().len(), "fsm wants read"); + let mut buf = ReadBuf::new(fsm.space()); + match this.rd.as_mut().poll_read(cx, &mut buf) { + task::Poll::Ready(res) => res?, + task::Poll::Pending => { + *this.fsm = Some(fsm); + return task::Poll::Pending; + } } - } - let n = buf.filled().len(); + let n = buf.filled().len(); - tracing::trace!("read {} bytes", n); - fsm.fill(n); - } else { - tracing::trace!("fsm does not want read"); - } + tracing::trace!("read {} bytes", n); + fsm.fill(n); + filled_bytes = n; + } else { + tracing::trace!("fsm does not want read"); + filled_bytes = 0; + } - match fsm.process(buf.initialize_unfilled())? { - FsmResult::Continue((fsm, outcome)) => { - *this.fsm = Some(fsm); - if outcome.bytes_written > 0 { - tracing::trace!("wrote {} bytes", outcome.bytes_written); - buf.advance(outcome.bytes_written); - } else if outcome.bytes_read == 0 { - // that's EOF, baby! - } else { - // loop, it happens - return self.poll_read(cx, buf); + match fsm.process(buf.initialize_unfilled())? { + FsmResult::Continue((fsm, outcome)) => { + *this.fsm = Some(fsm); + if outcome.bytes_written > 0 { + tracing::trace!("wrote {} bytes", outcome.bytes_written); + buf.advance(outcome.bytes_written); + } else if filled_bytes > 0 { + // keep reading + continue; + } else { + // that's EOF, baby! + } + } + FsmResult::Done(_) => { + // neat! } } - FsmResult::Done(_) => { - // neat! - } + return Ok(()).into(); } - Ok(()).into() } } diff --git a/rc-zip-tokio/src/read_zip.rs b/rc-zip-tokio/src/read_zip.rs index 95864fc..7794e9f 100644 --- a/rc-zip-tokio/src/read_zip.rs +++ b/rc-zip-tokio/src/read_zip.rs @@ -61,13 +61,10 @@ where let mut fsm = ArchiveFsm::new(size); loop { if let Some(offset) = fsm.wants_read() { - trace!(%offset, "read_zip_with_size: wants_read, space len = {}", fsm.space().len()); - let mut cstate_next = match cstate.take() { Some(cstate) => { if cstate.offset == offset { // all good, re-using - trace!(%offset, "read_zip_with_size: re-using cursor"); cstate } else { trace!(%offset, %cstate.offset, "read_zip_with_size: making new cursor (had wrong offset)"); @@ -91,7 +88,7 @@ where cstate_next.offset += read_bytes as u64; cstate = Some(cstate_next); - trace!(%read_bytes, "read_zip_with_size: read"); + trace!(%read_bytes, "filling fsm"); if read_bytes == 0 { return Err(Error::IO(io::ErrorKind::UnexpectedEof.into())); } @@ -308,23 +305,18 @@ impl AsyncRead for AsyncRandomAccessFileCursor { match &mut self.state { ARAFCState::Idle(core) => { if core.inner_buf_offset < core.inner_buf_len { - trace!(inner_buf_offset = %core.inner_buf_offset, inner_buf_len = %core.inner_buf_len, avail = %(core.inner_buf_len - core.inner_buf_offset), "poll_read: have data in inner buffer"); - // we have data in the inner buffer, don't even need // to spawn a blocking task let read_len = cmp::min(buf.remaining(), core.inner_buf_len - core.inner_buf_offset); - trace!(%read_len, "poll_read: putting slice"); buf.put_slice(&core.inner_buf[core.inner_buf_offset..][..read_len]); core.inner_buf_offset += read_len; - trace!(inner_buf_offset = %core.inner_buf_offset, inner_buf_len = %core.inner_buf_len, "poll_read: after put_slice"); + trace!(inner_buf_offset = %core.inner_buf_offset, inner_buf_len = %core.inner_buf_len, "read from inner buffer"); return Poll::Ready(Ok(())); } - trace!("will have to issue a read call"); - // this is just used to shadow core #[allow(unused_variables, clippy::let_unit_value)] let core = (); @@ -339,7 +331,7 @@ impl AsyncRead for AsyncRandomAccessFileCursor { let fut = Box::pin(tokio::task::spawn_blocking(move || { let read_bytes = file.read_at(file_offset, &mut inner_buf)?; - trace!("read {} bytes", read_bytes); + trace!(%read_bytes, "read from file"); Ok(ARAFCCore { file_offset: file_offset + read_bytes as u64, file,