Skip to content

Commit

Permalink
Merge pull request #31 from fission-codes/matheus23/update-libraries
Browse files Browse the repository at this point in the history
chore: Update libraries
  • Loading branch information
matheus23 authored Dec 13, 2023
2 parents 01b127f + ab1576e commit 6ef78ae
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 62 deletions.
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions car-mirror/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,22 @@ doc = true
anyhow = "1.0"
async-stream = "0.3.5"
async-trait = "0.1.73"
bytes = "1.4.0"
bytes = "1.4"
deterministic-bloom = "0.1"
futures = "0.3.28"
iroh-car = "0.3.0"
libipld = "0.16.0"
libipld-core = "0.16.0"
futures = "0.3"
iroh-car = "0.4"
libipld = "0.16"
libipld-core = "0.16"
proptest = { version = "1.1", optional = true }
quick_cache = { version = "0.4.0", optional = true }
quick_cache = { version = "0.4", optional = true }
roaring-graphs = { version = "0.12", optional = true }
serde = "1.0.183"
serde_ipld_dagcbor = "0.4.0"
thiserror = "1.0.47"
serde = "^1"
serde_ipld_dagcbor = "0.4"
thiserror = "1.0"
tokio = { version = "^1", default-features = false }
tracing = "0.1"
tracing-subscriber = "0.3"
wnfs-common = "0.1.23"
wnfs-common = "0.1.24"

[dev-dependencies]
async-std = { version = "1.11", features = ["attributes"] }
Expand Down
38 changes: 8 additions & 30 deletions car-mirror/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#![allow(unknown_lints)] // Because the `instrument` macro contains some `#[allow]`s that rust 1.66 doesn't know yet.

use anyhow::anyhow;
use bytes::Bytes;
use deterministic_bloom::runtime_size::BloomFilter;
use futures::TryStreamExt;
Expand Down Expand Up @@ -104,11 +103,6 @@ pub async fn block_send(
Vec::new(),
);

writer
.write_header()
.await
.map_err(|e| Error::CarFileError(anyhow!(e)))?;

