diff --git a/applications/minotari_node/src/builder.rs b/applications/minotari_node/src/builder.rs index 2006ef5495..cf7c70f39a 100644 --- a/applications/minotari_node/src/builder.rs +++ b/applications/minotari_node/src/builder.rs @@ -187,6 +187,7 @@ pub async fn configure_and_initialize_node( let backend = create_lmdb_database( app_config.base_node.lmdb_path.as_path(), app_config.base_node.lmdb.clone(), + app_config.base_node.storage.pruning_interval, rules, ) .map_err(|e| ExitError::new(ExitCode::DatabaseError, e))?; diff --git a/applications/minotari_node/src/recovery.rs b/applications/minotari_node/src/recovery.rs index f15c00f20c..5020de3526 100644 --- a/applications/minotari_node/src/recovery.rs +++ b/applications/minotari_node/src/recovery.rs @@ -81,14 +81,25 @@ pub async fn run_recovery(node_config: &BaseNodeConfig) -> Result<(), anyhow::Er })?; let (temp_db, main_db, temp_path) = match &node_config.db_type { DatabaseType::Lmdb => { - let backend = create_lmdb_database(&node_config.lmdb_path, node_config.lmdb.clone(), rules.clone()) - .map_err(|e| { - error!(target: LOG_TARGET, "Error opening db: {}", e); - anyhow!("Could not open DB: {}", e) - })?; + let backend = create_lmdb_database( + &node_config.lmdb_path, + node_config.lmdb.clone(), + node_config.storage.pruning_interval, + rules.clone(), + ) + .map_err(|e| { + error!(target: LOG_TARGET, "Error opening db: {}", e); + anyhow!("Could not open DB: {}", e) + })?; let temp_path = temp_dir().join("temp_recovery"); - let temp = create_lmdb_database(&temp_path, node_config.lmdb.clone(), rules.clone()).map_err(|e| { + let temp = create_lmdb_database( + &temp_path, + node_config.lmdb.clone(), + node_config.storage.pruning_interval, + rules.clone(), + ) + .map_err(|e| { error!(target: LOG_TARGET, "Error opening recovery db: {}", e); anyhow!("Could not open recovery DB: {}", e) })?; diff --git a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs index 796337cffa..b6f577e134 100644 --- a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs @@ -22,7 +22,7 @@ use std::{ convert::{TryFrom, TryInto}, - sync::Arc, + sync::{atomic::AtomicBool, Arc}, time::{Duration, Instant}, }; @@ -365,10 +365,11 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> { ); let timer = Instant::now(); + let allow_smt_change = Arc::new(AtomicBool::new(true)); self.db .write_transaction() .delete_orphan(header_hash) - .insert_tip_block_body(block.clone(), self.db.inner().smt()) + .insert_tip_block_body(block.clone(), self.db.inner().smt(), allow_smt_change.clone()) .set_best_block( block.height(), header_hash, diff --git a/base_layer/core/src/chain_storage/async_db.rs b/base_layer/core/src/chain_storage/async_db.rs index f2203c6104..50760aaf23 100644 --- a/base_layer/core/src/chain_storage/async_db.rs +++ b/base_layer/core/src/chain_storage/async_db.rs @@ -22,7 +22,7 @@ use std::{ mem, ops::RangeBounds, - sync::{Arc, RwLock}, + sync::{atomic::AtomicBool, Arc, RwLock}, time::Instant, }; @@ -394,8 +394,13 @@ impl<'a, B: BlockchainBackend + 'static> AsyncDbTransaction<'a, B> { self } - pub fn insert_tip_block_body(&mut self, block: Arc, smt: Arc>) -> &mut Self { - self.transaction.insert_tip_block_body(block, smt); + pub fn insert_tip_block_body( + &mut self, + block: Arc, + smt: Arc>, + allow_smt_change: Arc, + ) -> &mut Self { + self.transaction.insert_tip_block_body(block, smt, allow_smt_change); self } diff --git a/base_layer/core/src/chain_storage/blockchain_database.rs b/base_layer/core/src/chain_storage/blockchain_database.rs index cb9a9a4faa..327cdb804b 100644 --- a/base_layer/core/src/chain_storage/blockchain_database.rs +++ b/base_layer/core/src/chain_storage/blockchain_database.rs @@ -934,6 +934,7 @@ where B: BlockchainBackend Ok(v) => v, Err(e) => { // some error happend, lets rewind the smt + warn!(target: LOG_TARGET, "Reloading SMT into memory from stored db via new block prepare"); *smt = db.calculate_tip_smt()?; return Err(e); }, @@ -962,6 +963,7 @@ where B: BlockchainBackend Ok(v) => v, Err(e) => { // some error happend, lets reset the smt to its starting state + warn!(target: LOG_TARGET, "Reloading SMT into memory from stored db via calculate root"); *smt = db.calculate_tip_smt()?; return Err(e); }, @@ -1660,8 +1662,9 @@ fn insert_best_block( let timestamp = block.header().timestamp().as_u64(); let accumulated_difficulty = block.accumulated_data().total_accumulated_difficulty; let expected_prev_best_block = block.block().header.prev_hash; + let allow_smt_change = Arc::new(AtomicBool::new(true)); txn.insert_chain_header(block.to_chain_header()) - .insert_tip_block_body(block, smt) + .insert_tip_block_body(block, smt, allow_smt_change) .set_best_block( height, block_hash, @@ -2080,6 +2083,7 @@ fn reorganize_chain( ); ChainStorageError::AccessError("write lock on smt".into()) })?; + warn!(target: LOG_TARGET, "Reloading SMT into memory from stored db via reorg"); *write_smt = backend.calculate_tip_smt()?; return Err(e); } diff --git a/base_layer/core/src/chain_storage/db_transaction.rs b/base_layer/core/src/chain_storage/db_transaction.rs index 2650bba36a..4b0a29f15d 100644 --- a/base_layer/core/src/chain_storage/db_transaction.rs +++ b/base_layer/core/src/chain_storage/db_transaction.rs @@ -23,7 +23,7 @@ use std::{ fmt, fmt::{Display, Error, Formatter}, - sync::{Arc, RwLock}, + sync::{atomic::AtomicBool, Arc, RwLock}, }; use primitive_types::U256; @@ -171,8 +171,17 @@ impl DbTransaction { /// Add the BlockHeader and contents of a `Block` (i.e. inputs, outputs and kernels) to the database. /// If the `BlockHeader` already exists, then just the contents are updated along with the relevant accumulated /// data. - pub fn insert_tip_block_body(&mut self, block: Arc, smt: Arc>) -> &mut Self { - self.operations.push(WriteOperation::InsertTipBlockBody { block, smt }); + pub fn insert_tip_block_body( + &mut self, + block: Arc, + smt: Arc>, + allow_smt_change: Arc, + ) -> &mut Self { + self.operations.push(WriteOperation::InsertTipBlockBody { + block, + smt, + allow_smt_change, + }); self } @@ -292,6 +301,7 @@ pub enum WriteOperation { InsertTipBlockBody { block: Arc, smt: Arc>, + allow_smt_change: Arc, }, InsertKernel { header_hash: HashOutput, @@ -368,7 +378,11 @@ impl fmt::Display for WriteOperation { InsertChainHeader { header } => { write!(f, "InsertChainHeader(#{} {})", header.height(), header.hash()) }, - InsertTipBlockBody { block, smt: _ } => write!( + InsertTipBlockBody { + block, + smt: _, + allow_smt_change: _, + } => write!( f, "InsertTipBlockBody({}, {})", block.accumulated_data().hash, diff --git a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs index a38a1a0179..53550dd3b2 100644 --- a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs +++ b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs @@ -28,12 +28,24 @@ use std::{ fs::File, ops::Deref, path::Path, - sync::{Arc, RwLock}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + RwLock, + }, time::Instant, }; use fs2::FileExt; -use lmdb_zero::{open, ConstTransaction, Database, Environment, ReadTransaction, WriteTransaction}; +use lmdb_zero::{ + open, + traits::AsLmdbBytes, + ConstTransaction, + Database, + Environment, + ReadTransaction, + WriteTransaction, +}; use log::*; use primitive_types::U256; use serde::{Deserialize, Serialize}; @@ -150,6 +162,8 @@ const LMDB_DB_REORGS: &str = "reorgs"; const LMDB_DB_VALIDATOR_NODES: &str = "validator_nodes"; const LMDB_DB_VALIDATOR_NODES_MAPPING: &str = "validator_nodes_mapping"; const LMDB_DB_TEMPLATE_REGISTRATIONS: &str = "template_registrations"; +const LMDB_DB_UTXO_SMT: &str = "utxo_smt"; +const SMT_CACHE_PERIOD: u64 = 1000; /// HeaderHash(32), mmr_pos(8), hash(32) type KernelKey = CompositeKey<72>; @@ -159,6 +173,7 @@ type ValidatorNodeRegistrationKey = CompositeKey<40>; pub fn create_lmdb_database>( path: P, config: LMDBConfig, + prune_interval: u64, consensus_manager: ConsensusManager, ) -> Result { let flags = db::CREATE; @@ -199,10 +214,11 @@ pub fn create_lmdb_database>( .add_database(LMDB_DB_VALIDATOR_NODES, flags) .add_database(LMDB_DB_VALIDATOR_NODES_MAPPING, flags) .add_database(LMDB_DB_TEMPLATE_REGISTRATIONS, flags | db::DUPSORT) + .add_database(LMDB_DB_UTXO_SMT, flags) .build() .map_err(|err| ChainStorageError::CriticalError(format!("Could not create LMDB store:{}", err)))?; debug!(target: LOG_TARGET, "LMDB database creation successful"); - LMDBDatabase::new(&lmdb_store, file_lock, consensus_manager) + LMDBDatabase::new(&lmdb_store, file_lock, consensus_manager, prune_interval) } /// This is a lmdb-based blockchain database for persistent storage of the chain state. @@ -261,8 +277,12 @@ pub struct LMDBDatabase { validator_nodes_mapping: DatabaseRef, /// Maps CodeTemplateRegistration -> TemplateRegistration template_registrations: DatabaseRef, + /// Stores a cache of the sparse merkle tree on the latest mod 1000 height + utxo_smt: DatabaseRef, + _file_lock: Arc, consensus_manager: ConsensusManager, + smt_cache_period: u64, } impl LMDBDatabase { @@ -270,8 +290,14 @@ impl LMDBDatabase { store: &LMDBStore, file_lock: File, consensus_manager: ConsensusManager, + prune_interval: u64, ) -> Result { let env = store.env(); + let smt_cache_period = if prune_interval == 0 { + SMT_CACHE_PERIOD + } else { + prune_interval / 2 + }; let db = Self { metadata_db: get_database(store, LMDB_DB_METADATA)?, @@ -300,10 +326,12 @@ impl LMDBDatabase { validator_nodes: get_database(store, LMDB_DB_VALIDATOR_NODES)?, validator_nodes_mapping: get_database(store, LMDB_DB_VALIDATOR_NODES_MAPPING)?, template_registrations: get_database(store, LMDB_DB_TEMPLATE_REGISTRATIONS)?, + utxo_smt: get_database(store, LMDB_DB_UTXO_SMT)?, env, env_config: store.env_config(), _file_lock: Arc::new(file_lock), consensus_manager, + smt_cache_period, }; run_migrations(&db)?; @@ -337,8 +365,18 @@ impl LMDBDatabase { InsertChainHeader { header } => { self.insert_header(&write_txn, header.header(), header.accumulated_data())?; }, - InsertTipBlockBody { block, smt } => { - self.insert_tip_block_body(&write_txn, block.header(), block.block().body.clone(), smt.clone())?; + InsertTipBlockBody { + block, + smt, + allow_smt_change, + } => { + self.insert_tip_block_body( + &write_txn, + block.header(), + block.block().body.clone(), + smt.clone(), + allow_smt_change.clone(), + )?; }, InsertKernel { header_hash, @@ -495,7 +533,7 @@ impl LMDBDatabase { Ok(()) } - fn all_dbs(&self) -> [(&'static str, &DatabaseRef); 26] { + fn all_dbs(&self) -> [(&'static str, &DatabaseRef); 27] { [ (LMDB_DB_METADATA, &self.metadata_db), (LMDB_DB_HEADERS, &self.headers_db), @@ -529,6 +567,7 @@ impl LMDBDatabase { (LMDB_DB_VALIDATOR_NODES, &self.validator_nodes), (LMDB_DB_VALIDATOR_NODES_MAPPING, &self.validator_nodes_mapping), (LMDB_DB_TEMPLATE_REGISTRATIONS, &self.template_registrations), + (LMDB_DB_UTXO_SMT, &self.utxo_smt), ] } @@ -627,7 +666,7 @@ impl LMDBDatabase { fn input_with_output_data( &self, - txn: &WriteTransaction<'_>, + txn: &ConstTransaction<'_>, input: TransactionInput, ) -> Result { let input_with_output_data = match input.spent_output { @@ -1173,7 +1212,9 @@ impl LMDBDatabase { header: &BlockHeader, body: AggregateBody, smt: Arc>, + allow_smt_change: Arc, ) -> Result<(), ChainStorageError> { + let can_we_change_smt = allow_smt_change.load(Ordering::SeqCst); let mut output_smt = smt.write().map_err(|e| { error!( target: LOG_TARGET, @@ -1253,7 +1294,7 @@ impl LMDBDatabase { output.commitment.to_hex(), output.hash() ); - if !output.is_burned() { + if !output.is_burned() && can_we_change_smt { let smt_key = NodeKey::try_from(output.commitment.as_bytes())?; let smt_node = ValueHash::try_from(output.smt_hash(header.height).as_slice())?; if let Err(e) = output_smt.insert(smt_key, smt_node) { @@ -1297,17 +1338,19 @@ impl LMDBDatabase { for input in inputs { let input_with_output_data = self.input_with_output_data(txn, input)?; let smt_key = NodeKey::try_from(input_with_output_data.commitment()?.as_bytes())?; - match output_smt.delete(&smt_key)? { - DeleteResult::Deleted(_value_hash) => {}, - DeleteResult::KeyNotFound => { - error!( - target: LOG_TARGET, - "Could not find input({}) in SMT", - input_with_output_data.commitment()?.to_hex(), - ); - return Err(ChainStorageError::UnspendableInput); - }, - }; + if can_we_change_smt { + match output_smt.delete(&smt_key)? { + DeleteResult::Deleted(_value_hash) => {}, + DeleteResult::KeyNotFound => { + error!( + target: LOG_TARGET, + "Could not find input({}) in SMT", + input_with_output_data.commitment()?.to_hex(), + ); + return Err(ChainStorageError::UnspendableInput); + }, + }; + } let features = input_with_output_data.features()?; if let Some(vn_reg) = features @@ -1341,6 +1384,10 @@ impl LMDBDatabase { header.height, &BlockAccumulatedData::new(kernel_mmr.get_pruned_hash_set()?, total_kernel_sum), )?; + allow_smt_change.store(false, Ordering::SeqCst); + if header.height % self.smt_cache_period == 0 { + self.insert_smt(txn, &output_smt, header.height)?; + } Ok(()) } @@ -1699,6 +1746,64 @@ impl LMDBDatabase { fn get_consensus_constants(&self, height: u64) -> &ConsensusConstants { self.consensus_manager.consensus_constants(height) } + + fn insert_smt(&self, txn: &WriteTransaction<'_>, smt: &OutputSmt, height: u64) -> Result<(), ChainStorageError> { + let start = Instant::now(); + let k = MetadataKey::Smt; + + debug!(target: LOG_TARGET, + "Saving SMT at height: {}", + height + ); + + // This is best effort, if it fails (typically when the entry does not yet exist) we just log it + if let Err(e) = lmdb_delete(txn, &self.utxo_smt, &k.as_u32(), LMDB_DB_UTXO_SMT) { + debug!( + "Could NOT delete '{}' db with key '{}' ({})", + LMDB_DB_UTXO_SMT, + to_hex(k.as_u32().as_lmdb_bytes()), + e + ); + } + + #[allow(clippy::cast_possible_truncation)] + let estimated_bytes = smt.size().saturating_mul(225) as usize; + + match lmdb_replace(txn, &self.utxo_smt, &k.as_u32(), smt, Some(estimated_bytes)) { + Ok(_) => { + trace!( + target: LOG_TARGET, + "Inserted ~{} MB with key '{}' into '{}' (size {}) in {:.2?}", + estimated_bytes / BYTES_PER_MB, + to_hex(k.as_u32().as_lmdb_bytes()), + LMDB_DB_UTXO_SMT, + smt.size(), + start.elapsed() + ); + lmdb_replace( + txn, + &self.metadata_db, + &MetadataKey::SmtHeight.as_u32(), + &height, + Some(8), + )?; + Ok(()) + }, + Err(e) => { + if let ChainStorageError::DbResizeRequired(Some(val)) = e { + trace!( + target: LOG_TARGET, + "Could NOT insert {} MB with key '{}' into '{}' (size {})", + val / BYTES_PER_MB, + to_hex(k.as_u32().as_lmdb_bytes()), + LMDB_DB_UTXO_SMT, + smt.size() + ); + } + Err(e) + }, + } + } } pub fn create_recovery_lmdb_database>(path: P) -> Result<(), ChainStorageError> { @@ -2504,13 +2609,25 @@ impl BlockchainBackend for LMDBDatabase { fn calculate_tip_smt(&self) -> Result { let start = Instant::now(); let metadata = self.fetch_chain_metadata()?; - let mut smt = OutputSmt::new(); + let txn = self.read_transaction()?; + let k = MetadataKey::SmtHeight; + let mut starting_height = 0; + if let Some(val) = lmdb_get::(&txn, &self.metadata_db, &k.as_u32())? { + starting_height = val + 1u64; + } + + let k = MetadataKey::Smt; + let mut smt = match lmdb_get(&txn, &self.utxo_smt, &k.as_u32())? { + Some(smt) => smt, + _ => OutputSmt::new(), + }; trace!( target: LOG_TARGET, "Calculating new smt at height: #{}", - metadata.pruned_height(), + metadata.best_block_height(), ); - for height in 0..=metadata.best_block_height() { + + for height in starting_height..=metadata.best_block_height() { let header = self.fetch_chain_header_by_height(height)?; let outputs = self.fetch_outputs_in_block_with_spend_state(header.hash(), Some(metadata.best_block_hash()))?; @@ -2528,6 +2645,13 @@ impl BlockchainBackend for LMDBDatabase { } } } + let inputs = self.fetch_inputs_in_block(header.hash())?; + for input in inputs { + let txn = self.read_transaction()?; + let input_with_output_data = self.input_with_output_data(&txn, input)?; + let smt_key = NodeKey::try_from(input_with_output_data.commitment()?.as_bytes())?; + smt.delete(&smt_key)?; + } } trace!( target: LOG_TARGET, @@ -2661,6 +2785,8 @@ enum MetadataKey { HorizonData, BestBlockTimestamp, MigrationVersion, + Smt, + SmtHeight, } impl MetadataKey { @@ -2681,6 +2807,8 @@ impl fmt::Display for MetadataKey { MetadataKey::HorizonData => write!(f, "Database info"), MetadataKey::BestBlockTimestamp => write!(f, "Chain tip block timestamp"), MetadataKey::MigrationVersion => write!(f, "Migration version"), + MetadataKey::Smt => write!(f, "Chain Sparse Merkle Tree"), + MetadataKey::SmtHeight => write!(f, "Chain Sparse Merkle Tree saved height"), } } } diff --git a/base_layer/core/src/test_helpers/blockchain.rs b/base_layer/core/src/test_helpers/blockchain.rs index b145782200..e800c2a1b0 100644 --- a/base_layer/core/src/test_helpers/blockchain.rs +++ b/base_layer/core/src/test_helpers/blockchain.rs @@ -168,7 +168,7 @@ impl TempDatabase { let rules = create_consensus_rules(); Self { - db: Some(create_lmdb_database(&temp_path, LMDBConfig::default(), rules).unwrap()), + db: Some(create_lmdb_database(&temp_path, LMDBConfig::default(), 0, rules).unwrap()), path: temp_path, delete_on_drop: true, } @@ -177,7 +177,7 @@ impl TempDatabase { pub fn from_path>(temp_path: P) -> Self { let rules = create_consensus_rules(); Self { - db: Some(create_lmdb_database(&temp_path, LMDBConfig::default(), rules).unwrap()), + db: Some(create_lmdb_database(&temp_path, LMDBConfig::default(), 0, rules).unwrap()), path: temp_path.as_ref().to_path_buf(), delete_on_drop: true, }