Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into hm/db-gc
Browse files Browse the repository at this point in the history
  • Loading branch information
hanabi1224 committed Mar 17, 2023
2 parents c13ebb9 + 7c4edc4 commit 3c72222
Show file tree
Hide file tree
Showing 48 changed files with 351 additions and 279 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ Notable updates:

### Changed

- [database] Move blockstore meta-data to standalone files.
[2635](https://github.com/ChainSafe/forest/pull/2635)
[2652](https://github.com/ChainSafe/forest/pull/2652)
- [cli] Remove Forest ctrl-c hard shutdown behavior on subsequent ctrl-c
signals. [#2538](https://github.com/ChainSafe/forest/pull/2538)
- [libp2p] Use in house bitswap implementation.
Expand Down
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion blockchain/blocks/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mod tests {
fn tipset_keys_round_trip() -> Result<()> {
let path = Path::new("tests/calibnet/HEAD");
let obj1: FileBacked<TipsetKeys> =
FileBacked::load_from_file_or_create(path.into(), Default::default)?;
FileBacked::load_from_file_or_create(path.into(), Default::default, None)?;
let serialized = obj1.inner().serialize()?;
let deserialized = TipsetKeys::deserialize(&serialized)?;

Expand Down
57 changes: 32 additions & 25 deletions blockchain/chain/src/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@

use std::{num::NonZeroUsize, path::Path, sync::Arc, time::SystemTime};

use ahash::{HashMap, HashMapExt};
use ahash::{HashMap, HashMapExt, HashSet};
use anyhow::Result;
use async_stream::stream;
use bls_signatures::Serialize as SerializeBls;
use cid::{multihash::Code::Blake2b256, Cid};
use digest::Digest;
use forest_beacon::{BeaconEntry, IGNORE_DRAND_VAR};
use forest_blocks::{Block, BlockHeader, FullTipset, Tipset, TipsetKeys, TxMeta};
use forest_db::Store;
use forest_encoding::de::DeserializeOwned;
use forest_interpreter::BlockMessages;
use forest_ipld::{should_save_block_to_snapshot, walk_snapshot};
Expand All @@ -28,7 +27,10 @@ use forest_shim::{
state_tree::StateTree,
};
use forest_utils::{
db::{file_backed_obj::FileBacked, BlockstoreExt},
db::{
file_backed_obj::{FileBacked, SYNC_PERIOD},
BlockstoreExt,
},
io::Checksum,
};
use fvm_ipld_amt::Amtv0 as Amt;
Expand Down Expand Up @@ -56,8 +58,6 @@ use super::{
};
use crate::Scale;

const BLOCK_VAL_PREFIX: &[u8] = b"block_val/";

// A cap on the size of the future_sink
const SINK_CAP: usize = 200;

Expand Down Expand Up @@ -97,6 +97,9 @@ pub struct ChainStore<DB> {

/// File backed heaviest tipset keys
file_backed_heaviest_tipset_keys: Mutex<FileBacked<TipsetKeys>>,

/// File backed validated blocks
file_backed_validated_blocks: Mutex<FileBacked<HashSet<Cid>>>,
}

impl<DB> BitswapStoreRead for ChainStore<DB>
Expand Down Expand Up @@ -125,7 +128,7 @@ where

impl<DB> ChainStore<DB>
where
DB: Blockstore + Store + Send + Sync,
DB: Blockstore + Send + Sync,
{
pub fn new(
db: DB,
Expand All @@ -142,11 +145,17 @@ where
*genesis_block_header.cid(),
chain_data_root.join("GENESIS"),
));

let file_backed_heaviest_tipset_keys = Mutex::new(FileBacked::load_from_file_or_create(
chain_data_root.join("HEAD"),
|| TipsetKeys::new(vec![*genesis_block_header.cid()]),
None,
)?);
let file_backed_validated_blocks = Mutex::new(FileBacked::load_from_file_or_create(
chain_data_root.join("VALIDATED_BLOCKS"),
HashSet::default,
Some(SYNC_PERIOD),
)?);

let cs = Self {
publisher,
chain_index: ChainIndex::new(ts_cache.clone(), db.clone()),
Expand All @@ -155,6 +164,7 @@ where
ts_cache,
file_backed_genesis,
file_backed_heaviest_tipset_keys,
file_backed_validated_blocks,
};

cs.set_genesis(genesis_block_header)?;
Expand Down Expand Up @@ -268,20 +278,25 @@ where
Ok(())
}

/// Checks store if block has already been validated. Key based on the block
/// validation prefix.
/// Checks metadata file if block has already been validated.
pub fn is_block_validated(&self, cid: &Cid) -> Result<bool, Error> {
let key = block_validation_key(cid);

Ok(self.db.exists(key)?)
let validated = self
.file_backed_validated_blocks
.lock()
.inner()
.contains(cid);
if validated {
log::debug!("Block {cid} was previously validated");
}
Ok(validated)
}

/// Marks block as validated in the store. This is retrieved using the block
/// validation prefix.
/// Marks block as validated in the metadata file.
pub fn mark_block_as_validated(&self, cid: &Cid) -> Result<(), Error> {
let key = block_validation_key(cid);

Ok(self.db.write(key, [])?)
let mut file = self.file_backed_validated_blocks.lock();
Ok(file.with_inner(|inner| {
inner.insert(*cid);
})?)
}

/// Returns the tipset behind `tsk` at a given `height`.
Expand Down Expand Up @@ -619,14 +634,6 @@ where
Ok(ts)
}

/// Helper to ensure consistent CID to db key translation.
fn block_validation_key(cid: &Cid) -> Vec<u8> {
let mut key = Vec::new();
key.extend_from_slice(BLOCK_VAL_PREFIX);
key.extend(cid.to_bytes());
key
}

/// Returns a Tuple of BLS messages of type `UnsignedMessage` and SECP messages
/// of type `SignedMessage`
pub fn block_messages<DB>(
Expand Down
5 changes: 2 additions & 3 deletions blockchain/chain_sync/src/chain_muxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use forest_blocks::{
Block, Error as ForestBlockError, FullTipset, GossipBlock, Tipset, TipsetKeys,
};
use forest_chain::{ChainStore, Error as ChainStoreError};
use forest_db::Store;
use forest_libp2p::{
hello::HelloRequest, NetworkEvent, NetworkMessage, PeerId, PeerManager, PubsubMessage,
};
Expand Down Expand Up @@ -159,7 +158,7 @@ pub struct ChainMuxer<DB, M, C: Consensus> {

impl<DB, M, C> ChainMuxer<DB, M, C>
where
DB: Blockstore + Store + Clone + Sync + Send + 'static,
DB: Blockstore + Clone + Sync + Send + 'static,
M: Provider + Sync + Send + 'static,
C: Consensus,
{
Expand Down Expand Up @@ -838,7 +837,7 @@ enum ChainMuxerState<C: Consensus> {

impl<DB, M, C> Future for ChainMuxer<DB, M, C>
where
DB: Blockstore + Store + Clone + Sync + Send + 'static,
DB: Blockstore + Clone + Sync + Send + 'static,
M: Provider + Sync + Send + 'static,
C: Consensus,
{
Expand Down
9 changes: 4 additions & 5 deletions blockchain/chain_sync/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use std::{
use async_trait::async_trait;
use forest_blocks::{Block, GossipBlock, Tipset};
use forest_chain::Scale;
use forest_db::Store;
use forest_libp2p::{NetworkMessage, Topic, PUBSUB_BLOCK_STR};
use forest_message::SignedMessage;
use forest_message_pool::MessagePool;
Expand Down Expand Up @@ -46,7 +45,7 @@ pub trait Consensus: Scale + Debug + Send + Sync + Unpin + 'static {
block: Arc<Block>,
) -> Result<(), NonEmpty<Self::Error>>
where
DB: Blockstore + Store + Clone + Sync + Send + 'static;
DB: Blockstore + Clone + Sync + Send + 'static;
}

/// Helper function to collect errors from async validations.
Expand Down Expand Up @@ -114,7 +113,7 @@ pub trait Proposer {
services: &mut JoinSet<anyhow::Result<()>>,
) -> anyhow::Result<()>
where
DB: Blockstore + Store + Clone + Sync + Send + 'static,
DB: Blockstore + Clone + Sync + Send + 'static,
MP: MessagePoolApi + Sync + Send + 'static;
}

Expand Down Expand Up @@ -142,7 +141,7 @@ pub trait MessagePoolApi {
base: &Tipset,
) -> anyhow::Result<Vec<Cow<SignedMessage>>>
where
DB: Blockstore + Store + Clone + Sync + Send + 'static;
DB: Blockstore + Clone + Sync + Send + 'static;
}

impl<P> MessagePoolApi for MessagePool<P>
Expand All @@ -155,7 +154,7 @@ where
base: &Tipset,
) -> anyhow::Result<Vec<Cow<SignedMessage>>>
where
DB: Blockstore + Store + Clone + Sync + Send + 'static,
DB: Blockstore + Clone + Sync + Send + 'static,
{
self.select_messages_for_block(base)
.map_err(|e| e.into())
Expand Down
32 changes: 11 additions & 21 deletions blockchain/chain_sync/src/tipset_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use forest_blocks::{
Block, BlockHeader, Error as ForestBlockError, FullTipset, Tipset, TipsetKeys,
};
use forest_chain::{persist_objects, ChainStore, Error as ChainStoreError};
use forest_db::Store;
use forest_libp2p::chain_exchange::TipsetBundle;
use forest_message::{message::valid_for_block_inclusion, Message as MessageTrait};
use forest_networks::Height;
Expand Down Expand Up @@ -262,7 +261,7 @@ pub(crate) struct TipsetProcessor<DB, C: Consensus> {

impl<DB, C> TipsetProcessor<DB, C>
where
DB: Blockstore + Store + Clone + Sync + Send + 'static,
DB: Blockstore + Clone + Sync + Send + 'static,
C: Consensus,
{
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -350,7 +349,7 @@ enum TipsetProcessorState<DB, C: Consensus> {

impl<DB, C> Future for TipsetProcessor<DB, C>
where
DB: Blockstore + Store + Clone + Sync + Send + 'static,
DB: Blockstore + Clone + Sync + Send + 'static,
C: Consensus,
{
type Output = Result<(), TipsetProcessorError<C>>;
Expand Down Expand Up @@ -638,7 +637,7 @@ pub(crate) struct TipsetRangeSyncer<DB, C: Consensus> {

impl<DB, C> TipsetRangeSyncer<DB, C>
where
DB: Blockstore + Store + Clone + Sync + Send + 'static,
DB: Blockstore + Clone + Sync + Send + 'static,
C: Consensus,
{
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -762,7 +761,7 @@ where
/// messages going forward on the chain and validate each extension. Finally set
/// the proposed head as the heaviest tipset.
#[allow(clippy::too_many_arguments)]
fn sync_tipset_range<DB: Blockstore + Store + Clone + Sync + Send + 'static, C: Consensus>(
fn sync_tipset_range<DB: Blockstore + Clone + Sync + Send + 'static, C: Consensus>(
proposed_head: Arc<Tipset>,
current_head: Arc<Tipset>,
tracker: crate::chain_muxer::WorkerState,
Expand Down Expand Up @@ -850,10 +849,7 @@ fn sync_tipset_range<DB: Blockstore + Store + Clone + Sync + Send + 'static, C:
/// Download headers between the proposed head and the current one available
/// locally. If they turn out to be on different forks, download more headers up
/// to a certain limit to try to find a common ancestor.
async fn sync_headers_in_reverse<
DB: Blockstore + Store + Clone + Sync + Send + 'static,
C: Consensus,
>(
async fn sync_headers_in_reverse<DB: Blockstore + Clone + Sync + Send + 'static, C: Consensus>(
tracker: crate::chain_muxer::WorkerState,
tipset_range_length: u64,
proposed_head: Arc<Tipset>,
Expand Down Expand Up @@ -971,7 +967,7 @@ async fn sync_headers_in_reverse<
}

#[allow(clippy::too_many_arguments)]
fn sync_tipset<DB: Blockstore + Store + Clone + Sync + Send + 'static, C: Consensus>(
fn sync_tipset<DB: Blockstore + Clone + Sync + Send + 'static, C: Consensus>(
proposed_head: Arc<Tipset>,
consensus: Arc<C>,
state_manager: Arc<StateManager<DB>>,
Expand Down Expand Up @@ -1020,7 +1016,7 @@ fn sync_tipset<DB: Blockstore + Store + Clone + Sync + Send + 'static, C: Consen
})
}

async fn fetch_batch<DB: Blockstore + Store + Clone + Send + Sync + 'static, C: Consensus>(
async fn fetch_batch<DB: Blockstore + Clone + Send + Sync + 'static, C: Consensus>(
batch: &[Arc<Tipset>],
network: &SyncNetworkContext<DB>,
chainstore: &ChainStore<DB>,
Expand Down Expand Up @@ -1068,10 +1064,7 @@ async fn fetch_batch<DB: Blockstore + Store + Clone + Send + Sync + 'static, C:
/// `BlockStore`, or download them from the network, then validate the full
/// tipset on each epoch.
#[allow(clippy::too_many_arguments)]
async fn sync_messages_check_state<
DB: Blockstore + Store + Clone + Send + Sync + 'static,
C: Consensus,
>(
async fn sync_messages_check_state<DB: Blockstore + Clone + Send + Sync + 'static, C: Consensus>(
tracker: crate::chain_muxer::WorkerState,
consensus: Arc<C>,
state_manager: Arc<StateManager<DB>>,
Expand Down Expand Up @@ -1145,7 +1138,7 @@ async fn sync_messages_check_state<
/// executed), adding the successful ones to the tipset tracker, and the failed
/// ones to the bad block cache, depending on strategy. Any bad block fails
/// validation.
async fn validate_tipset<DB: Blockstore + Store + Clone + Send + Sync + 'static, C: Consensus>(
async fn validate_tipset<DB: Blockstore + Clone + Send + Sync + 'static, C: Consensus>(
consensus: Arc<C>,
state_manager: Arc<StateManager<DB>>,
chainstore: &ChainStore<DB>,
Expand Down Expand Up @@ -1235,7 +1228,7 @@ async fn validate_tipset<DB: Blockstore + Store + Clone + Send + Sync + 'static,
/// * Checking that the messages in the block correspond to the agreed upon
/// total ordering
/// * That the block is a deterministic derivative of the underlying consensus
async fn validate_block<DB: Blockstore + Store + Clone + Sync + Send + 'static, C: Consensus>(
async fn validate_block<DB: Blockstore + Clone + Sync + Send + 'static, C: Consensus>(
consensus: Arc<C>,
state_manager: Arc<StateManager<DB>>,
block: Arc<Block>,
Expand Down Expand Up @@ -1430,10 +1423,7 @@ async fn validate_block<DB: Blockstore + Store + Clone + Sync + Send + 'static,
///
/// NB: This loads/computes the state resulting from the execution of the parent
/// tipset.
async fn check_block_messages<
DB: Blockstore + Store + Clone + Send + Sync + 'static,
C: Consensus,
>(
async fn check_block_messages<DB: Blockstore + Clone + Send + Sync + 'static, C: Consensus>(
state_manager: Arc<StateManager<DB>>,
block: Arc<Block>,
base_tipset: Arc<Tipset>,
Expand Down
Loading

0 comments on commit 3c72222

Please sign in to comment.