Skip to content

Commit

Permalink
feat: Allow mode of streaming car file frames
Browse files Browse the repository at this point in the history
Also:
- remove `main.rs` from car-mirror crate
- adjust authors
  • Loading branch information
matheus23 committed Feb 12, 2024
1 parent 122f369 commit cab1377
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 19 deletions.
2 changes: 1 addition & 1 deletion car-mirror-benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "car-mirror-benches"
version = "0.1.0"
publish = false
edition = "2021"
authors = ["Stephen Akinyemi <[email protected]>"]
authors = ["Philipp Krüger <[email protected]>"]

[dependencies]
anyhow = "1.0"
Expand Down
4 changes: 2 additions & 2 deletions car-mirror-wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ description = "Rust implementation of the CAR Mirror protocol"
keywords = []
categories = []
include = ["/src", "README.md", "LICENSE-APACHE", "LICENSE-MIT"]
license = "Apache-2.0 or MIT"
license = "Apache-2.0"
readme = "README.md"
edition = "2021"
rust-version = "1.66"
documentation = "https://docs.rs/car-mirror-wasm"
repository = "https://github.com/fission-codes/rs-car-mirror/tree/main/car-mirror-wasm"
authors = ["Stephen Akinyemi <[email protected]>"]
authors = ["Philipp Krüger <[email protected]>"]

[lib]
crate-type = ["cdylib", "rlib"]
Expand Down
9 changes: 2 additions & 7 deletions car-mirror/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,18 @@ description = "Rust implementation of the CAR Mirror protocol"
keywords = []
categories = []
include = ["/src", "README.md", "LICENSE-APACHE", "LICENSE-MIT"]
license = "Apache-2.0 or MIT"
license = "Apache-2.0"
readme = "README.md"
edition = "2021"
rust-version = "1.66"
documentation = "https://docs.rs/car-mirror"
repository = "https://github.com/fission-codes/rs-car-mirror/tree/main/car-mirror"
authors = ["Stephen Akinyemi <[email protected]>"]
authors = ["Philipp Krüger <[email protected]>"]

[lib]
path = "src/lib.rs"
doctest = true

[[bin]]
name = "car-mirror"
path = "src/main.rs"
doc = true

[dependencies]
anyhow = "1.0"
async-trait = "0.1.73"
Expand Down
58 changes: 56 additions & 2 deletions car-mirror/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

use bytes::Bytes;
use deterministic_bloom::runtime_size::BloomFilter;
use futures::{future, TryStreamExt};
use futures::{future, StreamExt, TryStreamExt};
use iroh_car::{CarHeader, CarReader, CarWriter};
use libipld::{Ipld, IpldCodec};
use libipld_core::{cid::Cid, codec::References};
use std::io::Cursor;
use tracing::{debug, instrument, trace, warn};
use wnfs_common::{
utils::{BoxStream, CondSend},
utils::{boxed_stream, BoxStream, CondSend},
BlockStore,
};

Expand Down Expand Up @@ -233,6 +233,40 @@ pub async fn block_receive_block_stream(
Ok(dag_verification.into_receiver_state(config.bloom_fpr))
}

/// Turns a stream of blocks (tuples of CIDs and Bytes) into a stream
/// of frames for a CAR file.
///
/// Simply concatenated together, all these frames form a CARv1 file.
///
/// The frame boundaries are after the header section and between each block.
///
/// The first frame will always be a CAR file header frame.
pub async fn stream_car_frames(
mut blocks: BlockStream<'_>,
) -> Result<BoxStream<'_, Result<Bytes, Error>>, Error> {
// https://github.com/wnfs-wg/car-mirror-spec/issues/6
// CAR files *must* have at least one CID in them, and all of them
// need to appear as a block in the payload.
// It would probably make most sense to just write all subgraph roots into this,
// but we don't know how many of the subgraph roots fit into this round yet,
// so we're simply writing the first one in here, since we know
// at least one block will be written (and it'll be that one).
let Some((cid, block)) = blocks.try_next().await? else {
debug!("No blocks to write.");
return Ok(boxed_stream(futures::stream::empty()));
};

let mut writer = CarWriter::new(CarHeader::new_v1(vec![cid]), Vec::new());
writer.write_header().await?;
let first_frame = car_frame_from_block((cid, block)).await?;

let header = writer.finish().await?;
Ok(boxed_stream(
futures::stream::iter(vec![Ok(header.into()), Ok(first_frame)])
.chain(blocks.and_then(car_frame_from_block)),
))
}

/// Find all CIDs that a block references.
///
/// This will error out if
Expand All @@ -256,6 +290,26 @@ pub fn references<E: Extend<Cid>>(
// Private
//--------------------------------------------------------------------------------------------------

async fn car_frame_from_block(block: (Cid, Bytes)) -> Result<Bytes, Error> {
// TODO(matheus23): I wish this were exposed in iroh-car somehow
// Instead of having to allocate so many things.

// The writer will always first emit a header.
// If we don't force it here, it'll do so in `writer.write()`.
// We do it here so we find out how many bytes we need to skip.
let bogus_header = CarHeader::new_v1(vec![Cid::default()]);
let mut writer = CarWriter::new(bogus_header, Vec::new());
let start = writer.write_header().await?;

writer.write(block.0, block.1).await?;
let mut bytes = writer.finish().await?;

// This removes the bogus header bytes
bytes.drain(0..start);

Ok(bytes.into())
}

async fn verify_missing_subgraph_roots(
root: Cid,
missing_subgraph_roots: &[Cid],
Expand Down
6 changes: 0 additions & 6 deletions car-mirror/src/main.rs

This file was deleted.

2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "examples"
version = "0.1.0"
publish = false
edition = "2021"
authors = ["Stephen Akinyemi <[email protected]>"]
authors = ["Philipp Krüger <[email protected]>"]

[dev-dependencies]
car-mirror = { path = "../car-mirror", version = "0.1" }
Expand Down

0 comments on commit cab1377

Please sign in to comment.