diff --git a/rc-zip-tokio/src/read_zip.rs b/rc-zip-tokio/src/read_zip.rs index 5111d0b..95864fc 100644 --- a/rc-zip-tokio/src/read_zip.rs +++ b/rc-zip-tokio/src/read_zip.rs @@ -1,4 +1,10 @@ -use std::{cmp, io, ops::Deref, pin::Pin, sync::Arc, task}; +use std::{ + cmp, io, + ops::Deref, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; use futures::future::BoxFuture; use positioned_io::{RandomAccessFile, ReadAt, Size}; @@ -61,18 +67,23 @@ where Some(cstate) => { if cstate.offset == offset { // all good, re-using + trace!(%offset, "read_zip_with_size: re-using cursor"); cstate } else { + trace!(%offset, %cstate.offset, "read_zip_with_size: making new cursor (had wrong offset)"); CursorState { cursor: self.cursor_at(offset), offset, } } } - None => CursorState { - cursor: self.cursor_at(offset), - offset, - }, + None => { + trace!(%offset, "read_zip_with_size: making new cursor (had none)"); + CursorState { + cursor: self.cursor_at(offset), + offset, + } + } }; match cstate_next.cursor.read(fsm.space()).await { @@ -242,9 +253,12 @@ impl HasCursor for Arc { fn cursor_at(&self, offset: u64) -> Self::Cursor<'_> { AsyncRandomAccessFileCursor { - pos: offset, state: ARAFCState::Idle(ARAFCCore { + file_offset: offset, inner_buf: vec![0u8; 128 * 1024], + // inner_buf: vec![0u8; 128], + inner_buf_len: 0, + inner_buf_offset: 0, file: self.clone(), }), } @@ -252,7 +266,18 @@ impl HasCursor for Arc { } struct ARAFCCore { + // offset we're reading from in the file + file_offset: u64, + + // note: the length of this vec is the inner buffer capacity inner_buf: Vec, + + // the start of data we haven't returned put to caller buffets yet + inner_buf_offset: usize, + + // the end of data we haven't returned put to caller buffets yet + inner_buf_len: usize, + file: Arc, } @@ -262,7 +287,7 @@ type JoinResult = Result; enum ARAFCState { Idle(ARAFCCore), Reading { - fut: BoxFuture<'static, JoinResult<(Result, ARAFCCore)>>, + fut: BoxFuture<'static, JoinResult>>, }, #[default] @@ -271,51 +296,74 @@ enum ARAFCState { /// A cursor for reading from a [RandomAccessFile] asynchronously. pub struct AsyncRandomAccessFileCursor { - pos: u64, state: ARAFCState, } impl AsyncRead for AsyncRandomAccessFileCursor { fn poll_read( mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, - ) -> task::Poll> { + ) -> Poll> { match &mut self.state { - ARAFCState::Idle { .. } => { - let mut core = match std::mem::take(&mut self.state) { - ARAFCState::Idle(core) => core, - _ => unreachable!(), + ARAFCState::Idle(core) => { + if core.inner_buf_offset < core.inner_buf_len { + trace!(inner_buf_offset = %core.inner_buf_offset, inner_buf_len = %core.inner_buf_len, avail = %(core.inner_buf_len - core.inner_buf_offset), "poll_read: have data in inner buffer"); + + // we have data in the inner buffer, don't even need + // to spawn a blocking task + let read_len = + cmp::min(buf.remaining(), core.inner_buf_len - core.inner_buf_offset); + trace!(%read_len, "poll_read: putting slice"); + + buf.put_slice(&core.inner_buf[core.inner_buf_offset..][..read_len]); + core.inner_buf_offset += read_len; + trace!(inner_buf_offset = %core.inner_buf_offset, inner_buf_len = %core.inner_buf_len, "poll_read: after put_slice"); + + return Poll::Ready(Ok(())); + } + + trace!("will have to issue a read call"); + + // this is just used to shadow core + #[allow(unused_variables, clippy::let_unit_value)] + let core = (); + + let (file_offset, file, mut inner_buf) = { + let core = match std::mem::take(&mut self.state) { + ARAFCState::Idle(core) => core, + _ => unreachable!(), + }; + (core.file_offset, core.file, core.inner_buf) }; - let read_len = cmp::min(buf.remaining(), core.inner_buf.len()); - let pos = self.pos; + let fut = Box::pin(tokio::task::spawn_blocking(move || { - let read = core.file.read_at(pos, &mut core.inner_buf[..read_len]); - (read, core) + let read_bytes = file.read_at(file_offset, &mut inner_buf)?; + trace!("read {} bytes", read_bytes); + Ok(ARAFCCore { + file_offset: file_offset + read_bytes as u64, + file, + inner_buf, + inner_buf_len: read_bytes, + inner_buf_offset: 0, + }) })); self.state = ARAFCState::Reading { fut }; self.poll_read(cx, buf) } ARAFCState::Reading { fut } => { - let (read, core) = match fut.as_mut().poll(cx) { - task::Poll::Ready(Ok(r)) => r, - task::Poll::Ready(Err(e)) => { - return task::Poll::Ready(Err(io::Error::new( - io::ErrorKind::Other, - e.to_string(), - ))) - } - task::Poll::Pending => return task::Poll::Pending, - }; - match read { - Ok(read) => { - self.pos += read as u64; - buf.put_slice(&core.inner_buf[..read]); - self.state = ARAFCState::Idle(core); - task::Poll::Ready(Ok(())) - } - Err(e) => task::Poll::Ready(Err(e)), + let core = futures::ready!(fut + .as_mut() + .poll(cx) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))??); + let is_eof = core.inner_buf_len == 0; + self.state = ARAFCState::Idle(core); + + if is_eof { + // we're at EOF + return Poll::Ready(Ok(())); } + self.poll_read(cx, buf) } ARAFCState::Transitioning => unreachable!(), } diff --git a/rc-zip/src/fsm/entry/mod.rs b/rc-zip/src/fsm/entry/mod.rs index b46f42f..104b8f1 100644 --- a/rc-zip/src/fsm/entry/mod.rs +++ b/rc-zip/src/fsm/entry/mod.rs @@ -202,141 +202,152 @@ impl EntryFsm { ); use State as S; - match &mut self.state { - S::ReadLocalHeader => { - self.internal_process_local_header()?; - self.process(out) - } - S::ReadData { - compressed_bytes, - uncompressed_bytes, - hasher, - decompressor, - .. - } => { - let in_buf = self.buffer.data(); - - // don't feed the decompressor bytes beyond the entry's compressed size - - let entry = self.entry.as_ref().unwrap(); - let in_buf_max_len = cmp::min( - in_buf.len(), - entry.compressed_size as usize - *compressed_bytes as usize, - ); - let in_buf = &in_buf[..in_buf_max_len]; - - let fed_bytes_after_this = *compressed_bytes + in_buf.len() as u64; - let has_more_input = if fed_bytes_after_this == entry.compressed_size as _ { - HasMoreInput::No - } else { - HasMoreInput::Yes - }; - - trace!( - compressed_bytes = *compressed_bytes, - uncompressed_bytes = *uncompressed_bytes, - fed_bytes_after_this, - in_buf_len = in_buf.len(), - ?has_more_input, - "decompressing" - ); - - let outcome = decompressor.decompress(in_buf, out, has_more_input)?; - trace!( - ?outcome, - compressed_bytes = *compressed_bytes, - uncompressed_bytes = *uncompressed_bytes, - "decompressed" - ); - self.buffer.consume(outcome.bytes_read); - *compressed_bytes += outcome.bytes_read as u64; - - if outcome.bytes_written == 0 && *compressed_bytes == entry.compressed_size { - trace!("eof and no bytes written, we're done"); - - // we're done, let's read the data descriptor (if there's one) - transition!(self.state => (S::ReadData { has_data_descriptor, is_zip64, uncompressed_bytes, hasher, .. }) { - let metrics = EntryReadMetrics { - uncompressed_size: uncompressed_bytes, - crc32: hasher.finalize(), + 'process_state: loop { + return match &mut self.state { + S::ReadLocalHeader => { + if self.internal_process_local_header()? { + // the local header was completed, let's keep going + continue 'process_state; + } else { + // no buffer were touched, the local header wasn't complete + let outcome = DecompressOutcome { + bytes_read: 0, + bytes_written: 0, }; - - if has_data_descriptor { - trace!("transitioning to ReadDataDescriptor"); - S::ReadDataDescriptor { metrics, is_zip64 } - } else { - trace!("transitioning to Validate"); - S::Validate { metrics, descriptor: None } - } - }); - return self.process(out); + Ok(FsmResult::Continue((self, outcome))) + } } + S::ReadData { + compressed_bytes, + uncompressed_bytes, + hasher, + decompressor, + .. + } => { + let in_buf = self.buffer.data(); + + // don't feed the decompressor bytes beyond the entry's compressed size + + let entry = self.entry.as_ref().unwrap(); + let in_buf_max_len = cmp::min( + in_buf.len(), + entry.compressed_size as usize - *compressed_bytes as usize, + ); + let in_buf = &in_buf[..in_buf_max_len]; + + let fed_bytes_after_this = *compressed_bytes + in_buf.len() as u64; + let has_more_input = if fed_bytes_after_this == entry.compressed_size as _ { + HasMoreInput::No + } else { + HasMoreInput::Yes + }; + + trace!( + compressed_bytes = *compressed_bytes, + uncompressed_bytes = *uncompressed_bytes, + fed_bytes_after_this, + in_buf_len = in_buf.len(), + ?has_more_input, + "decompressing" + ); + + let outcome = decompressor.decompress(in_buf, out, has_more_input)?; + trace!( + ?outcome, + compressed_bytes = *compressed_bytes, + uncompressed_bytes = *uncompressed_bytes, + "decompressed" + ); + self.buffer.consume(outcome.bytes_read); + *compressed_bytes += outcome.bytes_read as u64; + + if outcome.bytes_written == 0 && *compressed_bytes == entry.compressed_size { + trace!("eof and no bytes written, we're done"); + + // we're done, let's read the data descriptor (if there's one) + transition!(self.state => (S::ReadData { has_data_descriptor, is_zip64, uncompressed_bytes, hasher, .. }) { + let metrics = EntryReadMetrics { + uncompressed_size: uncompressed_bytes, + crc32: hasher.finalize(), + }; + + if has_data_descriptor { + trace!("transitioning to ReadDataDescriptor"); + S::ReadDataDescriptor { metrics, is_zip64 } + } else { + trace!("transitioning to Validate"); + S::Validate { metrics, descriptor: None } + } + }); + return self.process(out); + } - // write the decompressed data to the hasher - hasher.update(&out[..outcome.bytes_written]); - // update the number of bytes we've decompressed - *uncompressed_bytes += outcome.bytes_written as u64; + // write the decompressed data to the hasher + hasher.update(&out[..outcome.bytes_written]); + // update the number of bytes we've decompressed + *uncompressed_bytes += outcome.bytes_written as u64; - trace!( - compressed_bytes = *compressed_bytes, - uncompressed_bytes = *uncompressed_bytes, - "updated hasher" - ); + trace!( + compressed_bytes = *compressed_bytes, + uncompressed_bytes = *uncompressed_bytes, + "updated hasher" + ); - Ok(FsmResult::Continue((self, outcome))) - } - S::ReadDataDescriptor { is_zip64, .. } => { - let mut input = Partial::new(self.buffer.data()); - - match DataDescriptorRecord::mk_parser(*is_zip64).parse_next(&mut input) { - Ok(descriptor) => { - self.buffer - .consume(input.as_bytes().offset_from(&self.buffer.data())); - trace!("data descriptor = {:#?}", descriptor); - transition!(self.state => (S::ReadDataDescriptor { metrics, .. }) { - S::Validate { metrics, descriptor: Some(descriptor) } - }); - self.process(out) - } - Err(ErrMode::Incomplete(_)) => { - Ok(FsmResult::Continue((self, Default::default()))) + Ok(FsmResult::Continue((self, outcome))) + } + S::ReadDataDescriptor { is_zip64, .. } => { + let mut input = Partial::new(self.buffer.data()); + + match DataDescriptorRecord::mk_parser(*is_zip64).parse_next(&mut input) { + Ok(descriptor) => { + self.buffer + .consume(input.as_bytes().offset_from(&self.buffer.data())); + trace!("data descriptor = {:#?}", descriptor); + transition!(self.state => (S::ReadDataDescriptor { metrics, .. }) { + S::Validate { metrics, descriptor: Some(descriptor) } + }); + self.process(out) + } + Err(ErrMode::Incomplete(_)) => { + Ok(FsmResult::Continue((self, Default::default()))) + } + Err(_e) => Err(Error::Format(FormatError::InvalidDataDescriptor)), } - Err(_e) => Err(Error::Format(FormatError::InvalidDataDescriptor)), } - } - S::Validate { - metrics, - descriptor, - } => { - let entry = self.entry.as_ref().unwrap(); - - let expected_crc32 = if entry.crc32 != 0 { - entry.crc32 - } else if let Some(descriptor) = descriptor.as_ref() { - descriptor.crc32 - } else { - 0 - }; + S::Validate { + metrics, + descriptor, + } => { + let entry = self.entry.as_ref().unwrap(); + + let expected_crc32 = if entry.crc32 != 0 { + entry.crc32 + } else if let Some(descriptor) = descriptor.as_ref() { + descriptor.crc32 + } else { + 0 + }; + + if entry.uncompressed_size != metrics.uncompressed_size { + return Err(Error::Format(FormatError::WrongSize { + expected: entry.uncompressed_size, + actual: metrics.uncompressed_size, + })); + } - if entry.uncompressed_size != metrics.uncompressed_size { - return Err(Error::Format(FormatError::WrongSize { - expected: entry.uncompressed_size, - actual: metrics.uncompressed_size, - })); - } + if expected_crc32 != 0 && expected_crc32 != metrics.crc32 { + return Err(Error::Format(FormatError::WrongChecksum { + expected: expected_crc32, + actual: metrics.crc32, + })); + } - if expected_crc32 != 0 && expected_crc32 != metrics.crc32 { - return Err(Error::Format(FormatError::WrongChecksum { - expected: expected_crc32, - actual: metrics.crc32, - })); + Ok(FsmResult::Done(self.buffer)) } - - Ok(FsmResult::Done(self.buffer)) - } - S::Transition => { - unreachable!("the state machine should never be in the transition state") - } + S::Transition => { + unreachable!("the state machine should never be in the transition state") + } + }; } }