From a1cea81d7a10e95f1d742716a4525d8725f01676 Mon Sep 17 00:00:00 2001 From: Amos Wenger Date: Thu, 1 Feb 2024 18:10:53 +0100 Subject: [PATCH] Add tokio AsyncRead support --- .vscode/settings.json | 12 ++- Cargo.lock | 174 ++++++++++++++++++++++++++++++++++++ Cargo.toml | 5 ++ src/reader/mod.rs | 3 + src/reader/tokio/decoder.rs | 113 +++++++++++++++++++++++ src/reader/tokio/mod.rs | 2 + 6 files changed, 308 insertions(+), 1 deletion(-) create mode 100644 src/reader/tokio/decoder.rs create mode 100644 src/reader/tokio/mod.rs diff --git a/.vscode/settings.json b/.vscode/settings.json index 3248b76..2e949f9 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,13 @@ { - "rust-analyzer.cargo.features": ["default", "lzma", "deflate64", "bzip2", "zstd"] + "rust-analyzer.cargo.features": [ + "default", + "tokio", + "lzma", + "deflate64", + "bzip2", + "zstd" + ], + "rust-analyzer.linkedProjects": [ + "./Cargo.toml" + ] } \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 286830c..79ac253 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" +dependencies = [ + "gimli", +] + [[package]] name = "adler" version = "1.0.2" @@ -86,6 +95,21 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "backtrace" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "bumpalo" version = "3.14.0" @@ -286,6 +310,101 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" + +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" + +[[package]] +name = "futures-task" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" + +[[package]] +name = "futures-util" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "gimli" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" + [[package]] name = "hashbrown" version = "0.14.3" @@ -490,6 +609,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" +[[package]] +name = "object" +version = "0.32.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" +dependencies = [ + "memchr", +] + [[package]] name = "oem_cp" version = "2.0.0" @@ -564,6 +692,12 @@ version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "pkg-config" version = "0.3.29" @@ -649,16 +783,19 @@ dependencies = [ "deflate64", "encoding_rs", "flate2", + "futures", "humansize", "indicatif", "lzma-rs", "num_enum", "oem_cp", "oval", + "pin-project-lite", "positioned-io", "pretty-hex", "test-log", "thiserror", + "tokio", "tracing", "tracing-subscriber", "winnow", @@ -709,6 +846,12 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + [[package]] name = "ryu" version = "1.0.16" @@ -761,6 +904,15 @@ version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + [[package]] name = "smallvec" version = "1.13.1" @@ -835,6 +987,28 @@ dependencies = [ "once_cell", ] +[[package]] +name = "tokio" +version = "1.35.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" +dependencies = [ + "backtrace", + "pin-project-lite", + "tokio-macros", +] + +[[package]] +name = "tokio-macros" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "toml_datetime" version = "0.6.5" diff --git a/Cargo.toml b/Cargo.toml index ad753c6..97e3536 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,9 @@ lzma-rs = { version = "0.3.0", features = ["stream"], optional = true } deflate64 = { version = "0.1.7", optional = true } bzip2 = { version = "0.4.4", optional = true } zstd = { version = "0.13.0", optional = true } +tokio = { version = "1.35.1", optional = true } +futures = { version = "0.3.30", optional = true } +pin-project-lite = { version = "0.2.13", optional = true } [features] default = ["sync", "file", "deflate"] @@ -49,6 +52,7 @@ deflate64 = ["dep:deflate64"] lzma = ["dep:lzma-rs"] bzip2 = ["dep:bzip2"] zstd = ["dep:zstd"] +tokio = ["dep:tokio", "dep:futures", "dep:pin-project-lite"] [dev-dependencies] clap = { version = "4.4.18", features = ["derive"] } @@ -56,6 +60,7 @@ humansize = "2.1.3" indicatif = "0.17.7" test-log = { version = "0.2.14", default-features = false, features = ["tracing-subscriber", "trace"] } tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } +tokio = { version = "1.35.1", features = ["macros"] } [profile.release] debug = 1 diff --git a/src/reader/mod.rs b/src/reader/mod.rs index c2def34..bb12f98 100644 --- a/src/reader/mod.rs +++ b/src/reader/mod.rs @@ -6,3 +6,6 @@ pub use self::archive_reader::{ArchiveReader, ArchiveReaderResult}; #[cfg(feature = "sync")] pub mod sync; + +#[cfg(feature = "tokio")] +pub mod tokio; diff --git a/src/reader/tokio/decoder.rs b/src/reader/tokio/decoder.rs new file mode 100644 index 0000000..fdbbce1 --- /dev/null +++ b/src/reader/tokio/decoder.rs @@ -0,0 +1,113 @@ +use std::{cmp, io, pin::Pin, task}; + +use oval::Buffer; +use tokio::io::{AsyncBufRead, AsyncRead}; + +pub trait AsyncDecoder: AsyncRead +where + R: AsyncRead, +{ + /// Moves the inner reader out of this decoder. + /// self is boxed because decoders are typically used as trait objects. + fn into_inner(self: Box) -> R; + + /// Returns a mutable reference to the inner reader. + fn get_mut(&mut self) -> &mut R; +} + +pub struct StoreAsyncDecoder +where + R: AsyncRead, +{ + inner: R, +} + +impl StoreAsyncDecoder +where + R: AsyncRead, +{ + pub fn new(inner: R) -> Self { + Self { inner } + } +} + +impl AsyncRead for StoreAsyncDecoder +where + R: AsyncRead, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> task::Poll> { + // pin-project inner + let inner = unsafe { self.map_unchecked_mut(|s| &mut s.inner) }; + inner.poll_read(cx, buf) + } +} + +impl AsyncDecoder for StoreAsyncDecoder +where + R: AsyncRead, +{ + fn into_inner(self: Box) -> R { + self.inner + } + + fn get_mut(&mut self) -> &mut R { + &mut self.inner + } +} + +// TODO: dedup between tokio & sync + +/// Only allows reading a fixed number of bytes from a [oval::Buffer], +/// allowing to move the inner reader out afterwards. +pub struct RawEntryAsyncReader { + remaining: u64, + inner: Buffer, +} + +impl RawEntryAsyncReader { + pub fn new(inner: Buffer, remaining: u64) -> Self { + Self { inner, remaining } + } + + pub fn into_inner(self) -> Buffer { + self.inner + } + + pub fn get_mut(&mut self) -> &mut Buffer { + &mut self.inner + } +} + +impl AsyncBufRead for RawEntryAsyncReader { + fn consume(mut self: Pin<&mut Self>, amt: usize) { + self.as_mut().remaining -= amt as u64; + Buffer::consume(&mut self.inner, amt); + } + + fn poll_fill_buf( + self: Pin<&mut Self>, + _cx: &mut task::Context<'_>, + ) -> task::Poll> { + let max_avail = cmp::min(self.remaining, self.inner.available_data() as u64); + Ok(self.get_mut().inner.data()[..max_avail as _].as_ref()).into() + } +} + +impl AsyncRead for RawEntryAsyncReader { + fn poll_read( + mut self: Pin<&mut Self>, + _cx: &mut task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> task::Poll> { + let len = cmp::min(buf.remaining() as u64, self.remaining) as usize; + tracing::trace!(%len, buf_remaining = buf.remaining(), remaining = self.remaining, available_data = self.inner.available_data(), available_space = self.inner.available_space(), "computing len"); + + buf.put_slice(&self.inner.data()[..len]); + self.as_mut().inner.consume(len); + Ok(()).into() + } +} diff --git a/src/reader/tokio/mod.rs b/src/reader/tokio/mod.rs new file mode 100644 index 0000000..953526d --- /dev/null +++ b/src/reader/tokio/mod.rs @@ -0,0 +1,2 @@ +mod decoder; +pub use decoder::*;