From cab13778a66573285f5228bc724c97cf4c174871 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Mon, 12 Feb 2024 16:42:40 +0100 Subject: [PATCH] feat: Allow mode of streaming car file frames Also: - remove `main.rs` from car-mirror crate - adjust authors --- car-mirror-benches/Cargo.toml | 2 +- car-mirror-wasm/Cargo.toml | 4 +-- car-mirror/Cargo.toml | 9 ++---- car-mirror/src/common.rs | 58 +++++++++++++++++++++++++++++++++-- car-mirror/src/main.rs | 6 ---- examples/Cargo.toml | 2 +- 6 files changed, 62 insertions(+), 19 deletions(-) delete mode 100644 car-mirror/src/main.rs diff --git a/car-mirror-benches/Cargo.toml b/car-mirror-benches/Cargo.toml index c954284..a9590b0 100644 --- a/car-mirror-benches/Cargo.toml +++ b/car-mirror-benches/Cargo.toml @@ -3,7 +3,7 @@ name = "car-mirror-benches" version = "0.1.0" publish = false edition = "2021" -authors = ["Stephen Akinyemi "] +authors = ["Philipp Krüger "] [dependencies] anyhow = "1.0" diff --git a/car-mirror-wasm/Cargo.toml b/car-mirror-wasm/Cargo.toml index 0dcdd3a..b18f174 100644 --- a/car-mirror-wasm/Cargo.toml +++ b/car-mirror-wasm/Cargo.toml @@ -5,13 +5,13 @@ description = "Rust implementation of the CAR Mirror protocol" keywords = [] categories = [] include = ["/src", "README.md", "LICENSE-APACHE", "LICENSE-MIT"] -license = "Apache-2.0 or MIT" +license = "Apache-2.0" readme = "README.md" edition = "2021" rust-version = "1.66" documentation = "https://docs.rs/car-mirror-wasm" repository = "https://github.com/fission-codes/rs-car-mirror/tree/main/car-mirror-wasm" -authors = ["Stephen Akinyemi "] +authors = ["Philipp Krüger "] [lib] crate-type = ["cdylib", "rlib"] diff --git a/car-mirror/Cargo.toml b/car-mirror/Cargo.toml index 1d20cee..706e56c 100644 --- a/car-mirror/Cargo.toml +++ b/car-mirror/Cargo.toml @@ -5,23 +5,18 @@ description = "Rust implementation of the CAR Mirror protocol" keywords = [] categories = [] include = ["/src", "README.md", "LICENSE-APACHE", "LICENSE-MIT"] -license = "Apache-2.0 or MIT" +license = "Apache-2.0" readme = "README.md" edition = "2021" rust-version = "1.66" documentation = "https://docs.rs/car-mirror" repository = "https://github.com/fission-codes/rs-car-mirror/tree/main/car-mirror" -authors = ["Stephen Akinyemi "] +authors = ["Philipp Krüger "] [lib] path = "src/lib.rs" doctest = true -[[bin]] -name = "car-mirror" -path = "src/main.rs" -doc = true - [dependencies] anyhow = "1.0" async-trait = "0.1.73" diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index 2bcc539..8037da5 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -2,14 +2,14 @@ use bytes::Bytes; use deterministic_bloom::runtime_size::BloomFilter; -use futures::{future, TryStreamExt}; +use futures::{future, StreamExt, TryStreamExt}; use iroh_car::{CarHeader, CarReader, CarWriter}; use libipld::{Ipld, IpldCodec}; use libipld_core::{cid::Cid, codec::References}; use std::io::Cursor; use tracing::{debug, instrument, trace, warn}; use wnfs_common::{ - utils::{BoxStream, CondSend}, + utils::{boxed_stream, BoxStream, CondSend}, BlockStore, }; @@ -233,6 +233,40 @@ pub async fn block_receive_block_stream( Ok(dag_verification.into_receiver_state(config.bloom_fpr)) } +/// Turns a stream of blocks (tuples of CIDs and Bytes) into a stream +/// of frames for a CAR file. +/// +/// Simply concatenated together, all these frames form a CARv1 file. +/// +/// The frame boundaries are after the header section and between each block. +/// +/// The first frame will always be a CAR file header frame. +pub async fn stream_car_frames( + mut blocks: BlockStream<'_>, +) -> Result>, Error> { + // https://github.com/wnfs-wg/car-mirror-spec/issues/6 + // CAR files *must* have at least one CID in them, and all of them + // need to appear as a block in the payload. + // It would probably make most sense to just write all subgraph roots into this, + // but we don't know how many of the subgraph roots fit into this round yet, + // so we're simply writing the first one in here, since we know + // at least one block will be written (and it'll be that one). + let Some((cid, block)) = blocks.try_next().await? else { + debug!("No blocks to write."); + return Ok(boxed_stream(futures::stream::empty())); + }; + + let mut writer = CarWriter::new(CarHeader::new_v1(vec![cid]), Vec::new()); + writer.write_header().await?; + let first_frame = car_frame_from_block((cid, block)).await?; + + let header = writer.finish().await?; + Ok(boxed_stream( + futures::stream::iter(vec![Ok(header.into()), Ok(first_frame)]) + .chain(blocks.and_then(car_frame_from_block)), + )) +} + /// Find all CIDs that a block references. /// /// This will error out if @@ -256,6 +290,26 @@ pub fn references>( // Private //-------------------------------------------------------------------------------------------------- +async fn car_frame_from_block(block: (Cid, Bytes)) -> Result { + // TODO(matheus23): I wish this were exposed in iroh-car somehow + // Instead of having to allocate so many things. + + // The writer will always first emit a header. + // If we don't force it here, it'll do so in `writer.write()`. + // We do it here so we find out how many bytes we need to skip. + let bogus_header = CarHeader::new_v1(vec![Cid::default()]); + let mut writer = CarWriter::new(bogus_header, Vec::new()); + let start = writer.write_header().await?; + + writer.write(block.0, block.1).await?; + let mut bytes = writer.finish().await?; + + // This removes the bogus header bytes + bytes.drain(0..start); + + Ok(bytes.into()) +} + async fn verify_missing_subgraph_roots( root: Cid, missing_subgraph_roots: &[Cid], diff --git a/car-mirror/src/main.rs b/car-mirror/src/main.rs deleted file mode 100644 index c07b656..0000000 --- a/car-mirror/src/main.rs +++ /dev/null @@ -1,6 +0,0 @@ -//! car-mirror - -/// Main entry point. -fn main() { - println!("Welcome!") -} diff --git a/examples/Cargo.toml b/examples/Cargo.toml index ac27c96..fa1fa7e 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -3,7 +3,7 @@ name = "examples" version = "0.1.0" publish = false edition = "2021" -authors = ["Stephen Akinyemi "] +authors = ["Philipp Krüger "] [dev-dependencies] car-mirror = { path = "../car-mirror", version = "0.1" }