Skip to content

Commit

Permalink
feat: Use new error types for BlockStoreError
Browse files Browse the repository at this point in the history
  • Loading branch information
matheus23 committed Feb 15, 2024
1 parent 1b878d5 commit 5b883db
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 52 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 13 additions & 5 deletions car-mirror-benches/benches/artificially_slow_blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use car_mirror::{
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use libipld::Cid;
use std::time::Duration;
use wnfs_common::{utils::CondSend, BlockStore, MemoryBlockStore};
use wnfs_common::{utils::CondSend, BlockStore, BlockStoreError, MemoryBlockStore};

pub fn push_throttled(c: &mut Criterion) {
let mut rvg = car_mirror::test_utils::Rvg::deterministic();
Expand Down Expand Up @@ -116,20 +116,28 @@ pub fn pull_throttled(c: &mut Criterion) {
struct ThrottledBlockStore(MemoryBlockStore);

impl BlockStore for ThrottledBlockStore {
async fn get_block(&self, cid: &Cid) -> Result<Bytes> {
async fn get_block(&self, cid: &Cid) -> Result<Bytes, BlockStoreError> {
async_std::task::sleep(Duration::from_micros(50)).await; // Block fetching is artifically slowed by 50 microseconds
self.0.get_block(cid).await
}

async fn put_block(&self, bytes: impl Into<Bytes> + CondSend, codec: u64) -> Result<Cid> {
async fn put_block(
&self,
bytes: impl Into<Bytes> + CondSend,
codec: u64,
) -> Result<Cid, BlockStoreError> {
self.0.put_block(bytes, codec).await
}

async fn put_block_keyed(&self, cid: Cid, bytes: impl Into<Bytes> + CondSend) -> Result<()> {
async fn put_block_keyed(
&self,
cid: Cid,
bytes: impl Into<Bytes> + CondSend,
) -> Result<(), BlockStoreError> {
self.0.put_block_keyed(cid, bytes).await
}

async fn has_block(&self, cid: &Cid) -> Result<bool> {
async fn has_block(&self, cid: &Cid) -> Result<bool, BlockStoreError> {
async_std::task::sleep(Duration::from_micros(50)).await; // Block fetching is artifically slowed by 50 microseconds
self.0.has_block(cid).await
}
Expand Down
87 changes: 59 additions & 28 deletions car-mirror/src/cache.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::common::references;
use anyhow::Result;
use futures::Future;
use libipld::{Cid, IpldCodec};
use wnfs_common::{
utils::{CondSend, CondSync},
BlockStore,
BlockStore, BlockStoreError,
};

/// This trait abstracts caches used by the car mirror implementation.
Expand All @@ -25,14 +24,14 @@ pub trait Cache: CondSync {
fn get_references_cache(
&self,
cid: Cid,
) -> impl Future<Output = Result<Option<Vec<Cid>>>> + CondSend;
) -> impl Future<Output = Result<Option<Vec<Cid>>, BlockStoreError>> + CondSend;

/// Populates the references cache for given CID with given references.
fn put_references_cache(
&self,
cid: Cid,
references: Vec<Cid>,
) -> impl Future<Output = Result<()>> + CondSend;
) -> impl Future<Output = Result<(), BlockStoreError>> + CondSend;

/// Find out any CIDs that are linked to from the block with given CID.
///
Expand All @@ -43,7 +42,7 @@ pub trait Cache: CondSync {
&self,
cid: Cid,
store: &impl BlockStore,
) -> impl Future<Output = Result<Vec<Cid>>> + CondSend {
) -> impl Future<Output = Result<Vec<Cid>, BlockStoreError>> + CondSend {
async move {
// raw blocks don't have further links
let raw_codec: u64 = IpldCodec::Raw.into();
Expand All @@ -64,21 +63,29 @@ pub trait Cache: CondSync {
}

impl<C: Cache> Cache for &C {
async fn get_references_cache(&self, cid: Cid) -> Result<Option<Vec<Cid>>> {
async fn get_references_cache(&self, cid: Cid) -> Result<Option<Vec<Cid>>, BlockStoreError> {
(**self).get_references_cache(cid).await
}

async fn put_references_cache(&self, cid: Cid, references: Vec<Cid>) -> Result<()> {
async fn put_references_cache(
&self,
cid: Cid,
references: Vec<Cid>,
) -> Result<(), BlockStoreError> {
(**self).put_references_cache(cid, references).await
}
}

impl<C: Cache> Cache for Box<C> {
async fn get_references_cache(&self, cid: Cid) -> Result<Option<Vec<Cid>>> {
async fn get_references_cache(&self, cid: Cid) -> Result<Option<Vec<Cid>>, BlockStoreError> {
(**self).get_references_cache(cid).await
}

async fn put_references_cache(&self, cid: Cid, references: Vec<Cid>) -> Result<()> {
async fn put_references_cache(
&self,
cid: Cid,
references: Vec<Cid>,
) -> Result<(), BlockStoreError> {
(**self).put_references_cache(cid, references).await
}
}
Expand All @@ -88,11 +95,11 @@ impl<C: Cache> Cache for Box<C> {
pub struct NoCache;

impl Cache for NoCache {
async fn get_references_cache(&self, _: Cid) -> Result<Option<Vec<Cid>>> {
async fn get_references_cache(&self, _: Cid) -> Result<Option<Vec<Cid>>, BlockStoreError> {
Ok(None)
}

async fn put_references_cache(&self, _: Cid, _: Vec<Cid>) -> Result<()> {
async fn put_references_cache(&self, _: Cid, _: Vec<Cid>) -> Result<(), BlockStoreError> {
Ok(())
}
}
Expand All @@ -103,7 +110,6 @@ pub use quick_cache::*;
#[cfg(feature = "quick_cache")]
mod quick_cache {
use super::Cache;
use anyhow::{anyhow, Result};
use bytes::Bytes;
use libipld::Cid;
use quick_cache::{sync, OptionsBuilder, Weighter};
Expand Down Expand Up @@ -157,11 +163,18 @@ mod quick_cache {
}

impl Cache for InMemoryCache {
async fn get_references_cache(&self, cid: Cid) -> Result<Option<Vec<Cid>>> {
async fn get_references_cache(
&self,
cid: Cid,
) -> Result<Option<Vec<Cid>>, BlockStoreError> {
Ok(self.references.get(&cid))
}

async fn put_references_cache(&self, cid: Cid, references: Vec<Cid>) -> Result<()> {
async fn put_references_cache(
&self,
cid: Cid,
references: Vec<Cid>,
) -> Result<(), BlockStoreError> {
self.references.insert(cid, references);
Ok(())
}
Expand Down Expand Up @@ -191,23 +204,20 @@ mod quick_cache {
}

impl<B: BlockStore> BlockStore for CacheMissing<B> {
async fn get_block(&self, cid: &Cid) -> Result<Bytes> {
async fn get_block(&self, cid: &Cid) -> Result<Bytes, BlockStoreError> {
match self.has_blocks.get_value_or_guard_async(cid).await {
Ok(false) => Err(anyhow!(BlockStoreError::CIDNotFound(*cid))),
Ok(false) => Err(BlockStoreError::CIDNotFound(*cid)),
Ok(true) => self.inner.get_block(cid).await,
Err(guard) => match self.inner.get_block(cid).await {
Ok(block) => {
let _ignore_meantime_eviction = guard.insert(true);
Ok(block)
}
Err(e) => {
if let Some(BlockStoreError::CIDNotFound(_)) = e.downcast_ref() {
let _ignore_meantime_eviction = guard.insert(false);
Err(e)
} else {
Err(e)
}
e @ Err(BlockStoreError::CIDNotFound(_)) => {
let _ignore_meantime_eviction = guard.insert(false);
e
}
Err(e) => Err(e),
},
}
}
Expand All @@ -216,17 +226,31 @@ mod quick_cache {
&self,
cid: Cid,
bytes: impl Into<Bytes> + CondSend,
) -> Result<()> {
) -> Result<(), BlockStoreError> {
self.inner.put_block_keyed(cid, bytes).await?;
self.has_blocks.insert(cid, true);
Ok(())
}

async fn has_block(&self, cid: &Cid) -> Result<bool> {
async fn has_block(&self, cid: &Cid) -> Result<bool, BlockStoreError> {
self.has_blocks
.get_or_insert_async(cid, self.inner.has_block(cid))
.await
}

async fn put_block(
&self,
bytes: impl Into<Bytes> + CondSend,
codec: u64,
) -> Result<Cid, BlockStoreError> {
let cid = self.inner.put_block(bytes, codec).await?;
self.has_blocks.insert(cid, true);
Ok(cid)
}

fn create_cid(&self, bytes: &[u8], codec: u64) -> Result<Cid, BlockStoreError> {
self.inner.create_cid(bytes, codec)
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -293,19 +317,26 @@ mod tests {
use libipld::{cbor::DagCborCodec, Cid, Ipld, IpldCodec};
use std::{collections::HashMap, sync::RwLock};
use testresult::TestResult;
use wnfs_common::{encode, BlockStore, MemoryBlockStore};
use wnfs_common::{encode, BlockStore, BlockStoreError, MemoryBlockStore};

#[derive(Debug, Default)]
struct HashMapCache {
references: RwLock<HashMap<Cid, Vec<Cid>>>,
}

impl Cache for HashMapCache {
async fn get_references_cache(&self, cid: Cid) -> Result<Option<Vec<Cid>>> {
async fn get_references_cache(
&self,
cid: Cid,
) -> Result<Option<Vec<Cid>>, BlockStoreError> {
Ok(self.references.read().unwrap().get(&cid).cloned())
}

async fn put_references_cache(&self, cid: Cid, references: Vec<Cid>) -> Result<()> {
async fn put_references_cache(
&self,
cid: Cid,
references: Vec<Cid>,
) -> Result<(), BlockStoreError> {
self.references.write().unwrap().insert(cid, references);
Ok(())
}
Expand Down
4 changes: 1 addition & 3 deletions car-mirror/src/dag_walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ impl TraversedItem {
pub fn to_cid(self) -> Result<Cid, Error> {
match self {
Self::Have(cid) => Ok(cid),
Self::Missing(cid) => Err(Error::BlockStoreError(
BlockStoreError::CIDNotFound(cid).into(),
)),
Self::Missing(cid) => Err(Error::BlockStoreError(BlockStoreError::CIDNotFound(cid))),
}
}
}
Expand Down
19 changes: 4 additions & 15 deletions car-mirror/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use libipld::Cid;
use wnfs_common::BlockStoreError;

use crate::incremental_verification::BlockState;

Expand Down Expand Up @@ -31,17 +32,9 @@ pub enum Error {
cid: Cid,
},

/// This error is raised when the hash function that the `BlockStore` uses a different hashing function
/// than the blocks which are received over the wire.
/// This error will be removed in the future, when the block store trait gets modified to support specifying
/// the hash function.
#[error("BlockStore uses an incompatible hashing function: CID mismatched, expected {cid}, got {actual_cid}")]
BlockStoreIncompatible {
/// The expected CID
cid: Box<Cid>,
/// The CID returned from the BlockStore implementation
actual_cid: Box<Cid>,
},
/// An error rasied from the blockstore.
#[error("BlockStore error: {0}")]
BlockStoreError(#[from] BlockStoreError),

// -------------
// Anyhow Errors
Expand All @@ -50,10 +43,6 @@ pub enum Error {
#[error("Error during block parsing: {0}")]
ParsingError(anyhow::Error),

/// An error rasied from the blockstore.
#[error("BlockStore error: {0}")]
BlockStoreError(anyhow::Error),

// ----------
// Sub-errors
// ----------
Expand Down

0 comments on commit 5b883db

Please sign in to comment.