From f029a986b1273b19560978a37fed4ebf57ccf176 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 28 Dec 2023 18:15:09 +0100 Subject: [PATCH 01/15] chore: Update wnfs-common to 0.1.26 --- Cargo.lock | 24 +++++++++++++++---- .../benches/artificially_slow_blockstore.rs | 7 +++--- car-mirror/Cargo.toml | 2 +- 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f54bcb7..6cecc61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -654,6 +654,19 @@ dependencies = [ "typenum", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.0", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.4.0" @@ -1691,9 +1704,9 @@ dependencies = [ [[package]] name = "serde_ipld_dagcbor" -version = "0.4.0" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ace39c1b7526be78c755a4c698313f699cf44e62408c0029bf9ab9450fe836da" +checksum = "8e880e0b1f9c7a8db874642c1217f7e19b29e325f24ab9f0fcb11818adec7f01" dependencies = [ "cbor4ii", "cid", @@ -2276,21 +2289,24 @@ checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" [[package]] name = "wnfs-common" -version = "0.1.24" +version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e7dd203b73bbbbbf175a8a733ef6aa843f020095f5f4d1e6cd3b7fdce8ba4d8" +checksum = "1395a47e38402df060d3448fe153c5af1eae6f27aeca9c2e79e5a39bb355efab" dependencies = [ "anyhow", "async-once-cell", "async-trait", "bytes", "chrono", + "dashmap", "futures", "libipld", "multihash", "once_cell", + "parking_lot", "rand_core", "serde", + "serde_ipld_dagcbor", "thiserror", ] diff --git a/car-mirror-benches/benches/artificially_slow_blockstore.rs b/car-mirror-benches/benches/artificially_slow_blockstore.rs index b34bcbf..e6f1859 100644 --- a/car-mirror-benches/benches/artificially_slow_blockstore.rs +++ b/car-mirror-benches/benches/artificially_slow_blockstore.rs @@ -10,7 +10,7 @@ use car_mirror::{ use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use libipld::Cid; use std::time::Duration; -use wnfs_common::{BlockStore, MemoryBlockStore}; +use wnfs_common::{utils::CondSend, BlockStore, MemoryBlockStore}; pub fn push_throttled(c: &mut Criterion) { let mut rvg = car_mirror::test_utils::Rvg::deterministic(); @@ -109,7 +109,8 @@ pub fn pull_throttled(c: &mut Criterion) { #[derive(Debug, Clone)] struct ThrottledBlockStore(MemoryBlockStore); -#[async_trait(?Send)] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl BlockStore for ThrottledBlockStore { async fn get_block(&self, cid: &Cid) -> Result { let bytes = self.0.get_block(cid).await?; @@ -117,7 +118,7 @@ impl BlockStore for ThrottledBlockStore { Ok(bytes) } - async fn put_block(&self, bytes: impl Into, codec: u64) -> Result { + async fn put_block(&self, bytes: impl Into + CondSend, codec: u64) -> Result { self.0.put_block(bytes, codec).await } } diff --git a/car-mirror/Cargo.toml b/car-mirror/Cargo.toml index 817ca1d..2ddc10b 100644 --- a/car-mirror/Cargo.toml +++ b/car-mirror/Cargo.toml @@ -41,7 +41,7 @@ thiserror = "1.0" tokio = { version = "^1", default-features = false } tracing = "0.1" tracing-subscriber = "0.3" -wnfs-common = "0.1.24" +wnfs-common = "0.1.26" [dev-dependencies] async-std = { version = "1.11", features = ["attributes"] } From 2d3517f3eff548a0f1f3de882f4e63214d168237 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 28 Dec 2023 18:27:46 +0100 Subject: [PATCH 02/15] chore: Lints --- car-mirror-wasm/src/lib.rs | 2 +- car-mirror/src/common.rs | 2 +- car-mirror/src/lib.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/car-mirror-wasm/src/lib.rs b/car-mirror-wasm/src/lib.rs index e31fe47..0432b03 100644 --- a/car-mirror-wasm/src/lib.rs +++ b/car-mirror-wasm/src/lib.rs @@ -1,6 +1,6 @@ #![cfg_attr(docsrs, feature(doc_cfg))] #![warn(missing_debug_implementations, missing_docs, rust_2018_idioms)] -#![deny(unreachable_pub, private_in_public)] +#![deny(unreachable_pub)] //! car-mirror diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index 20c2054..66a4280 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -221,7 +221,7 @@ pub fn references>( async fn verify_missing_subgraph_roots( root: Cid, - missing_subgraph_roots: &Vec, + missing_subgraph_roots: &[Cid], store: &impl BlockStore, cache: &impl Cache, ) -> Result, Error> { diff --git a/car-mirror/src/lib.rs b/car-mirror/src/lib.rs index 48ddeb0..6514d7b 100644 --- a/car-mirror/src/lib.rs +++ b/car-mirror/src/lib.rs @@ -1,6 +1,6 @@ #![cfg_attr(docsrs, feature(doc_cfg))] #![warn(missing_debug_implementations, missing_docs, rust_2018_idioms)] -#![deny(unreachable_pub, private_in_public)] +#![deny(unreachable_pub)] //! car-mirror From 0b3685a9ee51d8050e538e0314ac84d7af43ee68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Fri, 29 Dec 2023 10:35:32 +0100 Subject: [PATCH 03/15] feat: Make futures `Send` and `Sync` --- car-mirror/src/common.rs | 28 ++++++++++++++++++++++++ car-mirror/src/test_utils/local_utils.rs | 2 ++ car-mirror/src/traits.rs | 13 ++++++----- 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index 66a4280..d44130e 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -450,3 +450,31 @@ impl Default for Config { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::assert_send_sync; + + #[cfg(not(target_arch = "wasm32"))] + fn send_sync_tests() { + assert_send_sync(|| { + block_send( + unimplemented!(), + unimplemented!(), + unimplemented!(), + unimplemented!(), + unimplemented!(), + ) + }); + assert_send_sync(|| { + block_receive( + unimplemented!(), + unimplemented!(), + unimplemented!(), + unimplemented!(), + unimplemented!(), + ) + }) + } +} diff --git a/car-mirror/src/test_utils/local_utils.rs b/car-mirror/src/test_utils/local_utils.rs index a6708df..b24c8b6 100644 --- a/car-mirror/src/test_utils/local_utils.rs +++ b/car-mirror/src/test_utils/local_utils.rs @@ -91,3 +91,5 @@ pub(crate) async fn total_dag_blocks(root: Cid, store: &impl BlockStore) -> Resu .await? .len()) } + +pub(crate) fn assert_send_sync(fut: fn() -> T) {} diff --git a/car-mirror/src/traits.rs b/car-mirror/src/traits.rs index c38079b..f032570 100644 --- a/car-mirror/src/traits.rs +++ b/car-mirror/src/traits.rs @@ -2,7 +2,7 @@ use crate::common::references; use anyhow::Result; use async_trait::async_trait; use libipld::{Cid, IpldCodec}; -use wnfs_common::BlockStore; +use wnfs_common::{utils::CondSync, BlockStore}; /// This trait abstracts caches used by the car mirror implementation. /// An efficient cache implementation can significantly reduce the amount @@ -13,8 +13,9 @@ use wnfs_common::BlockStore; /// /// See `InMemoryCache` for a `quick_cache`-based implementation /// (enable the `quick-cache` feature), and `NoCache` for disabling the cache. -#[async_trait(?Send)] -pub trait Cache { +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait Cache: CondSync { /// This returns further references from the block referenced by given CID, /// if the cache is hit. /// Returns `None` if it's a cache miss. @@ -85,7 +86,8 @@ impl InMemoryCache { } #[cfg(feature = "quick_cache")] -#[async_trait(?Send)] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Cache for InMemoryCache { async fn get_references_cached(&self, cid: Cid) -> Result>> { Ok(self.references.get(&cid)) @@ -101,7 +103,8 @@ impl Cache for InMemoryCache { #[derive(Debug)] pub struct NoCache; -#[async_trait(?Send)] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Cache for NoCache { async fn get_references_cached(&self, _: Cid) -> Result>> { Ok(None) From 190e6a92318199e1a748c83ece98c8781d5f927b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Mon, 1 Jan 2024 11:52:09 +0100 Subject: [PATCH 04/15] feat: Better `Debug` print for `ReceiverState`. --- car-mirror/src/common.rs | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index d44130e..b84a7c6 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -40,7 +40,7 @@ pub struct Config { /// Some information that the block receiving end provides the block sending end /// in order to deduplicate block transfers. -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct ReceiverState { /// At least *some* of the subgraph roots that are missing for sure on the receiving end. pub missing_subgraph_roots: Vec, @@ -48,6 +48,25 @@ pub struct ReceiverState { pub have_cids_bloom: Option, } +impl std::fmt::Debug for ReceiverState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let have_cids_bloom = self + .have_cids_bloom + .as_ref() + .map_or("None".into(), |bloom| { + format!( + "Some(BloomFilter(k_hashes = {}, {} bytes))", + bloom.hash_count(), + bloom.as_bytes().len() + ) + }); + f.debug_struct("ReceiverState") + .field("missing_subgraph_roots", &self.missing_subgraph_roots) + .field("have_cids_bloom", &have_cids_bloom) + .finish() + } +} + /// Newtype around bytes that are supposed to represent a CAR file #[derive(Debug, Clone)] pub struct CarFile { @@ -451,14 +470,14 @@ impl Default for Config { } } +#[cfg(fuckoff)] #[cfg(test)] mod tests { use super::*; - use crate::test_utils::assert_send_sync; + use crate::test_utils::assert_cond_send_sync; - #[cfg(not(target_arch = "wasm32"))] fn send_sync_tests() { - assert_send_sync(|| { + assert_cond_send_sync(|| { block_send( unimplemented!(), unimplemented!(), @@ -467,7 +486,7 @@ mod tests { unimplemented!(), ) }); - assert_send_sync(|| { + assert_cond_send_sync(|| { block_receive( unimplemented!(), unimplemented!(), From 7310ed242e5c9318227930f25ab17261403fec22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Mon, 1 Jan 2024 11:57:44 +0100 Subject: [PATCH 05/15] chore: Add tracing to `update_have_cids` --- car-mirror/src/common.rs | 1 - car-mirror/src/incremental_verification.rs | 9 ++++++++- car-mirror/src/pull.rs | 6 +++--- car-mirror/src/push.rs | 6 +++--- car-mirror/src/test_utils/local_utils.rs | 4 ++-- 5 files changed, 16 insertions(+), 10 deletions(-) diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index b84a7c6..9411fdc 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -470,7 +470,6 @@ impl Default for Config { } } -#[cfg(fuckoff)] #[cfg(test)] mod tests { use super::*; diff --git a/car-mirror/src/incremental_verification.rs b/car-mirror/src/incremental_verification.rs index c8fb908..f5f7cb8 100644 --- a/car-mirror/src/incremental_verification.rs +++ b/car-mirror/src/incremental_verification.rs @@ -54,7 +54,7 @@ impl IncrementalDagVerification { Ok(this) } - #[instrument(level = "trace", skip_all, fields(num_want = self.want_cids.len(), num_have = self.have_cids.len()))] + #[instrument(level = "trace", skip_all)] async fn update_have_cids( &mut self, store: &impl BlockStore, @@ -68,6 +68,7 @@ impl IncrementalDagVerification { if let Some(BlockStoreError::CIDNotFound(not_found)) = e.downcast_ref::() { + tracing::trace!(%not_found, "Missing block, adding to want list"); self.want_cids.insert(*not_found); } else { return Err(Error::BlockStoreError(e)); @@ -84,6 +85,12 @@ impl IncrementalDagVerification { } } + tracing::debug!( + num_want = self.want_cids.len(), + num_have = self.have_cids.len(), + "Finished dag verification" + ); + Ok(()) } diff --git a/car-mirror/src/pull.rs b/car-mirror/src/pull.rs index 9bf7abf..1991a90 100644 --- a/car-mirror/src/pull.rs +++ b/car-mirror/src/pull.rs @@ -54,13 +54,13 @@ mod tests { use futures::TryStreamExt; use libipld::Cid; use std::collections::HashSet; - use wnfs_common::MemoryBlockStore; + use wnfs_common::{BlockStore, MemoryBlockStore}; pub(crate) async fn simulate_protocol( root: Cid, config: &Config, - client_store: &MemoryBlockStore, - server_store: &MemoryBlockStore, + client_store: &impl BlockStore, + server_store: &impl BlockStore, ) -> Result> { let mut metrics = Vec::new(); let mut request = crate::pull::request(root, None, config, client_store, &NoCache).await?; diff --git a/car-mirror/src/push.rs b/car-mirror/src/push.rs index bb8d57f..c2c81ac 100644 --- a/car-mirror/src/push.rs +++ b/car-mirror/src/push.rs @@ -64,13 +64,13 @@ mod tests { use libipld::Cid; use proptest::collection::vec; use std::collections::HashSet; - use wnfs_common::MemoryBlockStore; + use wnfs_common::{BlockStore, MemoryBlockStore}; pub(crate) async fn simulate_protocol( root: Cid, config: &Config, - client_store: &MemoryBlockStore, - server_store: &MemoryBlockStore, + client_store: &impl BlockStore, + server_store: &impl BlockStore, ) -> Result> { let mut metrics = Vec::new(); let mut request = crate::push::request(root, None, config, client_store, &NoCache).await?; diff --git a/car-mirror/src/test_utils/local_utils.rs b/car-mirror/src/test_utils/local_utils.rs index b24c8b6..9f50371 100644 --- a/car-mirror/src/test_utils/local_utils.rs +++ b/car-mirror/src/test_utils/local_utils.rs @@ -5,7 +5,7 @@ use anyhow::Result; use futures::TryStreamExt; use libipld::{Cid, Ipld}; use proptest::strategy::Strategy; -use wnfs_common::{BlockStore, MemoryBlockStore}; +use wnfs_common::{utils::CondSync, BlockStore, MemoryBlockStore}; #[derive(Clone, Debug)] pub(crate) struct Metrics { @@ -92,4 +92,4 @@ pub(crate) async fn total_dag_blocks(root: Cid, store: &impl BlockStore) -> Resu .len()) } -pub(crate) fn assert_send_sync(fut: fn() -> T) {} +pub(crate) fn assert_cond_send_sync(fut: fn() -> T) {} From d0d29e383a05328a0ee8e0b6f28d52121679e5a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Mon, 1 Jan 2024 12:03:05 +0100 Subject: [PATCH 06/15] chore: Don't require futures to be Sync --- car-mirror/src/common.rs | 15 +++++++++------ car-mirror/src/test_utils/local_utils.rs | 4 ++-- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index 9411fdc..73c60c5 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -472,17 +472,20 @@ impl Default for Config { #[cfg(test)] mod tests { + use wnfs_common::MemoryBlockStore; + use super::*; - use crate::test_utils::assert_cond_send_sync; + use crate::{test_utils::assert_cond_send_sync, traits::NoCache}; - fn send_sync_tests() { + #[allow(clippy::unreachable, unused)] + fn test_assert_send() { assert_cond_send_sync(|| { block_send( unimplemented!(), unimplemented!(), unimplemented!(), - unimplemented!(), - unimplemented!(), + unimplemented!() as &MemoryBlockStore, + &NoCache, ) }); assert_cond_send_sync(|| { @@ -490,8 +493,8 @@ mod tests { unimplemented!(), unimplemented!(), unimplemented!(), - unimplemented!(), - unimplemented!(), + unimplemented!() as &MemoryBlockStore, + &NoCache, ) }) } diff --git a/car-mirror/src/test_utils/local_utils.rs b/car-mirror/src/test_utils/local_utils.rs index 9f50371..2281d78 100644 --- a/car-mirror/src/test_utils/local_utils.rs +++ b/car-mirror/src/test_utils/local_utils.rs @@ -5,7 +5,7 @@ use anyhow::Result; use futures::TryStreamExt; use libipld::{Cid, Ipld}; use proptest::strategy::Strategy; -use wnfs_common::{utils::CondSync, BlockStore, MemoryBlockStore}; +use wnfs_common::{utils::CondSend, BlockStore, MemoryBlockStore}; #[derive(Clone, Debug)] pub(crate) struct Metrics { @@ -92,4 +92,4 @@ pub(crate) async fn total_dag_blocks(root: Cid, store: &impl BlockStore) -> Resu .len()) } -pub(crate) fn assert_cond_send_sync(fut: fn() -> T) {} +pub(crate) fn assert_cond_send_sync(_fut: fn() -> T) {} From a92678fee900362d9b3bf50095dd09b41455abc2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Mon, 1 Jan 2024 12:22:18 +0100 Subject: [PATCH 07/15] chore: Write failing testcase --- car-mirror/src/common.rs | 3 +- car-mirror/src/test_utils/blockstore_utils.rs | 8 ++-- car-mirror/src/test_utils/dag_strategy.rs | 40 ++++++++++++------- 3 files changed, 32 insertions(+), 19 deletions(-) diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index 73c60c5..7f7ab3b 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -472,10 +472,9 @@ impl Default for Config { #[cfg(test)] mod tests { - use wnfs_common::MemoryBlockStore; - use super::*; use crate::{test_utils::assert_cond_send_sync, traits::NoCache}; + use wnfs_common::MemoryBlockStore; #[allow(clippy::unreachable, unused)] fn test_assert_send() { diff --git a/car-mirror/src/test_utils/blockstore_utils.rs b/car-mirror/src/test_utils/blockstore_utils.rs index ee34a99..3a47b8b 100644 --- a/car-mirror/src/test_utils/blockstore_utils.rs +++ b/car-mirror/src/test_utils/blockstore_utils.rs @@ -20,8 +20,9 @@ pub async fn setup_existing_blockstore( store: &impl BlockStore, ) -> Result<()> { for (cid, ipld) in blocks.into_iter() { - let block: Bytes = encode(&ipld, IpldCodec::DagCbor)?.into(); - let cid_store = store.put_block(block, IpldCodec::DagCbor.into()).await?; + let codec: IpldCodec = cid.codec().try_into()?; + let block: Bytes = encode(&ipld, codec)?.into(); + let cid_store = store.put_block(block, codec.into()).await?; debug_assert_eq!(cid, cid_store); } @@ -36,7 +37,8 @@ pub fn dag_to_dot( writeln!(writer, "digraph {{")?; for (cid, ipld) in blocks { - let bytes = encode(&ipld, IpldCodec::DagCbor)?; + let codec: IpldCodec = cid.codec().try_into()?; + let bytes = encode(&ipld, codec)?; let refs = references(cid, bytes, Vec::new())?; for to_cid in refs { print_truncated_string(writer, cid.to_string())?; diff --git a/car-mirror/src/test_utils/dag_strategy.rs b/car-mirror/src/test_utils/dag_strategy.rs index b98984e..ca88824 100644 --- a/car-mirror/src/test_utils/dag_strategy.rs +++ b/car-mirror/src/test_utils/dag_strategy.rs @@ -1,7 +1,11 @@ use bytes::Bytes; use libipld::{Cid, Ipld, IpldCodec}; use libipld_core::multihash::{Code, MultihashDigest}; -use proptest::{prelude::Rng, strategy::Strategy, test_runner::TestRng}; +use proptest::{ + prelude::{Rng, RngCore}, + strategy::Strategy, + test_runner::TestRng, +}; use roaring_graphs::{arb_dag, DirectedAcyclicGraph, Vertex}; use std::{ collections::{BTreeMap, HashSet}, @@ -46,20 +50,28 @@ pub fn links_to_padded_ipld( padding_bytes: usize, ) -> impl Fn(Vec, &mut TestRng) -> (Cid, Ipld) + Clone { move |cids, rng| { - let mut padding = Vec::with_capacity(padding_bytes); - for _ in 0..padding_bytes { - padding.push(rng.gen::()); - } + let mut padding = vec![0u8; padding_bytes]; + rng.fill_bytes(&mut padding); + + let codec = match rng.gen_bool(0.5) { + true if cids.is_empty() => IpldCodec::Raw, + _ => IpldCodec::DagCbor, + }; + + let ipld = if cids.is_empty() && codec == IpldCodec::Raw { + Ipld::Bytes(padding) + } else { + Ipld::Map(BTreeMap::from([ + ("data".into(), Ipld::Bytes(padding)), + ( + "links".into(), + Ipld::List(cids.into_iter().map(Ipld::Link).collect()), + ), + ])) + }; - let ipld = Ipld::Map(BTreeMap::from([ - ("data".into(), Ipld::Bytes(padding)), - ( - "links".into(), - Ipld::List(cids.into_iter().map(Ipld::Link).collect()), - ), - ])); - let bytes = encode(&ipld, IpldCodec::DagCbor).unwrap(); - let cid = Cid::new_v1(IpldCodec::DagCbor.into(), Code::Blake3_256.digest(&bytes)); + let bytes = encode(&ipld, codec).unwrap(); + let cid = Cid::new_v1(codec.into(), Code::Blake3_256.digest(&bytes)); (cid, ipld) } } From e1cbd408e5af0e90777539ce8ba26a4f7204cfe6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Mon, 1 Jan 2024 14:55:35 +0100 Subject: [PATCH 08/15] fix: Also transfer blocks with raw codec --- car-mirror/src/incremental_verification.rs | 33 +++++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/car-mirror/src/incremental_verification.rs b/car-mirror/src/incremental_verification.rs index f5f7cb8..215887b 100644 --- a/car-mirror/src/incremental_verification.rs +++ b/car-mirror/src/incremental_verification.rs @@ -54,8 +54,11 @@ impl IncrementalDagVerification { Ok(this) } + /// Updates the state of incremental dag verification. + /// This goes through all "want" blocks and what they link to, + /// removing items that we now have and don't want anymore. #[instrument(level = "trace", skip_all)] - async fn update_have_cids( + pub async fn update_have_cids( &mut self, store: &impl BlockStore, cache: &impl Cache, @@ -69,15 +72,24 @@ impl IncrementalDagVerification { e.downcast_ref::() { tracing::trace!(%not_found, "Missing block, adding to want list"); - self.want_cids.insert(*not_found); + self.mark_as_want(*not_found); } else { return Err(Error::BlockStoreError(e)); } } Err(e) => return Err(e), Ok(Some(cid)) => { - self.want_cids.remove(&cid); - self.have_cids.insert(cid); + let not_found = matches!( + store.get_block(&cid).await, + Err(e) if matches!(e.downcast_ref(), Some(BlockStoreError::CIDNotFound(_))) + ); + + if not_found { + tracing::trace!(%cid, "Missing block, adding to want list"); + self.mark_as_want(cid); + } else { + self.mark_as_have(cid); + } } Ok(None) => { break; @@ -94,6 +106,19 @@ impl IncrementalDagVerification { Ok(()) } + fn mark_as_want(&mut self, want: Cid) { + if self.have_cids.contains(&want) { + tracing::warn!(%want, "Marking a CID as wanted, that we have previously marked as having!"); + self.have_cids.remove(&want); + } + self.want_cids.insert(want); + } + + fn mark_as_have(&mut self, have: Cid) { + self.want_cids.remove(&have); + self.have_cids.insert(have); + } + /// Check the state of a CID to find out whether /// - we expect it as one of the next possible blocks to receive (Want) /// - we have already stored it (Have) From 1645c9064f8a2db2be7a7bcba817bf1c67ccad56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Mon, 1 Jan 2024 14:55:51 +0100 Subject: [PATCH 09/15] feat: Implement a positive cache for blocks we already have --- .../benches/artificially_slow_blockstore.rs | 8 +- car-mirror-benches/benches/in_memory.rs | 8 +- .../benches/simulated_latency.rs | 8 +- car-mirror/src/common.rs | 9 +- car-mirror/src/traits.rs | 103 +++++++++++++++--- 5 files changed, 104 insertions(+), 32 deletions(-) diff --git a/car-mirror-benches/benches/artificially_slow_blockstore.rs b/car-mirror-benches/benches/artificially_slow_blockstore.rs index e6f1859..905bdd1 100644 --- a/car-mirror-benches/benches/artificially_slow_blockstore.rs +++ b/car-mirror-benches/benches/artificially_slow_blockstore.rs @@ -28,9 +28,9 @@ pub fn push_throttled(c: &mut Criterion) { }, |(client_store, root)| { let client_store = &ThrottledBlockStore(client_store); - let client_cache = &InMemoryCache::new(10_000); + let client_cache = &InMemoryCache::new(10_000, 150_000); let server_store = &ThrottledBlockStore::new(); - let server_cache = &InMemoryCache::new(10_000); + let server_cache = &InMemoryCache::new(10_000, 150_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory @@ -75,9 +75,9 @@ pub fn pull_throttled(c: &mut Criterion) { }, |(server_store, root)| { let server_store = &ThrottledBlockStore(server_store); - let server_cache = &InMemoryCache::new(10_000); + let server_cache = &InMemoryCache::new(10_000, 150_000); let client_store = &ThrottledBlockStore::new(); - let client_cache = &InMemoryCache::new(10_000); + let client_cache = &InMemoryCache::new(10_000, 150_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory diff --git a/car-mirror-benches/benches/in_memory.rs b/car-mirror-benches/benches/in_memory.rs index f77ac4b..8d03744 100644 --- a/car-mirror-benches/benches/in_memory.rs +++ b/car-mirror-benches/benches/in_memory.rs @@ -22,9 +22,9 @@ pub fn push(c: &mut Criterion) { (store, root) }, |(ref client_store, root)| { - let client_cache = &InMemoryCache::new(10_000); + let client_cache = &InMemoryCache::new(10_000, 150_000); let server_store = &MemoryBlockStore::new(); - let server_cache = &InMemoryCache::new(10_000); + let server_cache = &InMemoryCache::new(10_000, 150_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory @@ -68,9 +68,9 @@ pub fn pull(c: &mut Criterion) { (store, root) }, |(ref server_store, root)| { - let server_cache = &InMemoryCache::new(10_000); + let server_cache = &InMemoryCache::new(10_000, 150_000); let client_store = &MemoryBlockStore::new(); - let client_cache = &InMemoryCache::new(10_000); + let client_cache = &InMemoryCache::new(10_000, 150_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory diff --git a/car-mirror-benches/benches/simulated_latency.rs b/car-mirror-benches/benches/simulated_latency.rs index 0c05689..155ef59 100644 --- a/car-mirror-benches/benches/simulated_latency.rs +++ b/car-mirror-benches/benches/simulated_latency.rs @@ -68,12 +68,12 @@ pub fn pull_with_simulated_latency( links_to_padded_ipld(block_padding), )); let store = async_std::task::block_on(setup_blockstore(blocks)).unwrap(); - let cache = InMemoryCache::new(10_000); + let cache = InMemoryCache::new(10_000, 150_000); (store, cache, root) }, |(ref server_store, ref server_cache, root)| { let client_store = &MemoryBlockStore::new(); - let client_cache = &InMemoryCache::new(10_000); + let client_cache = &InMemoryCache::new(10_000, 150_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory @@ -145,12 +145,12 @@ pub fn push_with_simulated_latency( links_to_padded_ipld(block_padding), )); let store = async_std::task::block_on(setup_blockstore(blocks)).unwrap(); - let cache = InMemoryCache::new(10_000); + let cache = InMemoryCache::new(10_000, 150_000); (store, cache, root) }, |(ref client_store, ref client_cache, root)| { let server_store = &MemoryBlockStore::new(); - let server_cache = &InMemoryCache::new(10_000); + let server_cache = &InMemoryCache::new(10_000, 150_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index 7f7ab3b..dc3dd67 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -61,7 +61,10 @@ impl std::fmt::Debug for ReceiverState { ) }); f.debug_struct("ReceiverState") - .field("missing_subgraph_roots", &self.missing_subgraph_roots) + .field( + "missing_subgraph_roots.len() == ", + &self.missing_subgraph_roots.len(), + ) .field("have_cids_bloom", &have_cids_bloom) .finish() } @@ -86,7 +89,7 @@ pub struct CarFile { /// /// It returns a `CarFile` of (a subset) of all blocks below `root`, that /// are thought to be missing on the receiving end. -#[instrument(skip(config, store, cache))] +#[instrument(skip_all, fields(root, last_state))] pub async fn block_send( root: Cid, last_state: Option, @@ -145,7 +148,7 @@ pub async fn block_send( /// It takes a `CarFile`, verifies that its contents are related to the /// `root` and returns some information to help the block sending side /// figure out what blocks to send next. -#[instrument(skip(last_car, config, store, cache), fields(car_bytes = last_car.as_ref().map(|car| car.bytes.len())))] +#[instrument(skip_all, fields(root, car_bytes = last_car.as_ref().map(|car| car.bytes.len())))] pub async fn block_receive( root: Cid, last_car: Option, diff --git a/car-mirror/src/traits.rs b/car-mirror/src/traits.rs index f032570..6e8eebb 100644 --- a/car-mirror/src/traits.rs +++ b/car-mirror/src/traits.rs @@ -2,14 +2,17 @@ use crate::common::references; use anyhow::Result; use async_trait::async_trait; use libipld::{Cid, IpldCodec}; -use wnfs_common::{utils::CondSync, BlockStore}; +use wnfs_common::{ + utils::{Arc, CondSync}, + BlockStore, BlockStoreError, +}; /// This trait abstracts caches used by the car mirror implementation. /// An efficient cache implementation can significantly reduce the amount /// of lookups into the blockstore. /// -/// At the moment, all caches are conceptually memoization tables, so you don't -/// necessarily need to think about being careful about cache eviction. +/// At the moment, all caches are either memoization tables or informationally +/// monotonous, so you don't need to be careful about cache eviction. /// /// See `InMemoryCache` for a `quick_cache`-based implementation /// (enable the `quick-cache` feature), and `NoCache` for disabling the cache. @@ -21,10 +24,23 @@ pub trait Cache: CondSync { /// Returns `None` if it's a cache miss. /// /// This isn't meant to be called directly, instead use `Cache::references`. - async fn get_references_cached(&self, cid: Cid) -> Result>>; + async fn get_references_cache(&self, cid: Cid) -> Result>>; /// Populates the references cache for given CID with given references. - async fn put_references_cached(&self, cid: Cid, references: Vec) -> Result<()>; + async fn put_references_cache(&self, cid: Cid, references: Vec) -> Result<()>; + + /// This returns whether the cache has the fact stored that a block with given + /// CID is stored. + /// + /// This only returns `true` in case the block has been stored. + /// `false` simply indicates that the cache doesn't know whether the block is + /// stored or not (it's always a cache miss). + /// + /// Don't call this directly, instead, use `Cache::has_block`. + async fn get_has_block_cache(&self, cid: &Cid) -> Result; + + /// This populates the cache with the fact that a block with given CID is stored. + async fn put_has_block_cache(&self, cid: Cid) -> Result<()>; /// Find out any CIDs that are linked to from the block with given CID. /// @@ -38,32 +54,63 @@ pub trait Cache: CondSync { return Ok(Vec::new()); } - if let Some(refs) = self.get_references_cached(cid).await? { + if let Some(refs) = self.get_references_cache(cid).await? { return Ok(refs); } let block = store.get_block(&cid).await?; let refs = references(cid, block, Vec::new())?; - self.put_references_cached(cid, refs.clone()).await?; + self.put_references_cache(cid, refs.clone()).await?; Ok(refs) } + + /// Find out whether a given block is stored in given blockstore or not. + /// + /// This cache is *only* effective on `true` values for `has_block`. + /// Repeatedly calling `has_block` with `Cid`s of blocks that are *not* + /// stored will cause repeated calls to given blockstore. + /// + /// **Make sure to always use the same `BlockStore` when calling this function.** + /// + /// This makes use of the caches `get_has_block_cached`, if possible. + /// On cache misses, this will actually fetch the block from the store + /// and if successful, populate the cache using `put_has_block_cached`. + async fn has_block(&self, cid: Cid, store: &impl BlockStore) -> Result { + if self.get_has_block_cache(&cid).await? { + return Ok(true); + } + + match store.get_block(&cid).await { + Ok(_) => { + self.put_has_block_cache(cid).await?; + Ok(true) + } + Err(e) if matches!(e.downcast_ref(), Some(BlockStoreError::CIDNotFound(_))) => { + Ok(false) + } + Err(e) => Err(e), + } + } } /// A [quick-cache]-based implementation of a car mirror cache. /// /// [quick-cache]: https://github.com/arthurprs/quick-cache/ #[cfg(feature = "quick_cache")] -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct InMemoryCache { - references: quick_cache::sync::Cache>, + references: Arc>>, + has_blocks: Arc>, } #[cfg(feature = "quick_cache")] impl InMemoryCache { /// Create a new in-memory cache that approximately holds - /// cached references for `approx_capacity` CIDs. + /// cached references for `approx_references_capacity` CIDs + /// and `approx_has_blocks_capacity` CIDs known to be stored locally. /// - /// Computing the expected memory requirements isn't easy. + /// Computing the expected memory requirements for the reference + /// cache isn't easy. /// A block in theory have up to thousands of references. /// [UnixFS] chunked files will reference up to 174 chunks /// at a time. @@ -77,10 +124,15 @@ impl InMemoryCache { /// /// In practice, the fanout average will be much lower than 174. /// + /// On the other hand, each cache entry for the `has_blocks` cache + /// will take a little more than 64 bytes, so for a 10MB + /// `has_blocks` cache, you would use `10MB / 64bytes = 156_250`. + /// /// [UnixFS]: https://github.com/ipfs/specs/blob/main/UNIXFS.md#layout - pub fn new(approx_capacity: usize) -> Self { + pub fn new(approx_references_capacity: usize, approx_has_blocks_capacity: usize) -> Self { Self { - references: quick_cache::sync::Cache::new(approx_capacity), + references: Arc::new(quick_cache::sync::Cache::new(approx_references_capacity)), + has_blocks: Arc::new(quick_cache::sync::Cache::new(approx_has_blocks_capacity)), } } } @@ -89,14 +141,23 @@ impl InMemoryCache { #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Cache for InMemoryCache { - async fn get_references_cached(&self, cid: Cid) -> Result>> { + async fn get_references_cache(&self, cid: Cid) -> Result>> { Ok(self.references.get(&cid)) } - async fn put_references_cached(&self, cid: Cid, references: Vec) -> Result<()> { + async fn put_references_cache(&self, cid: Cid, references: Vec) -> Result<()> { self.references.insert(cid, references); Ok(()) } + + async fn get_has_block_cache(&self, cid: &Cid) -> Result { + Ok(self.has_blocks.get(cid).is_some()) + } + + async fn put_has_block_cache(&self, cid: Cid) -> Result<()> { + self.has_blocks.insert(cid, ()); + Ok(()) + } } /// An implementation of `Cache` that doesn't cache at all. @@ -106,11 +167,19 @@ pub struct NoCache; #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Cache for NoCache { - async fn get_references_cached(&self, _: Cid) -> Result>> { + async fn get_references_cache(&self, _: Cid) -> Result>> { Ok(None) } - async fn put_references_cached(&self, _: Cid, _: Vec) -> Result<()> { + async fn put_references_cache(&self, _: Cid, _: Vec) -> Result<()> { + Ok(()) + } + + async fn get_has_block_cache(&self, _: &Cid) -> Result { + Ok(false) + } + + async fn put_has_block_cache(&self, _: Cid) -> Result<()> { Ok(()) } } From 091746782cf9943a0ac65d64b3218c93efd06ae4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Tue, 2 Jan 2024 10:57:04 +0100 Subject: [PATCH 10/15] chore: Fix warning --- car-mirror/src/traits.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/car-mirror/src/traits.rs b/car-mirror/src/traits.rs index 6e8eebb..46ea717 100644 --- a/car-mirror/src/traits.rs +++ b/car-mirror/src/traits.rs @@ -2,10 +2,9 @@ use crate::common::references; use anyhow::Result; use async_trait::async_trait; use libipld::{Cid, IpldCodec}; -use wnfs_common::{ - utils::{Arc, CondSync}, - BlockStore, BlockStoreError, -}; +#[cfg(feature = "quick_cache")] +use wnfs_common::utils::Arc; +use wnfs_common::{utils::CondSync, BlockStore, BlockStoreError}; /// This trait abstracts caches used by the car mirror implementation. /// An efficient cache implementation can significantly reduce the amount From 5f47f015c478fd575b566ad670582b8dea2954c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Tue, 2 Jan 2024 11:18:05 +0100 Subject: [PATCH 11/15] chore: Add some tests --- Cargo.lock | 7 +++++ car-mirror/Cargo.toml | 1 + car-mirror/src/traits.rs | 65 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 73 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 6cecc61..679fab7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -404,6 +404,7 @@ dependencies = [ "serde", "serde_ipld_dagcbor", "test-strategy", + "testresult", "thiserror", "tokio", "tracing", @@ -1869,6 +1870,12 @@ dependencies = [ "syn 2.0.28", ] +[[package]] +name = "testresult" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52e045f5cf9ad69772c1c9652f5567a75df88bbb5a1310a64e53cab140c5c459" + [[package]] name = "textwrap" version = "0.16.0" diff --git a/car-mirror/Cargo.toml b/car-mirror/Cargo.toml index 2ddc10b..510354e 100644 --- a/car-mirror/Cargo.toml +++ b/car-mirror/Cargo.toml @@ -49,6 +49,7 @@ car-mirror = { path = ".", features = ["test_utils"] } proptest = "1.1" roaring-graphs = "0.12" test-strategy = "0.3" +testresult = "0.3.0" [features] default = [] diff --git a/car-mirror/src/traits.rs b/car-mirror/src/traits.rs index 46ea717..280b1cb 100644 --- a/car-mirror/src/traits.rs +++ b/car-mirror/src/traits.rs @@ -182,3 +182,68 @@ impl Cache for NoCache { Ok(()) } } + +#[cfg(all(test, feature = "quick_cache"))] +mod quick_cache_tests { + use super::{Cache, InMemoryCache}; + use libipld::{Ipld, IpldCodec}; + use testresult::TestResult; + use wnfs_common::{BlockStore, MemoryBlockStore}; + + #[async_std::test] + async fn test_has_block_cache() -> TestResult { + let store = &MemoryBlockStore::new(); + let cache = InMemoryCache::new(10_000, 150_000); + + let cid = store + .put_block(b"Hello, World!".to_vec(), IpldCodec::Raw.into()) + .await?; + + // Initially, the cache is unpopulated + assert!(!cache.get_has_block_cache(&cid).await?); + + // Then, we populate that cache + assert!(cache.has_block(cid, store).await?); + + // Now, the cache should be populated + assert!(cache.get_has_block_cache(&cid).await?); + + Ok(()) + } + + #[async_std::test] + async fn test_references_cache() -> TestResult { + let store = &MemoryBlockStore::new(); + let cache = InMemoryCache::new(10_000, 150_000); + + let hello_one_cid = store + .put_block(b"Hello, One?".to_vec(), IpldCodec::Raw.into()) + .await?; + let hello_two_cid = store + .put_block(b"Hello, Two?".to_vec(), IpldCodec::Raw.into()) + .await?; + let cid = store + .put_serializable(&Ipld::List(vec![ + Ipld::Link(hello_one_cid), + Ipld::Link(hello_two_cid), + ])) + .await?; + + // Cache unpopulated initially + assert_eq!(cache.get_references_cache(cid).await?, None); + + // This should populate the references cache + assert_eq!( + cache.references(cid, store).await?, + vec![hello_one_cid, hello_two_cid] + ); + + // Cache should now contain the references + assert_eq!( + cache.get_references_cache(cid).await?, + Some(vec![hello_one_cid, hello_two_cid]) + ); + + Ok(()) + } +} From 4e227af79812a8b1a1474f9fc7e74cb486077798 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Tue, 2 Jan 2024 11:31:08 +0100 Subject: [PATCH 12/15] chore: More tests --- car-mirror/src/traits.rs | 67 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/car-mirror/src/traits.rs b/car-mirror/src/traits.rs index 280b1cb..e4ffee4 100644 --- a/car-mirror/src/traits.rs +++ b/car-mirror/src/traits.rs @@ -247,3 +247,70 @@ mod quick_cache_tests { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::{Cache, NoCache}; + use libipld::{Ipld, IpldCodec}; + use testresult::TestResult; + use wnfs_common::{BlockStore, MemoryBlockStore}; + + #[async_std::test] + async fn test_no_cache_has_block() -> TestResult { + let store = &MemoryBlockStore::new(); + let cache = NoCache; + + let cid = store + .put_block(b"Hello, World!".to_vec(), IpldCodec::Raw.into()) + .await?; + + let not_stored_cid = store.create_cid(b"Hi!", IpldCodec::Raw.into())?; + + // Cache should start out unpopulated + assert!(!cache.get_has_block_cache(&cid).await?); + + // Then we "try to populate it". + assert!(cache.has_block(cid, store).await?); + + // It should still give correct answers + assert!(!cache.has_block(not_stored_cid, store).await?); + + // Still, it should stay unpopulated + assert!(!cache.get_has_block_cache(&cid).await?); + + Ok(()) + } + + #[async_std::test] + async fn test_no_cache_references() -> TestResult { + let store = &MemoryBlockStore::new(); + let cache = NoCache; + + let hello_one_cid = store + .put_block(b"Hello, One?".to_vec(), IpldCodec::Raw.into()) + .await?; + let hello_two_cid = store + .put_block(b"Hello, Two?".to_vec(), IpldCodec::Raw.into()) + .await?; + let cid = store + .put_serializable(&Ipld::List(vec![ + Ipld::Link(hello_one_cid), + Ipld::Link(hello_two_cid), + ])) + .await?; + + // Cache should start out unpopulated + assert_eq!(cache.get_references_cache(cid).await?, None); + + // We should get the correct answer for our queries + assert_eq!( + cache.references(cid, store).await?, + vec![hello_one_cid, hello_two_cid] + ); + + // We don't have a populated cache though + assert_eq!(cache.get_references_cache(cid).await?, None); + + Ok(()) + } +} From aea27845df32837567a8eb8bf440c1213966baa2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Tue, 2 Jan 2024 11:41:38 +0100 Subject: [PATCH 13/15] chore: Try a different way of annotating for codecov --- car-mirror/src/traits.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/car-mirror/src/traits.rs b/car-mirror/src/traits.rs index e4ffee4..9c7da56 100644 --- a/car-mirror/src/traits.rs +++ b/car-mirror/src/traits.rs @@ -183,7 +183,8 @@ impl Cache for NoCache { } } -#[cfg(all(test, feature = "quick_cache"))] +#[cfg(feature = "quick_cache")] +#[cfg(test)] mod quick_cache_tests { use super::{Cache, InMemoryCache}; use libipld::{Ipld, IpldCodec}; From 54f067a8ad9a9d1bb555506d65e22749e91fc2af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Tue, 2 Jan 2024 11:45:35 +0100 Subject: [PATCH 14/15] chore: Write another test to appease the codecov gods --- car-mirror/src/common.rs | 59 +++++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index dc3dd67..ffbd2be 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -48,28 +48,6 @@ pub struct ReceiverState { pub have_cids_bloom: Option, } -impl std::fmt::Debug for ReceiverState { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let have_cids_bloom = self - .have_cids_bloom - .as_ref() - .map_or("None".into(), |bloom| { - format!( - "Some(BloomFilter(k_hashes = {}, {} bytes))", - bloom.hash_count(), - bloom.as_bytes().len() - ) - }); - f.debug_struct("ReceiverState") - .field( - "missing_subgraph_roots.len() == ", - &self.missing_subgraph_roots.len(), - ) - .field("have_cids_bloom", &have_cids_bloom) - .finish() - } -} - /// Newtype around bytes that are supposed to represent a CAR file #[derive(Debug, Clone)] pub struct CarFile { @@ -473,10 +451,33 @@ impl Default for Config { } } +impl std::fmt::Debug for ReceiverState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let have_cids_bloom = self + .have_cids_bloom + .as_ref() + .map_or("None".into(), |bloom| { + format!( + "Some(BloomFilter(k_hashes = {}, {} bytes))", + bloom.hash_count(), + bloom.as_bytes().len() + ) + }); + f.debug_struct("ReceiverState") + .field( + "missing_subgraph_roots.len() == ", + &self.missing_subgraph_roots.len(), + ) + .field("have_cids_bloom", &have_cids_bloom) + .finish() + } +} + #[cfg(test)] mod tests { use super::*; use crate::{test_utils::assert_cond_send_sync, traits::NoCache}; + use testresult::TestResult; use wnfs_common::MemoryBlockStore; #[allow(clippy::unreachable, unused)] @@ -500,4 +501,18 @@ mod tests { ) }) } + + #[test] + fn test_receiver_state_is_not_a_huge_debug() -> TestResult { + let state = ReceiverState { + have_cids_bloom: Some(BloomFilter::new_from_size(4096, 1000)), + missing_subgraph_roots: vec![Cid::default(); 1000], + }; + + let debug_print = format!("{state:#?}"); + + assert!(debug_print.len() < 1000); + + Ok(()) + } } From 8d0c5c0230e908dd5ab9b7d6034d3f92f65761db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Tue, 2 Jan 2024 11:53:05 +0100 Subject: [PATCH 15/15] chore: Write caching tests outside feature flags --- car-mirror/src/traits.rs | 92 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 91 insertions(+), 1 deletion(-) diff --git a/car-mirror/src/traits.rs b/car-mirror/src/traits.rs index 9c7da56..49f8931 100644 --- a/car-mirror/src/traits.rs +++ b/car-mirror/src/traits.rs @@ -252,10 +252,100 @@ mod quick_cache_tests { #[cfg(test)] mod tests { use super::{Cache, NoCache}; - use libipld::{Ipld, IpldCodec}; + use anyhow::Result; + use async_trait::async_trait; + use libipld::{Cid, Ipld, IpldCodec}; + use std::{ + collections::{HashMap, HashSet}, + sync::RwLock, + }; use testresult::TestResult; use wnfs_common::{BlockStore, MemoryBlockStore}; + #[derive(Debug, Default)] + struct HashMapCache { + references: RwLock>>, + has_blocks: RwLock>, + } + + #[async_trait] + impl Cache for HashMapCache { + async fn get_references_cache(&self, cid: Cid) -> Result>> { + Ok(self.references.read().unwrap().get(&cid).cloned()) + } + + async fn put_references_cache(&self, cid: Cid, references: Vec) -> Result<()> { + self.references.write().unwrap().insert(cid, references); + Ok(()) + } + + async fn get_has_block_cache(&self, cid: &Cid) -> Result { + Ok(self.has_blocks.read().unwrap().contains(cid)) + } + + async fn put_has_block_cache(&self, cid: Cid) -> Result<()> { + self.has_blocks.write().unwrap().insert(cid); + Ok(()) + } + } + + #[async_std::test] + async fn test_has_block_cache() -> TestResult { + let store = &MemoryBlockStore::new(); + let cache = HashMapCache::default(); + + let cid = store + .put_block(b"Hello, World!".to_vec(), IpldCodec::Raw.into()) + .await?; + + // Initially, the cache is unpopulated + assert!(!cache.get_has_block_cache(&cid).await?); + + // Then, we populate that cache + assert!(cache.has_block(cid, store).await?); + + // Now, the cache should be populated + assert!(cache.get_has_block_cache(&cid).await?); + + Ok(()) + } + + #[async_std::test] + async fn test_references_cache() -> TestResult { + let store = &MemoryBlockStore::new(); + let cache = HashMapCache::default(); + + let hello_one_cid = store + .put_block(b"Hello, One?".to_vec(), IpldCodec::Raw.into()) + .await?; + let hello_two_cid = store + .put_block(b"Hello, Two?".to_vec(), IpldCodec::Raw.into()) + .await?; + let cid = store + .put_serializable(&Ipld::List(vec![ + Ipld::Link(hello_one_cid), + Ipld::Link(hello_two_cid), + ])) + .await?; + + // Cache unpopulated initially + assert_eq!(cache.get_references_cache(cid).await?, None); + + // This should populate the references cache + assert_eq!( + cache.references(cid, store).await?, + vec![hello_one_cid, hello_two_cid] + ); + + // Cache should now contain the references + assert_eq!( + cache.get_references_cache(cid).await?, + Some(vec![hello_one_cid, hello_two_cid]) + ); + + Ok(()) + } + #[async_std::test] async fn test_no_cache_has_block() -> TestResult { let store = &MemoryBlockStore::new();