From b22fb94752f065482bc6f738d0c1a8639cd46778 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Tue, 2 Jan 2024 11:59:01 +0100 Subject: [PATCH] fix: Correctly handle raw-codec CIDs/blocks (#37) Major things in this PR: - Correctly handle raw-codec CIDs/blocks, they were previously not transferred due to a bug in `IncrementalDagVerification`. - Update wnfs-common to 0.1.26 so this crate is compatible with the latest rs-wnfs. - Introduce another cache, a positive cache for checking if we already have a block. (Work on #28) - Make sure the main request/response futures are `Send` --- Cargo.lock | 31 +- .../benches/artificially_slow_blockstore.rs | 15 +- car-mirror-benches/benches/in_memory.rs | 8 +- .../benches/simulated_latency.rs | 8 +- car-mirror-wasm/src/lib.rs | 2 +- car-mirror/Cargo.toml | 3 +- car-mirror/src/common.rs | 74 +++- car-mirror/src/incremental_verification.rs | 42 ++- car-mirror/src/lib.rs | 2 +- car-mirror/src/pull.rs | 6 +- car-mirror/src/push.rs | 6 +- car-mirror/src/test_utils/blockstore_utils.rs | 8 +- car-mirror/src/test_utils/dag_strategy.rs | 40 ++- car-mirror/src/test_utils/local_utils.rs | 4 +- car-mirror/src/traits.rs | 336 ++++++++++++++++-- 15 files changed, 509 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f54bcb7..679fab7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -404,6 +404,7 @@ dependencies = [ "serde", "serde_ipld_dagcbor", "test-strategy", + "testresult", "thiserror", "tokio", "tracing", @@ -654,6 +655,19 @@ dependencies = [ "typenum", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.0", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.4.0" @@ -1691,9 +1705,9 @@ dependencies = [ [[package]] name = "serde_ipld_dagcbor" -version = "0.4.0" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ace39c1b7526be78c755a4c698313f699cf44e62408c0029bf9ab9450fe836da" +checksum = "8e880e0b1f9c7a8db874642c1217f7e19b29e325f24ab9f0fcb11818adec7f01" dependencies = [ "cbor4ii", "cid", @@ -1856,6 +1870,12 @@ dependencies = [ "syn 2.0.28", ] +[[package]] +name = "testresult" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52e045f5cf9ad69772c1c9652f5567a75df88bbb5a1310a64e53cab140c5c459" + [[package]] name = "textwrap" version = "0.16.0" @@ -2276,21 +2296,24 @@ checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" [[package]] name = "wnfs-common" -version = "0.1.24" +version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e7dd203b73bbbbbf175a8a733ef6aa843f020095f5f4d1e6cd3b7fdce8ba4d8" +checksum = "1395a47e38402df060d3448fe153c5af1eae6f27aeca9c2e79e5a39bb355efab" dependencies = [ "anyhow", "async-once-cell", "async-trait", "bytes", "chrono", + "dashmap", "futures", "libipld", "multihash", "once_cell", + "parking_lot", "rand_core", "serde", + "serde_ipld_dagcbor", "thiserror", ] diff --git a/car-mirror-benches/benches/artificially_slow_blockstore.rs b/car-mirror-benches/benches/artificially_slow_blockstore.rs index b34bcbf..905bdd1 100644 --- a/car-mirror-benches/benches/artificially_slow_blockstore.rs +++ b/car-mirror-benches/benches/artificially_slow_blockstore.rs @@ -10,7 +10,7 @@ use car_mirror::{ use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use libipld::Cid; use std::time::Duration; -use wnfs_common::{BlockStore, MemoryBlockStore}; +use wnfs_common::{utils::CondSend, BlockStore, MemoryBlockStore}; pub fn push_throttled(c: &mut Criterion) { let mut rvg = car_mirror::test_utils::Rvg::deterministic(); @@ -28,9 +28,9 @@ pub fn push_throttled(c: &mut Criterion) { }, |(client_store, root)| { let client_store = &ThrottledBlockStore(client_store); - let client_cache = &InMemoryCache::new(10_000); + let client_cache = &InMemoryCache::new(10_000, 150_000); let server_store = &ThrottledBlockStore::new(); - let server_cache = &InMemoryCache::new(10_000); + let server_cache = &InMemoryCache::new(10_000, 150_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory @@ -75,9 +75,9 @@ pub fn pull_throttled(c: &mut Criterion) { }, |(server_store, root)| { let server_store = &ThrottledBlockStore(server_store); - let server_cache = &InMemoryCache::new(10_000); + let server_cache = &InMemoryCache::new(10_000, 150_000); let client_store = &ThrottledBlockStore::new(); - let client_cache = &InMemoryCache::new(10_000); + let client_cache = &InMemoryCache::new(10_000, 150_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory @@ -109,7 +109,8 @@ pub fn pull_throttled(c: &mut Criterion) { #[derive(Debug, Clone)] struct ThrottledBlockStore(MemoryBlockStore); -#[async_trait(?Send)] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl BlockStore for ThrottledBlockStore { async fn get_block(&self, cid: &Cid) -> Result { let bytes = self.0.get_block(cid).await?; @@ -117,7 +118,7 @@ impl BlockStore for ThrottledBlockStore { Ok(bytes) } - async fn put_block(&self, bytes: impl Into, codec: u64) -> Result { + async fn put_block(&self, bytes: impl Into + CondSend, codec: u64) -> Result { self.0.put_block(bytes, codec).await } } diff --git a/car-mirror-benches/benches/in_memory.rs b/car-mirror-benches/benches/in_memory.rs index f77ac4b..8d03744 100644 --- a/car-mirror-benches/benches/in_memory.rs +++ b/car-mirror-benches/benches/in_memory.rs @@ -22,9 +22,9 @@ pub fn push(c: &mut Criterion) { (store, root) }, |(ref client_store, root)| { - let client_cache = &InMemoryCache::new(10_000); + let client_cache = &InMemoryCache::new(10_000, 150_000); let server_store = &MemoryBlockStore::new(); - let server_cache = &InMemoryCache::new(10_000); + let server_cache = &InMemoryCache::new(10_000, 150_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory @@ -68,9 +68,9 @@ pub fn pull(c: &mut Criterion) { (store, root) }, |(ref server_store, root)| { - let server_cache = &InMemoryCache::new(10_000); + let server_cache = &InMemoryCache::new(10_000, 150_000); let client_store = &MemoryBlockStore::new(); - let client_cache = &InMemoryCache::new(10_000); + let client_cache = &InMemoryCache::new(10_000, 150_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory diff --git a/car-mirror-benches/benches/simulated_latency.rs b/car-mirror-benches/benches/simulated_latency.rs index 0c05689..155ef59 100644 --- a/car-mirror-benches/benches/simulated_latency.rs +++ b/car-mirror-benches/benches/simulated_latency.rs @@ -68,12 +68,12 @@ pub fn pull_with_simulated_latency( links_to_padded_ipld(block_padding), )); let store = async_std::task::block_on(setup_blockstore(blocks)).unwrap(); - let cache = InMemoryCache::new(10_000); + let cache = InMemoryCache::new(10_000, 150_000); (store, cache, root) }, |(ref server_store, ref server_cache, root)| { let client_store = &MemoryBlockStore::new(); - let client_cache = &InMemoryCache::new(10_000); + let client_cache = &InMemoryCache::new(10_000, 150_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory @@ -145,12 +145,12 @@ pub fn push_with_simulated_latency( links_to_padded_ipld(block_padding), )); let store = async_std::task::block_on(setup_blockstore(blocks)).unwrap(); - let cache = InMemoryCache::new(10_000); + let cache = InMemoryCache::new(10_000, 150_000); (store, cache, root) }, |(ref client_store, ref client_cache, root)| { let server_store = &MemoryBlockStore::new(); - let server_cache = &InMemoryCache::new(10_000); + let server_cache = &InMemoryCache::new(10_000, 150_000); let config = &Config::default(); // Simulate a multi-round protocol run in-memory diff --git a/car-mirror-wasm/src/lib.rs b/car-mirror-wasm/src/lib.rs index e31fe47..0432b03 100644 --- a/car-mirror-wasm/src/lib.rs +++ b/car-mirror-wasm/src/lib.rs @@ -1,6 +1,6 @@ #![cfg_attr(docsrs, feature(doc_cfg))] #![warn(missing_debug_implementations, missing_docs, rust_2018_idioms)] -#![deny(unreachable_pub, private_in_public)] +#![deny(unreachable_pub)] //! car-mirror diff --git a/car-mirror/Cargo.toml b/car-mirror/Cargo.toml index 817ca1d..510354e 100644 --- a/car-mirror/Cargo.toml +++ b/car-mirror/Cargo.toml @@ -41,7 +41,7 @@ thiserror = "1.0" tokio = { version = "^1", default-features = false } tracing = "0.1" tracing-subscriber = "0.3" -wnfs-common = "0.1.24" +wnfs-common = "0.1.26" [dev-dependencies] async-std = { version = "1.11", features = ["attributes"] } @@ -49,6 +49,7 @@ car-mirror = { path = ".", features = ["test_utils"] } proptest = "1.1" roaring-graphs = "0.12" test-strategy = "0.3" +testresult = "0.3.0" [features] default = [] diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index 20c2054..ffbd2be 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -40,7 +40,7 @@ pub struct Config { /// Some information that the block receiving end provides the block sending end /// in order to deduplicate block transfers. -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct ReceiverState { /// At least *some* of the subgraph roots that are missing for sure on the receiving end. pub missing_subgraph_roots: Vec, @@ -67,7 +67,7 @@ pub struct CarFile { /// /// It returns a `CarFile` of (a subset) of all blocks below `root`, that /// are thought to be missing on the receiving end. -#[instrument(skip(config, store, cache))] +#[instrument(skip_all, fields(root, last_state))] pub async fn block_send( root: Cid, last_state: Option, @@ -126,7 +126,7 @@ pub async fn block_send( /// It takes a `CarFile`, verifies that its contents are related to the /// `root` and returns some information to help the block sending side /// figure out what blocks to send next. -#[instrument(skip(last_car, config, store, cache), fields(car_bytes = last_car.as_ref().map(|car| car.bytes.len())))] +#[instrument(skip_all, fields(root, car_bytes = last_car.as_ref().map(|car| car.bytes.len())))] pub async fn block_receive( root: Cid, last_car: Option, @@ -221,7 +221,7 @@ pub fn references>( async fn verify_missing_subgraph_roots( root: Cid, - missing_subgraph_roots: &Vec, + missing_subgraph_roots: &[Cid], store: &impl BlockStore, cache: &impl Cache, ) -> Result, Error> { @@ -450,3 +450,69 @@ impl Default for Config { } } } + +impl std::fmt::Debug for ReceiverState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let have_cids_bloom = self + .have_cids_bloom + .as_ref() + .map_or("None".into(), |bloom| { + format!( + "Some(BloomFilter(k_hashes = {}, {} bytes))", + bloom.hash_count(), + bloom.as_bytes().len() + ) + }); + f.debug_struct("ReceiverState") + .field( + "missing_subgraph_roots.len() == ", + &self.missing_subgraph_roots.len(), + ) + .field("have_cids_bloom", &have_cids_bloom) + .finish() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{test_utils::assert_cond_send_sync, traits::NoCache}; + use testresult::TestResult; + use wnfs_common::MemoryBlockStore; + + #[allow(clippy::unreachable, unused)] + fn test_assert_send() { + assert_cond_send_sync(|| { + block_send( + unimplemented!(), + unimplemented!(), + unimplemented!(), + unimplemented!() as &MemoryBlockStore, + &NoCache, + ) + }); + assert_cond_send_sync(|| { + block_receive( + unimplemented!(), + unimplemented!(), + unimplemented!(), + unimplemented!() as &MemoryBlockStore, + &NoCache, + ) + }) + } + + #[test] + fn test_receiver_state_is_not_a_huge_debug() -> TestResult { + let state = ReceiverState { + have_cids_bloom: Some(BloomFilter::new_from_size(4096, 1000)), + missing_subgraph_roots: vec![Cid::default(); 1000], + }; + + let debug_print = format!("{state:#?}"); + + assert!(debug_print.len() < 1000); + + Ok(()) + } +} diff --git a/car-mirror/src/incremental_verification.rs b/car-mirror/src/incremental_verification.rs index c8fb908..215887b 100644 --- a/car-mirror/src/incremental_verification.rs +++ b/car-mirror/src/incremental_verification.rs @@ -54,8 +54,11 @@ impl IncrementalDagVerification { Ok(this) } - #[instrument(level = "trace", skip_all, fields(num_want = self.want_cids.len(), num_have = self.have_cids.len()))] - async fn update_have_cids( + /// Updates the state of incremental dag verification. + /// This goes through all "want" blocks and what they link to, + /// removing items that we now have and don't want anymore. + #[instrument(level = "trace", skip_all)] + pub async fn update_have_cids( &mut self, store: &impl BlockStore, cache: &impl Cache, @@ -68,15 +71,25 @@ impl IncrementalDagVerification { if let Some(BlockStoreError::CIDNotFound(not_found)) = e.downcast_ref::() { - self.want_cids.insert(*not_found); + tracing::trace!(%not_found, "Missing block, adding to want list"); + self.mark_as_want(*not_found); } else { return Err(Error::BlockStoreError(e)); } } Err(e) => return Err(e), Ok(Some(cid)) => { - self.want_cids.remove(&cid); - self.have_cids.insert(cid); + let not_found = matches!( + store.get_block(&cid).await, + Err(e) if matches!(e.downcast_ref(), Some(BlockStoreError::CIDNotFound(_))) + ); + + if not_found { + tracing::trace!(%cid, "Missing block, adding to want list"); + self.mark_as_want(cid); + } else { + self.mark_as_have(cid); + } } Ok(None) => { break; @@ -84,9 +97,28 @@ impl IncrementalDagVerification { } } + tracing::debug!( + num_want = self.want_cids.len(), + num_have = self.have_cids.len(), + "Finished dag verification" + ); + Ok(()) } + fn mark_as_want(&mut self, want: Cid) { + if self.have_cids.contains(&want) { + tracing::warn!(%want, "Marking a CID as wanted, that we have previously marked as having!"); + self.have_cids.remove(&want); + } + self.want_cids.insert(want); + } + + fn mark_as_have(&mut self, have: Cid) { + self.want_cids.remove(&have); + self.have_cids.insert(have); + } + /// Check the state of a CID to find out whether /// - we expect it as one of the next possible blocks to receive (Want) /// - we have already stored it (Have) diff --git a/car-mirror/src/lib.rs b/car-mirror/src/lib.rs index 48ddeb0..6514d7b 100644 --- a/car-mirror/src/lib.rs +++ b/car-mirror/src/lib.rs @@ -1,6 +1,6 @@ #![cfg_attr(docsrs, feature(doc_cfg))] #![warn(missing_debug_implementations, missing_docs, rust_2018_idioms)] -#![deny(unreachable_pub, private_in_public)] +#![deny(unreachable_pub)] //! car-mirror diff --git a/car-mirror/src/pull.rs b/car-mirror/src/pull.rs index 9bf7abf..1991a90 100644 --- a/car-mirror/src/pull.rs +++ b/car-mirror/src/pull.rs @@ -54,13 +54,13 @@ mod tests { use futures::TryStreamExt; use libipld::Cid; use std::collections::HashSet; - use wnfs_common::MemoryBlockStore; + use wnfs_common::{BlockStore, MemoryBlockStore}; pub(crate) async fn simulate_protocol( root: Cid, config: &Config, - client_store: &MemoryBlockStore, - server_store: &MemoryBlockStore, + client_store: &impl BlockStore, + server_store: &impl BlockStore, ) -> Result> { let mut metrics = Vec::new(); let mut request = crate::pull::request(root, None, config, client_store, &NoCache).await?; diff --git a/car-mirror/src/push.rs b/car-mirror/src/push.rs index bb8d57f..c2c81ac 100644 --- a/car-mirror/src/push.rs +++ b/car-mirror/src/push.rs @@ -64,13 +64,13 @@ mod tests { use libipld::Cid; use proptest::collection::vec; use std::collections::HashSet; - use wnfs_common::MemoryBlockStore; + use wnfs_common::{BlockStore, MemoryBlockStore}; pub(crate) async fn simulate_protocol( root: Cid, config: &Config, - client_store: &MemoryBlockStore, - server_store: &MemoryBlockStore, + client_store: &impl BlockStore, + server_store: &impl BlockStore, ) -> Result> { let mut metrics = Vec::new(); let mut request = crate::push::request(root, None, config, client_store, &NoCache).await?; diff --git a/car-mirror/src/test_utils/blockstore_utils.rs b/car-mirror/src/test_utils/blockstore_utils.rs index ee34a99..3a47b8b 100644 --- a/car-mirror/src/test_utils/blockstore_utils.rs +++ b/car-mirror/src/test_utils/blockstore_utils.rs @@ -20,8 +20,9 @@ pub async fn setup_existing_blockstore( store: &impl BlockStore, ) -> Result<()> { for (cid, ipld) in blocks.into_iter() { - let block: Bytes = encode(&ipld, IpldCodec::DagCbor)?.into(); - let cid_store = store.put_block(block, IpldCodec::DagCbor.into()).await?; + let codec: IpldCodec = cid.codec().try_into()?; + let block: Bytes = encode(&ipld, codec)?.into(); + let cid_store = store.put_block(block, codec.into()).await?; debug_assert_eq!(cid, cid_store); } @@ -36,7 +37,8 @@ pub fn dag_to_dot( writeln!(writer, "digraph {{")?; for (cid, ipld) in blocks { - let bytes = encode(&ipld, IpldCodec::DagCbor)?; + let codec: IpldCodec = cid.codec().try_into()?; + let bytes = encode(&ipld, codec)?; let refs = references(cid, bytes, Vec::new())?; for to_cid in refs { print_truncated_string(writer, cid.to_string())?; diff --git a/car-mirror/src/test_utils/dag_strategy.rs b/car-mirror/src/test_utils/dag_strategy.rs index b98984e..ca88824 100644 --- a/car-mirror/src/test_utils/dag_strategy.rs +++ b/car-mirror/src/test_utils/dag_strategy.rs @@ -1,7 +1,11 @@ use bytes::Bytes; use libipld::{Cid, Ipld, IpldCodec}; use libipld_core::multihash::{Code, MultihashDigest}; -use proptest::{prelude::Rng, strategy::Strategy, test_runner::TestRng}; +use proptest::{ + prelude::{Rng, RngCore}, + strategy::Strategy, + test_runner::TestRng, +}; use roaring_graphs::{arb_dag, DirectedAcyclicGraph, Vertex}; use std::{ collections::{BTreeMap, HashSet}, @@ -46,20 +50,28 @@ pub fn links_to_padded_ipld( padding_bytes: usize, ) -> impl Fn(Vec, &mut TestRng) -> (Cid, Ipld) + Clone { move |cids, rng| { - let mut padding = Vec::with_capacity(padding_bytes); - for _ in 0..padding_bytes { - padding.push(rng.gen::()); - } + let mut padding = vec![0u8; padding_bytes]; + rng.fill_bytes(&mut padding); + + let codec = match rng.gen_bool(0.5) { + true if cids.is_empty() => IpldCodec::Raw, + _ => IpldCodec::DagCbor, + }; + + let ipld = if cids.is_empty() && codec == IpldCodec::Raw { + Ipld::Bytes(padding) + } else { + Ipld::Map(BTreeMap::from([ + ("data".into(), Ipld::Bytes(padding)), + ( + "links".into(), + Ipld::List(cids.into_iter().map(Ipld::Link).collect()), + ), + ])) + }; - let ipld = Ipld::Map(BTreeMap::from([ - ("data".into(), Ipld::Bytes(padding)), - ( - "links".into(), - Ipld::List(cids.into_iter().map(Ipld::Link).collect()), - ), - ])); - let bytes = encode(&ipld, IpldCodec::DagCbor).unwrap(); - let cid = Cid::new_v1(IpldCodec::DagCbor.into(), Code::Blake3_256.digest(&bytes)); + let bytes = encode(&ipld, codec).unwrap(); + let cid = Cid::new_v1(codec.into(), Code::Blake3_256.digest(&bytes)); (cid, ipld) } } diff --git a/car-mirror/src/test_utils/local_utils.rs b/car-mirror/src/test_utils/local_utils.rs index a6708df..2281d78 100644 --- a/car-mirror/src/test_utils/local_utils.rs +++ b/car-mirror/src/test_utils/local_utils.rs @@ -5,7 +5,7 @@ use anyhow::Result; use futures::TryStreamExt; use libipld::{Cid, Ipld}; use proptest::strategy::Strategy; -use wnfs_common::{BlockStore, MemoryBlockStore}; +use wnfs_common::{utils::CondSend, BlockStore, MemoryBlockStore}; #[derive(Clone, Debug)] pub(crate) struct Metrics { @@ -91,3 +91,5 @@ pub(crate) async fn total_dag_blocks(root: Cid, store: &impl BlockStore) -> Resu .await? .len()) } + +pub(crate) fn assert_cond_send_sync(_fut: fn() -> T) {} diff --git a/car-mirror/src/traits.rs b/car-mirror/src/traits.rs index c38079b..49f8931 100644 --- a/car-mirror/src/traits.rs +++ b/car-mirror/src/traits.rs @@ -2,28 +2,44 @@ use crate::common::references; use anyhow::Result; use async_trait::async_trait; use libipld::{Cid, IpldCodec}; -use wnfs_common::BlockStore; +#[cfg(feature = "quick_cache")] +use wnfs_common::utils::Arc; +use wnfs_common::{utils::CondSync, BlockStore, BlockStoreError}; /// This trait abstracts caches used by the car mirror implementation. /// An efficient cache implementation can significantly reduce the amount /// of lookups into the blockstore. /// -/// At the moment, all caches are conceptually memoization tables, so you don't -/// necessarily need to think about being careful about cache eviction. +/// At the moment, all caches are either memoization tables or informationally +/// monotonous, so you don't need to be careful about cache eviction. /// /// See `InMemoryCache` for a `quick_cache`-based implementation /// (enable the `quick-cache` feature), and `NoCache` for disabling the cache. -#[async_trait(?Send)] -pub trait Cache { +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait Cache: CondSync { /// This returns further references from the block referenced by given CID, /// if the cache is hit. /// Returns `None` if it's a cache miss. /// /// This isn't meant to be called directly, instead use `Cache::references`. - async fn get_references_cached(&self, cid: Cid) -> Result>>; + async fn get_references_cache(&self, cid: Cid) -> Result>>; /// Populates the references cache for given CID with given references. - async fn put_references_cached(&self, cid: Cid, references: Vec) -> Result<()>; + async fn put_references_cache(&self, cid: Cid, references: Vec) -> Result<()>; + + /// This returns whether the cache has the fact stored that a block with given + /// CID is stored. + /// + /// This only returns `true` in case the block has been stored. + /// `false` simply indicates that the cache doesn't know whether the block is + /// stored or not (it's always a cache miss). + /// + /// Don't call this directly, instead, use `Cache::has_block`. + async fn get_has_block_cache(&self, cid: &Cid) -> Result; + + /// This populates the cache with the fact that a block with given CID is stored. + async fn put_has_block_cache(&self, cid: Cid) -> Result<()>; /// Find out any CIDs that are linked to from the block with given CID. /// @@ -37,32 +53,63 @@ pub trait Cache { return Ok(Vec::new()); } - if let Some(refs) = self.get_references_cached(cid).await? { + if let Some(refs) = self.get_references_cache(cid).await? { return Ok(refs); } let block = store.get_block(&cid).await?; let refs = references(cid, block, Vec::new())?; - self.put_references_cached(cid, refs.clone()).await?; + self.put_references_cache(cid, refs.clone()).await?; Ok(refs) } + + /// Find out whether a given block is stored in given blockstore or not. + /// + /// This cache is *only* effective on `true` values for `has_block`. + /// Repeatedly calling `has_block` with `Cid`s of blocks that are *not* + /// stored will cause repeated calls to given blockstore. + /// + /// **Make sure to always use the same `BlockStore` when calling this function.** + /// + /// This makes use of the caches `get_has_block_cached`, if possible. + /// On cache misses, this will actually fetch the block from the store + /// and if successful, populate the cache using `put_has_block_cached`. + async fn has_block(&self, cid: Cid, store: &impl BlockStore) -> Result { + if self.get_has_block_cache(&cid).await? { + return Ok(true); + } + + match store.get_block(&cid).await { + Ok(_) => { + self.put_has_block_cache(cid).await?; + Ok(true) + } + Err(e) if matches!(e.downcast_ref(), Some(BlockStoreError::CIDNotFound(_))) => { + Ok(false) + } + Err(e) => Err(e), + } + } } /// A [quick-cache]-based implementation of a car mirror cache. /// /// [quick-cache]: https://github.com/arthurprs/quick-cache/ #[cfg(feature = "quick_cache")] -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct InMemoryCache { - references: quick_cache::sync::Cache>, + references: Arc>>, + has_blocks: Arc>, } #[cfg(feature = "quick_cache")] impl InMemoryCache { /// Create a new in-memory cache that approximately holds - /// cached references for `approx_capacity` CIDs. + /// cached references for `approx_references_capacity` CIDs + /// and `approx_has_blocks_capacity` CIDs known to be stored locally. /// - /// Computing the expected memory requirements isn't easy. + /// Computing the expected memory requirements for the reference + /// cache isn't easy. /// A block in theory have up to thousands of references. /// [UnixFS] chunked files will reference up to 174 chunks /// at a time. @@ -76,38 +123,285 @@ impl InMemoryCache { /// /// In practice, the fanout average will be much lower than 174. /// + /// On the other hand, each cache entry for the `has_blocks` cache + /// will take a little more than 64 bytes, so for a 10MB + /// `has_blocks` cache, you would use `10MB / 64bytes = 156_250`. + /// /// [UnixFS]: https://github.com/ipfs/specs/blob/main/UNIXFS.md#layout - pub fn new(approx_capacity: usize) -> Self { + pub fn new(approx_references_capacity: usize, approx_has_blocks_capacity: usize) -> Self { Self { - references: quick_cache::sync::Cache::new(approx_capacity), + references: Arc::new(quick_cache::sync::Cache::new(approx_references_capacity)), + has_blocks: Arc::new(quick_cache::sync::Cache::new(approx_has_blocks_capacity)), } } } #[cfg(feature = "quick_cache")] -#[async_trait(?Send)] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Cache for InMemoryCache { - async fn get_references_cached(&self, cid: Cid) -> Result>> { + async fn get_references_cache(&self, cid: Cid) -> Result>> { Ok(self.references.get(&cid)) } - async fn put_references_cached(&self, cid: Cid, references: Vec) -> Result<()> { + async fn put_references_cache(&self, cid: Cid, references: Vec) -> Result<()> { self.references.insert(cid, references); Ok(()) } + + async fn get_has_block_cache(&self, cid: &Cid) -> Result { + Ok(self.has_blocks.get(cid).is_some()) + } + + async fn put_has_block_cache(&self, cid: Cid) -> Result<()> { + self.has_blocks.insert(cid, ()); + Ok(()) + } } /// An implementation of `Cache` that doesn't cache at all. #[derive(Debug)] pub struct NoCache; -#[async_trait(?Send)] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Cache for NoCache { - async fn get_references_cached(&self, _: Cid) -> Result>> { + async fn get_references_cache(&self, _: Cid) -> Result>> { Ok(None) } - async fn put_references_cached(&self, _: Cid, _: Vec) -> Result<()> { + async fn put_references_cache(&self, _: Cid, _: Vec) -> Result<()> { + Ok(()) + } + + async fn get_has_block_cache(&self, _: &Cid) -> Result { + Ok(false) + } + + async fn put_has_block_cache(&self, _: Cid) -> Result<()> { + Ok(()) + } +} + +#[cfg(feature = "quick_cache")] +#[cfg(test)] +mod quick_cache_tests { + use super::{Cache, InMemoryCache}; + use libipld::{Ipld, IpldCodec}; + use testresult::TestResult; + use wnfs_common::{BlockStore, MemoryBlockStore}; + + #[async_std::test] + async fn test_has_block_cache() -> TestResult { + let store = &MemoryBlockStore::new(); + let cache = InMemoryCache::new(10_000, 150_000); + + let cid = store + .put_block(b"Hello, World!".to_vec(), IpldCodec::Raw.into()) + .await?; + + // Initially, the cache is unpopulated + assert!(!cache.get_has_block_cache(&cid).await?); + + // Then, we populate that cache + assert!(cache.has_block(cid, store).await?); + + // Now, the cache should be populated + assert!(cache.get_has_block_cache(&cid).await?); + + Ok(()) + } + + #[async_std::test] + async fn test_references_cache() -> TestResult { + let store = &MemoryBlockStore::new(); + let cache = InMemoryCache::new(10_000, 150_000); + + let hello_one_cid = store + .put_block(b"Hello, One?".to_vec(), IpldCodec::Raw.into()) + .await?; + let hello_two_cid = store + .put_block(b"Hello, Two?".to_vec(), IpldCodec::Raw.into()) + .await?; + let cid = store + .put_serializable(&Ipld::List(vec![ + Ipld::Link(hello_one_cid), + Ipld::Link(hello_two_cid), + ])) + .await?; + + // Cache unpopulated initially + assert_eq!(cache.get_references_cache(cid).await?, None); + + // This should populate the references cache + assert_eq!( + cache.references(cid, store).await?, + vec![hello_one_cid, hello_two_cid] + ); + + // Cache should now contain the references + assert_eq!( + cache.get_references_cache(cid).await?, + Some(vec![hello_one_cid, hello_two_cid]) + ); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::{Cache, NoCache}; + use anyhow::Result; + use async_trait::async_trait; + use libipld::{Cid, Ipld, IpldCodec}; + use std::{ + collections::{HashMap, HashSet}, + sync::RwLock, + }; + use testresult::TestResult; + use wnfs_common::{BlockStore, MemoryBlockStore}; + + #[derive(Debug, Default)] + struct HashMapCache { + references: RwLock>>, + has_blocks: RwLock>, + } + + #[async_trait] + impl Cache for HashMapCache { + async fn get_references_cache(&self, cid: Cid) -> Result>> { + Ok(self.references.read().unwrap().get(&cid).cloned()) + } + + async fn put_references_cache(&self, cid: Cid, references: Vec) -> Result<()> { + self.references.write().unwrap().insert(cid, references); + Ok(()) + } + + async fn get_has_block_cache(&self, cid: &Cid) -> Result { + Ok(self.has_blocks.read().unwrap().contains(cid)) + } + + async fn put_has_block_cache(&self, cid: Cid) -> Result<()> { + self.has_blocks.write().unwrap().insert(cid); + Ok(()) + } + } + + #[async_std::test] + async fn test_has_block_cache() -> TestResult { + let store = &MemoryBlockStore::new(); + let cache = HashMapCache::default(); + + let cid = store + .put_block(b"Hello, World!".to_vec(), IpldCodec::Raw.into()) + .await?; + + // Initially, the cache is unpopulated + assert!(!cache.get_has_block_cache(&cid).await?); + + // Then, we populate that cache + assert!(cache.has_block(cid, store).await?); + + // Now, the cache should be populated + assert!(cache.get_has_block_cache(&cid).await?); + + Ok(()) + } + + #[async_std::test] + async fn test_references_cache() -> TestResult { + let store = &MemoryBlockStore::new(); + let cache = HashMapCache::default(); + + let hello_one_cid = store + .put_block(b"Hello, One?".to_vec(), IpldCodec::Raw.into()) + .await?; + let hello_two_cid = store + .put_block(b"Hello, Two?".to_vec(), IpldCodec::Raw.into()) + .await?; + let cid = store + .put_serializable(&Ipld::List(vec![ + Ipld::Link(hello_one_cid), + Ipld::Link(hello_two_cid), + ])) + .await?; + + // Cache unpopulated initially + assert_eq!(cache.get_references_cache(cid).await?, None); + + // This should populate the references cache + assert_eq!( + cache.references(cid, store).await?, + vec![hello_one_cid, hello_two_cid] + ); + + // Cache should now contain the references + assert_eq!( + cache.get_references_cache(cid).await?, + Some(vec![hello_one_cid, hello_two_cid]) + ); + + Ok(()) + } + + #[async_std::test] + async fn test_no_cache_has_block() -> TestResult { + let store = &MemoryBlockStore::new(); + let cache = NoCache; + + let cid = store + .put_block(b"Hello, World!".to_vec(), IpldCodec::Raw.into()) + .await?; + + let not_stored_cid = store.create_cid(b"Hi!", IpldCodec::Raw.into())?; + + // Cache should start out unpopulated + assert!(!cache.get_has_block_cache(&cid).await?); + + // Then we "try to populate it". + assert!(cache.has_block(cid, store).await?); + + // It should still give correct answers + assert!(!cache.has_block(not_stored_cid, store).await?); + + // Still, it should stay unpopulated + assert!(!cache.get_has_block_cache(&cid).await?); + + Ok(()) + } + + #[async_std::test] + async fn test_no_cache_references() -> TestResult { + let store = &MemoryBlockStore::new(); + let cache = NoCache; + + let hello_one_cid = store + .put_block(b"Hello, One?".to_vec(), IpldCodec::Raw.into()) + .await?; + let hello_two_cid = store + .put_block(b"Hello, Two?".to_vec(), IpldCodec::Raw.into()) + .await?; + let cid = store + .put_serializable(&Ipld::List(vec![ + Ipld::Link(hello_one_cid), + Ipld::Link(hello_two_cid), + ])) + .await?; + + // Cache should start out unpopulated + assert_eq!(cache.get_references_cache(cid).await?, None); + + // We should get the correct answer for our queries + assert_eq!( + cache.references(cid, store).await?, + vec![hello_one_cid, hello_two_cid] + ); + + // We don't have a populated cache though + assert_eq!(cache.get_references_cache(cid).await?, None); + Ok(()) } }