Skip to content

Commit

Permalink
switch to foreset CAR db
Browse files Browse the repository at this point in the history
  • Loading branch information
hanabi1224 committed Dec 16, 2024
1 parent e54bc5e commit 827e63a
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 37 deletions.
54 changes: 29 additions & 25 deletions src/db/memory.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
// Copyright 2019-2024 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use super::{EthMappingsStore, SettingsStore};
use super::{EthMappingsStore, SettingsStore, SettingsStoreExt};
use crate::blocks::TipsetKey;
use crate::cid_collections::CidHashSet;
use crate::db::{GarbageCollectable, PersistentStore};
use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite};
use crate::rpc::eth::types::EthHash;
use crate::utils::db::car_stream::CarBlock;
use crate::utils::multihash::prelude::*;
use ahash::HashMap;
use anyhow::Context as _;
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use itertools::Itertools;
use parking_lot::RwLock;
use std::ops::Deref;

#[derive(Debug, Default)]
pub struct MemoryDB {
Expand All @@ -23,29 +25,31 @@ pub struct MemoryDB {
}

impl MemoryDB {
pub fn serialize(&self) -> anyhow::Result<Vec<u8>> {
let blockchain_db = self.blockchain_db.read();
let blockchain_persistent_db = self.blockchain_persistent_db.read();
let settings_db = self.settings_db.read();
let eth_mappings_db = self.eth_mappings_db.read();
let tuple = (
blockchain_db.deref(),
blockchain_persistent_db.deref(),
settings_db.deref(),
eth_mappings_db.deref(),
);
Ok(fvm_ipld_encoding::to_vec(&tuple)?)
}

pub fn deserialize_from(bytes: &[u8]) -> anyhow::Result<Self> {
let (blockchain_db, blockchain_persistent_db, settings_db, eth_mappings_db) =
fvm_ipld_encoding::from_slice(bytes)?;
Ok(Self {
blockchain_db: RwLock::new(blockchain_db),
blockchain_persistent_db: RwLock::new(blockchain_persistent_db),
settings_db: RwLock::new(settings_db),
eth_mappings_db: RwLock::new(eth_mappings_db),
})
pub async fn export_forest_car<W: tokio::io::AsyncWrite + Unpin>(
&self,
writer: &mut W,
) -> anyhow::Result<()> {
let roots =
SettingsStoreExt::read_obj::<TipsetKey>(self, crate::db::setting_keys::HEAD_KEY)?
.context("chain head is not tracked and cannot be exported")?
.into_cids();
let blocks = {
let blockchain_db = self.blockchain_db.read();
let blockchain_persistent_db = self.blockchain_persistent_db.read();
blockchain_db
.iter()
.chain(blockchain_persistent_db.iter())
.map(|(&cid, data)| {
anyhow::Ok(CarBlock {
cid,
data: data.clone(),
})
})
.collect_vec()
};
let frames =
crate::db::car::forest::Encoder::compress_stream_default(futures::stream::iter(blocks));
crate::db::car::forest::Encoder::write(writer, roots, frames).await
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/tool/subcommands/api_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,9 @@ impl ApiCommands {
{
Ok(_) => {
let snapshot = {
let db = tracking_db.tracker.serialize()?;
tracking_db.ensure_chain_head_is_tracked()?;
let mut db = vec![];
tracking_db.export_forest_car(&mut db).await?;
RpcTestSnapshot {
name: test_dump.request.method_name.to_string(),
params: test_dump.request.params,
Expand Down
38 changes: 35 additions & 3 deletions src/tool/subcommands/api_cmd/generate_test_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@

use super::*;
use crate::{
blocks::{CachingBlockHeader, TipsetKey},
chain::ChainStore,
chain_sync::{network_context::SyncNetworkContext, SyncConfig, SyncStage},
daemon::db_util::load_all_forest_cars,
db::{
db_engine::open_db, parity_db::ParityDb, EthMappingsStore, MemoryDB, SettingsStore,
CAR_DB_DIR_NAME,
SettingsStoreExt, CAR_DB_DIR_NAME,
},
genesis::{get_network_name_from_genesis, read_genesis_header},
libp2p::{NetworkMessage, PeerManager},
Expand Down Expand Up @@ -128,16 +129,47 @@ async fn ctx(
/// A [`Blockstore`] wrapper that tracks read operations to the inner [`Blockstore`] with an [`MemoryDB`]
pub struct ReadOpsTrackingStore<T> {
inner: T,
pub tracker: Arc<MemoryDB>,
tracker: Arc<MemoryDB>,
}

impl<T> ReadOpsTrackingStore<T> {
impl<T> ReadOpsTrackingStore<T>
where
T: Blockstore + SettingsStore,
{
pub fn new(inner: T) -> Self {
Self {
inner,
tracker: Arc::new(Default::default()),
}
}

pub fn ensure_chain_head_is_tracked(&self) -> anyhow::Result<()> {
if !self.is_chain_head_tracked()? {
let _ =
SettingsStoreExt::read_obj::<TipsetKey>(self, crate::db::setting_keys::HEAD_KEY)?
.context("HEAD_KEY not found")?
.into_cids()
.into_iter()
.map(|key| CachingBlockHeader::load(self, key))
.collect::<anyhow::Result<Option<Vec<_>>>>()?
.map(Tipset::new)
.transpose()?
.context("failed to load tipset")?;
}

Ok(())
}

fn is_chain_head_tracked(&self) -> anyhow::Result<bool> {
SettingsStore::exists(&self.tracker, crate::db::setting_keys::HEAD_KEY)
}

pub async fn export_forest_car<W: tokio::io::AsyncWrite + Unpin>(
&self,
writer: &mut W,
) -> anyhow::Result<()> {
self.tracker.export_forest_car(writer).await
}
}

impl<T: Blockstore> Blockstore for ReadOpsTrackingStore<T> {
Expand Down
24 changes: 16 additions & 8 deletions src/tool/subcommands/api_cmd/test_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
use crate::{
chain::ChainStore,
chain_sync::{network_context::SyncNetworkContext, SyncConfig, SyncStage},
db::MemoryDB,
db::{
car::{AnyCar, ManyCar},
MemoryDB,
},
genesis::{get_network_name_from_genesis, read_genesis_header},
libp2p::{NetworkMessage, PeerManager},
lotus_json::HasLotusJson,
Expand Down Expand Up @@ -39,24 +42,29 @@ pub async fn run_test_from_snapshot(path: &Path) -> anyhow::Result<()> {
} else {
snapshot_bytes
};
let snapshot: RpcTestSnapshot = serde_json::from_slice(snapshot_bytes.as_slice())?;
let db = Arc::new(MemoryDB::deserialize_from(snapshot.db.as_slice())?);
let RpcTestSnapshot {
name: method_name,
params,
db: db_bytes,
response: expected_response,
} = serde_json::from_slice(snapshot_bytes.as_slice())?;
let db = Arc::new(ManyCar::new(MemoryDB::default()).with_read_only(AnyCar::new(db_bytes)?)?);
let chain_config = Arc::new(ChainConfig::calibnet());
let (ctx, _, _) = ctx(db, chain_config).await?;
let params_raw = match serde_json::to_string(&snapshot.params)? {
let params_raw = match serde_json::to_string(&params)? {
s if s.is_empty() => None,
s => Some(s),
};

macro_rules! run_test {
($ty:ty) => {
if snapshot.name.as_str() == <$ty>::NAME {
if method_name.as_str() == <$ty>::NAME {
let params = <$ty>::parse_params(params_raw.clone(), ParamStructure::Either)?;
let result = <$ty>::handle(ctx.clone(), params)
.await
.map_err(|e| e.to_string())
.and_then(|r| r.into_lotus_json_value().map_err(|e| e.to_string()));
assert_eq!(snapshot.response, result);
assert_eq!(expected_response, result);
run = true;
}
};
Expand All @@ -70,10 +78,10 @@ pub async fn run_test_from_snapshot(path: &Path) -> anyhow::Result<()> {
}

async fn ctx(
db: Arc<MemoryDB>,
db: Arc<ManyCar<MemoryDB>>,
chain_config: Arc<ChainConfig>,
) -> anyhow::Result<(
Arc<RPCState<MemoryDB>>,
Arc<RPCState<ManyCar<MemoryDB>>>,
flume::Receiver<NetworkMessage>,
tokio::sync::mpsc::Receiver<()>,
)> {
Expand Down

0 comments on commit 827e63a

Please sign in to comment.