diff --git a/CHANGELOG.md b/CHANGELOG.md index 83bd15c33659..bb91d9f22357 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,9 @@ Notable updates: ### Changed +- [database] Move blockstore meta-data to standalone files. + [2635](https://github.com/ChainSafe/forest/pull/2635) + [2652](https://github.com/ChainSafe/forest/pull/2652) - [cli] Remove Forest ctrl-c hard shutdown behavior on subsequent ctrl-c signals. [#2538](https://github.com/ChainSafe/forest/pull/2538) - [libp2p] Use in house bitswap implementation. diff --git a/Cargo.lock b/Cargo.lock index 803760fcc569..235e42cdaede 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3655,7 +3655,6 @@ version = "0.6.0" dependencies = [ "ahash 0.8.3", "anyhow", - "async-trait", "chrono", "cid", "flume", @@ -3692,7 +3691,6 @@ dependencies = [ "forest_blocks", "forest_chain", "forest_chain_sync", - "forest_db", "forest_interpreter", "forest_key_management", "forest_networks", @@ -3738,7 +3736,6 @@ dependencies = [ "forest_blocks", "forest_chain", "forest_chain_sync", - "forest_db", "forest_fil_types", "forest_interpreter", "forest_key_management", @@ -3777,7 +3774,6 @@ dependencies = [ "cid", "flume", "forest_blocks", - "forest_db", "forest_state_manager", "forest_utils", "futures", @@ -4320,16 +4316,20 @@ dependencies = [ name = "forest_utils" version = "0.6.0" dependencies = [ + "ahash 0.8.3", "anyhow", "async-trait", "atty", + "chrono", "cid", "const_format", "digest 0.10.6", + "flume", "forest_encoding", "futures", "fvm_ipld_blockstore", "fvm_ipld_encoding 0.2.3", + "human-repr", "hyper", "hyper-rustls", "libc", diff --git a/blockchain/blocks/src/persistence.rs b/blockchain/blocks/src/persistence.rs index c4b7a4973c93..d629494deff9 100644 --- a/blockchain/blocks/src/persistence.rs +++ b/blockchain/blocks/src/persistence.rs @@ -29,7 +29,7 @@ mod tests { fn tipset_keys_round_trip() -> Result<()> { let path = Path::new("tests/calibnet/HEAD"); let obj1: FileBacked = - FileBacked::load_from_file_or_create(path.into(), Default::default)?; + FileBacked::load_from_file_or_create(path.into(), Default::default, None)?; let serialized = obj1.inner().serialize()?; let deserialized = TipsetKeys::deserialize(&serialized)?; diff --git a/blockchain/chain/src/store/chain_store.rs b/blockchain/chain/src/store/chain_store.rs index b13d139b7a99..51906abbbf2d 100644 --- a/blockchain/chain/src/store/chain_store.rs +++ b/blockchain/chain/src/store/chain_store.rs @@ -3,7 +3,7 @@ use std::{num::NonZeroUsize, path::Path, sync::Arc, time::SystemTime}; -use ahash::{HashMap, HashMapExt}; +use ahash::{HashMap, HashMapExt, HashSet}; use anyhow::Result; use async_stream::stream; use bls_signatures::Serialize as SerializeBls; @@ -11,7 +11,6 @@ use cid::{multihash::Code::Blake2b256, Cid}; use digest::Digest; use forest_beacon::{BeaconEntry, IGNORE_DRAND_VAR}; use forest_blocks::{Block, BlockHeader, FullTipset, Tipset, TipsetKeys, TxMeta}; -use forest_db::Store; use forest_encoding::de::DeserializeOwned; use forest_interpreter::BlockMessages; use forest_ipld::{should_save_block_to_snapshot, walk_snapshot}; @@ -28,7 +27,10 @@ use forest_shim::{ state_tree::StateTree, }; use forest_utils::{ - db::{file_backed_obj::FileBacked, BlockstoreExt}, + db::{ + file_backed_obj::{FileBacked, SYNC_PERIOD}, + BlockstoreExt, + }, io::Checksum, }; use fvm_ipld_amt::Amtv0 as Amt; @@ -56,8 +58,6 @@ use super::{ }; use crate::Scale; -const BLOCK_VAL_PREFIX: &[u8] = b"block_val/"; - // A cap on the size of the future_sink const SINK_CAP: usize = 200; @@ -97,6 +97,9 @@ pub struct ChainStore { /// File backed heaviest tipset keys file_backed_heaviest_tipset_keys: Mutex>, + + /// File backed validated blocks + file_backed_validated_blocks: Mutex>>, } impl BitswapStoreRead for ChainStore @@ -125,7 +128,7 @@ where impl ChainStore where - DB: Blockstore + Store + Send + Sync, + DB: Blockstore + Send + Sync, { pub fn new( db: DB, @@ -142,11 +145,17 @@ where *genesis_block_header.cid(), chain_data_root.join("GENESIS"), )); - let file_backed_heaviest_tipset_keys = Mutex::new(FileBacked::load_from_file_or_create( chain_data_root.join("HEAD"), || TipsetKeys::new(vec![*genesis_block_header.cid()]), + None, + )?); + let file_backed_validated_blocks = Mutex::new(FileBacked::load_from_file_or_create( + chain_data_root.join("VALIDATED_BLOCKS"), + HashSet::default, + Some(SYNC_PERIOD), )?); + let cs = Self { publisher, chain_index: ChainIndex::new(ts_cache.clone(), db.clone()), @@ -155,6 +164,7 @@ where ts_cache, file_backed_genesis, file_backed_heaviest_tipset_keys, + file_backed_validated_blocks, }; cs.set_genesis(genesis_block_header)?; @@ -268,20 +278,25 @@ where Ok(()) } - /// Checks store if block has already been validated. Key based on the block - /// validation prefix. + /// Checks metadata file if block has already been validated. pub fn is_block_validated(&self, cid: &Cid) -> Result { - let key = block_validation_key(cid); - - Ok(self.db.exists(key)?) + let validated = self + .file_backed_validated_blocks + .lock() + .inner() + .contains(cid); + if validated { + log::debug!("Block {cid} was previously validated"); + } + Ok(validated) } - /// Marks block as validated in the store. This is retrieved using the block - /// validation prefix. + /// Marks block as validated in the metadata file. pub fn mark_block_as_validated(&self, cid: &Cid) -> Result<(), Error> { - let key = block_validation_key(cid); - - Ok(self.db.write(key, [])?) + let mut file = self.file_backed_validated_blocks.lock(); + Ok(file.with_inner(|inner| { + inner.insert(*cid); + })?) } /// Returns the tipset behind `tsk` at a given `height`. @@ -619,14 +634,6 @@ where Ok(ts) } -/// Helper to ensure consistent CID to db key translation. -fn block_validation_key(cid: &Cid) -> Vec { - let mut key = Vec::new(); - key.extend_from_slice(BLOCK_VAL_PREFIX); - key.extend(cid.to_bytes()); - key -} - /// Returns a Tuple of BLS messages of type `UnsignedMessage` and SECP messages /// of type `SignedMessage` pub fn block_messages( diff --git a/blockchain/chain_sync/src/chain_muxer.rs b/blockchain/chain_sync/src/chain_muxer.rs index c9c67086bd7f..5e183d26a450 100644 --- a/blockchain/chain_sync/src/chain_muxer.rs +++ b/blockchain/chain_sync/src/chain_muxer.rs @@ -13,7 +13,6 @@ use forest_blocks::{ Block, Error as ForestBlockError, FullTipset, GossipBlock, Tipset, TipsetKeys, }; use forest_chain::{ChainStore, Error as ChainStoreError}; -use forest_db::Store; use forest_libp2p::{ hello::HelloRequest, NetworkEvent, NetworkMessage, PeerId, PeerManager, PubsubMessage, }; @@ -159,7 +158,7 @@ pub struct ChainMuxer { impl ChainMuxer where - DB: Blockstore + Store + Clone + Sync + Send + 'static, + DB: Blockstore + Clone + Sync + Send + 'static, M: Provider + Sync + Send + 'static, C: Consensus, { @@ -838,7 +837,7 @@ enum ChainMuxerState { impl Future for ChainMuxer where - DB: Blockstore + Store + Clone + Sync + Send + 'static, + DB: Blockstore + Clone + Sync + Send + 'static, M: Provider + Sync + Send + 'static, C: Consensus, { diff --git a/blockchain/chain_sync/src/consensus.rs b/blockchain/chain_sync/src/consensus.rs index ef6deeeb0a8e..3c77cf0e6082 100644 --- a/blockchain/chain_sync/src/consensus.rs +++ b/blockchain/chain_sync/src/consensus.rs @@ -10,7 +10,6 @@ use std::{ use async_trait::async_trait; use forest_blocks::{Block, GossipBlock, Tipset}; use forest_chain::Scale; -use forest_db::Store; use forest_libp2p::{NetworkMessage, Topic, PUBSUB_BLOCK_STR}; use forest_message::SignedMessage; use forest_message_pool::MessagePool; @@ -46,7 +45,7 @@ pub trait Consensus: Scale + Debug + Send + Sync + Unpin + 'static { block: Arc, ) -> Result<(), NonEmpty> where - DB: Blockstore + Store + Clone + Sync + Send + 'static; + DB: Blockstore + Clone + Sync + Send + 'static; } /// Helper function to collect errors from async validations. @@ -114,7 +113,7 @@ pub trait Proposer { services: &mut JoinSet>, ) -> anyhow::Result<()> where - DB: Blockstore + Store + Clone + Sync + Send + 'static, + DB: Blockstore + Clone + Sync + Send + 'static, MP: MessagePoolApi + Sync + Send + 'static; } @@ -142,7 +141,7 @@ pub trait MessagePoolApi { base: &Tipset, ) -> anyhow::Result>> where - DB: Blockstore + Store + Clone + Sync + Send + 'static; + DB: Blockstore + Clone + Sync + Send + 'static; } impl