write_blocks_into_car(
&mut writer,
subgraph_roots,
Expand All @@ -120,11 +114,7 @@ pub async fn block_send(
.await?;

Ok(CarFile {
bytes: writer
.finish()
.await
.map_err(|e| Error::CarFileError(anyhow!(e)))?
.into(),
bytes: writer.finish().await?.into(),
})
}

Expand All @@ -147,9 +137,7 @@ pub async fn block_receive(
let mut dag_verification = IncrementalDagVerification::new([root], store, cache).await?;

if let Some(car) = last_car {
let mut reader = CarReader::new(Cursor::new(car.bytes))
.await
.map_err(|e| Error::CarFileError(anyhow!(e)))?;
let mut reader = CarReader::new(Cursor::new(car.bytes)).await?;

read_and_verify_blocks(
&mut dag_verification,
Expand Down Expand Up @@ -310,14 +298,8 @@ async fn write_blocks_into_car<W: tokio::io::AsyncWrite + Unpin + Send>(
"writing block to CAR",
);

writer
.write(cid, &block)
.await
.map_err(|e| Error::CarFileError(anyhow!(e)))?;
block_bytes += writer.write(cid, &block).await?;

// TODO(matheus23): Count the actual bytes sent?
// At the moment, this is a rough estimate. iroh-car could be improved to return the written bytes.
block_bytes += block.len();
if block_bytes > send_minimum {
break;
}
Expand All @@ -333,12 +315,8 @@ async fn read_and_verify_blocks<R: tokio::io::AsyncRead + Unpin>(
store: &impl BlockStore,
cache: &impl Cache,
) -> Result<(), Error> {
let mut block_bytes = 0;
while let Some((cid, vec)) = reader
.next_block()
.await
.map_err(|e| Error::CarFileError(anyhow!(e)))?
{
let mut bytes_read = 0;
while let Some((cid, vec)) = reader.next_block().await? {
let block = Bytes::from(vec);

debug!(
Expand All @@ -347,10 +325,10 @@ async fn read_and_verify_blocks<R: tokio::io::AsyncRead + Unpin>(
"reading block from CAR",
);

block_bytes += block.len();
if block_bytes > receive_maximum {
bytes_read += block.len();
if bytes_read > receive_maximum {
return Err(Error::TooManyBytes {
block_bytes,
bytes_read,
receive_maximum,
});
}
Expand Down
6 changes: 3 additions & 3 deletions car-mirror/src/dag_walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,14 @@ mod proptests {
use proptest::strategy::Strategy;
use std::collections::BTreeSet;
use test_strategy::proptest;
use wnfs_common::{dagcbor::encode, BlockStore, MemoryBlockStore};
use wnfs_common::{encode, BlockStore, MemoryBlockStore};

fn ipld_dags() -> impl Strategy<Value = (Vec<(Cid, Ipld)>, Cid)> {
arb_ipld_dag(1..256, 0.5, |cids, _| {
let ipld = Ipld::List(cids.into_iter().map(Ipld::Link).collect());
let cid = Cid::new_v1(
IpldCodec::DagCbor.into(),
Code::Blake3_256.digest(&encode(&ipld).unwrap()),
Code::Blake3_256.digest(&encode(&ipld, IpldCodec::DagCbor).unwrap()),
);
(cid, ipld)
})
Expand All @@ -204,7 +204,7 @@ mod proptests {
let store = &MemoryBlockStore::new();

for (cid, ipld) in dag.iter() {
let block: Bytes = encode(ipld).unwrap().into();
let block: Bytes = encode(ipld, IpldCodec::DagCbor).unwrap().into();
let cid_store = store
.put_block(block, IpldCodec::DagCbor.into())
.await
Expand Down
12 changes: 6 additions & 6 deletions car-mirror/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use crate::incremental_verification::BlockState;
pub enum Error {
/// An error raised during receival of blocks, when more than the configured maximum
/// bytes are received in a single batch. See the `Config` type.
#[error("Received more than {receive_maximum} bytes ({block_bytes}), aborting request.")]
#[error("Expected to receive no more than {receive_maximum} bytes, but got at least {bytes_read}, aborting request.")]
TooManyBytes {
/// The configured amount of maximum bytes to receive
receive_maximum: usize,
/// The actual amount of bytes received so far
block_bytes: usize,
bytes_read: usize,
},

/// This library only supports a subset of default codecs, including DAG-CBOR, DAG-JSON, DAG-PB and more.g
Expand Down Expand Up @@ -50,10 +50,6 @@ pub enum Error {
#[error("Error during block parsing: {0}")]
ParsingError(anyhow::Error),

/// An error rasied when trying to read or write a CAR file.
#[error("CAR (de)serialization error: {0}")]
CarFileError(anyhow::Error),

/// An error rasied from the blockstore.
#[error("BlockStore error: {0}")]
BlockStoreError(anyhow::Error),
Expand All @@ -64,6 +60,10 @@ pub enum Error {
/// Errors related to incremental verification
#[error(transparent)]
IncrementalVerificationError(#[from] IncrementalVerificationError),

/// An error rasied when trying to read or write a CAR file.
#[error("CAR (de)serialization error: {0}")]
CarFileError(#[from] iroh_car::Error),
}

/// Errors related to incremental verification
Expand Down
6 changes: 3 additions & 3 deletions car-mirror/src/test_utils/blockstore_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use anyhow::Result;
use bytes::Bytes;
use libipld::{Cid, Ipld, IpldCodec};
use std::io::Write;
use wnfs_common::{dagcbor::encode, BlockStore, MemoryBlockStore};
use wnfs_common::{encode, BlockStore, MemoryBlockStore};

/// Take a list of dag-cbor IPLD blocks and store all of them as dag-cbor in a
/// MemoryBlockStore & return it.
Expand All @@ -20,7 +20,7 @@ pub async fn setup_existing_blockstore(
store: &impl BlockStore,
) -> Result<()> {
for (cid, ipld) in blocks.into_iter() {
let block: Bytes = encode(&ipld)?.into();
let block: Bytes = encode(&ipld, IpldCodec::DagCbor)?.into();
let cid_store = store.put_block(block, IpldCodec::DagCbor.into()).await?;
debug_assert_eq!(cid, cid_store);
}
Expand All @@ -36,7 +36,7 @@ pub fn dag_to_dot(
writeln!(writer, "digraph {{")?;

for (cid, ipld) in blocks {
let bytes = encode(&ipld)?;
let bytes = encode(&ipld, IpldCodec::DagCbor)?;
let refs = references(cid, bytes, Vec::new())?;
for to_cid in refs {
print_truncated_string(writer, cid.to_string())?;
Expand Down
8 changes: 4 additions & 4 deletions car-mirror/src/test_utils/dag_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
fmt::Debug,
ops::Range,
};
use wnfs_common::dagcbor::encode;
use wnfs_common::encode;

/// A strategy for use with proptest to generate random DAGs (directed acyclic graphs).
/// The strategy generates a list of blocks of type T and their CIDs, as well as
Expand All @@ -25,15 +25,15 @@ pub fn arb_ipld_dag<T: Debug + Clone>(
/// A block-generating function for use with `arb_ipld_dag`.
pub fn links_to_ipld(cids: Vec<Cid>, _: &mut TestRng) -> (Cid, Ipld) {
let ipld = Ipld::List(cids.into_iter().map(Ipld::Link).collect());
let bytes = encode(&ipld).unwrap();
let bytes = encode(&ipld, IpldCodec::DagCbor).unwrap();
let cid = Cid::new_v1(IpldCodec::DagCbor.into(), Code::Blake3_256.digest(&bytes));
(cid, ipld)
}

/// A block-generating function for use with `arb_ipld_dag`.
pub fn links_to_dag_cbor(cids: Vec<Cid>, _: &mut TestRng) -> (Cid, Bytes) {
let ipld = Ipld::List(cids.into_iter().map(Ipld::Link).collect());
let bytes: Bytes = encode(&ipld).unwrap().into();
let bytes: Bytes = encode(&ipld, IpldCodec::DagCbor).unwrap().into();
let cid = Cid::new_v1(IpldCodec::DagCbor.into(), Code::Blake3_256.digest(&bytes));
(cid, bytes)
}
Expand All @@ -58,7 +58,7 @@ pub fn links_to_padded_ipld(
Ipld::List(cids.into_iter().map(Ipld::Link).collect()),
),
]));
let bytes = encode(&ipld).unwrap();
let bytes = encode(&ipld, IpldCodec::DagCbor).unwrap();
let cid = Cid::new_v1(IpldCodec::DagCbor.into(), Code::Blake3_256.digest(&bytes));
(cid, ipld)
}
Expand Down

0 comments on commit 6ef78ae

Please sign in to comment.