diff --git a/Cargo.lock b/Cargo.lock index c68eb0085c6..6b177446b43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3097,6 +3097,7 @@ dependencies = [ "libp2p-swarm-test", "libsecp256k1", "lru", + "md-5", "memmap2 0.9.5", "memory-stats", "mimalloc", @@ -5939,6 +5940,16 @@ dependencies = [ "rawpointer", ] +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest 0.10.7", +] + [[package]] name = "memchr" version = "2.7.4" diff --git a/Cargo.toml b/Cargo.toml index f428f4d9de8..0fed7d2bca8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -238,6 +238,7 @@ glob = "0.3" http-range-header = "0.4" insta = { version = "1", features = ["yaml"] } libp2p-swarm-test = { workspace = true } +md5 = { package = "md-5", version = "0.10" } num-bigint = { version = "0.4", features = ['quickcheck'] } petgraph = "0.7" predicates = "3" diff --git a/src/chain/store/chain_store.rs b/src/chain/store/chain_store.rs index 6f3ec5b74c6..35fff27e2c9 100644 --- a/src/chain/store/chain_store.rs +++ b/src/chain/store/chain_store.rs @@ -234,7 +234,7 @@ where &self .settings .require_obj::(HEAD_KEY) - .expect("failed to load heaviest tipset"), + .expect("failed to load heaviest tipset key"), ) .expect("failed to load heaviest tipset") } diff --git a/src/chain/store/index.rs b/src/chain/store/index.rs index 504a1a3e7e6..5b99f69641e 100644 --- a/src/chain/store/index.rs +++ b/src/chain/store/index.rs @@ -7,6 +7,7 @@ use crate::beacon::{BeaconEntry, IGNORE_DRAND_VAR}; use crate::blocks::{Tipset, TipsetKey}; use crate::metrics; use crate::shim::clock::ChainEpoch; +use crate::utils::misc::env::is_env_truthy; use fvm_ipld_blockstore::Blockstore; use itertools::Itertools; use lru::LruCache; @@ -47,11 +48,13 @@ impl ChainIndex { /// Loads a tipset from memory given the tipset keys and cache. Semantically /// identical to [`Tipset::load`] but the result is cached. pub fn load_tipset(&self, tsk: &TipsetKey) -> Result>, Error> { - if let Some(ts) = self.ts_cache.lock().get(tsk) { - metrics::LRU_CACHE_HIT - .get_or_create(&metrics::values::TIPSET) - .inc(); - return Ok(Some(ts.clone())); + if !is_env_truthy("FOREST_TIPSET_CACHE_DISABLED") { + if let Some(ts) = self.ts_cache.lock().get(tsk) { + metrics::LRU_CACHE_HIT + .get_or_create(&metrics::values::TIPSET) + .inc(); + return Ok(Some(ts.clone())); + } } let ts_opt = Tipset::load(&self.db, tsk)?.map(Arc::new); diff --git a/src/cli_shared/snapshot.rs b/src/cli_shared/snapshot.rs index dc698418b12..51e75eec1f0 100644 --- a/src/cli_shared/snapshot.rs +++ b/src/cli_shared/snapshot.rs @@ -134,6 +134,9 @@ fn parse_content_disposition(value: &reqwest::header::HeaderValue) -> Option anyhow::Result { + if !directory.is_dir() { + std::fs::create_dir_all(directory)?; + } let dst_path = directory.join(filename); let destination = dst_path.display(); event!(target: "forest::snapshot", tracing::Level::INFO, %url, %destination, "downloading snapshot"); diff --git a/src/db/memory.rs b/src/db/memory.rs index 646c86cb250..6bb4bdb3d35 100644 --- a/src/db/memory.rs +++ b/src/db/memory.rs @@ -1,33 +1,63 @@ // Copyright 2019-2025 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use super::{EthMappingsStore, SettingsStore, SettingsStoreExt}; +use crate::blocks::TipsetKey; use crate::cid_collections::CidHashSet; use crate::db::{GarbageCollectable, PersistentStore}; use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite}; use crate::rpc::eth::types::EthHash; +use crate::utils::db::car_stream::CarBlock; use crate::utils::multihash::prelude::*; use ahash::HashMap; +use anyhow::Context as _; use cid::Cid; use fvm_ipld_blockstore::Blockstore; use itertools::Itertools; use parking_lot::RwLock; -use super::{EthMappingsStore, SettingsStore}; - #[derive(Debug, Default)] pub struct MemoryDB { - blockchain_db: RwLock, Vec>>, - blockchain_persistent_db: RwLock, Vec>>, + blockchain_db: RwLock>>, + blockchain_persistent_db: RwLock>>, settings_db: RwLock>>, eth_mappings_db: RwLock>>, } +impl MemoryDB { + pub async fn export_forest_car( + &self, + writer: &mut W, + ) -> anyhow::Result<()> { + let roots = + SettingsStoreExt::read_obj::(self, crate::db::setting_keys::HEAD_KEY)? + .context("chain head is not tracked and cannot be exported")? + .into_cids(); + let blocks = { + let blockchain_db = self.blockchain_db.read(); + let blockchain_persistent_db = self.blockchain_persistent_db.read(); + blockchain_db + .iter() + .chain(blockchain_persistent_db.iter()) + .map(|(&cid, data)| { + anyhow::Ok(CarBlock { + cid, + data: data.clone(), + }) + }) + .collect_vec() + }; + let frames = + crate::db::car::forest::Encoder::compress_stream_default(futures::stream::iter(blocks)); + crate::db::car::forest::Encoder::write(writer, roots, frames).await + } +} + impl GarbageCollectable for MemoryDB { fn get_keys(&self) -> anyhow::Result { let mut set = CidHashSet::new(); - for key in self.blockchain_db.read().keys() { - let cid = Cid::try_from(key.as_slice())?; - set.insert(cid); + for &key in self.blockchain_db.read().keys() { + set.insert(key); } Ok(set) } @@ -36,17 +66,11 @@ impl GarbageCollectable for MemoryDB { let mut db = self.blockchain_db.write(); let mut deleted = 0; db.retain(|key, _| { - let cid = Cid::try_from(key.as_slice()); - match cid { - Ok(cid) => { - let retain = !keys.contains(&cid); - if !retain { - deleted += 1; - } - retain - } - _ => true, + let retain = !keys.contains(key); + if !retain { + deleted += 1; } + retain }); Ok(deleted) } @@ -111,22 +135,15 @@ impl EthMappingsStore for MemoryDB { impl Blockstore for MemoryDB { fn get(&self, k: &Cid) -> anyhow::Result>> { - Ok(self - .blockchain_db + Ok(self.blockchain_db.read().get(k).cloned().or(self + .blockchain_persistent_db .read() - .get(&k.to_bytes()) - .cloned() - .or(self - .blockchain_persistent_db - .read() - .get(&k.to_bytes()) - .cloned())) + .get(k) + .cloned())) } fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> { - self.blockchain_db - .write() - .insert(k.to_bytes(), block.to_vec()); + self.blockchain_db.write().insert(*k, block.to_vec()); Ok(()) } } @@ -135,14 +152,14 @@ impl PersistentStore for MemoryDB { fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> { self.blockchain_persistent_db .write() - .insert(k.to_bytes(), block.to_vec()); + .insert(*k, block.to_vec()); Ok(()) } } impl BitswapStoreRead for MemoryDB { fn contains(&self, cid: &Cid) -> anyhow::Result { - Ok(self.blockchain_db.read().contains_key(&cid.to_bytes())) + Ok(self.blockchain_db.read().contains_key(cid)) } fn get(&self, cid: &Cid) -> anyhow::Result>> { @@ -157,3 +174,43 @@ impl BitswapStoreReadWrite for MemoryDB { self.put_keyed(block.cid(), block.data()) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::db::{car::ForestCar, setting_keys::HEAD_KEY}; + use fvm_ipld_encoding::DAG_CBOR; + use multihash_codetable::Code::Blake2b256; + use nunny::vec as nonempty; + + #[tokio::test] + async fn test_export_forest_car() { + let db = MemoryDB::default(); + let record1 = b"non-persistent"; + let key1 = Cid::new_v1(DAG_CBOR, Blake2b256.digest(record1.as_slice())); + db.put_keyed(&key1, record1.as_slice()).unwrap(); + + let record2 = b"persistent"; + let key2 = Cid::new_v1(DAG_CBOR, Blake2b256.digest(record2.as_slice())); + db.put_keyed_persistent(&key2, record2.as_slice()).unwrap(); + + let mut car_db_bytes = vec![]; + assert!(db + .export_forest_car(&mut car_db_bytes) + .await + .unwrap_err() + .to_string() + .contains("chain head is not tracked and cannot be exported")); + + db.write_obj(HEAD_KEY, &TipsetKey::from(nonempty![key1])) + .unwrap(); + + car_db_bytes.clear(); + db.export_forest_car(&mut car_db_bytes).await.unwrap(); + + let car = ForestCar::new(car_db_bytes).unwrap(); + assert_eq!(car.roots(), &nonempty![key1]); + assert!(car.has(&key1).unwrap()); + assert!(car.has(&key2).unwrap()); + } +} diff --git a/src/rpc/auth_layer.rs b/src/rpc/auth_layer.rs index 5ee7c4c6964..36dead81223 100644 --- a/src/rpc/auth_layer.rs +++ b/src/rpc/auth_layer.rs @@ -32,7 +32,7 @@ static METHOD_NAME2REQUIRED_PERMISSION: Lazy> = Lazy:: } }; } - super::for_each_method!(insert); + super::for_each_rpc_method!(insert); access.insert(chain::CHAIN_NOTIFY, Permission::Read); access.insert(CANCEL_METHOD_NAME, Permission::Read); diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 79d922f451f..151be99e161 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -31,242 +31,243 @@ pub use jsonrpsee::core::ClientError; /// trait. /// /// All methods should be entered here. -macro_rules! for_each_method { +#[macro_export] +macro_rules! for_each_rpc_method { ($callback:path) => { // auth vertical - $callback!(crate::rpc::auth::AuthNew); - $callback!(crate::rpc::auth::AuthVerify); + $callback!($crate::rpc::auth::AuthNew); + $callback!($crate::rpc::auth::AuthVerify); // beacon vertical - $callback!(crate::rpc::beacon::BeaconGetEntry); + $callback!($crate::rpc::beacon::BeaconGetEntry); // chain vertical - $callback!(crate::rpc::chain::ChainExport); - $callback!(crate::rpc::chain::ChainGetBlock); - $callback!(crate::rpc::chain::ChainGetBlockMessages); - $callback!(crate::rpc::chain::ChainGetEvents); - $callback!(crate::rpc::chain::ChainGetGenesis); - $callback!(crate::rpc::chain::ChainGetMessage); - $callback!(crate::rpc::chain::ChainGetMessagesInTipset); - $callback!(crate::rpc::chain::ChainGetMinBaseFee); - $callback!(crate::rpc::chain::ChainGetParentMessages); - $callback!(crate::rpc::chain::ChainGetParentReceipts); - $callback!(crate::rpc::chain::ChainGetPath); - $callback!(crate::rpc::chain::ChainGetTipSet); - $callback!(crate::rpc::chain::ChainGetTipSetAfterHeight); - $callback!(crate::rpc::chain::ChainGetTipSetByHeight); - $callback!(crate::rpc::chain::ChainHasObj); - $callback!(crate::rpc::chain::ChainHead); - $callback!(crate::rpc::chain::ChainReadObj); - $callback!(crate::rpc::chain::ChainSetHead); - $callback!(crate::rpc::chain::ChainStatObj); - $callback!(crate::rpc::chain::ChainTipSetWeight); + $callback!($crate::rpc::chain::ChainExport); + $callback!($crate::rpc::chain::ChainGetBlock); + $callback!($crate::rpc::chain::ChainGetBlockMessages); + $callback!($crate::rpc::chain::ChainGetEvents); + $callback!($crate::rpc::chain::ChainGetGenesis); + $callback!($crate::rpc::chain::ChainGetMessage); + $callback!($crate::rpc::chain::ChainGetMessagesInTipset); + $callback!($crate::rpc::chain::ChainGetMinBaseFee); + $callback!($crate::rpc::chain::ChainGetParentMessages); + $callback!($crate::rpc::chain::ChainGetParentReceipts); + $callback!($crate::rpc::chain::ChainGetPath); + $callback!($crate::rpc::chain::ChainGetTipSet); + $callback!($crate::rpc::chain::ChainGetTipSetAfterHeight); + $callback!($crate::rpc::chain::ChainGetTipSetByHeight); + $callback!($crate::rpc::chain::ChainHasObj); + $callback!($crate::rpc::chain::ChainHead); + $callback!($crate::rpc::chain::ChainReadObj); + $callback!($crate::rpc::chain::ChainSetHead); + $callback!($crate::rpc::chain::ChainStatObj); + $callback!($crate::rpc::chain::ChainTipSetWeight); // common vertical - $callback!(crate::rpc::common::Session); - $callback!(crate::rpc::common::Shutdown); - $callback!(crate::rpc::common::StartTime); - $callback!(crate::rpc::common::Version); + $callback!($crate::rpc::common::Session); + $callback!($crate::rpc::common::Shutdown); + $callback!($crate::rpc::common::StartTime); + $callback!($crate::rpc::common::Version); // eth vertical - $callback!(crate::rpc::eth::EthAccounts); - $callback!(crate::rpc::eth::EthAddressToFilecoinAddress); - $callback!(crate::rpc::eth::EthBlockNumber); - $callback!(crate::rpc::eth::EthCall); - $callback!(crate::rpc::eth::EthChainId); - $callback!(crate::rpc::eth::EthEstimateGas); - $callback!(crate::rpc::eth::EthFeeHistory); - $callback!(crate::rpc::eth::EthGasPrice); - $callback!(crate::rpc::eth::EthGetBalance); - $callback!(crate::rpc::eth::EthGetBlockByHash); - $callback!(crate::rpc::eth::EthGetBlockByNumber); - $callback!(crate::rpc::eth::EthGetBlockReceipts); - $callback!(crate::rpc::eth::EthGetBlockReceiptsLimited); - $callback!(crate::rpc::eth::EthGetBlockTransactionCountByHash); - $callback!(crate::rpc::eth::EthGetBlockTransactionCountByNumber); - $callback!(crate::rpc::eth::EthGetCode); - $callback!(crate::rpc::eth::EthGetLogs); - $callback!(crate::rpc::eth::EthGetMessageCidByTransactionHash); - $callback!(crate::rpc::eth::EthGetStorageAt); - $callback!(crate::rpc::eth::EthGetTransactionByHash); - $callback!(crate::rpc::eth::EthGetTransactionByHashLimited); - $callback!(crate::rpc::eth::EthGetTransactionCount); - $callback!(crate::rpc::eth::EthGetTransactionHashByCid); - $callback!(crate::rpc::eth::EthGetTransactionByBlockNumberAndIndex); - $callback!(crate::rpc::eth::EthGetTransactionByBlockHashAndIndex); - $callback!(crate::rpc::eth::EthMaxPriorityFeePerGas); - $callback!(crate::rpc::eth::EthProtocolVersion); - $callback!(crate::rpc::eth::EthGetTransactionReceipt); - $callback!(crate::rpc::eth::EthGetTransactionReceiptLimited); - $callback!(crate::rpc::eth::EthNewFilter); - $callback!(crate::rpc::eth::EthNewPendingTransactionFilter); - $callback!(crate::rpc::eth::EthNewBlockFilter); - $callback!(crate::rpc::eth::EthUninstallFilter); - $callback!(crate::rpc::eth::EthSyncing); - $callback!(crate::rpc::eth::EthTraceBlock); - $callback!(crate::rpc::eth::Web3ClientVersion); - $callback!(crate::rpc::eth::EthSendRawTransaction); + $callback!($crate::rpc::eth::EthAccounts); + $callback!($crate::rpc::eth::EthAddressToFilecoinAddress); + $callback!($crate::rpc::eth::EthBlockNumber); + $callback!($crate::rpc::eth::EthCall); + $callback!($crate::rpc::eth::EthChainId); + $callback!($crate::rpc::eth::EthEstimateGas); + $callback!($crate::rpc::eth::EthFeeHistory); + $callback!($crate::rpc::eth::EthGasPrice); + $callback!($crate::rpc::eth::EthGetBalance); + $callback!($crate::rpc::eth::EthGetBlockByHash); + $callback!($crate::rpc::eth::EthGetBlockByNumber); + $callback!($crate::rpc::eth::EthGetBlockReceipts); + $callback!($crate::rpc::eth::EthGetBlockReceiptsLimited); + $callback!($crate::rpc::eth::EthGetBlockTransactionCountByHash); + $callback!($crate::rpc::eth::EthGetBlockTransactionCountByNumber); + $callback!($crate::rpc::eth::EthGetCode); + $callback!($crate::rpc::eth::EthGetLogs); + $callback!($crate::rpc::eth::EthGetMessageCidByTransactionHash); + $callback!($crate::rpc::eth::EthGetStorageAt); + $callback!($crate::rpc::eth::EthGetTransactionByHash); + $callback!($crate::rpc::eth::EthGetTransactionByHashLimited); + $callback!($crate::rpc::eth::EthGetTransactionCount); + $callback!($crate::rpc::eth::EthGetTransactionHashByCid); + $callback!($crate::rpc::eth::EthGetTransactionByBlockNumberAndIndex); + $callback!($crate::rpc::eth::EthGetTransactionByBlockHashAndIndex); + $callback!($crate::rpc::eth::EthMaxPriorityFeePerGas); + $callback!($crate::rpc::eth::EthProtocolVersion); + $callback!($crate::rpc::eth::EthGetTransactionReceipt); + $callback!($crate::rpc::eth::EthGetTransactionReceiptLimited); + $callback!($crate::rpc::eth::EthNewFilter); + $callback!($crate::rpc::eth::EthNewPendingTransactionFilter); + $callback!($crate::rpc::eth::EthNewBlockFilter); + $callback!($crate::rpc::eth::EthUninstallFilter); + $callback!($crate::rpc::eth::EthSyncing); + $callback!($crate::rpc::eth::EthTraceBlock); + $callback!($crate::rpc::eth::Web3ClientVersion); + $callback!($crate::rpc::eth::EthSendRawTransaction); // gas vertical - $callback!(crate::rpc::gas::GasEstimateFeeCap); - $callback!(crate::rpc::gas::GasEstimateGasLimit); - $callback!(crate::rpc::gas::GasEstimateGasPremium); - $callback!(crate::rpc::gas::GasEstimateMessageGas); + $callback!($crate::rpc::gas::GasEstimateFeeCap); + $callback!($crate::rpc::gas::GasEstimateGasLimit); + $callback!($crate::rpc::gas::GasEstimateGasPremium); + $callback!($crate::rpc::gas::GasEstimateMessageGas); // market vertical - $callback!(crate::rpc::market::MarketAddBalance); + $callback!($crate::rpc::market::MarketAddBalance); // miner vertical - $callback!(crate::rpc::miner::MinerCreateBlock); - $callback!(crate::rpc::miner::MinerGetBaseInfo); + $callback!($crate::rpc::miner::MinerCreateBlock); + $callback!($crate::rpc::miner::MinerGetBaseInfo); // mpool vertical - $callback!(crate::rpc::mpool::MpoolBatchPush); - $callback!(crate::rpc::mpool::MpoolBatchPushUntrusted); - $callback!(crate::rpc::mpool::MpoolGetNonce); - $callback!(crate::rpc::mpool::MpoolPending); - $callback!(crate::rpc::mpool::MpoolPush); - $callback!(crate::rpc::mpool::MpoolPushMessage); - $callback!(crate::rpc::mpool::MpoolPushUntrusted); - $callback!(crate::rpc::mpool::MpoolSelect); + $callback!($crate::rpc::mpool::MpoolBatchPush); + $callback!($crate::rpc::mpool::MpoolBatchPushUntrusted); + $callback!($crate::rpc::mpool::MpoolGetNonce); + $callback!($crate::rpc::mpool::MpoolPending); + $callback!($crate::rpc::mpool::MpoolPush); + $callback!($crate::rpc::mpool::MpoolPushMessage); + $callback!($crate::rpc::mpool::MpoolPushUntrusted); + $callback!($crate::rpc::mpool::MpoolSelect); // msig vertical - $callback!(crate::rpc::msig::MsigGetAvailableBalance); - $callback!(crate::rpc::msig::MsigGetPending); - $callback!(crate::rpc::msig::MsigGetVested); - $callback!(crate::rpc::msig::MsigGetVestingSchedule); + $callback!($crate::rpc::msig::MsigGetAvailableBalance); + $callback!($crate::rpc::msig::MsigGetPending); + $callback!($crate::rpc::msig::MsigGetVested); + $callback!($crate::rpc::msig::MsigGetVestingSchedule); // net vertical - $callback!(crate::rpc::net::NetAddrsListen); - $callback!(crate::rpc::net::NetAgentVersion); - $callback!(crate::rpc::net::NetAutoNatStatus); - $callback!(crate::rpc::net::NetConnect); - $callback!(crate::rpc::net::NetDisconnect); - $callback!(crate::rpc::net::NetFindPeer); - $callback!(crate::rpc::net::NetInfo); - $callback!(crate::rpc::net::NetListening); - $callback!(crate::rpc::net::NetPeers); - $callback!(crate::rpc::net::NetProtectAdd); - $callback!(crate::rpc::net::NetProtectList); - $callback!(crate::rpc::net::NetProtectRemove); - $callback!(crate::rpc::net::NetVersion); + $callback!($crate::rpc::net::NetAddrsListen); + $callback!($crate::rpc::net::NetAgentVersion); + $callback!($crate::rpc::net::NetAutoNatStatus); + $callback!($crate::rpc::net::NetConnect); + $callback!($crate::rpc::net::NetDisconnect); + $callback!($crate::rpc::net::NetFindPeer); + $callback!($crate::rpc::net::NetInfo); + $callback!($crate::rpc::net::NetListening); + $callback!($crate::rpc::net::NetPeers); + $callback!($crate::rpc::net::NetProtectAdd); + $callback!($crate::rpc::net::NetProtectList); + $callback!($crate::rpc::net::NetProtectRemove); + $callback!($crate::rpc::net::NetVersion); // node vertical - $callback!(crate::rpc::node::NodeStatus); + $callback!($crate::rpc::node::NodeStatus); // state vertical - $callback!(crate::rpc::state::StateAccountKey); - $callback!(crate::rpc::state::StateCall); - $callback!(crate::rpc::state::StateCirculatingSupply); - $callback!(crate::rpc::state::StateCompute); - $callback!(crate::rpc::state::StateDealProviderCollateralBounds); - $callback!(crate::rpc::state::StateFetchRoot); - $callback!(crate::rpc::state::StateGetActor); - $callback!(crate::rpc::state::StateGetAllAllocations); - $callback!(crate::rpc::state::StateGetAllClaims); - $callback!(crate::rpc::state::StateGetAllocation); - $callback!(crate::rpc::state::StateGetAllocationForPendingDeal); - $callback!(crate::rpc::state::StateGetAllocationIdForPendingDeal); - $callback!(crate::rpc::state::StateGetAllocations); - $callback!(crate::rpc::state::StateGetBeaconEntry); - $callback!(crate::rpc::state::StateGetClaim); - $callback!(crate::rpc::state::StateGetClaims); - $callback!(crate::rpc::state::StateGetNetworkParams); - $callback!(crate::rpc::state::StateGetRandomnessDigestFromBeacon); - $callback!(crate::rpc::state::StateGetRandomnessDigestFromTickets); - $callback!(crate::rpc::state::StateGetRandomnessFromBeacon); - $callback!(crate::rpc::state::StateGetRandomnessFromTickets); - $callback!(crate::rpc::state::StateGetReceipt); - $callback!(crate::rpc::state::StateListActors); - $callback!(crate::rpc::state::StateListMessages); - $callback!(crate::rpc::state::StateListMiners); - $callback!(crate::rpc::state::StateLookupID); - $callback!(crate::rpc::state::StateLookupRobustAddress); - $callback!(crate::rpc::state::StateMarketBalance); - $callback!(crate::rpc::state::StateMarketDeals); - $callback!(crate::rpc::state::StateMarketParticipants); - $callback!(crate::rpc::state::StateMarketStorageDeal); - $callback!(crate::rpc::state::StateMinerActiveSectors); - $callback!(crate::rpc::state::StateMinerAllocated); - $callback!(crate::rpc::state::StateMinerAvailableBalance); - $callback!(crate::rpc::state::StateMinerDeadlines); - $callback!(crate::rpc::state::StateMinerFaults); - $callback!(crate::rpc::state::StateMinerInfo); - $callback!(crate::rpc::state::StateMinerInitialPledgeCollateral); - $callback!(crate::rpc::state::StateMinerPartitions); - $callback!(crate::rpc::state::StateMinerPower); - $callback!(crate::rpc::state::StateMinerPreCommitDepositForPower); - $callback!(crate::rpc::state::StateMinerProvingDeadline); - $callback!(crate::rpc::state::StateMinerRecoveries); - $callback!(crate::rpc::state::StateMinerSectorAllocated); - $callback!(crate::rpc::state::StateMinerSectorCount); - $callback!(crate::rpc::state::StateMinerSectors); - $callback!(crate::rpc::state::StateNetworkName); - $callback!(crate::rpc::state::StateNetworkVersion); - $callback!(crate::rpc::state::StateReadState); - $callback!(crate::rpc::state::StateReplay); - $callback!(crate::rpc::state::StateSearchMsg); - $callback!(crate::rpc::state::StateSearchMsgLimited); - $callback!(crate::rpc::state::StateSectorExpiration); - $callback!(crate::rpc::state::StateSectorGetInfo); - $callback!(crate::rpc::state::StateSectorPartition); - $callback!(crate::rpc::state::StateSectorPreCommitInfo); - $callback!(crate::rpc::state::StateSectorPreCommitInfoV0); - $callback!(crate::rpc::state::StateVerifiedClientStatus); - $callback!(crate::rpc::state::StateVerifiedRegistryRootKey); - $callback!(crate::rpc::state::StateVerifierStatus); - $callback!(crate::rpc::state::StateVMCirculatingSupplyInternal); - $callback!(crate::rpc::state::StateWaitMsg); - $callback!(crate::rpc::state::StateWaitMsgV0); - $callback!(crate::rpc::state::StateMinerInitialPledgeForSector); + $callback!($crate::rpc::state::StateAccountKey); + $callback!($crate::rpc::state::StateCall); + $callback!($crate::rpc::state::StateCirculatingSupply); + $callback!($crate::rpc::state::StateCompute); + $callback!($crate::rpc::state::StateDealProviderCollateralBounds); + $callback!($crate::rpc::state::StateFetchRoot); + $callback!($crate::rpc::state::StateGetActor); + $callback!($crate::rpc::state::StateGetAllAllocations); + $callback!($crate::rpc::state::StateGetAllClaims); + $callback!($crate::rpc::state::StateGetAllocation); + $callback!($crate::rpc::state::StateGetAllocationForPendingDeal); + $callback!($crate::rpc::state::StateGetAllocationIdForPendingDeal); + $callback!($crate::rpc::state::StateGetAllocations); + $callback!($crate::rpc::state::StateGetBeaconEntry); + $callback!($crate::rpc::state::StateGetClaim); + $callback!($crate::rpc::state::StateGetClaims); + $callback!($crate::rpc::state::StateGetNetworkParams); + $callback!($crate::rpc::state::StateGetRandomnessDigestFromBeacon); + $callback!($crate::rpc::state::StateGetRandomnessDigestFromTickets); + $callback!($crate::rpc::state::StateGetRandomnessFromBeacon); + $callback!($crate::rpc::state::StateGetRandomnessFromTickets); + $callback!($crate::rpc::state::StateGetReceipt); + $callback!($crate::rpc::state::StateListActors); + $callback!($crate::rpc::state::StateListMessages); + $callback!($crate::rpc::state::StateListMiners); + $callback!($crate::rpc::state::StateLookupID); + $callback!($crate::rpc::state::StateLookupRobustAddress); + $callback!($crate::rpc::state::StateMarketBalance); + $callback!($crate::rpc::state::StateMarketDeals); + $callback!($crate::rpc::state::StateMarketParticipants); + $callback!($crate::rpc::state::StateMarketStorageDeal); + $callback!($crate::rpc::state::StateMinerActiveSectors); + $callback!($crate::rpc::state::StateMinerAllocated); + $callback!($crate::rpc::state::StateMinerAvailableBalance); + $callback!($crate::rpc::state::StateMinerDeadlines); + $callback!($crate::rpc::state::StateMinerFaults); + $callback!($crate::rpc::state::StateMinerInfo); + $callback!($crate::rpc::state::StateMinerInitialPledgeCollateral); + $callback!($crate::rpc::state::StateMinerPartitions); + $callback!($crate::rpc::state::StateMinerPower); + $callback!($crate::rpc::state::StateMinerPreCommitDepositForPower); + $callback!($crate::rpc::state::StateMinerProvingDeadline); + $callback!($crate::rpc::state::StateMinerRecoveries); + $callback!($crate::rpc::state::StateMinerSectorAllocated); + $callback!($crate::rpc::state::StateMinerSectorCount); + $callback!($crate::rpc::state::StateMinerSectors); + $callback!($crate::rpc::state::StateNetworkName); + $callback!($crate::rpc::state::StateNetworkVersion); + $callback!($crate::rpc::state::StateReadState); + $callback!($crate::rpc::state::StateReplay); + $callback!($crate::rpc::state::StateSearchMsg); + $callback!($crate::rpc::state::StateSearchMsgLimited); + $callback!($crate::rpc::state::StateSectorExpiration); + $callback!($crate::rpc::state::StateSectorGetInfo); + $callback!($crate::rpc::state::StateSectorPartition); + $callback!($crate::rpc::state::StateSectorPreCommitInfo); + $callback!($crate::rpc::state::StateSectorPreCommitInfoV0); + $callback!($crate::rpc::state::StateVerifiedClientStatus); + $callback!($crate::rpc::state::StateVerifiedRegistryRootKey); + $callback!($crate::rpc::state::StateVerifierStatus); + $callback!($crate::rpc::state::StateVMCirculatingSupplyInternal); + $callback!($crate::rpc::state::StateWaitMsg); + $callback!($crate::rpc::state::StateWaitMsgV0); + $callback!($crate::rpc::state::StateMinerInitialPledgeForSector); // sync vertical - $callback!(crate::rpc::sync::SyncCheckBad); - $callback!(crate::rpc::sync::SyncMarkBad); - $callback!(crate::rpc::sync::SyncState); - $callback!(crate::rpc::sync::SyncSubmitBlock); + $callback!($crate::rpc::sync::SyncCheckBad); + $callback!($crate::rpc::sync::SyncMarkBad); + $callback!($crate::rpc::sync::SyncState); + $callback!($crate::rpc::sync::SyncSubmitBlock); // wallet vertical - $callback!(crate::rpc::wallet::WalletBalance); - $callback!(crate::rpc::wallet::WalletDefaultAddress); - $callback!(crate::rpc::wallet::WalletDelete); - $callback!(crate::rpc::wallet::WalletExport); - $callback!(crate::rpc::wallet::WalletHas); - $callback!(crate::rpc::wallet::WalletImport); - $callback!(crate::rpc::wallet::WalletList); - $callback!(crate::rpc::wallet::WalletNew); - $callback!(crate::rpc::wallet::WalletSetDefault); - $callback!(crate::rpc::wallet::WalletSign); - $callback!(crate::rpc::wallet::WalletSignMessage); - $callback!(crate::rpc::wallet::WalletValidateAddress); - $callback!(crate::rpc::wallet::WalletVerify); + $callback!($crate::rpc::wallet::WalletBalance); + $callback!($crate::rpc::wallet::WalletDefaultAddress); + $callback!($crate::rpc::wallet::WalletDelete); + $callback!($crate::rpc::wallet::WalletExport); + $callback!($crate::rpc::wallet::WalletHas); + $callback!($crate::rpc::wallet::WalletImport); + $callback!($crate::rpc::wallet::WalletList); + $callback!($crate::rpc::wallet::WalletNew); + $callback!($crate::rpc::wallet::WalletSetDefault); + $callback!($crate::rpc::wallet::WalletSign); + $callback!($crate::rpc::wallet::WalletSignMessage); + $callback!($crate::rpc::wallet::WalletValidateAddress); + $callback!($crate::rpc::wallet::WalletVerify); // f3 - $callback!(crate::rpc::f3::F3GetCertificate); - $callback!(crate::rpc::f3::F3GetECPowerTable); - $callback!(crate::rpc::f3::F3GetF3PowerTable); - $callback!(crate::rpc::f3::F3IsRunning); - $callback!(crate::rpc::f3::F3GetProgress); - $callback!(crate::rpc::f3::F3GetManifest); - $callback!(crate::rpc::f3::F3ListParticipants); - $callback!(crate::rpc::f3::F3GetLatestCertificate); - $callback!(crate::rpc::f3::F3GetOrRenewParticipationTicket); - $callback!(crate::rpc::f3::F3Participate); - $callback!(crate::rpc::f3::GetHead); - $callback!(crate::rpc::f3::GetParent); - $callback!(crate::rpc::f3::GetParticipatingMinerIDs); - $callback!(crate::rpc::f3::GetPowerTable); - $callback!(crate::rpc::f3::GetTipset); - $callback!(crate::rpc::f3::GetTipsetByEpoch); - $callback!(crate::rpc::f3::Finalize); - $callback!(crate::rpc::f3::ProtectPeer); - $callback!(crate::rpc::f3::SignMessage); + $callback!($crate::rpc::f3::F3GetCertificate); + $callback!($crate::rpc::f3::F3GetECPowerTable); + $callback!($crate::rpc::f3::F3GetF3PowerTable); + $callback!($crate::rpc::f3::F3IsRunning); + $callback!($crate::rpc::f3::F3GetProgress); + $callback!($crate::rpc::f3::F3GetManifest); + $callback!($crate::rpc::f3::F3ListParticipants); + $callback!($crate::rpc::f3::F3GetLatestCertificate); + $callback!($crate::rpc::f3::F3GetOrRenewParticipationTicket); + $callback!($crate::rpc::f3::F3Participate); + $callback!($crate::rpc::f3::GetHead); + $callback!($crate::rpc::f3::GetParent); + $callback!($crate::rpc::f3::GetParticipatingMinerIDs); + $callback!($crate::rpc::f3::GetPowerTable); + $callback!($crate::rpc::f3::GetTipset); + $callback!($crate::rpc::f3::GetTipsetByEpoch); + $callback!($crate::rpc::f3::Finalize); + $callback!($crate::rpc::f3::ProtectPeer); + $callback!($crate::rpc::f3::SignMessage); // misc - $callback!(crate::rpc::misc::GetActorEventsRaw); + $callback!($crate::rpc::misc::GetActorEventsRaw); }; } -pub(crate) use for_each_method; +pub(crate) use for_each_rpc_method; use tower_http::compression::CompressionLayer; use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer; @@ -293,7 +294,7 @@ pub mod prelude { }; } - for_each_method!(export); + for_each_rpc_method!(export); } /// All the methods live in their own folder @@ -579,7 +580,7 @@ where <$ty>::register_alias(&mut module).unwrap(); }; } - for_each_method!(register); + for_each_rpc_method!(register); module } @@ -631,7 +632,7 @@ pub fn openrpc(path: ApiPath, include: Option<&[&str]>) -> openrpc_types::OpenRP } }; } - for_each_method!(callback); + for_each_rpc_method!(callback); openrpc_types::OpenRPC { methods, components: Some(openrpc_types::Components { diff --git a/src/rpc/reflect/mod.rs b/src/rpc/reflect/mod.rs index a6187de94c1..222fd0f16c6 100644 --- a/src/rpc/reflect/mod.rs +++ b/src/rpc/reflect/mod.rs @@ -150,6 +150,21 @@ pub trait RpcMethodExt: RpcMethod { )), } } + + fn parse_params( + params_raw: Option>, + calling_convention: ParamStructure, + ) -> anyhow::Result { + Ok(Self::Params::parse( + params_raw + .map(|s| serde_json::from_str(s.as_ref())) + .transpose()?, + Self::PARAM_NAMES, + calling_convention, + Self::N_REQUIRED_PARAMS, + )?) + } + /// Generate a full `OpenRPC` method definition for this endpoint. fn openrpc<'de>( gen: &mut SchemaGenerator, @@ -214,17 +229,8 @@ pub trait RpcMethodExt: RpcMethod { ); module.register_async_method(Self::NAME, move |params, ctx, _extensions| async move { - let raw = params - .as_str() - .map(serde_json::from_str) - .transpose() + let params = Self::parse_params(params.as_str(), calling_convention) .map_err(|e| Error::invalid_params(e, None))?; - let params = Self::Params::parse( - raw, - Self::PARAM_NAMES, - calling_convention, - Self::N_REQUIRED_PARAMS, - )?; let ok = Self::handle(ctx, params).await?; Result::<_, jsonrpsee::types::ErrorObjectOwned>::Ok(ok.into_lotus_json()) }) diff --git a/src/shim/address.rs b/src/shim/address.rs index aa099b3738a..b116bb73669 100644 --- a/src/shim/address.rs +++ b/src/shim/address.rs @@ -49,7 +49,7 @@ thread_local! { /// /// The thread-local network variable is initialized to the value of the global network. This global /// network variable is set once when Forest has figured out which network it is using. -pub struct CurrentNetwork(); +pub struct CurrentNetwork; impl CurrentNetwork { pub fn get() -> Network { FromPrimitive::from_u8(LOCAL_NETWORK.with(|ident| ident.load(Ordering::Acquire))) diff --git a/src/tool/subcommands/api_cmd.rs b/src/tool/subcommands/api_cmd.rs index 651d2d1d443..f62c479dac0 100644 --- a/src/tool/subcommands/api_cmd.rs +++ b/src/tool/subcommands/api_cmd.rs @@ -1,6 +1,9 @@ // Copyright 2019-2025 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +mod generate_test_snapshot; +mod test_snapshot; + use crate::blocks::{ElectionProof, Ticket, Tipset}; use crate::db::car::ManyCar; use crate::eth::{EthChainId as EthChainIdType, SAFE_EPOCH_DELAY}; @@ -31,7 +34,7 @@ use crate::shim::{ use crate::tool::offline_server::start_offline_server; use crate::utils::UrlFromMultiAddr; use ahash::HashMap; -use anyhow::{bail, ensure}; +use anyhow::{bail, ensure, Context as _}; use bls_signatures::Serialize as _; use cid::Cid; use clap::{Subcommand, ValueEnum}; @@ -56,6 +59,7 @@ use std::{ time::Duration, }; use tabled::{builder::Builder, settings::Style}; +use test_snapshot::RpcTestSnapshot; use tokio::sync::Semaphore; use tracing::debug; @@ -144,6 +148,20 @@ pub enum ApiCommands { #[arg(long)] dump_dir: Option, }, + GenerateTestSnapshot { + /// Path to test dumps that are generated by `forest-tool api dump-tests` command + #[arg(num_args = 1.., required = true)] + test_dump_files: Vec, + /// Path to the database folder that powers a Forest node + #[arg(long, required = true)] + db: PathBuf, + /// Filecoin network chain + #[arg(long, required = true)] + chain: NetworkChain, + #[arg(long, required = true)] + /// Folder into which test snapshots are dumped + out_dir: PathBuf, + }, DumpTests { #[command(flatten)] create_tests_args: CreateTestsArgs, @@ -153,6 +171,11 @@ pub enum ApiCommands { #[arg(long)] include_ignored: bool, }, + Test { + /// Path to test snapshots that are generated by `forest-tool api generate-test-snapshot` command + #[arg(num_args = 1.., required = true)] + files: Vec, + }, } impl ApiCommands { @@ -218,6 +241,66 @@ impl ApiCommands { .await?; } } + Self::GenerateTestSnapshot { + test_dump_files, + db, + chain, + out_dir, + } => { + std::env::set_var("FOREST_TIPSET_CACHE_DISABLED", "1"); + if !out_dir.is_dir() { + std::fs::create_dir_all(&out_dir)?; + } + let tracking_db = generate_test_snapshot::load_db(&db)?; + for test_dump_file in test_dump_files { + let out_path = out_dir + .join(test_dump_file.file_name().context("Infallible")?) + .with_extension("rpcsnap.json"); + let test_dump = serde_json::from_reader(std::fs::File::open(&test_dump_file)?)?; + print!("Generating RPC snapshot at {} ...", out_path.display()); + match generate_test_snapshot::run_test_with_dump( + &test_dump, + tracking_db.clone(), + &chain, + ) + .await + { + Ok(_) => { + let snapshot = { + tracking_db.ensure_chain_head_is_tracked()?; + let mut db = vec![]; + tracking_db.export_forest_car(&mut db).await?; + RpcTestSnapshot { + chain: chain.clone(), + name: test_dump.request.method_name.to_string(), + params: test_dump.request.params, + response: test_dump.forest_response, + db, + } + }; + + std::fs::write(&out_path, serde_json::to_string_pretty(&snapshot)?)?; + println!(" Succeeded"); + } + Err(e) => { + println!(" Failed: {e}"); + } + }; + } + } + Self::Test { files } => { + for path in files { + print!("Running RPC test with snapshot {} ...", path.display()); + match test_snapshot::run_test_from_snapshot(&path).await { + Ok(_) => { + println!(" Succeeded"); + } + Err(e) => { + println!(" Failed: {e}"); + } + }; + } + } Self::DumpTests { create_tests_args, path, diff --git a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs new file mode 100644 index 00000000000..b74ed40d3f9 --- /dev/null +++ b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs @@ -0,0 +1,261 @@ +// Copyright 2019-2025 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use super::*; +use crate::{ + blocks::{CachingBlockHeader, TipsetKey}, + chain::ChainStore, + chain_sync::{network_context::SyncNetworkContext, SyncConfig, SyncStage}, + daemon::db_util::load_all_forest_cars, + db::{ + db_engine::open_db, parity_db::ParityDb, EthMappingsStore, MemoryDB, SettingsStore, + SettingsStoreExt, CAR_DB_DIR_NAME, + }, + genesis::{get_network_name_from_genesis, read_genesis_header}, + libp2p::{NetworkMessage, PeerManager}, + libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite, Block64}, + message_pool::{MessagePool, MpoolRpcProvider}, + networks::ChainConfig, + shim::address::CurrentNetwork, + state_manager::StateManager, + KeyStore, KeyStoreConfig, +}; +use fvm_shared4::address::Network; +use openrpc_types::ParamStructure; +use parking_lot::RwLock; +use rpc::{eth::filter::EthEventHandler, RPCState, RpcMethod as _}; +use tokio::{sync::mpsc, task::JoinSet}; + +pub async fn run_test_with_dump( + test_dump: &TestDump, + db: Arc>>, + chain: &NetworkChain, +) -> anyhow::Result<()> { + if chain.is_testnet() { + CurrentNetwork::set_global(Network::Testnet); + } + let mut run = false; + let chain_config = Arc::new(ChainConfig::from_chain(chain)); + let (ctx, _, _) = ctx(db, chain_config).await?; + let params_raw = Some(serde_json::to_string(&test_dump.request.params)?); + macro_rules! run_test { + ($ty:ty) => { + if test_dump.request.method_name.as_ref() == <$ty>::NAME { + let params = <$ty>::parse_params(params_raw.clone(), ParamStructure::Either)?; + let result = <$ty>::handle(ctx.clone(), params).await?; + assert_eq!( + test_dump.forest_response, + Ok(result.into_lotus_json_value()?) + ); + run = true; + } + }; + } + crate::for_each_rpc_method!(run_test); + anyhow::ensure!(run, "RPC method not found"); + Ok(()) +} + +pub(super) fn load_db( + db_root: &Path, +) -> anyhow::Result>>> { + let db_writer = open_db(db_root.into(), Default::default())?; + let db = ManyCar::new(db_writer); + let forest_car_db_dir = db_root.join(CAR_DB_DIR_NAME); + load_all_forest_cars(&db, &forest_car_db_dir)?; + Ok(Arc::new(ReadOpsTrackingStore::new(db))) +} + +async fn ctx( + db: Arc>>, + chain_config: Arc, +) -> anyhow::Result<( + Arc>>>, + flume::Receiver, + tokio::sync::mpsc::Receiver<()>, +)> { + let (network_send, network_rx) = flume::bounded(5); + let (tipset_send, _) = flume::bounded(5); + let sync_config = Arc::new(SyncConfig::default()); + let genesis_header = + read_genesis_header(None, chain_config.genesis_bytes(&db).await?.as_deref(), &db).await?; + + let chain_store = Arc::new( + ChainStore::new( + db.clone(), + db.clone(), + db, + chain_config.clone(), + genesis_header.clone(), + ) + .unwrap(), + ); + + let state_manager = + Arc::new(StateManager::new(chain_store.clone(), chain_config, sync_config).unwrap()); + let network_name = get_network_name_from_genesis(&genesis_header, &state_manager)?; + let message_pool = MessagePool::new( + MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()), + network_name.clone(), + network_send.clone(), + Default::default(), + state_manager.chain_config().clone(), + &mut JoinSet::new(), + )?; + + let peer_manager = Arc::new(PeerManager::default()); + let sync_network_context = + SyncNetworkContext::new(network_send, peer_manager, state_manager.blockstore_owned()); + let (shutdown, shutdown_recv) = mpsc::channel(1); + let rpc_state = Arc::new(RPCState { + state_manager, + keystore: Arc::new(tokio::sync::RwLock::new(KeyStore::new( + KeyStoreConfig::Memory, + )?)), + mpool: Arc::new(message_pool), + bad_blocks: Default::default(), + sync_state: Arc::new(RwLock::new(Default::default())), + eth_event_handler: Arc::new(EthEventHandler::new()), + sync_network_context, + network_name, + start_time: chrono::Utc::now(), + shutdown, + tipset_send, + }); + rpc_state.sync_state.write().set_stage(SyncStage::Idle); + Ok((rpc_state, network_rx, shutdown_recv)) +} + +/// A [`Blockstore`] wrapper that tracks read operations to the inner [`Blockstore`] with an [`MemoryDB`] +pub struct ReadOpsTrackingStore { + inner: T, + tracker: Arc, +} + +impl ReadOpsTrackingStore +where + T: Blockstore + SettingsStore, +{ + pub fn new(inner: T) -> Self { + Self { + inner, + tracker: Arc::new(Default::default()), + } + } + + pub fn ensure_chain_head_is_tracked(&self) -> anyhow::Result<()> { + if !self.is_chain_head_tracked()? { + let _ = + SettingsStoreExt::read_obj::(self, crate::db::setting_keys::HEAD_KEY)? + .context("HEAD_KEY not found")? + .into_cids() + .into_iter() + .map(|key| CachingBlockHeader::load(self, key)) + .collect::>>>()? + .map(Tipset::new) + .transpose()? + .context("failed to load tipset")?; + } + + Ok(()) + } + + fn is_chain_head_tracked(&self) -> anyhow::Result { + SettingsStore::exists(&self.tracker, crate::db::setting_keys::HEAD_KEY) + } + + pub async fn export_forest_car( + &self, + writer: &mut W, + ) -> anyhow::Result<()> { + self.tracker.export_forest_car(writer).await + } +} + +impl Blockstore for ReadOpsTrackingStore { + fn get(&self, k: &Cid) -> anyhow::Result>> { + let result = self.inner.get(k)?; + if let Some(v) = &result { + self.tracker.put_keyed(k, v.as_slice())?; + } + Ok(result) + } + + fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> { + self.inner.put_keyed(k, block) + } +} + +impl SettingsStore for ReadOpsTrackingStore { + fn read_bin(&self, key: &str) -> anyhow::Result>> { + let result = self.inner.read_bin(key)?; + if let Some(v) = &result { + SettingsStore::write_bin(&self.tracker, key, v.as_slice())?; + } + Ok(result) + } + + fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> { + self.inner.write_bin(key, value) + } + + fn exists(&self, key: &str) -> anyhow::Result { + let result = self.inner.read_bin(key)?; + if let Some(v) = &result { + SettingsStore::write_bin(&self.tracker, key, v.as_slice())?; + } + Ok(result.is_some()) + } + + fn setting_keys(&self) -> anyhow::Result> { + self.inner.setting_keys() + } +} + +impl BitswapStoreRead for ReadOpsTrackingStore { + fn contains(&self, cid: &Cid) -> anyhow::Result { + let result = self.inner.get(cid)?; + if let Some(v) = &result { + Blockstore::put_keyed(&self.tracker, cid, v.as_slice())?; + } + Ok(result.is_some()) + } + + fn get(&self, cid: &Cid) -> anyhow::Result>> { + let result = self.inner.get(cid)?; + if let Some(v) = &result { + Blockstore::put_keyed(&self.tracker, cid, v.as_slice())?; + } + Ok(result) + } +} + +impl BitswapStoreReadWrite for ReadOpsTrackingStore { + type Hashes = ::Hashes; + + fn insert(&self, block: &Block64) -> anyhow::Result<()> { + self.inner.insert(block) + } +} + +impl EthMappingsStore for ReadOpsTrackingStore { + fn read_bin(&self, key: &EthHash) -> anyhow::Result>> { + self.inner.read_bin(key) + } + + fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> { + self.inner.write_bin(key, value) + } + + fn exists(&self, key: &EthHash) -> anyhow::Result { + self.inner.exists(key) + } + + fn get_message_cids(&self) -> anyhow::Result> { + self.inner.get_message_cids() + } + + fn delete(&self, keys: Vec) -> anyhow::Result<()> { + self.inner.delete(keys) + } +} diff --git a/src/tool/subcommands/api_cmd/test_snapshot.rs b/src/tool/subcommands/api_cmd/test_snapshot.rs new file mode 100644 index 00000000000..2a0f955063a --- /dev/null +++ b/src/tool/subcommands/api_cmd/test_snapshot.rs @@ -0,0 +1,211 @@ +// Copyright 2019-2025 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use crate::{ + chain::ChainStore, + chain_sync::{network_context::SyncNetworkContext, SyncConfig, SyncStage}, + db::{ + car::{AnyCar, ManyCar}, + MemoryDB, + }, + genesis::{get_network_name_from_genesis, read_genesis_header}, + libp2p::{NetworkMessage, PeerManager}, + lotus_json::HasLotusJson, + message_pool::{MessagePool, MpoolRpcProvider}, + networks::{ChainConfig, NetworkChain}, + rpc::{eth::filter::EthEventHandler, RPCState, RpcMethod as _, RpcMethodExt as _}, + shim::address::{CurrentNetwork, Network}, + state_manager::StateManager, + KeyStore, KeyStoreConfig, +}; +use openrpc_types::ParamStructure; +use parking_lot::RwLock; +use serde::{Deserialize, Serialize}; +use std::{path::Path, sync::Arc}; +use tokio::{sync::mpsc, task::JoinSet}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RpcTestSnapshot { + pub chain: NetworkChain, + pub name: String, + pub params: serde_json::Value, + pub response: Result, + #[serde(with = "crate::lotus_json::base64_standard")] + pub db: Vec, +} + +pub async fn run_test_from_snapshot(path: &Path) -> anyhow::Result<()> { + let mut run = false; + let snapshot_bytes = std::fs::read(path)?; + let snapshot_bytes = if let Ok(bytes) = zstd::decode_all(snapshot_bytes.as_slice()) { + bytes + } else { + snapshot_bytes + }; + let RpcTestSnapshot { + chain, + name: method_name, + params, + db: db_bytes, + response: expected_response, + } = serde_json::from_slice(snapshot_bytes.as_slice())?; + if chain.is_testnet() { + CurrentNetwork::set_global(Network::Testnet); + } + let db = Arc::new(ManyCar::new(MemoryDB::default()).with_read_only(AnyCar::new(db_bytes)?)?); + let chain_config = Arc::new(ChainConfig::from_chain(&chain)); + let (ctx, _, _) = ctx(db, chain_config).await?; + let params_raw = match serde_json::to_string(¶ms)? { + s if s.is_empty() => None, + s => Some(s), + }; + + macro_rules! run_test { + ($ty:ty) => { + if method_name.as_str() == <$ty>::NAME { + let params = <$ty>::parse_params(params_raw.clone(), ParamStructure::Either)?; + let result = <$ty>::handle(ctx.clone(), params) + .await + .map_err(|e| e.to_string()) + .and_then(|r| r.into_lotus_json_value().map_err(|e| e.to_string())); + assert_eq!(expected_response, result); + run = true; + } + }; + } + + crate::for_each_rpc_method!(run_test); + + assert!(run, "RPC method not found"); + + Ok(()) +} + +async fn ctx( + db: Arc>, + chain_config: Arc, +) -> anyhow::Result<( + Arc>>, + flume::Receiver, + tokio::sync::mpsc::Receiver<()>, +)> { + let (network_send, network_rx) = flume::bounded(5); + let (tipset_send, _) = flume::bounded(5); + let sync_config = Arc::new(SyncConfig::default()); + let genesis_header = + read_genesis_header(None, chain_config.genesis_bytes(&db).await?.as_deref(), &db).await?; + let chain_store = Arc::new( + ChainStore::new( + db.clone(), + db.clone(), + db.clone(), + chain_config.clone(), + genesis_header.clone(), + ) + .unwrap(), + ); + chain_store.set_heaviest_tipset(db.heaviest_tipset()?.into())?; + let state_manager = + Arc::new(StateManager::new(chain_store.clone(), chain_config, sync_config).unwrap()); + let network_name = get_network_name_from_genesis(&genesis_header, &state_manager)?; + let message_pool = MessagePool::new( + MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()), + network_name.clone(), + network_send.clone(), + Default::default(), + state_manager.chain_config().clone(), + &mut JoinSet::new(), + )?; + + let peer_manager = Arc::new(PeerManager::default()); + let sync_network_context = + SyncNetworkContext::new(network_send, peer_manager, state_manager.blockstore_owned()); + let (shutdown, shutdown_recv) = mpsc::channel(1); + let rpc_state = Arc::new(RPCState { + state_manager, + keystore: Arc::new(tokio::sync::RwLock::new(KeyStore::new( + KeyStoreConfig::Memory, + )?)), + mpool: Arc::new(message_pool), + bad_blocks: Default::default(), + sync_state: Arc::new(RwLock::new(Default::default())), + eth_event_handler: Arc::new(EthEventHandler::new()), + sync_network_context, + network_name, + start_time: chrono::Utc::now(), + shutdown, + tipset_send, + }); + rpc_state.sync_state.write().set_stage(SyncStage::Idle); + Ok((rpc_state, network_rx, shutdown_recv)) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{daemon::db_util::download_to, utils::net::global_http_client}; + use directories::ProjectDirs; + use itertools::Itertools as _; + use md5::{Digest as _, Md5}; + use url::Url; + + #[tokio::test] + async fn rpc_regression_tests() { + let urls = include_str!("test_snapshots.txt") + .trim() + .split("\n") + .filter_map(|n| { + Url::parse( + format!("https://forest-snapshots.fra1.digitaloceanspaces.com/rpc_test/{n}") + .as_str(), + ) + .ok() + .map(|url| (n, url)) + }) + .collect_vec(); + let project_dir = ProjectDirs::from("com", "ChainSafe", "Forest").unwrap(); + let cache_dir = project_dir.cache_dir().join("test").join("rpc-snapshots"); + for (filename, url) in urls { + let cache_file_path = cache_dir.join(filename); + let is_file_cached = match get_file_md5_etag(&cache_file_path) { + Some(file_etag) => { + let url_etag = get_digital_ocean_space_url_etag(url.clone()).await.unwrap(); + if Some(&file_etag) == url_etag.as_ref() { + true + } else { + println!( + "etag mismatch, file: {filename}, local: {file_etag}, remote: {}", + url_etag.unwrap_or_default() + ); + false + } + } + None => false, + }; + if !is_file_cached { + println!("Downloading from {url} to {}", cache_file_path.display()); + download_to(&url, &cache_file_path).await.unwrap(); + } + print!("Testing {filename} ..."); + run_test_from_snapshot(&cache_file_path).await.unwrap(); + println!(" succeeded."); + } + } + + async fn get_digital_ocean_space_url_etag(url: Url) -> anyhow::Result> { + let response = global_http_client().head(url).send().await?; + Ok(response + .headers() + .get("etag") + .and_then(|v| v.to_str().ok().map(|v| v.replace('"', "").to_string()))) + } + + fn get_file_md5_etag(path: &Path) -> Option { + std::fs::read(path).ok().map(|bytes| { + let mut hasher = Md5::new(); + hasher.update(bytes.as_slice()); + let hash = hasher.finalize(); + format!("{hash:x}") + }) + } +} diff --git a/src/tool/subcommands/api_cmd/test_snapshots.txt b/src/tool/subcommands/api_cmd/test_snapshots.txt new file mode 100644 index 00000000000..38605336482 --- /dev/null +++ b/src/tool/subcommands/api_cmd/test_snapshots.txt @@ -0,0 +1,6 @@ +filecoin_stategetallallocations_1733735079961566.rpcsnap.json.zst +filecoin_stategetallclaims_1733735080472880.rpcsnap.json.zst +filecoin_stategetallocation_1733735082114977.rpcsnap.json.zst +filecoin_stategetallocationforpendingdeal_1733735082149029.rpcsnap.json.zst +filecoin_stategetallocationidforpendingdeal_1733735082161045.rpcsnap.json.zst +filecoin_stategetallocations_1733735082163772.rpcsnap.json.zst