MessagePoolApi for MessagePool

@@ -155,7 +154,7 @@ where base: &Tipset, ) -> anyhow::Result>> where - DB: Blockstore + Store + Clone + Sync + Send + 'static, + DB: Blockstore + Clone + Sync + Send + 'static, { self.select_messages_for_block(base) .map_err(|e| e.into()) diff --git a/blockchain/chain_sync/src/tipset_syncer.rs b/blockchain/chain_sync/src/tipset_syncer.rs index be518d48e752..7976f2c1ea5f 100644 --- a/blockchain/chain_sync/src/tipset_syncer.rs +++ b/blockchain/chain_sync/src/tipset_syncer.rs @@ -17,7 +17,6 @@ use forest_blocks::{ Block, BlockHeader, Error as ForestBlockError, FullTipset, Tipset, TipsetKeys, }; use forest_chain::{persist_objects, ChainStore, Error as ChainStoreError}; -use forest_db::Store; use forest_libp2p::chain_exchange::TipsetBundle; use forest_message::{message::valid_for_block_inclusion, Message as MessageTrait}; use forest_networks::Height; @@ -262,7 +261,7 @@ pub(crate) struct TipsetProcessor { impl TipsetProcessor where - DB: Blockstore + Store + Clone + Sync + Send + 'static, + DB: Blockstore + Clone + Sync + Send + 'static, C: Consensus, { #[allow(clippy::too_many_arguments)] @@ -350,7 +349,7 @@ enum TipsetProcessorState { impl Future for TipsetProcessor where - DB: Blockstore + Store + Clone + Sync + Send + 'static, + DB: Blockstore + Clone + Sync + Send + 'static, C: Consensus, { type Output = Result<(), TipsetProcessorError>; @@ -638,7 +637,7 @@ pub(crate) struct TipsetRangeSyncer { impl TipsetRangeSyncer where - DB: Blockstore + Store + Clone + Sync + Send + 'static, + DB: Blockstore + Clone + Sync + Send + 'static, C: Consensus, { #[allow(clippy::too_many_arguments)] @@ -762,7 +761,7 @@ where /// messages going forward on the chain and validate each extension. Finally set /// the proposed head as the heaviest tipset. #[allow(clippy::too_many_arguments)] -fn sync_tipset_range( +fn sync_tipset_range( proposed_head: Arc, current_head: Arc, tracker: crate::chain_muxer::WorkerState, @@ -850,10 +849,7 @@ fn sync_tipset_range( +async fn sync_headers_in_reverse( tracker: crate::chain_muxer::WorkerState, tipset_range_length: u64, proposed_head: Arc, @@ -971,7 +967,7 @@ async fn sync_headers_in_reverse< } #[allow(clippy::too_many_arguments)] -fn sync_tipset( +fn sync_tipset( proposed_head: Arc, consensus: Arc, state_manager: Arc>, @@ -1020,7 +1016,7 @@ fn sync_tipset( +async fn fetch_batch( batch: &[Arc], network: &SyncNetworkContext, chainstore: &ChainStore, @@ -1068,10 +1064,7 @@ async fn fetch_batch( +async fn sync_messages_check_state( tracker: crate::chain_muxer::WorkerState, consensus: Arc, state_manager: Arc>, @@ -1145,7 +1138,7 @@ async fn sync_messages_check_state< /// executed), adding the successful ones to the tipset tracker, and the failed /// ones to the bad block cache, depending on strategy. Any bad block fails /// validation. -async fn validate_tipset( +async fn validate_tipset( consensus: Arc, state_manager: Arc>, chainstore: &ChainStore, @@ -1235,7 +1228,7 @@ async fn validate_tipset( +async fn validate_block( consensus: Arc, state_manager: Arc>, block: Arc, @@ -1430,10 +1423,7 @@ async fn validate_block( +async fn check_block_messages( state_manager: Arc>, block: Arc, base_tipset: Arc, diff --git a/blockchain/consensus/deleg_cns/Cargo.toml b/blockchain/consensus/deleg_cns/Cargo.toml index 33910ae65ed7..556f05213d40 100644 --- a/blockchain/consensus/deleg_cns/Cargo.toml +++ b/blockchain/consensus/deleg_cns/Cargo.toml @@ -12,7 +12,6 @@ async-trait.workspace = true forest_blocks.workspace = true forest_chain.workspace = true forest_chain_sync.workspace = true -forest_db.workspace = true forest_interpreter.workspace = true forest_key_management.workspace = true forest_networks.workspace = true diff --git a/blockchain/consensus/deleg_cns/src/composition.rs b/blockchain/consensus/deleg_cns/src/composition.rs index 931a9020a849..c84df9523f04 100644 --- a/blockchain/consensus/deleg_cns/src/composition.rs +++ b/blockchain/consensus/deleg_cns/src/composition.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use forest_chain_sync::consensus::{MessagePoolApi, Proposer, SyncGossipSubmitter}; -use forest_db::Store; use forest_key_management::KeyStore; use forest_shim::econ::TokenAmount; use forest_state_manager::StateManager; @@ -32,7 +31,7 @@ pub async fn consensus( services: &mut JoinSet>, ) -> anyhow::Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, MP: MessagePoolApi + Send + Sync + 'static, { let consensus = DelegatedConsensus::default(); diff --git a/blockchain/consensus/deleg_cns/src/consensus.rs b/blockchain/consensus/deleg_cns/src/consensus.rs index bacdbaec5502..61bf4ef5f6d6 100644 --- a/blockchain/consensus/deleg_cns/src/consensus.rs +++ b/blockchain/consensus/deleg_cns/src/consensus.rs @@ -7,7 +7,6 @@ use async_trait::async_trait; use forest_blocks::{Block, Tipset}; use forest_chain::{Error as ChainStoreError, Scale, Weight}; use forest_chain_sync::consensus::Consensus; -use forest_db::Store; use forest_key_management::KeyStore; use forest_shim::address::Address; use forest_state_manager::{Error as StateManagerError, StateManager}; @@ -98,7 +97,7 @@ impl DelegatedConsensus { state_manager: &Arc>, ) -> anyhow::Result> where - DB: Blockstore + Store + Clone + Sync + Send + 'static, + DB: Blockstore + Clone + Sync + Send + 'static, { let genesis = state_manager.chain_store().genesis()?; let state_cid = genesis.state_root(); @@ -143,7 +142,7 @@ impl Consensus for DelegatedConsensus { block: Arc, ) -> Result<(), NonEmpty> where - DB: Blockstore + Store + Clone + Sync + Send + 'static, + DB: Blockstore + Clone + Sync + Send + 'static, { crate::validation::validate_block(&self.chosen_one, state_manager, block) .await diff --git a/blockchain/consensus/deleg_cns/src/proposer.rs b/blockchain/consensus/deleg_cns/src/proposer.rs index e4a12bbe730f..3d9c38e3aea8 100644 --- a/blockchain/consensus/deleg_cns/src/proposer.rs +++ b/blockchain/consensus/deleg_cns/src/proposer.rs @@ -9,7 +9,6 @@ use async_trait::async_trait; use forest_blocks::{BlockHeader, GossipBlock, Tipset}; use forest_chain::Scale; use forest_chain_sync::consensus::{MessagePoolApi, Proposer, SyncGossipSubmitter}; -use forest_db::Store; use forest_key_management::Key; use forest_networks::Height; use forest_shim::address::Address; @@ -52,7 +51,7 @@ impl DelegatedProposer { base: &Arc, ) -> anyhow::Result where - DB: Blockstore + Store + Clone + Sync + Send + 'static, + DB: Blockstore + Clone + Sync + Send + 'static, { let block_delay = state_manager.chain_config().block_delay_secs; let smoke_height = state_manager.chain_config().epoch(Height::Smoke); @@ -105,7 +104,7 @@ impl Proposer for DelegatedProposer { services: &mut JoinSet>, ) -> anyhow::Result<()> where - DB: Blockstore + Store + Clone + Sync + Send + 'static, + DB: Blockstore + Clone + Sync + Send + 'static, MP: MessagePoolApi + Send + Sync + 'static, { services.spawn(async move { @@ -125,7 +124,7 @@ impl DelegatedProposer { submitter: &SyncGossipSubmitter, ) -> anyhow::Result<()> where - DB: Blockstore + Store + Clone + Sync + Send + 'static, + DB: Blockstore + Clone + Sync + Send + 'static, MP: MessagePoolApi + Send + Sync + 'static, { // TODO: Ideally these should not be coming through the `StateManager`. diff --git a/blockchain/consensus/deleg_cns/src/validation.rs b/blockchain/consensus/deleg_cns/src/validation.rs index bc73865f519f..25a37110d7b9 100644 --- a/blockchain/consensus/deleg_cns/src/validation.rs +++ b/blockchain/consensus/deleg_cns/src/validation.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use forest_blocks::{Block, BlockHeader, Tipset}; -use forest_db::Store; use forest_networks::ChainConfig; use forest_shim::address::Address; use forest_state_manager::StateManager; @@ -21,7 +20,7 @@ use crate::DelegatedConsensusError; /// * Timestamps /// * The block was proposed by the only only miner eligible #[allow(clippy::unused_async)] -pub(crate) async fn validate_block( +pub(crate) async fn validate_block( chosen_one: &Address, state_manager: Arc>, block: Arc, @@ -94,7 +93,7 @@ fn validate_miner( chosen_one: &Address, ) -> Result<(), Box> where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, { use DelegatedConsensusError::*; let miner_addr = header.miner_address(); diff --git a/blockchain/consensus/fil_cns/Cargo.toml b/blockchain/consensus/fil_cns/Cargo.toml index 70d51878a8ea..7eb790f24c56 100644 --- a/blockchain/consensus/fil_cns/Cargo.toml +++ b/blockchain/consensus/fil_cns/Cargo.toml @@ -16,7 +16,6 @@ forest_beacon.workspace = true forest_blocks.workspace = true forest_chain.workspace = true forest_chain_sync.workspace = true -forest_db.workspace = true forest_fil_types.workspace = true forest_interpreter.workspace = true forest_key_management.workspace = true diff --git a/blockchain/consensus/fil_cns/src/composition.rs b/blockchain/consensus/fil_cns/src/composition.rs index 1b383f8e3b06..d83f0befcc40 100644 --- a/blockchain/consensus/fil_cns/src/composition.rs +++ b/blockchain/consensus/fil_cns/src/composition.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use forest_beacon::DrandBeacon; use forest_chain_sync::consensus::{MessagePoolApi, SyncGossipSubmitter}; -use forest_db::Store; use forest_key_management::KeyStore; use forest_state_manager::StateManager; use fvm_ipld_blockstore::Blockstore; @@ -29,7 +28,7 @@ pub async fn consensus( _services: &mut JoinSet>, ) -> anyhow::Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, MP: MessagePoolApi + Send + Sync + 'static, { let consensus = FilecoinConsensus::new(state_manager.beacon_schedule()); diff --git a/blockchain/consensus/fil_cns/src/lib.rs b/blockchain/consensus/fil_cns/src/lib.rs index 83e308095871..db15ad24f270 100644 --- a/blockchain/consensus/fil_cns/src/lib.rs +++ b/blockchain/consensus/fil_cns/src/lib.rs @@ -8,7 +8,6 @@ use forest_beacon::{Beacon, BeaconSchedule}; use forest_blocks::{Block, Tipset}; use forest_chain::{Error as ChainStoreError, Scale, Weight}; use forest_chain_sync::Consensus; -use forest_db::Store; use forest_state_manager::{Error as StateManagerError, StateManager}; use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::Error as ForestEncodingError; @@ -109,7 +108,7 @@ where block: Arc, ) -> Result<(), NonEmpty> where - DB: Blockstore + Store + Clone + Sync + Send + 'static, + DB: Blockstore + Clone + Sync + Send + 'static, { validation::validate_block::<_, _>(state_manager, self.beacon.clone(), block).await } diff --git a/blockchain/consensus/fil_cns/src/validation.rs b/blockchain/consensus/fil_cns/src/validation.rs index c96666de23b6..c132077ccf0d 100644 --- a/blockchain/consensus/fil_cns/src/validation.rs +++ b/blockchain/consensus/fil_cns/src/validation.rs @@ -9,7 +9,6 @@ use fil_actors_runtime_v9::runtime::DomainSeparationTag; use forest_beacon::{Beacon, BeaconEntry, BeaconSchedule, IGNORE_DRAND_VAR}; use forest_blocks::{Block, BlockHeader, Tipset}; use forest_chain_sync::collect_errs; -use forest_db::Store; use forest_fil_types::verifier::verify_winning_post; use forest_networks::{ChainConfig, Height}; use forest_shim::{address::Address, randomness::Randomness, version::NetworkVersion}; @@ -34,10 +33,7 @@ fn to_errs>(e: E) -> NonEmpty( +pub(crate) async fn validate_block( state_manager: Arc>, beacon_schedule: Arc>, block: Arc, @@ -200,7 +196,7 @@ fn block_timestamp_checks( // Check that the miner power can be loaded. // Doesn't check that the miner actually has any power. -fn validate_miner( +fn validate_miner( state_manager: &StateManager, miner_addr: &Address, tipset_state: &Cid, @@ -224,7 +220,7 @@ fn validate_miner( Ok(()) } -fn validate_winner_election( +fn validate_winner_election( header: &BlockHeader, base_tipset: &Tipset, lookback_tipset: &Tipset, @@ -334,7 +330,7 @@ fn verify_election_post_vrf( verify_bls_sig(evrf, rand, &worker.into()).map_err(FilecoinConsensusError::VrfValidation) } -fn verify_winning_post_proof( +fn verify_winning_post_proof( state_manager: &StateManager, network_version: NetworkVersion, header: &BlockHeader, diff --git a/blockchain/message_pool/src/msgpool/provider.rs b/blockchain/message_pool/src/msgpool/provider.rs index 4c04f009d2ec..15e26f515937 100644 --- a/blockchain/message_pool/src/msgpool/provider.rs +++ b/blockchain/message_pool/src/msgpool/provider.rs @@ -7,7 +7,6 @@ use async_trait::async_trait; use cid::{multihash::Code::Blake2b256, Cid}; use forest_blocks::{BlockHeader, Tipset, TipsetKeys}; use forest_chain::HeadChange; -use forest_db::Store; use forest_message::{ChainMessage, SignedMessage}; use forest_networks::Height; use forest_shim::{ @@ -61,11 +60,11 @@ pub struct MpoolRpcProvider { impl MpoolRpcProvider where - DB: Blockstore + Store + Clone + Sync + Send, + DB: Blockstore + Clone + Sync + Send, { pub fn new(subscriber: Publisher, sm: Arc>) -> Self where - DB: Blockstore + Store + Clone, + DB: Blockstore + Clone, { MpoolRpcProvider { subscriber, sm } } @@ -74,7 +73,7 @@ where #[async_trait] impl Provider for MpoolRpcProvider where - DB: Blockstore + Store + Clone + Sync + Send + 'static, + DB: Blockstore + Clone + Sync + Send + 'static, { fn subscribe_head_changes(&self) -> Subscriber { self.subscriber.subscribe() diff --git a/blockchain/state_manager/src/chain_rand.rs b/blockchain/state_manager/src/chain_rand.rs index c58e1c6a0d97..94a8d3c1c1d3 100644 --- a/blockchain/state_manager/src/chain_rand.rs +++ b/blockchain/state_manager/src/chain_rand.rs @@ -9,7 +9,6 @@ use byteorder::{BigEndian, WriteBytesExt}; use forest_beacon::{Beacon, BeaconEntry, BeaconSchedule, DrandBeacon}; use forest_blocks::{Tipset, TipsetKeys}; use forest_chain::ChainStore; -use forest_db::Store; use forest_encoding::blake2b_256; use forest_networks::ChainConfig; use fvm::externs::Rand as Rand_v2; @@ -38,7 +37,7 @@ impl Clone for ChainRand { impl ChainRand where - DB: Blockstore + Store + Send + Sync, + DB: Blockstore + Send + Sync, { pub fn new( chain_config: Arc, @@ -190,7 +189,7 @@ where impl Rand_v2 for ChainRand where - DB: Blockstore + Store + Send + Sync, + DB: Blockstore + Send + Sync, { fn get_chain_randomness( &self, @@ -213,7 +212,7 @@ where impl Rand_v3 for ChainRand where - DB: Blockstore + Store + Send + Sync, + DB: Blockstore + Send + Sync, { fn get_chain_randomness( &self, diff --git a/blockchain/state_manager/src/lib.rs b/blockchain/state_manager/src/lib.rs index a2d0d0deeddd..b0be70c42afc 100644 --- a/blockchain/state_manager/src/lib.rs +++ b/blockchain/state_manager/src/lib.rs @@ -19,7 +19,6 @@ use fil_actors_runtime_v9::runtime::Policy; use forest_beacon::{BeaconSchedule, DrandBeacon}; use forest_blocks::{BlockHeader, Tipset, TipsetKeys}; use forest_chain::{ChainStore, HeadChange}; -use forest_db::Store; use forest_interpreter::{resolve_to_key_addr, BlockMessages, RewardCalc, VM}; use forest_json::message_receipt; use forest_message::{ChainMessage, Message as MessageTrait}; @@ -214,7 +213,7 @@ pub struct StateManager { impl StateManager where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, { pub fn new( cs: Arc>, @@ -939,7 +938,7 @@ where confidence: i64, ) -> Result<(Option>, Option), Error> where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, { let mut subscriber = self.cs.publisher().subscribe(); let (sender, mut receiver) = oneshot::channel::<()>(); @@ -1263,7 +1262,7 @@ fn chain_epoch_root( tipset: Arc, ) -> Box anyhow::Result> where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, { Box::new(move |round| { let (_, st) = sm.get_lookback_tipset_for_round(tipset.clone(), round)?; diff --git a/blockchain/state_manager/src/utils.rs b/blockchain/state_manager/src/utils.rs index 1b8cb88566e9..7603ecaf8184 100644 --- a/blockchain/state_manager/src/utils.rs +++ b/blockchain/state_manager/src/utils.rs @@ -3,7 +3,6 @@ use cid::Cid; use fil_actor_interface::{is_account_actor, is_eth_account_actor, is_placeholder_actor, miner}; -use forest_db::Store; use forest_fil_types::verifier::generate_winning_post_sector_challenge; use forest_shim::{ address::{Address, Payload}, @@ -19,7 +18,7 @@ use crate::{errors::*, StateManager}; impl StateManager where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, { /// Retrieves and generates a vector of sector info for the winning `PoSt` /// verification. diff --git a/blockchain/state_manager/src/vm_circ_supply.rs b/blockchain/state_manager/src/vm_circ_supply.rs index bb68ac2dc98e..1d4539f62ac4 100644 --- a/blockchain/state_manager/src/vm_circ_supply.rs +++ b/blockchain/state_manager/src/vm_circ_supply.rs @@ -5,7 +5,6 @@ use anyhow::Context; use cid::Cid; use fil_actor_interface::{market, power, reward}; use forest_chain::*; -use forest_db::Store; use forest_networks::{ChainConfig, Height}; use forest_shim::{ address::Address, @@ -65,7 +64,7 @@ impl GenesisInfo { } // Allows generation of the current circulating supply - pub fn get_circulating_supply( + pub fn get_circulating_supply( &self, height: ChainEpoch, db: &DB, @@ -149,7 +148,7 @@ fn get_fil_vested(genesis_info: &GenesisInfo, height: ChainEpoch) -> TokenAmount return_value } -fn get_fil_mined( +fn get_fil_mined( state_tree: &StateTree, ) -> Result { let actor = state_tree @@ -160,7 +159,7 @@ fn get_fil_mined( Ok(state.into_total_storage_power_reward().into()) } -fn get_fil_market_locked( +fn get_fil_market_locked( state_tree: &StateTree, ) -> Result { let actor = state_tree @@ -171,7 +170,7 @@ fn get_fil_market_locked( Ok(state.total_locked().into()) } -fn get_fil_power_locked( +fn get_fil_power_locked( state_tree: &StateTree, ) -> Result { let actor = state_tree @@ -182,7 +181,7 @@ fn get_fil_power_locked( Ok(state.into_total_locked().into()) } -fn get_fil_reserve_disbursed( +fn get_fil_reserve_disbursed( state_tree: &StateTree, ) -> Result { let fil_reserved: TokenAmount = TokenAmount::from_whole(300_000_000); @@ -192,7 +191,7 @@ fn get_fil_reserve_disbursed( Ok(TokenAmount::from(&*fil_reserved - &reserve_actor.balance)) } -fn get_fil_locked( +fn get_fil_locked( state_tree: &StateTree, ) -> Result { let market_locked = get_fil_market_locked(state_tree)?; @@ -200,7 +199,7 @@ fn get_fil_locked( Ok(power_locked + market_locked) } -fn get_fil_burnt( +fn get_fil_burnt( state_tree: &StateTree, ) -> Result { let burnt_actor = get_actor_state(state_tree, &Address::BURNT_FUNDS_ACTOR)?; diff --git a/documentation/src/cli.md b/documentation/src/cli.md index 22004b343f1c..1b556d0874a3 100644 --- a/documentation/src/cli.md +++ b/documentation/src/cli.md @@ -14,7 +14,7 @@ prepend it to the command, like so: On Linux, you can set the environment variable with the following syntax -`export FULLNOPDE_API_INFO="..."` +`export FULLNODE_API_INFO="..."` Setting your API info this way will limit the value to your current session. Look online for ways to persist this variable if desired. diff --git a/forest/cli/src/cli/snapshot_cmd.rs b/forest/cli/src/cli/snapshot_cmd.rs index d59240d7bc9b..ea6aa814ae17 100644 --- a/forest/cli/src/cli/snapshot_cmd.rs +++ b/forest/cli/src/cli/snapshot_cmd.rs @@ -11,10 +11,7 @@ use forest_chain::ChainStore; use forest_cli_shared::cli::{ default_snapshot_dir, is_car_or_tmp, snapshot_fetch, SnapshotServer, SnapshotStore, }; -use forest_db::{ - db_engine::{db_root, open_proxy_db}, - Store, -}; +use forest_db::db_engine::{db_root, open_proxy_db}; use forest_genesis::{forest_load_car, read_genesis_header}; use forest_ipld::{recurse_links_hash, CidHashSet, DEFAULT_RECENT_STATE_ROOTS}; use forest_rpc_api::chain_api::ChainExportParams; @@ -441,7 +438,7 @@ async fn validate_links_and_genesis_traversal( network: &str, ) -> anyhow::Result<()> where - DB: fvm_ipld_blockstore::Blockstore + Store + Send + Sync, + DB: fvm_ipld_blockstore::Blockstore + Send + Sync, { let mut seen = CidHashSet::default(); let upto = ts.epoch() - recent_stateroots; diff --git a/forest/daemon/src/daemon.rs b/forest/daemon/src/daemon.rs index 696a79ce4b7b..cecbd410690b 100644 --- a/forest/daemon/src/daemon.rs +++ b/forest/daemon/src/daemon.rs @@ -369,6 +369,12 @@ pub(super) async fn start(opts: CliOpts, config: Config) -> anyhow::Result, Vec)>, - buffer_capacity_bytes: usize, - ) -> anyhow::Result<()> { - let start = Utc::now(); - let mut total_bytes = 0; - let mut total_entries = 0; - let mut estimated_buffer_bytes = 0; - let mut buffer = vec![]; - while let Ok((key, value)) = rx.recv_async().await { - estimated_buffer_bytes += key.len() + value.len(); - total_bytes += key.len() + value.len(); - total_entries += 1; - buffer.push((key, value)); - if estimated_buffer_bytes >= buffer_capacity_bytes { - self.bulk_write(std::mem::take(&mut buffer))?; - estimated_buffer_bytes = 0; - } - } - self.bulk_write(buffer)?; - info!( - "Buffered write completed: total entries: {total_entries}, total size: {}, took: {}s", - total_bytes.human_count_bytes(), - (Utc::now() - start).num_seconds() - ); - - Ok(()) - } -} - -impl StoreExt for T {} diff --git a/node/db/src/lib.rs b/node/db/src/lib.rs index 6a91b1ce1d6b..6dc875979dcc 100644 --- a/node/db/src/lib.rs +++ b/node/db/src/lib.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0, MIT mod errors; -mod ext; mod memory; mod metrics; @@ -16,7 +15,6 @@ pub mod parity_db_config; pub mod rocks_config; pub use errors::Error; -pub use ext::StoreExt; pub use memory::MemoryDB; #[cfg(any(feature = "paritydb", feature = "rocksdb"))] diff --git a/node/db/src/parity_db.rs b/node/db/src/parity_db.rs index 2700aa2bd421..d8a005c5e86d 100644 --- a/node/db/src/parity_db.rs +++ b/node/db/src/parity_db.rs @@ -134,8 +134,7 @@ impl Blockstore for ParityDb { { let values = blocks .into_iter() - .map(|(k, v)| (k.to_bytes(), v.as_ref().to_vec())) - .collect::>(); + .map(|(k, v)| (k.to_bytes(), v.as_ref().to_vec())); self.bulk_write(values).map_err(|e| e.into()) } } diff --git a/node/db/src/rolling/gc.rs b/node/db/src/rolling/gc.rs index 86d68e1d83ef..ad16e3f6f465 100644 --- a/node/db/src/rolling/gc.rs +++ b/node/db/src/rolling/gc.rs @@ -68,12 +68,12 @@ use std::{ use chrono::Utc; use forest_blocks::Tipset; use forest_ipld::util::*; +use forest_utils::db::BlockstoreBufferedWriteExt; use fvm_ipld_blockstore::Blockstore; use human_repr::HumanCount; use tokio::sync::Mutex; use super::*; -use crate::StoreExt; pub struct DbGarbageCollector where @@ -200,8 +200,9 @@ where .get(&cid)? .ok_or_else(|| anyhow::anyhow!("Cid {cid} not found in blockstore"))?; - let pair = (cid.to_bytes(), block.clone()); - reachable_bytes.fetch_add(pair.0.len() + pair.1.len(), atomic::Ordering::Relaxed); + let pair = (cid, block.clone()); + // Key size is 32 bytes in paritydb + reachable_bytes.fetch_add(32 + pair.1.len(), atomic::Ordering::Relaxed); if !db.current().has(&cid)? { tx.send_async(pair).await?; } diff --git a/node/db/src/rolling/impls.rs b/node/db/src/rolling/impls.rs index d98ee945c392..cc000c9ea7e2 100644 --- a/node/db/src/rolling/impls.rs +++ b/node/db/src/rolling/impls.rs @@ -200,7 +200,7 @@ impl RollingDB { let old_db_path = self.db_root.join(&db_index_inner_mut.old); db_index_inner_mut.old = db_index_inner_mut.current.clone(); db_index_inner_mut.current = new_db_name; - db_index.flush_to_file()?; + db_index.sync()?; delete_db(&old_db_path); Ok(()) @@ -228,8 +228,11 @@ impl RollingDB { } fn load_dbs(db_root: &Path, db_config: &DbConfig) -> anyhow::Result<(FileBacked, Db, Db)> { - let mut db_index = - FileBacked::load_from_file_or_create(db_root.join("db_index.yaml"), Default::default)?; + let mut db_index = FileBacked::load_from_file_or_create( + db_root.join("db_index.yaml"), + Default::default, + None, + )?; let db_index_mut: &mut DbIndex = db_index.inner_mut(); if db_index_mut.current.is_empty() { db_index_mut.current = Uuid::new_v4().simple().to_string(); @@ -239,7 +242,7 @@ fn load_dbs(db_root: &Path, db_config: &DbConfig) -> anyhow::Result<(FileBacked< } let current = open_db(&db_root.join(&db_index_mut.current), db_config)?; let old = open_db(&db_root.join(&db_index_mut.old), db_config)?; - db_index.flush_to_file()?; + db_index.sync()?; Ok((db_index, current, old)) } diff --git a/node/forest_libp2p/src/chain_exchange/provider.rs b/node/forest_libp2p/src/chain_exchange/provider.rs index 49e555e36a44..618d212a846b 100644 --- a/node/forest_libp2p/src/chain_exchange/provider.rs +++ b/node/forest_libp2p/src/chain_exchange/provider.rs @@ -5,7 +5,6 @@ use ahash::{HashMap, HashMapExt}; use cid::Cid; use forest_blocks::{Tipset, TipsetKeys}; use forest_chain::{ChainStore, Error as ChainError}; -use forest_db::Store; use fvm_ipld_blockstore::Blockstore; use log::debug; @@ -20,7 +19,7 @@ pub fn make_chain_exchange_response( request: &ChainExchangeRequest, ) -> ChainExchangeResponse where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, { let mut response_chain: Vec = Vec::with_capacity(request.request_len as usize); @@ -89,7 +88,7 @@ where // Builds CompactedMessages for given Tipset. fn compact_messages(db: &DB, tipset: &Tipset) -> Result where - DB: Blockstore + Store + Clone, + DB: Blockstore + Clone, { let mut bls_messages_order = HashMap::new(); let mut secp_messages_order = HashMap::new(); diff --git a/node/forest_libp2p/src/service.rs b/node/forest_libp2p/src/service.rs index 31d608bfbb57..f2c64048cab1 100644 --- a/node/forest_libp2p/src/service.rs +++ b/node/forest_libp2p/src/service.rs @@ -13,7 +13,6 @@ use cid::Cid; use flume::Sender; use forest_blocks::GossipBlock; use forest_chain::ChainStore; -use forest_db::Store; use forest_libp2p_bitswap::{ request_manager::BitswapRequestManager, BitswapStoreRead, BitswapStoreReadWrite, }; @@ -193,7 +192,7 @@ pub struct Libp2pService { impl Libp2pService where - DB: Blockstore + Store + BitswapStoreReadWrite + Clone + Sync + Send + 'static, + DB: Blockstore + BitswapStoreReadWrite + Clone + Sync + Send + 'static, { pub fn new( config: Libp2pConfig, @@ -674,7 +673,7 @@ async fn handle_chain_exchange_event( ChainExchangeResponse, )>, ) where - DB: Blockstore + Store + Clone + Sync + Send + 'static, + DB: Blockstore + Clone + Sync + Send + 'static, { match ce_event { request_response::Event::Message { peer, message } => { @@ -760,7 +759,7 @@ async fn handle_forest_behaviour_event( pubsub_block_str: &str, pubsub_msg_str: &str, ) where - DB: Blockstore + Store + BitswapStoreRead + Clone + Sync + Send + 'static, + DB: Blockstore + BitswapStoreRead + Clone + Sync + Send + 'static, { match event { ForestBehaviourEvent::Discovery(discovery_out) => { diff --git a/node/rpc/src/beacon_api.rs b/node/rpc/src/beacon_api.rs index 74bfe61079a3..c4752cba4c0f 100644 --- a/node/rpc/src/beacon_api.rs +++ b/node/rpc/src/beacon_api.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0, MIT use forest_beacon::{json::BeaconEntryJson, Beacon}; -use forest_db::Store; use forest_rpc_api::{beacon_api::*, data_types::RPCState}; use fvm_ipld_blockstore::Blockstore; use jsonrpc_v2::{Data, Error as JsonRpcError, Params}; @@ -15,7 +14,7 @@ pub(crate) async fn beacon_get_entry( Params(params): Params, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let (first,) = params; diff --git a/node/rpc/src/chain_api.rs b/node/rpc/src/chain_api.rs index 7db9710a0366..32167a7782ca 100644 --- a/node/rpc/src/chain_api.rs +++ b/node/rpc/src/chain_api.rs @@ -13,7 +13,6 @@ use forest_blocks::{ header::json::BlockHeaderJson, tipset_json::TipsetJson, tipset_keys_json::TipsetKeysJson, BlockHeader, Tipset, }; -use forest_db::Store; use forest_json::{cid::CidJson, message::json::MessageJson}; use forest_rpc_api::{ chain_api::*, @@ -40,7 +39,7 @@ pub(crate) async fn chain_get_message( Params(params): Params, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let (CidJson(msg_cid),) = params; @@ -64,7 +63,7 @@ pub(crate) async fn chain_export( }): Params, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { lazy_static::lazy_static! { @@ -147,7 +146,7 @@ pub(crate) async fn chain_read_obj( Params(params): Params, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let (CidJson(obj_cid),) = params; @@ -164,7 +163,7 @@ pub(crate) async fn chain_has_obj( Params(params): Params, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let (CidJson(obj_cid),) = params; @@ -176,7 +175,7 @@ pub(crate) async fn chain_get_block_messages( Params(params): Params, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let (CidJson(blk_cid),) = params; @@ -211,7 +210,7 @@ pub(crate) async fn chain_get_tipset_by_height( Params(params): Params, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let (height, tsk) = params; @@ -227,7 +226,7 @@ pub(crate) async fn chain_get_genesis( data: Data>, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let genesis = data.state_manager.chain_store().genesis()?; @@ -239,7 +238,7 @@ pub(crate) async fn chain_head( data: Data>, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let heaviest = data.state_manager.chain_store().heaviest_tipset(); @@ -251,7 +250,7 @@ pub(crate) async fn chain_get_block( Params(params): Params, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let (CidJson(blk_cid),) = params; @@ -268,7 +267,7 @@ pub(crate) async fn chain_get_tipset( Params(params): Params, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let (TipsetKeysJson(tsk),) = params; @@ -281,7 +280,7 @@ pub(crate) async fn chain_get_tipset_hash( Params(params): Params, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let (TipsetKeysJson(tsk),) = params; @@ -294,7 +293,7 @@ pub(crate) async fn chain_validate_tipset_checkpoints( Params(params): Params, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let () = params; @@ -314,7 +313,7 @@ pub(crate) async fn chain_get_name( data: Data>, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let name: String = data.state_manager.chain_config().name.clone(); diff --git a/node/rpc/src/db_api.rs b/node/rpc/src/db_api.rs index a31bf1f8d85f..e2c8975ec70c 100644 --- a/node/rpc/src/db_api.rs +++ b/node/rpc/src/db_api.rs @@ -2,12 +2,11 @@ // SPDX-License-Identifier: Apache-2.0, MIT use forest_beacon::Beacon; -use forest_db::Store; use forest_rpc_api::{data_types::RPCState, db_api::*}; use fvm_ipld_blockstore::Blockstore; use jsonrpc_v2::{Data, Error as JsonRpcError, Params}; -pub(crate) async fn db_gc( +pub(crate) async fn db_gc( data: Data>, Params(_): Params, ) -> Result { diff --git a/node/rpc/src/gas_api.rs b/node/rpc/src/gas_api.rs index faeae91f3d2e..5bd8a5f8747f 100644 --- a/node/rpc/src/gas_api.rs +++ b/node/rpc/src/gas_api.rs @@ -5,7 +5,6 @@ use forest_beacon::Beacon; use forest_blocks::{tipset_keys_json::TipsetKeysJson, TipsetKeys}; use forest_chain::{BASE_FEE_MAX_CHANGE_DENOM, BLOCK_GAS_TARGET, MINIMUM_BASE_FEE}; -use forest_db::Store; use forest_json::{address::json::AddressJson, message::json::MessageJson}; use forest_message::{ChainMessage, Message as MessageTrait}; use forest_rpc_api::{ @@ -28,7 +27,7 @@ pub(crate) async fn gas_estimate_fee_cap( Params(params): Params, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let (MessageJson(msg), max_queue_blks, TipsetKeysJson(tsk)) = params; @@ -43,7 +42,7 @@ fn estimate_fee_cap( _tsk: TipsetKeys, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let ts = data.state_manager.chain_store().heaviest_tipset(); @@ -66,7 +65,7 @@ pub(crate) async fn gas_estimate_gas_premium( Params(params): Params, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let (nblocksincl, AddressJson(_sender), _gas_limit, TipsetKeysJson(_tsk)) = params; @@ -80,7 +79,7 @@ async fn estimate_gas_premium( mut nblocksincl: u64, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { if nblocksincl == 0 { @@ -167,7 +166,7 @@ pub(crate) async fn gas_estimate_gas_limit( Params(params): Params, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let (MessageJson(msg), TipsetKeysJson(tsk)) = params; @@ -180,7 +179,7 @@ async fn estimate_gas_limit( _: TipsetKeys, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let mut msg = msg; @@ -223,7 +222,7 @@ pub(crate) async fn gas_estimate_message_gas( Params(params): Params, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let (MessageJson(msg), spec, TipsetKeysJson(tsk)) = params; @@ -239,7 +238,7 @@ pub(crate) async fn estimate_message_gas( tsk: TipsetKeys, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let mut msg = msg; diff --git a/node/rpc/src/lib.rs b/node/rpc/src/lib.rs index 5fa0e8af66a4..b37aa56775dd 100644 --- a/node/rpc/src/lib.rs +++ b/node/rpc/src/lib.rs @@ -21,7 +21,6 @@ use std::{net::TcpListener, sync::Arc}; use axum::routing::{get, post}; use forest_beacon::Beacon; use forest_chain::Scale; -use forest_db::Store; use forest_rpc_api::{ auth_api::*, beacon_api::*, chain_api::*, common_api::*, data_types::RPCState, db_api::*, gas_api::*, mpool_api::*, net_api::*, state_api::*, sync_api::*, wallet_api::*, @@ -46,7 +45,7 @@ pub async fn start_rpc( shutdown_send: Sender<()>, ) -> Result<(), JSONRPCError> where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, S: Scale + 'static, { diff --git a/node/rpc/src/mpool_api.rs b/node/rpc/src/mpool_api.rs index 9c252bcca2ee..4e71efe9dddb 100644 --- a/node/rpc/src/mpool_api.rs +++ b/node/rpc/src/mpool_api.rs @@ -7,7 +7,6 @@ use std::convert::TryFrom; use ahash::{HashSet, HashSetExt}; use forest_beacon::Beacon; use forest_blocks::TipsetKeys; -use forest_db::Store; use forest_json::{ cid::{vec::CidJsonVec, CidJson}, message::json::MessageJson, @@ -28,7 +27,7 @@ pub(crate) async fn mpool_pending( Params(params): Params, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let (CidJsonVec(cid_vec),) = params; @@ -88,7 +87,7 @@ pub(crate) async fn mpool_push( Params(params): Params, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let (SignedMessageJson(smsg),) = params; @@ -104,7 +103,7 @@ pub(crate) async fn mpool_push_message( Params(params): Params, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let (MessageJson(umsg), spec) = params; diff --git a/node/rpc/src/net_api.rs b/node/rpc/src/net_api.rs index 3f247d3efb03..980186bc3f50 100644 --- a/node/rpc/src/net_api.rs +++ b/node/rpc/src/net_api.rs @@ -4,7 +4,6 @@ use std::str::FromStr; use forest_beacon::Beacon; -use forest_db::Store; use forest_libp2p::{NetRPCMethods, NetworkMessage, PeerId}; use forest_rpc_api::{ data_types::{AddrInfo, RPCState}, @@ -15,10 +14,7 @@ use fvm_ipld_blockstore::Blockstore; use jsonrpc_v2::{Data, Error as JsonRpcError, Params}; use log::error; -pub(crate) async fn net_addrs_listen< - DB: Blockstore + Store + Clone + Send + Sync + 'static, - B: Beacon, ->( +pub(crate) async fn net_addrs_listen( data: Data>, ) -> Result { let (tx, rx) = oneshot::channel(); @@ -35,7 +31,7 @@ pub(crate) async fn net_addrs_listen< }) } -pub(crate) async fn net_peers( +pub(crate) async fn net_peers( data: Data>, ) -> Result { let (tx, rx) = oneshot::channel(); @@ -57,10 +53,7 @@ pub(crate) async fn net_peers( +pub(crate) async fn net_connect( data: Data>, Params(params): Params, ) -> Result { @@ -84,10 +77,7 @@ pub(crate) async fn net_connect< } } -pub(crate) async fn net_disconnect< - DB: Blockstore + Store + Clone + Send + Sync + 'static, - B: Beacon, ->( +pub(crate) async fn net_disconnect( data: Data>, Params(params): Params, ) -> Result { diff --git a/node/rpc/src/state_api.rs b/node/rpc/src/state_api.rs index 8f37e8e79fd8..49ece2499126 100644 --- a/node/rpc/src/state_api.rs +++ b/node/rpc/src/state_api.rs @@ -7,7 +7,6 @@ use cid::Cid; use fil_actor_interface::market; use forest_beacon::Beacon; use forest_blocks::tipset_keys_json::TipsetKeysJson; -use forest_db::Store; use forest_ipld::json::IpldJson; use forest_json::cid::CidJson; use forest_rpc_api::{ @@ -24,10 +23,7 @@ use libipld_core::ipld::Ipld; // defaulting to Full). /// runs the given message and returns its result without any persisted changes. -pub(crate) async fn state_call< - DB: Blockstore + Store + Clone + Send + Sync + 'static, - B: Beacon, ->( +pub(crate) async fn state_call( data: Data>, Params(params): Params, ) -> Result { @@ -43,10 +39,7 @@ pub(crate) async fn state_call< /// returns the result of executing the indicated message, assuming it was /// executed in the indicated tipset. -pub(crate) async fn state_replay< - DB: Blockstore + Store + Clone + Send + Sync + 'static, - B: Beacon, ->( +pub(crate) async fn state_replay( data: Data>, Params(params): Params, ) -> Result { @@ -68,7 +61,7 @@ pub(crate) async fn state_replay< /// gets network name from state manager pub(crate) async fn state_network_name< - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, >( data: Data>, @@ -82,7 +75,7 @@ pub(crate) async fn state_network_name< } pub(crate) async fn state_get_network_version< - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, >( data: Data>, @@ -96,7 +89,7 @@ pub(crate) async fn state_get_network_version< /// looks up the Escrow and Locked balances of the given address in the Storage /// Market pub(crate) async fn state_market_balance< - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, >( data: Data>, @@ -114,7 +107,7 @@ pub(crate) async fn state_market_balance< } pub(crate) async fn state_market_deals< - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, >( data: Data>, @@ -151,10 +144,7 @@ pub(crate) async fn state_market_deals< } /// returns the message receipt for the given message -pub(crate) async fn state_get_receipt< - DB: Blockstore + Store + Clone + Send + Sync + 'static, - B: Beacon, ->( +pub(crate) async fn state_get_receipt( data: Data>, Params(params): Params, ) -> Result { @@ -172,10 +162,7 @@ pub(crate) async fn state_get_receipt< } /// looks back in the chain for a message. If not found, it blocks until the /// message arrives on chain, and gets to the indicated confidence depth. -pub(crate) async fn state_wait_msg< - DB: Blockstore + Store + Clone + Send + Sync + 'static, - B: Beacon, ->( +pub(crate) async fn state_wait_msg( data: Data>, Params(params): Params, ) -> Result { diff --git a/node/rpc/src/wallet_api.rs b/node/rpc/src/wallet_api.rs index 87510cb03038..6485256a7712 100644 --- a/node/rpc/src/wallet_api.rs +++ b/node/rpc/src/wallet_api.rs @@ -5,7 +5,6 @@ use std::{convert::TryFrom, str::FromStr}; use base64::{prelude::BASE64_STANDARD, Engine}; use forest_beacon::Beacon; -use forest_db::Store; use forest_json::{address::json::AddressJson, signature::json::SignatureJson}; use forest_key_management::{json::KeyInfoJson, Error, Key}; use forest_rpc_api::{data_types::RPCState, wallet_api::*}; @@ -20,7 +19,7 @@ pub(crate) async fn wallet_balance( Params(params): Params, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let (addr_str,) = params; @@ -189,7 +188,7 @@ pub(crate) async fn wallet_sign( Params(params): Params, ) -> Result where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, B: Beacon, { let state_manager = &data.state_manager; diff --git a/utils/forest_utils/Cargo.toml b/utils/forest_utils/Cargo.toml index 94328a2e6327..f19e121f9912 100644 --- a/utils/forest_utils/Cargo.toml +++ b/utils/forest_utils/Cargo.toml @@ -7,16 +7,20 @@ authors.workspace = true edition.workspace = true [dependencies] +ahash.workspace = true anyhow.workspace = true async-trait.workspace = true atty.workspace = true +chrono.workspace = true cid.workspace = true const_format = "0.2.26" digest.workspace = true +flume.workspace = true forest_encoding.workspace = true futures.workspace = true fvm_ipld_blockstore.workspace = true fvm_ipld_encoding.workspace = true +human-repr.workspace = true hyper-rustls.workspace = true hyper.workspace = true libc = "0.2" diff --git a/utils/forest_utils/src/db/file_backed_obj.rs b/utils/forest_utils/src/db/file_backed_obj.rs index db2bbf595023..663faed1ee36 100644 --- a/utils/forest_utils/src/db/file_backed_obj.rs +++ b/utils/forest_utils/src/db/file_backed_obj.rs @@ -1,16 +1,25 @@ // Copyright 2019-2023 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use std::{path::PathBuf, str::FromStr}; +use std::{ + path::PathBuf, + str::FromStr, + time::{Duration, SystemTime}, +}; +use ahash::HashSet; use cid::Cid; use log::warn; pub struct FileBacked { inner: T, path: PathBuf, + last_sync: Option, + sync_period: Option, } +pub const SYNC_PERIOD: Duration = Duration::from_secs(600); + impl FileBacked { /// Gets a borrow of the inner object pub fn inner(&self) -> &T { @@ -22,55 +31,92 @@ impl FileBacked { &mut self.inner } - /// Sets the inner object and flushes to file + /// Sets the inner object and try sync to file pub fn set_inner(&mut self, inner: T) -> anyhow::Result<()> { self.inner = inner; - self.flush_to_file() + self.try_sync() + } + + /// Calls func with inner mutable reference and try sync to file + pub fn with_inner(&mut self, func: F) -> anyhow::Result<()> + where + F: FnOnce(&mut T), + { + func(&mut self.inner); + self.try_sync() } /// Creates a new file backed object pub fn new(inner: T, path: PathBuf) -> Self { - Self { inner, path } + Self { + inner, + path, + last_sync: None, + sync_period: None, + } } /// Loads an object from a file and creates a new instance pub fn load_from_file_or_create T>( path: PathBuf, create: F, + sync_period: Option, ) -> anyhow::Result { - let mut need_flush = false; + let mut need_sync = false; let obj = if path.is_file() { let bytes = std::fs::read(path.as_path())?; Self { inner: T::deserialize(&bytes) .map_err(|e| { warn!("Error loading object from {}", path.display()); - need_flush = true; + need_sync = true; e }) .unwrap_or_else(|_| create()), path, + last_sync: None, + sync_period, } } else { - need_flush = true; + need_sync = true; Self { inner: create(), path, + last_sync: None, + sync_period, } }; - if need_flush { - obj.flush_to_file()?; + if need_sync { + obj.sync()?; } Ok(obj) } - /// Flushes the object to the file - pub fn flush_to_file(&self) -> anyhow::Result<()> { + /// Syncs the object to the file + pub fn sync(&self) -> anyhow::Result<()> { let bytes = self.inner().serialize()?; Ok(std::fs::write(&self.path, bytes)?) } + + /// Try to sync to file if there is some sync period, otherwise syncs + pub fn try_sync(&mut self) -> anyhow::Result<()> { + if let Some(sync_period) = self.sync_period { + let now = SystemTime::now(); + if let Some(last_sync) = self.last_sync { + if now.duration_since(last_sync)? > sync_period { + self.last_sync = Some(now); + self.sync()?; + } + return Ok(()); + } + self.last_sync = Some(now); + } else { + self.sync()?; + } + Ok(()) + } } /// An object that is backed by a single file on disk @@ -92,6 +138,18 @@ impl FileBackedObject for Cid { } } +impl FileBackedObject for HashSet { + fn serialize(&self) -> anyhow::Result> { + let serialized = serde_json::to_string(&self)?; + Ok(serialized.into_bytes()) + } + + fn deserialize(bytes: &[u8]) -> anyhow::Result { + let result = serde_json::from_str(String::from_utf8_lossy(bytes).trim()); + Ok(result?) + } +} + #[cfg(test)] mod tests { use anyhow::*; @@ -113,11 +171,79 @@ mod tests { let dir = TempDir::new()?; let file_path = dir.path().join("CID"); let obj1: FileBacked = - FileBacked::load_from_file_or_create(file_path.clone(), || cid)?; + FileBacked::load_from_file_or_create(file_path.clone(), || cid, None)?; let obj2: FileBacked = - FileBacked::load_from_file_or_create(file_path, Default::default)?; + FileBacked::load_from_file_or_create(file_path, Default::default, None)?; ensure!(obj1.inner() == obj2.inner()); Ok(()) } + + #[test] + fn with_inner() -> Result<()> { + let mut bytes = [0; 1024]; + rand::rngs::OsRng.fill(&mut bytes); + let cid0 = Cid::new_v0(multihash::Code::Sha2_256.digest(bytes.as_slice()))?; + let serialized0 = cid0.serialize()?; + + rand::rngs::OsRng.fill(&mut bytes); + let cid1 = Cid::new_v0(multihash::Code::Sha2_256.digest(bytes.as_slice()))?; + let serialized1 = cid1.serialize()?; + + let dir = TempDir::new()?; + let file_path = dir.path().join("CID"); + let mut obj1: FileBacked = + FileBacked::load_from_file_or_create(file_path.clone(), || cid0, None)?; + // Check if content of file match the cid value + let result = std::fs::read(file_path.as_path())?; + ensure!(serialized0 == result); + + obj1.with_inner(|inner| *inner = cid1)?; + // Check if content of file match the new cid1 value + let result = std::fs::read(file_path.as_path())?; + ensure!(serialized1 == result); + + Ok(()) + } + + #[test] + fn with_inner_with_period() -> Result<()> { + const TEST_SYNC_PERIOD: Duration = Duration::from_millis(1); + + let mut bytes = [0; 1024]; + rand::rngs::OsRng.fill(&mut bytes); + let cid0 = Cid::new_v0(multihash::Code::Sha2_256.digest(bytes.as_slice()))?; + let serialized0 = cid0.serialize()?; + + rand::rngs::OsRng.fill(&mut bytes); + let cid1 = Cid::new_v0(multihash::Code::Sha2_256.digest(bytes.as_slice()))?; + let serialized1 = cid1.serialize()?; + + let dir = TempDir::new()?; + let file_path = dir.path().join("CID"); + let mut obj1: FileBacked = FileBacked::load_from_file_or_create( + file_path.clone(), + || cid0, + Some(TEST_SYNC_PERIOD), + )?; + // Check if content of file match the cid value + let result = std::fs::read(file_path.as_path())?; + ensure!(serialized0 == result); + + obj1.with_inner(|inner| *inner = cid1)?; + // Check if content of file still match the old cid0 value + let result = std::fs::read(file_path.as_path())?; + ensure!(obj1.inner() == &cid1); + ensure!(serialized0 == result); + + // Wait for the period + std::thread::sleep(TEST_SYNC_PERIOD); + + obj1.with_inner(|inner| *inner = cid1)?; + // Check now if content of file match the new cid1 value + let result = std::fs::read(file_path.as_path())?; + ensure!(serialized1 == result); + + Ok(()) + } } diff --git a/utils/forest_utils/src/db/mod.rs b/utils/forest_utils/src/db/mod.rs index 9580e19e63d1..6c1f8a169136 100644 --- a/utils/forest_utils/src/db/mod.rs +++ b/utils/forest_utils/src/db/mod.rs @@ -3,6 +3,8 @@ pub mod file_backed_obj; +use async_trait::async_trait; +use chrono::Utc; use cid::{ multihash::{Code, MultihashDigest}, Cid, @@ -10,6 +12,8 @@ use cid::{ use forest_encoding::{de::DeserializeOwned, ser::Serialize}; use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::{from_slice, to_vec, DAG_CBOR}; +use human_repr::HumanCount; +use log::info; /// Extension methods for inserting and retrieving IPLD data with CIDs pub trait BlockstoreExt: Blockstore { @@ -68,3 +72,40 @@ pub trait BlockstoreExt: Blockstore { } impl BlockstoreExt for T {} + +/// Extension methods for buffered write with manageable limit of RAM usage +#[async_trait] +pub trait BlockstoreBufferedWriteExt: Blockstore + Sized { + async fn buffered_write( + &self, + rx: flume::Receiver<(Cid, Vec)>, + buffer_capacity_bytes: usize, + ) -> anyhow::Result<()> { + let start = Utc::now(); + let mut total_bytes = 0; + let mut total_entries = 0; + let mut estimated_buffer_bytes = 0; + let mut buffer = vec![]; + while let Ok((key, value)) = rx.recv_async().await { + // Key is stored in 32 bytes in paritydb + estimated_buffer_bytes += 32 + value.len(); + total_bytes += 32 + value.len(); + total_entries += 1; + buffer.push((key, value)); + if estimated_buffer_bytes >= buffer_capacity_bytes { + self.put_many_keyed(std::mem::take(&mut buffer))?; + estimated_buffer_bytes = 0; + } + } + self.put_many_keyed(buffer)?; + info!( + "Buffered write completed: total entries: {total_entries}, total size: {}, took: {}s", + total_bytes.human_count_bytes(), + (Utc::now() - start).num_seconds() + ); + + Ok(()) + } +} + +impl BlockstoreBufferedWriteExt for T {} diff --git a/utils/genesis/Cargo.toml b/utils/genesis/Cargo.toml index 174699edcaae..b0c1f22cc2d7 100644 --- a/utils/genesis/Cargo.toml +++ b/utils/genesis/Cargo.toml @@ -14,7 +14,6 @@ anyhow.workspace = true cid.workspace = true flume.workspace = true forest_blocks.workspace = true -forest_db.workspace = true forest_state_manager.workspace = true forest_utils.workspace = true futures.workspace = true diff --git a/utils/genesis/src/lib.rs b/utils/genesis/src/lib.rs index 6ffaf493c451..bb2f6cd5a994 100644 --- a/utils/genesis/src/lib.rs +++ b/utils/genesis/src/lib.rs @@ -6,9 +6,11 @@ use std::{sync::Arc, time}; use anyhow::bail; use cid::Cid; use forest_blocks::{BlockHeader, Tipset, TipsetKeys}; -use forest_db::{Store, StoreExt}; use forest_state_manager::StateManager; -use forest_utils::{db::BlockstoreExt, net::FetchProgress}; +use forest_utils::{ + db::{BlockstoreBufferedWriteExt, BlockstoreExt}, + net::FetchProgress, +}; use fvm_ipld_blockstore::Blockstore; use fvm_ipld_car::{load_car, CarReader}; use log::{debug, info}; @@ -30,7 +32,7 @@ pub async fn read_genesis_header( db: &DB, ) -> Result where - DB: Blockstore + Store + Send + Sync, + DB: Blockstore + Send + Sync, { let genesis = match genesis_fp { Some(path) => { @@ -56,7 +58,7 @@ pub fn get_network_name_from_genesis( state_manager: &StateManager, ) -> Result where - BS: Blockstore + Store + Clone + Send + Sync + 'static, + BS: Blockstore + Clone + Send + Sync + 'static, { // Get network name from genesis state. let network_name = state_manager @@ -70,7 +72,7 @@ pub async fn initialize_genesis( state_manager: &StateManager, ) -> Result<(Tipset, String), anyhow::Error> where - BS: Blockstore + Store + Clone + Send + Sync + 'static, + BS: Blockstore + Clone + Send + Sync + 'static, { let genesis_bytes = state_manager.chain_config().genesis_bytes(); let genesis = @@ -83,7 +85,7 @@ where async fn process_car(reader: R, db: &BS) -> Result where R: AsyncRead + Send + Unpin, - BS: Blockstore + Store + Send + Sync, + BS: Blockstore + Send + Sync, { // Load genesis state into the database and get the Cid let genesis_cids: Vec = load_car(db, reader.compat()).await?; @@ -107,7 +109,7 @@ pub async fn import_chain( skip_load: bool, ) -> Result<(), anyhow::Error> where - DB: Blockstore + Store + Clone + Send + Sync + 'static, + DB: Blockstore + Clone + Send + Sync + 'static, { let is_remote_file: bool = path.starts_with("http://") || path.starts_with("https://"); @@ -145,8 +147,6 @@ where // Update head with snapshot header tipset sm.chain_store().set_heaviest_tipset(ts.clone())?; - sm.blockstore().flush()?; - if let Some(height) = validate_height { let height = if height > 0 { height @@ -170,7 +170,7 @@ async fn load_and_retrieve_header( skip_load: bool, ) -> anyhow::Result> where - DB: Store + Sync + Send + 'static, + DB: Blockstore + Send + Sync + 'static, R: AsyncRead + Send + Unpin, { let mut compat = reader.compat(); @@ -184,14 +184,10 @@ where Ok(result) } -/// Optimizations: -/// 1. ParityDB could benefit from a larger buffer. It's hard coded as 1000 -/// blocks in [fvm_ipld_car::load_car] 2. Use [Store::bulk_write] instead of -/// [Blockstore] to avoid tons of unneccesary allocations pub async fn forest_load_car(store: DB, reader: R) -> anyhow::Result> where R: futures::AsyncRead + Send + Unpin, - DB: Store + Sync + Send + 'static, + DB: Blockstore + Send + Sync + 'static, { // 1GB const BUFFER_CAPCITY_BYTES: usize = 1024 * 1024 * 1024; @@ -202,7 +198,7 @@ where tokio::spawn(async move { store.buffered_write(rx, BUFFER_CAPCITY_BYTES).await }); let mut car_reader = CarReader::new(reader).await?; while let Some(block) = car_reader.next_block().await? { - tx.send((block.cid.to_bytes(), block.data))?; + tx.send((block.cid, block.data))?; } drop(tx); write_task.await??; diff --git a/vm/state_migration/src/nv12/miner.rs b/vm/state_migration/src/nv12/miner.rs index ccb6a3646477..aedd15a17ce2 100644 --- a/vm/state_migration/src/nv12/miner.rs +++ b/vm/state_migration/src/nv12/miner.rs @@ -13,7 +13,7 @@ use cid::multihash::Code::Blake2b256; use cid::Cid; use fil_actor_interface::actorv3::miner::State as V3State; use fil_actor_interface::actorv4::miner::State as V4State; -use forest_db::Store; + use fvm_ipld_blockstore::Blockstore; use std::sync::Arc; diff --git a/vm/state_migration/src/nv12/mod.rs b/vm/state_migration/src/nv12/mod.rs index 344cc6b52e46..1d062ec79f9e 100644 --- a/vm/state_migration/src/nv12/mod.rs +++ b/vm/state_migration/src/nv12/mod.rs @@ -8,7 +8,7 @@ pub use miner::miner_migrator_v4; use crate::nil_migrator; use crate::StateMigration; use fil_actor_interface::{actorv3, actorv4}; -use forest_db::Store; + use fvm_ipld_blockstore::Blockstore; impl StateMigration {