From 5c7086f86bf15d59424bdf6b7247b94e004b1de6 Mon Sep 17 00:00:00 2001 From: Ash Manning Date: Mon, 20 May 2024 19:12:59 +0800 Subject: [PATCH] WIP --- Cargo.lock | 106 ++++- Cargo.toml | 4 +- app/app.rs | 6 +- app/gui/parent_chain/info.rs | 2 +- lib/archive.rs | 631 +++++++++++++++++++------ lib/miner.rs | 23 +- lib/net/mod.rs | 36 +- lib/net/peer.rs | 869 +++++++++++++++++++++++++++-------- lib/node/mainchain_task.rs | 200 ++++---- lib/node/mod.rs | 39 +- lib/node/net_task.rs | 338 +++++++------- lib/state.rs | 99 ++-- lib/types/mod.rs | 36 +- 13 files changed, 1691 insertions(+), 698 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7580d47..02494c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -159,6 +159,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc7eb209b1518d6bb87b283c20095f5228ecda460da70b44f0802523dea6da04" +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + [[package]] name = "android_system_properties" version = "0.1.5" @@ -570,7 +576,7 @@ dependencies = [ [[package]] name = "bip300301" version = "0.1.1" -source = "git+https://github.com/Ash-L2L/bip300301.git?rev=64568dee7b89fe8c021226f10b17a18fe3386871#64568dee7b89fe8c021226f10b17a18fe3386871" +source = "git+https://github.com/Ash-L2L/bip300301.git?rev=5e12a72b7f72df33307e21d054c41652b556a9b3#5e12a72b7f72df33307e21d054c41652b556a9b3" dependencies = [ "base64 0.21.7", "bitcoin", @@ -900,6 +906,19 @@ dependencies = [ "libc", ] +[[package]] +name = "chrono" +version = "0.4.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "num-traits", + "serde", + "windows-targets 0.52.5", +] + [[package]] name = "clap" version = "4.5.4" @@ -1261,6 +1280,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", + "serde", ] [[package]] @@ -2086,7 +2106,7 @@ checksum = "cc11df1ace8e7e564511f53af41f3e42ddc95b56fd07b3f4445d2a6048bc682c" dependencies = [ "bitflags 2.5.0", "gpu-descriptor-types", - "hashbrown", + "hashbrown 0.14.5", ] [[package]] @@ -2110,13 +2130,19 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap", + "indexmap 2.2.6", "slab", "tokio", "tokio-util", "tracing", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.14.5" @@ -2323,6 +2349,29 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "iana-time-zone" +version = "0.1.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "icrate" version = "0.0.4" @@ -2372,6 +2421,17 @@ dependencies = [ "litrs 0.2.3", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", + "serde", +] + [[package]] name = "indexmap" version = "2.2.6" @@ -2379,7 +2439,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.14.5", + "serde", ] [[package]] @@ -2857,7 +2918,7 @@ dependencies = [ "bitflags 2.5.0", "codespan-reporting", "hexf-parse", - "indexmap", + "indexmap 2.2.6", "log", "num-traits", "rustc-hash", @@ -4006,9 +4067,16 @@ version = "3.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ad483d2ab0149d5a5ebcd9972a3852711e0153d863bf5a5d0391d28883c4a20" dependencies = [ + "base64 0.22.1", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.2.6", "serde", "serde_derive", + "serde_json", "serde_with_macros", + "time", ] [[package]] @@ -4422,7 +4490,7 @@ dependencies = [ [[package]] name = "thunder" -version = "0.8.9" +version = "0.9.0" dependencies = [ "anyhow", "bincode", @@ -4459,7 +4527,7 @@ dependencies = [ [[package]] name = "thunder_app" -version = "0.8.9" +version = "0.9.0" dependencies = [ "anyhow", "base64 0.21.7", @@ -4494,7 +4562,7 @@ dependencies = [ [[package]] name = "thunder_app_cli" -version = "0.8.9" +version = "0.9.0" dependencies = [ "anyhow", "bip300301", @@ -4508,7 +4576,7 @@ dependencies = [ [[package]] name = "thunder_app_rpc_api" -version = "0.8.9" +version = "0.9.0" dependencies = [ "bip300301", "jsonrpsee", @@ -4522,10 +4590,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" dependencies = [ "deranged", + "itoa", "num-conv", "powerfmt", "serde", "time-core", + "time-macros", ] [[package]] @@ -4534,6 +4604,16 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" +[[package]] +name = "time-macros" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tiny-bip39" version = "1.0.0" @@ -4655,7 +4735,7 @@ dependencies = [ "futures-io", "futures-sink", "futures-util", - "hashbrown", + "hashbrown 0.14.5", "pin-project-lite", "tokio", ] @@ -4672,7 +4752,7 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap", + "indexmap 2.2.6", "toml_datetime", "winnow", ] @@ -4683,7 +4763,7 @@ version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" dependencies = [ - "indexmap", + "indexmap 2.2.6", "toml_datetime", "winnow", ] @@ -5179,7 +5259,7 @@ dependencies = [ "bitflags 2.5.0", "cfg_aliases", "codespan-reporting", - "indexmap", + "indexmap 2.2.6", "log", "naga", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index 45b2092..985a08f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,11 +13,11 @@ authors = [ "Nikita Chashchinskii " ] edition = "2021" -version = "0.8.9" +version = "0.9.0" [workspace.dependencies.bip300301] git = "https://github.com/Ash-L2L/bip300301.git" -rev = "64568dee7b89fe8c021226f10b17a18fe3386871" +rev = "5e12a72b7f72df33307e21d054c41652b556a9b3" [workspace.dependencies.rustreexo] git = "https://github.com/Ash-L2L/rustreexo.git" diff --git a/app/app.rs b/app/app.rs index d4f339f..af07910 100644 --- a/app/app.rs +++ b/app/app.rs @@ -181,9 +181,11 @@ impl App { .await?; // miner_write.generate().await?; tracing::trace!("confirming bmm..."); - if let Some((header, body)) = miner_write.confirm_bmm().await? { + if let Some((main_hash, header, body)) = + miner_write.confirm_bmm().await? + { tracing::trace!("confirmed bmm, submitting block"); - self.node.submit_block(&header, &body).await?; + self.node.submit_block(main_hash, &header, &body).await?; } drop(miner_write); self.update_wallet()?; diff --git a/app/gui/parent_chain/info.rs b/app/gui/parent_chain/info.rs index 1485648..2804ab3 100644 --- a/app/gui/parent_chain/info.rs +++ b/app/gui/parent_chain/info.rs @@ -18,7 +18,7 @@ impl Info { let mainchain_tip = app.runtime.block_on(async { let mainchain_tip_blockhash = dc.get_mainchain_tip().await?; dc.client - .getblock(&mainchain_tip_blockhash, None) + .getblock(mainchain_tip_blockhash, None) .map_err(bip300301::Error::Jsonrpsee) .await })?; diff --git a/lib/archive.rs b/lib/archive.rs index cb0b35d..0fde7bc 100644 --- a/lib/archive.rs +++ b/lib/archive.rs @@ -1,13 +1,16 @@ -use std::{cmp::Ordering, collections::HashSet}; +use std::{ + cmp::Ordering, + collections::{HashMap, HashSet}, +}; use bip300301::{ bitcoin::{self, hashes::Hash}, DepositInfo, Header as BitcoinHeader, }; -use fallible_iterator::FallibleIterator; +use fallible_iterator::{FallibleIterator, IteratorExt}; use heed::{types::SerdeBincode, Database, RoTxn, RwTxn}; -use crate::types::{Accumulator, BlockHash, Body, Header}; +use crate::types::{Accumulator, BlockHash, BmmResult, Body, Header, Tip}; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -30,16 +33,20 @@ pub enum Error { }, #[error("unknown block hash: {0}")] NoBlockHash(BlockHash), + #[error("no BMM result with for block {0}")] + NoBmmResult(BlockHash), #[error("no block body with hash {0}")] NoBody(BlockHash), - #[error("no BMM verification result with for block {0}")] - NoBmmVerification(BlockHash), #[error("no deposits info for block {0}")] NoDepositsInfo(bitcoin::BlockHash), #[error("no header with hash {0}")] NoHeader(BlockHash), #[error("no height info for block hash {0}")] NoHeight(BlockHash), + #[error("unknown mainchain block hash: {0}")] + NoMainBlockHash(bitcoin::BlockHash), + #[error("no BMM commitments data for mainchain block {0}")] + NoMainBmmCommitments(bitcoin::BlockHash), #[error("no mainchain header with hash {0}")] NoMainHeader(bitcoin::BlockHash), #[error("no height info for mainchain block hash {0}")] @@ -50,10 +57,15 @@ pub enum Error { pub struct Archive { accumulators: Database, SerdeBincode>, block_hash_to_height: Database, SerdeBincode>, - /// BMM verification status for each header. - /// A status of false indicates that verification failed. + /// BMM results for each header. /// All ancestors of any block should always be present. - bmm_verifications: Database, SerdeBincode>, + /// All relevant mainchain headers should exist in `main_headers`. + /// Note that it is possible for a block to have BMM commitments in several + /// different mainchain blocks, if there are any mainchain forks. + bmm_results: Database< + SerdeBincode, + SerdeBincode>, + >, bodies: Database, SerdeBincode>, /// Deposits by mainchain block, sorted first-to-last in each block deposits: Database< @@ -82,21 +94,38 @@ pub struct Archive { headers: Database, SerdeBincode
>, main_block_hash_to_height: Database, SerdeBincode>, + /// BMM commitments in each mainchain block. + /// All ancestors must be present. + /// Mainchain blocks MUST be present in `main_headers`, but not all + /// mainchain headers will be present, if the blocks are not available. + /// BMM commitments do not imply existence of a sidechain block header. + /// BMM commitments do not imply BMM validity of a sidechain block, + /// as BMM commitments for ancestors may not exist. + main_bmm_commitments: Database< + SerdeBincode, + SerdeBincode>, + >, /// Mainchain headers. All ancestors of any header should always be present main_headers: Database, SerdeBincode>, + /// Mainchain successor blocks. ALL known block hashes, INCLUDING the zero hash, + /// MUST be present. + main_successors: Database< + SerdeBincode, + SerdeBincode>, + >, /// Successor blocks. ALL known block hashes, INCLUDING the zero hash, /// MUST be present. successors: Database, SerdeBincode>>, - /// Total work for mainchain headers. + /// Total work for mainchain headers with BMM verifications. /// All ancestors of any block should always be present total_work: Database, SerdeBincode>, } impl Archive { - pub const NUM_DBS: u32 = 12; + pub const NUM_DBS: u32 = 14; pub fn new(env: &heed::Env) -> Result { let mut rwtxn = env.write_txn()?; @@ -104,8 +133,8 @@ impl Archive { env.create_database(&mut rwtxn, Some("accumulators"))?; let block_hash_to_height = env.create_database(&mut rwtxn, Some("hash_to_height"))?; - let bmm_verifications = - env.create_database(&mut rwtxn, Some("bmm_verifications"))?; + let bmm_results = + env.create_database(&mut rwtxn, Some("bmm_results"))?; let bodies = env.create_database(&mut rwtxn, Some("bodies"))?; let deposits = env.create_database(&mut rwtxn, Some("deposits"))?; let exponential_ancestors = @@ -115,8 +144,22 @@ impl Archive { let headers = env.create_database(&mut rwtxn, Some("headers"))?; let main_block_hash_to_height = env.create_database(&mut rwtxn, Some("main_hash_to_height"))?; + let main_bmm_commitments = + env.create_database(&mut rwtxn, Some("main_bmm_commitments"))?; let main_headers = env.create_database(&mut rwtxn, Some("main_headers"))?; + let main_successors = + env.create_database(&mut rwtxn, Some("main_successors"))?; + if main_successors + .get(&rwtxn, &bitcoin::BlockHash::all_zeros())? + .is_none() + { + main_successors.put( + &mut rwtxn, + &bitcoin::BlockHash::all_zeros(), + &HashSet::new(), + )?; + } let successors = env.create_database(&mut rwtxn, Some("successors"))?; if successors.get(&rwtxn, &BlockHash::default())?.is_none() { successors.put( @@ -130,14 +173,16 @@ impl Archive { Ok(Self { accumulators, block_hash_to_height, - bmm_verifications, + bmm_results, bodies, deposits, exponential_ancestors, exponential_main_ancestors, headers, + main_bmm_commitments, main_block_hash_to_height, main_headers, + main_successors, successors, total_work, }) @@ -188,27 +233,37 @@ impl Archive { .ok_or(Error::NoHeight(block_hash)) } - pub fn try_get_bmm_verification( + pub fn get_bmm_results( &self, rotxn: &RoTxn, block_hash: BlockHash, - ) -> Result, Error> { - if block_hash == BlockHash::default() { - Ok(Some(true)) - } else { - self.bmm_verifications - .get(rotxn, &block_hash) - .map_err(Error::from) - } + ) -> Result, Error> { + let results = self + .bmm_results + .get(rotxn, &block_hash) + .map_err(Error::from)? + .unwrap_or_default(); + Ok(results) } - pub fn get_bmm_verification( + pub fn try_get_bmm_result( &self, rotxn: &RoTxn, block_hash: BlockHash, - ) -> Result { - self.try_get_bmm_verification(rotxn, block_hash)? - .ok_or(Error::NoBmmVerification(block_hash)) + main_hash: bitcoin::BlockHash, + ) -> Result, Error> { + let results = self.get_bmm_results(rotxn, block_hash)?; + Ok(results.get(&main_hash).copied()) + } + + pub fn get_bmm_result( + &self, + rotxn: &RoTxn, + block_hash: BlockHash, + main_hash: bitcoin::BlockHash, + ) -> Result { + self.try_get_bmm_result(rotxn, block_hash, main_hash)? + .ok_or(Error::NoBmmResult(block_hash)) } pub fn try_get_body( @@ -265,6 +320,24 @@ impl Archive { .ok_or(Error::NoHeader(block_hash)) } + pub fn try_get_main_bmm_commitment( + &self, + rotxn: &RoTxn, + main_hash: bitcoin::BlockHash, + ) -> Result>, Error> { + let commitments = self.main_bmm_commitments.get(rotxn, &main_hash)?; + Ok(commitments) + } + + pub fn get_main_bmm_commitment( + &self, + rotxn: &RoTxn, + main_hash: bitcoin::BlockHash, + ) -> Result, Error> { + self.try_get_main_bmm_commitment(rotxn, main_hash)? + .ok_or(Error::NoMainBmmCommitments(main_hash)) + } + pub fn try_get_main_height( &self, rotxn: &RoTxn, @@ -306,6 +379,24 @@ impl Archive { .ok_or(Error::NoMainHeader(block_hash)) } + pub fn try_get_main_successors( + &self, + rotxn: &RoTxn, + block_hash: bitcoin::BlockHash, + ) -> Result>, Error> { + let successors = self.main_successors.get(rotxn, &block_hash)?; + Ok(successors) + } + + pub fn get_main_successors( + &self, + rotxn: &RoTxn, + block_hash: bitcoin::BlockHash, + ) -> Result, Error> { + self.try_get_main_successors(rotxn, block_hash)? + .ok_or(Error::NoMainBlockHash(block_hash)) + } + pub fn try_get_successors( &self, rotxn: &RoTxn, @@ -342,6 +433,36 @@ impl Archive { .ok_or(Error::NoMainHeader(block_hash)) } + /// Try to get the best valid mainchain verification for the specified block. + pub fn try_get_best_main_verification( + &self, + rotxn: &RoTxn, + block_hash: BlockHash, + ) -> Result, Error> { + let verifications = self.get_bmm_results(rotxn, block_hash)?; + verifications + .into_iter() + .filter_map(|(main_hash, bmm_result)| { + if bmm_result == BmmResult::Verified { + Some(Ok(main_hash)) + } else { + None + } + }) + .transpose_into_fallible() + .max_by_key(|main_hash| self.get_total_work(rotxn, *main_hash)) + } + + /// Try to get the best valid mainchain verification for the specified block. + pub fn get_best_main_verification( + &self, + rotxn: &RoTxn, + block_hash: BlockHash, + ) -> Result { + self.try_get_best_main_verification(rotxn, block_hash)? + .ok_or(Error::NoBmmResult(block_hash)) + } + pub fn get_nth_ancestor( &self, rotxn: &RoTxn, @@ -405,6 +526,24 @@ impl Archive { Ok(block_hash) } + /// Get block locator for the specified block hash + pub fn get_block_locator( + &self, + rotxn: &RoTxn, + block_hash: BlockHash, + ) -> Result, Error> { + if block_hash == BlockHash::default() { + return Ok(Vec::new()); + } + let header = self.get_header(rotxn, block_hash)?; + let mut res = + self.exponential_ancestors.get(rotxn, &block_hash)?.unwrap(); + res.reverse(); + res.push(header.prev_side_hash); + res.reverse(); + Ok(res) + } + /// Returns true if the second specified block is a descendant of the first /// specified block /// Returns an error if either of the specified block headers do not exist @@ -470,18 +609,6 @@ impl Archive { Ok(()) } - /// Store a BMM verification result - pub fn put_bmm_verification( - &self, - rwtxn: &mut RwTxn, - block_hash: BlockHash, - verification_result: bool, - ) -> Result<(), Error> { - self.bmm_verifications - .put(rwtxn, &block_hash, &verification_result)?; - Ok(()) - } - /// Store a block body. The header must already exist. pub fn put_body( &self, @@ -540,7 +667,13 @@ impl Archive { &pred_successors, )?; } - self.successors.put(rwtxn, &block_hash, &HashSet::new())?; + // Store successors + { + let successors = self + .try_get_successors(rwtxn, block_hash)? + .unwrap_or_default(); + self.successors.put(rwtxn, &block_hash, &successors)?; + } // populate exponential ancestors let mut exponential_ancestors = Vec::::new(); if height >= 2 { @@ -563,6 +696,113 @@ impl Archive { &block_hash, &exponential_ancestors, )?; + // Populate BMM verifications + { + let mut bmm_results = self.get_bmm_results(rwtxn, block_hash)?; + let parent_bmm_results = + self.get_bmm_results(rwtxn, header.prev_side_hash)?; + let main_blocks = + self.get_main_successors(rwtxn, header.prev_main_hash)?; + for main_block in main_blocks { + let Some(commitment) = + self.get_main_bmm_commitment(rwtxn, main_block)? + else { + bmm_results.insert(main_block, BmmResult::Failed); + continue; + }; + if commitment != block_hash { + bmm_results.insert(main_block, BmmResult::Failed); + continue; + } + let main_header = self.get_main_header(rwtxn, main_block)?; + if header.prev_main_hash != main_header.prev_blockhash { + bmm_results.insert(main_block, BmmResult::Failed); + continue; + } + if header.prev_side_hash == BlockHash::default() { + bmm_results.insert(main_block, BmmResult::Verified); + continue; + } + // Check if there is a valid BMM commitment to the parent in the + // main ancestry + let main_ancestry_contains_valid_bmm_commitment_to_parent = + parent_bmm_results + .iter() + .map(Ok) + .transpose_into_fallible() + .any(|(bmm_block, bmm_result)| { + let parent_verified = *bmm_result + == BmmResult::Verified + && self.is_main_descendant( + rwtxn, *bmm_block, main_block, + )?; + Result::::Ok(parent_verified) + })?; + if main_ancestry_contains_valid_bmm_commitment_to_parent { + bmm_results.insert(main_block, BmmResult::Verified); + continue; + } else { + bmm_results.insert(main_block, BmmResult::Failed); + continue; + } + } + self.bmm_results.put(rwtxn, &block_hash, &bmm_results)?; + } + Ok(()) + } + + /// All ancestors MUST be present. + /// Mainchain blocks MUST be present in `main_headers`. + pub fn put_main_bmm_commitment( + &self, + rwtxn: &mut RwTxn, + main_hash: bitcoin::BlockHash, + commitment: Option, + ) -> Result<(), Error> { + let main_header = self.get_main_header(rwtxn, main_hash)?; + if main_header.prev_blockhash != bitcoin::BlockHash::all_zeros() { + let _ = self + .get_main_bmm_commitment(rwtxn, main_header.prev_blockhash)?; + } + self.main_bmm_commitments + .put(rwtxn, &main_hash, &commitment)?; + let Some(commitment) = commitment else { + return Ok(()); + }; + let Some(header) = self.try_get_header(rwtxn, commitment)? else { + return Ok(()); + }; + let bmm_result = if header.prev_main_hash != main_header.prev_blockhash + { + BmmResult::Failed + } else if header.prev_side_hash == BlockHash::default() { + BmmResult::Verified + } else { + // Check if there is a valid BMM commitment to the parent in the + // main ancestry + let parent_bmm_results = + self.get_bmm_results(rwtxn, header.prev_side_hash)?; + let main_ancestry_contains_valid_bmm_commitment_to_parent = + parent_bmm_results + .into_iter() + .map(Ok) + .transpose_into_fallible() + .any(|(bmm_block, bmm_result)| { + let parent_verified = bmm_result == BmmResult::Verified + && self.is_main_descendant( + rwtxn, bmm_block, main_hash, + )?; + Result::::Ok(parent_verified) + })?; + if main_ancestry_contains_valid_bmm_commitment_to_parent { + BmmResult::Verified + } else { + BmmResult::Failed + } + }; + let mut bmm_results = self.get_bmm_results(rwtxn, commitment)?; + bmm_results.insert(main_hash, bmm_result); + self.bmm_results.put(rwtxn, &commitment, &bmm_results)?; Ok(()) } @@ -593,6 +833,24 @@ impl Archive { .put(rwtxn, &block_hash, &height)?; self.main_headers.put(rwtxn, &block_hash, header)?; self.total_work.put(rwtxn, &block_hash, &total_work)?; + // Add to successors for predecessor + { + let mut pred_successors = + self.get_main_successors(rwtxn, header.prev_blockhash)?; + pred_successors.insert(block_hash); + self.main_successors.put( + rwtxn, + &header.prev_blockhash, + &pred_successors, + )?; + } + // Store successors + { + let successors = self + .try_get_main_successors(rwtxn, block_hash)? + .unwrap_or_default(); + self.main_successors.put(rwtxn, &block_hash, &successors)?; + } // populate exponential ancestors let mut exponential_ancestors = Vec::::new(); if height >= 2 { @@ -809,6 +1067,7 @@ impl Archive { } /// Compares two potential tips and returns the better tip, if there is one. + /// Headers for each tip MUST exist. /// It is possible that neither tip is better, eg. if the mainchain lineage /// is not shared and the tip with greater total work had lower height before /// the common mainchain ancestor. @@ -826,119 +1085,223 @@ impl Archive { pub fn better_tip( &self, rotxn: &RoTxn, - block_hash0: BlockHash, - block_hash1: BlockHash, - ) -> Result, Error> { + tip0: Tip, + tip1: Tip, + ) -> Result, Error> { + let block_hash0 = tip0.block_hash; + let block_hash1 = tip1.block_hash; let height0 = self.get_height(rotxn, block_hash0)?; let height1 = self.get_height(rotxn, block_hash1)?; match (height0, height1) { (0, 0) => return Ok(None), - (0, _) => return Ok(Some(block_hash1)), - (_, 0) => return Ok(Some(block_hash0)), + (0, _) => return Ok(Some(tip1)), + (_, 0) => return Ok(Some(tip0)), (_, _) => (), } - let header0 = self.get_header(rotxn, block_hash0)?; - let header1 = self.get_header(rotxn, block_hash1)?; - if self.shared_mainchain_lineage( - rotxn, - header0.prev_main_hash, - header1.prev_main_hash, - )? { - match height0.cmp(&height1) { - Ordering::Less => Ok(Some(block_hash1)), - Ordering::Greater => Ok(Some(block_hash0)), - Ordering::Equal => { - let work0 = - self.get_total_work(rotxn, header0.prev_main_hash)?; - let work1 = - self.get_total_work(rotxn, header1.prev_main_hash)?; - match work0.cmp(&work1) { - Ordering::Less => Ok(Some(block_hash1)), - Ordering::Greater => Ok(Some(block_hash0)), - Ordering::Equal => Ok(None), - } + let work0 = self.get_total_work(rotxn, tip0.main_block_hash)?; + let work1 = self.get_total_work(rotxn, tip1.main_block_hash)?; + match (work0.cmp(&work1), height0.cmp(&height1)) { + (Ordering::Less | Ordering::Equal, Ordering::Less) => { + // No ancestor of tip0 can have greater height, + // so tip1 is better. + Ok(Some(tip1)) + } + (Ordering::Equal | Ordering::Greater, Ordering::Greater) => { + // No ancestor of tip1 can have greater height, + // so tip0 is better. + Ok(Some(tip0)) + } + (Ordering::Less, Ordering::Equal) => { + // Within the same mainchain lineage, prefer lower work + // Otherwise, prefer tip with greater work + if self.shared_mainchain_lineage( + rotxn, + tip0.main_block_hash, + tip1.main_block_hash, + )? { + Ok(Some(tip0)) + } else { + Ok(Some(tip1)) } } - } else { - let work0 = self.get_total_work(rotxn, header0.prev_main_hash)?; - let work1 = self.get_total_work(rotxn, header1.prev_main_hash)?; - match (height0.cmp(&height1), work0.cmp(&work1)) { - (Ordering::Less, Ordering::Equal) => Ok(Some(block_hash1)), - (Ordering::Greater, Ordering::Equal) => Ok(Some(block_hash0)), - (Ordering::Less | Ordering::Equal, Ordering::Less) => { - Ok(Some(block_hash1)) + (Ordering::Greater, Ordering::Equal) => { + // Within the same mainchain lineage, prefer lower work + // Otherwise, prefer tip with greater work + if !self.shared_mainchain_lineage( + rotxn, + tip0.main_block_hash, + tip1.main_block_hash, + )? { + Ok(Some(tip0)) + } else { + Ok(Some(tip1)) + } + } + (Ordering::Less, Ordering::Greater) => { + // Need to check if tip0 ancestor before common + // mainchain ancestor had greater or equal height + let main_ancestor = self.last_common_main_ancestor( + rotxn, + tip0.main_block_hash, + tip1.main_block_hash, + )?; + let tip0_ancestor_height = self + .ancestors(rotxn, block_hash0) + .find_map(|tip0_ancestor| { + if tip0_ancestor == BlockHash::default() { + return Ok(Some(0)); + } + let header = self.get_header(rotxn, tip0_ancestor)?; + if !self.is_main_descendant( + rotxn, + header.prev_main_hash, + main_ancestor, + )? { + return Ok(None); + } + if header.prev_main_hash == main_ancestor { + return Ok(None); + } + self.get_height(rotxn, tip0_ancestor).map(Some) + })? + .unwrap(); + if tip0_ancestor_height >= height1 { + Ok(Some(tip0)) + } else { + Ok(Some(tip1)) + } + } + (Ordering::Greater, Ordering::Less) => { + // Need to check if tip1 ancestor before common + // mainchain ancestor had greater or equal height + let main_ancestor = self.last_common_main_ancestor( + rotxn, + tip0.main_block_hash, + tip1.main_block_hash, + )?; + let tip1_ancestor_height = self + .ancestors(rotxn, block_hash1) + .find_map(|tip1_ancestor| { + if tip1_ancestor == BlockHash::default() { + return Ok(Some(0)); + } + let header = self.get_header(rotxn, tip1_ancestor)?; + if !self.is_main_descendant( + rotxn, + header.prev_main_hash, + main_ancestor, + )? { + return Ok(None); + } + if header.prev_main_hash == main_ancestor { + return Ok(None); + } + self.get_height(rotxn, tip1_ancestor).map(Some) + })? + .unwrap(); + if tip1_ancestor_height < height0 { + Ok(Some(tip0)) + } else { + Ok(Some(tip1)) } - (Ordering::Greater | Ordering::Equal, Ordering::Greater) => { - Ok(Some(block_hash0)) + } + (Ordering::Equal, Ordering::Equal) => { + // If tip0 is the same as tip1, return tip0 + if block_hash0 == block_hash1 { + return Ok(Some(tip0)); } - (Ordering::Less, Ordering::Greater) - | (Ordering::Greater, Ordering::Less) - | (Ordering::Equal, Ordering::Equal) => { - let common_mainchain_ancestor = self - .last_common_main_ancestor( + // Need to compare tip0 ancestor and tip1 ancestor + // before common mainchain ancestor + let main_ancestor = self.last_common_main_ancestor( + rotxn, + tip0.main_block_hash, + tip1.main_block_hash, + )?; + let main_ancestor_height = + self.get_main_height(rotxn, main_ancestor)?; + let (tip0_ancestor_height, tip0_ancestor_work) = self + .ancestors(rotxn, block_hash0) + .find_map(|tip0_ancestor| { + if tip0_ancestor == BlockHash::default() { + return Ok(Some((0, None))); + } + let header = self.get_header(rotxn, tip0_ancestor)?; + if !self.is_main_descendant( rotxn, - header0.prev_main_hash, - header1.prev_main_hash, - )?; - let common_mainchain_ancestor_height = - self.get_main_height(rotxn, common_mainchain_ancestor)?; - let height_before_common_mainchain_ancestor0 = self - .ancestors(rotxn, block_hash0) - .find_map(|block_hash| { - if block_hash == BlockHash::default() { - return Ok(Some(0)); - }; - let header = self.get_header(rotxn, block_hash)?; - let main_height = self.get_main_height( + header.prev_main_hash, + main_ancestor, + )? { + return Ok(None); + } + if header.prev_main_hash == main_ancestor { + return Ok(None); + } + let height = self.get_height(rotxn, tip0_ancestor)?; + // Find mainchain block hash to get total work + let main_block = { + let prev_height = self.get_main_height( rotxn, header.prev_main_hash, )?; - if main_height > common_mainchain_ancestor_height { - return Ok(None); - }; - let height = self.get_height(rotxn, block_hash)?; - Ok(Some(height)) - })? - .unwrap(); - let height_before_common_mainchain_ancestor1 = self - .ancestors(rotxn, block_hash1) - .find_map(|block_hash| { - if block_hash == BlockHash::default() { - return Ok(Some(0)); - }; - let header = self.get_header(rotxn, block_hash)?; - let main_height = self.get_main_height( + let height = prev_height + 1; + self.get_nth_main_ancestor( rotxn, - header.prev_main_hash, - )?; - if main_height > common_mainchain_ancestor_height { - return Ok(None); - }; - let height = self.get_height(rotxn, block_hash)?; - Ok(Some(height)) - })? - .unwrap(); - match ( - work0.cmp(&work1), - height_before_common_mainchain_ancestor0 - .cmp(&height_before_common_mainchain_ancestor1), - ) { - (Ordering::Less, Ordering::Less | Ordering::Equal) => { - Ok(Some(block_hash1)) + main_ancestor, + main_ancestor_height - height, + )? + }; + let work = self.get_total_work(rotxn, main_block)?; + Ok(Some((height, Some(work)))) + })? + .unwrap(); + let (tip1_ancestor_height, tip1_ancestor_work) = self + .ancestors(rotxn, block_hash1) + .find_map(|tip1_ancestor| { + if tip1_ancestor == BlockHash::default() { + return Ok(Some((0, None))); } - (Ordering::Less, Ordering::Greater) => Ok(None), - ( - Ordering::Greater, - Ordering::Greater | Ordering::Equal, - ) => Ok(Some(block_hash0)), - (Ordering::Greater, Ordering::Less) => Ok(None), - (Ordering::Equal, Ordering::Less) => { - Ok(Some(block_hash1)) + let header = self.get_header(rotxn, tip1_ancestor)?; + if !self.is_main_descendant( + rotxn, + header.prev_main_hash, + main_ancestor, + )? { + return Ok(None); } - (Ordering::Equal, Ordering::Greater) => { - Ok(Some(block_hash0)) + if header.prev_main_hash == main_ancestor { + return Ok(None); } - (Ordering::Equal, Ordering::Equal) => Ok(None), + let height = self.get_height(rotxn, tip1_ancestor)?; + // Find mainchain block hash to get total work + let main_block = { + let prev_height = self.get_main_height( + rotxn, + header.prev_main_hash, + )?; + let height = prev_height + 1; + self.get_nth_main_ancestor( + rotxn, + main_ancestor, + main_ancestor_height - height, + )? + }; + let work = self.get_total_work(rotxn, main_block)?; + Ok(Some((height, Some(work)))) + })? + .unwrap(); + match ( + tip0_ancestor_work.cmp(&tip1_ancestor_work), + tip0_ancestor_height.cmp(&tip1_ancestor_height), + ) { + (Ordering::Less | Ordering::Equal, Ordering::Equal) + | (_, Ordering::Greater) => { + // tip1 is not better + Ok(Some(tip0)) + } + (Ordering::Greater, Ordering::Equal) + | (_, Ordering::Less) => { + // tip1 is better + Ok(Some(tip1)) } } } diff --git a/lib/miner.rs b/lib/miner.rs index 0c8b521..4ebacaa 100644 --- a/lib/miner.rs +++ b/lib/miner.rs @@ -87,21 +87,28 @@ impl Miner { pub async fn confirm_bmm( &mut self, - ) -> Result, Error> { + ) -> Result, Error> { const VERIFY_BMM_POLL_INTERVAL: Duration = Duration::from_secs(15); if let Some((header, body)) = self.block.clone() { let block_hash = header.hash().into(); tracing::trace!(%block_hash, "verifying bmm..."); - self.drivechain - .verify_bmm( - &header.prev_main_hash, - &block_hash, + let (bmm_verified, main_hash) = self + .drivechain + .verify_bmm_next_block( + header.prev_main_hash, + block_hash, VERIFY_BMM_POLL_INTERVAL, ) .await?; - tracing::trace!(%block_hash, "verified bmm"); - self.block = None; - return Ok(Some((header, body))); + if bmm_verified { + tracing::trace!(%block_hash, "verified bmm"); + self.block = None; + return Ok(Some((main_hash, header, body))); + } else { + tracing::trace!(%block_hash, "bmm verification failed"); + self.block = None; + return Ok(None); + } } Ok(None) } diff --git a/lib/net/mod.rs b/lib/net/mod.rs index f99a7cc..d63ac63 100644 --- a/lib/net/mod.rs +++ b/lib/net/mod.rs @@ -24,6 +24,7 @@ use peer::{ }; pub use peer::{ ConnectionError as PeerConnectionError, Info as PeerConnectionInfo, + InternalMessage as PeerConnectionMessage, PeerStateId, Request as PeerRequest, Response as PeerResponse, }; @@ -45,6 +46,8 @@ pub enum Error { Heed(#[from] heed::Error), #[error("quinn error")] Io(#[from] std::io::Error), + #[error("peer connection not found for {0}")] + MissingPeerConnection(SocketAddr), #[error("peer connection")] PeerConnection(#[from] PeerConnectionError), #[error("quinn rustls error")] @@ -252,6 +255,26 @@ impl Net { Ok(()) } + // Push an internal message to the specified peer + pub fn push_internal_message( + &self, + message: PeerConnectionMessage, + addr: SocketAddr, + ) -> Result<(), Error> { + let active_peers_read = self.active_peers.read(); + let Some(peer_connection_handle) = active_peers_read.get(&addr) else { + return Err(Error::MissingPeerConnection(addr)); + }; + if let Err(send_err) = peer_connection_handle + .internal_message_tx + .unbounded_send(message) + { + let message = send_err.into_inner(); + tracing::error!("Failed to push internal message to peer connection {addr}: {message:?}") + } + Ok(()) + } + // Push a request to the specified peers pub fn push_request( &self, @@ -265,8 +288,8 @@ impl Net { continue; }; if let Err(_send_err) = peer_connection_handle - .forward_request_tx - .unbounded_send(request.clone()) + .internal_message_tx + .unbounded_send(request.clone().into()) { tracing::warn!( "Failed to push request to peer at {addr}: {request:?}" @@ -286,11 +309,12 @@ impl Net { .iter() .filter(|(addr, _)| !exclude.contains(addr)) .for_each(|(addr, peer_connection_handle)| { + let request = PeerRequest::PushTransaction { + transaction: tx.clone(), + }; if let Err(_send_err) = peer_connection_handle - .forward_request_tx - .unbounded_send(PeerRequest::PushTransaction { - transaction: tx.clone(), - }) + .internal_message_tx + .unbounded_send(request.into()) { let txid = tx.transaction.txid(); tracing::warn!("Failed to push tx {txid} to peer at {addr}") diff --git a/lib/net/peer.rs b/lib/net/peer.rs index 4e086ff..880203f 100644 --- a/lib/net/peer.rs +++ b/lib/net/peer.rs @@ -1,6 +1,11 @@ -use std::{collections::HashSet, net::SocketAddr}; +use std::{ + cmp::Ordering, + collections::{HashMap, HashSet}, + fmt::Debug, + net::SocketAddr, +}; -use bip300301::bitcoin::{self, hashes::Hash as _}; +use bip300301::bitcoin::{self, hashes::Hash as _, Work}; use borsh::BorshSerialize; use fallible_iterator::FallibleIterator; use futures::{channel::mpsc, stream, StreamExt, TryFutureExt, TryStreamExt}; @@ -17,18 +22,26 @@ use tokio_stream::wrappers::IntervalStream; use crate::{ archive::{self, Archive}, state::{self, State}, - types::{hash, AuthorizedTransaction, BlockHash, Body, Hash, Header, Txid}, + types::{ + hash, AuthorizedTransaction, BlockHash, BmmResult, Body, Hash, Header, + Tip, Txid, + }, }; #[derive(Debug, Error)] pub enum BanReason { - #[error("BMM verification failed for block {0}")] - BmmVerificationFailed(BlockHash), - #[error("Incorrect total work for block {block_hash}: {total_work:?}")] - IncorrectTotalWork { - block_hash: BlockHash, - total_work: Option, - }, + #[error( + "BMM verification failed for block hash {} at {}", + .0.block_hash, + .0.main_block_hash + )] + BmmVerificationFailed(Tip), + #[error( + "Incorrect total work for block {} at {}: {total_work:?}", + tip.block_hash, + tip.main_block_hash + )] + IncorrectTotalWork { tip: Tip, total_work: Option }, } #[must_use] @@ -46,14 +59,16 @@ pub enum ConnectionError { HeartbeatTimeout, #[error("heed error")] Heed(#[from] heed::Error), + #[error("missing peer state for id {0}")] + MissingPeerState(PeerStateId), #[error("peer should be banned; {0}")] PeerBan(#[from] BanReason), #[error("read to end error")] ReadToEnd(#[from] quinn::ReadToEndError), #[error("send datagram error")] SendDatagram(#[from] quinn::SendDatagramError), - #[error("send forward request error")] - SendForwardRequest, + #[error("send internal message error")] + SendInternalMessage, #[error("send info error")] SendInfo, #[error("state error")] @@ -68,16 +83,13 @@ impl From> for ConnectionError { } } -impl From> for ConnectionError { - fn from(_: mpsc::TrySendError) -> Self { - Self::SendForwardRequest +impl From> for ConnectionError { + fn from(_: mpsc::TrySendError) -> Self { + Self::SendInternalMessage } } -fn borsh_serialize_work( - work: &bitcoin::Work, - writer: &mut W, -) -> borsh::io::Result<()> +fn borsh_serialize_work(work: &Work, writer: &mut W) -> borsh::io::Result<()> where W: borsh::io::Write, { @@ -85,25 +97,42 @@ where } fn borsh_serialize_option_work( - work: &Option, + work: &Option, writer: &mut W, ) -> borsh::io::Result<()> where W: borsh::io::Write, { #[derive(BorshSerialize)] - struct BorshWrapper( - #[borsh(serialize_with = "borsh_serialize_work")] bitcoin::Work, - ); + struct BorshWrapper(#[borsh(serialize_with = "borsh_serialize_work")] Work); borsh::BorshSerialize::serialize(&work.map(BorshWrapper), writer) } -#[derive(BorshSerialize, Clone, Debug, Default, Deserialize, Serialize)] +#[derive( + BorshSerialize, Clone, Copy, Debug, Default, Deserialize, Serialize, +)] pub struct PeerState { block_height: u32, - tip: BlockHash, + tip: Tip, #[borsh(serialize_with = "borsh_serialize_option_work")] - total_work: Option, + total_work: Option, +} + +/// Unique identifier for a peer state +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] +#[repr(transparent)] +pub struct PeerStateId(Hash); + +impl From<&PeerState> for PeerStateId { + fn from(peer_state: &PeerState) -> Self { + Self(hash(peer_state)) + } +} + +impl std::fmt::Display for PeerStateId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } } #[derive(Debug, Serialize, Deserialize)] @@ -129,6 +158,19 @@ pub enum Request { Heartbeat(PeerState), GetBlock { block_hash: BlockHash, + /// Mainchain descendant tip that we are requesting the block to reach. + /// Only relevant for the requester, so serialization is skipped + #[borsh(skip)] + #[serde(skip)] + descendant_tip: Option, + /// Ancestor block. If no bodies are missing between `descendant_tip` + /// and `ancestor`, then `descendant_tip` is ready to apply. + /// Only relevant for the requester, so serialization is skipped + ancestor: Option, + /// Only relevant for the requester, so serialization is skipped + #[borsh(skip)] + #[serde(skip)] + peer_state_id: Option, }, /// Request headers up to [`end`] GetHeaders { @@ -138,26 +180,38 @@ pub enum Request { end: BlockHash, /// Height is only relevant for the requester, /// so serialization is skipped + #[borsh(skip)] #[serde(skip)] height: Option, + /// Only relevant for the requester, so serialization is skipped + #[borsh(skip)] + #[serde(skip)] + peer_state_id: Option, }, PushTransaction { transaction: AuthorizedTransaction, }, } +/// Info to send to the net task / node #[must_use] #[derive(Debug)] pub enum Info { Error(ConnectionError), - /// Need BMM verification for the specified block - NeedBmmVerification(BlockHash), - /// Need Mainchain ancestors for the specified block hash - NeedMainchainAncestors(BlockHash), + /// Need BMM verification for the specified tip + NeedBmmVerification { + main_hash: bitcoin::BlockHash, + peer_state_id: PeerStateId, + }, + /// Need Mainchain ancestors for the specified tip + NeedMainchainAncestors { + main_hash: bitcoin::BlockHash, + peer_state_id: PeerStateId, + }, /// New tip ready (body and header exist in archive, BMM verified) - NewTipReady(BlockHash), + NewTipReady(Tip), NewTransaction(AuthorizedTransaction), - Response(Response, Request), + Response(Box<(Response, Request)>), } impl From for Info { @@ -178,6 +232,31 @@ where } } +/// Message received from the connection task / net task / node +#[derive(Clone, Debug)] +pub enum InternalMessage { + /// Indicates if a BMM verification request completed. + /// Does not indicate that BMM was verified successfully. + BmmVerification { + res: Result<(), bip300301::BlockNotFoundError>, + peer_state_id: PeerStateId, + }, + /// Forward a request + ForwardRequest(Request), + /// Indicates that mainchain ancestors are now available + MainchainAncestors(PeerStateId), + /// Indicates that the requested headers are now available + Headers(PeerStateId), + /// Indicates that all requested missing block bodies are now available + BodiesAvailable(PeerStateId), +} + +impl From for InternalMessage { + fn from(request: Request) -> Self { + Self::ForwardRequest(request) + } +} + #[derive(Clone)] pub struct Connection(pub(super) quinn::Connection); @@ -265,11 +344,10 @@ struct ConnectionTask { connection: Connection, ctxt: ConnectionContext, info_tx: mpsc::UnboundedSender, - peer_state: Option, - /// Push a request to forward to the peer - forward_request_tx: mpsc::UnboundedSender, - /// Receive requests to forward to the peer - forward_request_rx: mpsc::UnboundedReceiver, + /// Push an internal message from connection task / net task / node + internal_message_tx: mpsc::UnboundedSender, + /// Receive an internal message from connection task / net task / node + internal_message_rx: mpsc::UnboundedReceiver, } impl ConnectionTask { @@ -303,125 +381,491 @@ impl ConnectionTask { .map_err(ConnectionError::from) } - /// If a new tip is announced with greater height than the current tip: - /// * If the header does not exist, request it - /// * Verify height of the new tip. - /// * If the previous mainchain header does not exist, request it - /// * Verify PoW - /// * Verify BMM - /// * If ancestor bodies do not exist, request them - /// * Attempt to apply the new tip - async fn handle_heartbeat( + /// * Request any missing mainchain headers + /// * Check claimed work + /// * Request BMM commitments if necessary + /// * Check that BMM commitment matches peer tip + /// * Check if peer tip is better, requesting headers if necessary + /// * If peer tip is better: + /// * request headers if missing + /// * verify BMM + /// * request missing bodies + /// * notify net task / node that new tip is ready + async fn handle_peer_state( ctxt: &ConnectionContext, info_tx: &mpsc::UnboundedSender, - forward_request_tx: &mpsc::UnboundedSender, + internal_message_tx: &mpsc::UnboundedSender, peer_state: &PeerState, ) -> Result<(), ConnectionError> { + if peer_state.tip.main_block_hash == bitcoin::BlockHash::all_zeros() { + // Nothing to do in this case + return Ok(()); + } + let Some(peer_total_work) = peer_state.total_work else { + // Peer should send total work if they send a main tip + let ban_reason = BanReason::IncorrectTotalWork { + tip: peer_state.tip, + total_work: None, + }; + return Err(ConnectionError::PeerBan(ban_reason)); + }; let (tip, tip_height, total_work) = { let rotxn = ctxt.env.read_txn()?; let tip = ctxt.state.get_tip(&rotxn)?; let tip_height = ctxt.state.get_height(&rotxn)?; - let total_work = match ctxt.archive.try_get_header(&rotxn, tip)? { - None => None, - Some(header) - if header.prev_main_hash - == bitcoin::BlockHash::all_zeros() => - { - None - } - Some(header) => Some( - ctxt.archive - .get_total_work(&rotxn, header.prev_main_hash)?, - ), + let (bmm_verification, total_work) = if tip == BlockHash::default() + { + (bitcoin::BlockHash::all_zeros(), None) + } else { + let bmm_verification = + ctxt.archive.get_best_main_verification(&rotxn, tip)?; + let work = + ctxt.archive.get_total_work(&rotxn, bmm_verification)?; + (bmm_verification, Some(work)) + }; + let tip = Tip { + block_hash: tip, + main_block_hash: bmm_verification, }; (tip, tip_height, total_work) }; - let peer_height = peer_state.block_height; - if peer_height > tip_height - || (peer_height == tip_height && peer_state.total_work > total_work) + // Check claimed work and request mainchain headers and BMM commitments + // if necessary { - let header = { + let rotxn = ctxt.env.read_txn()?; + match ctxt + .archive + .try_get_main_header(&rotxn, peer_state.tip.main_block_hash)? + { + None => { + let info = Info::NeedMainchainAncestors { + main_hash: peer_state.tip.main_block_hash, + peer_state_id: peer_state.into(), + }; + info_tx.unbounded_send(info)?; + return Ok(()); + } + Some(_main_header) => { + let computed_total_work = ctxt.archive.get_total_work( + &rotxn, + peer_state.tip.main_block_hash, + )?; + if peer_total_work != computed_total_work { + let ban_reason = BanReason::IncorrectTotalWork { + tip: peer_state.tip, + total_work: Some(peer_total_work), + }; + return Err(ConnectionError::PeerBan(ban_reason)); + } + if peer_state.tip.block_hash == BlockHash::default() { + // Nothing else to do in this case + return Ok(()); + } + let Some(bmm_commitment) = + ctxt.archive.try_get_main_bmm_commitment( + &rotxn, + peer_state.tip.main_block_hash, + )? + else { + let info = Info::NeedBmmVerification { + main_hash: peer_state.tip.main_block_hash, + peer_state_id: peer_state.into(), + }; + info_tx.unbounded_send(info)?; + return Ok(()); + }; + if bmm_commitment != Some(peer_state.tip.block_hash) { + let ban_reason = + BanReason::BmmVerificationFailed(peer_state.tip); + return Err(ConnectionError::PeerBan(ban_reason)); + } + } + } + } + // Check if the peer tip is better, requesting headers if necessary + match ( + total_work.cmp(&Some(peer_total_work)), + tip_height.cmp(&peer_state.block_height), + ) { + (Ordering::Less | Ordering::Equal, Ordering::Less) => { + // No tip ancestor can have greater height, + // so peer tip is better. + // Request headers if necessary let rotxn = ctxt.env.read_txn()?; - ctxt.archive.try_get_header(&rotxn, peer_state.tip)? - }; - let Some(header) = header else { - // Request headers - let request = Request::GetHeaders { - // TODO: provide alternative start points - start: HashSet::new(), - end: peer_state.tip, - height: Some(peer_state.block_height), - }; - forward_request_tx.unbounded_send(request)?; + if ctxt + .archive + .try_get_header(&rotxn, peer_state.tip.block_hash)? + .is_none() + { + let start = HashSet::from_iter( + ctxt.archive + .get_block_locator(&rotxn, tip.block_hash)?, + ); + let request = Request::GetHeaders { + start, + end: peer_state.tip.block_hash, + height: Some(peer_state.block_height), + peer_state_id: Some(peer_state.into()), + }; + internal_message_tx.unbounded_send(request.into())?; + return Ok(()); + } + } + (Ordering::Equal | Ordering::Greater, Ordering::Greater) => { + // No peer tip ancestor can have greater height, + // so tip is better. + // Nothing to do in this case return Ok(()); - }; - // Check mainchain headers - let prev_main_header = { + } + (Ordering::Less, Ordering::Equal) => { + // Within the same mainchain lineage, prefer lower work + // Otherwise, prefer tip with greater work let rotxn = ctxt.env.read_txn()?; - ctxt.archive - .try_get_main_header(&rotxn, header.prev_main_hash)? - }; - let Some(_prev_main_header) = prev_main_header else { - let info = Info::NeedMainchainAncestors(header.hash()); - info_tx.unbounded_send(info)?; - return Ok(()); - }; - // Check PoW - let prev_main_total_work = { + if ctxt.archive.shared_mainchain_lineage( + &rotxn, + tip.main_block_hash, + peer_state.tip.main_block_hash, + )? { + // Nothing to do in this case + return Ok(()); + } + // Request headers if necessary + if ctxt + .archive + .try_get_header(&rotxn, peer_state.tip.block_hash)? + .is_none() + { + let start = HashSet::from_iter( + ctxt.archive + .get_block_locator(&rotxn, tip.block_hash)?, + ); + let request = Request::GetHeaders { + start, + end: peer_state.tip.block_hash, + height: Some(peer_state.block_height), + peer_state_id: Some(peer_state.into()), + }; + internal_message_tx.unbounded_send(request.into())?; + return Ok(()); + } + } + (Ordering::Greater, Ordering::Equal) => { + // Within the same mainchain lineage, prefer lower work + // Otherwise, prefer tip with greater work let rotxn = ctxt.env.read_txn()?; - ctxt.archive.get_total_work(&rotxn, header.prev_main_hash)? - }; - if Some(prev_main_total_work) != peer_state.total_work { - let ban_reason = BanReason::IncorrectTotalWork { - block_hash: peer_state.tip, - total_work: peer_state.total_work, - }; - return Err(ConnectionError::PeerBan(ban_reason)); + if !ctxt.archive.shared_mainchain_lineage( + &rotxn, + tip.main_block_hash, + peer_state.tip.main_block_hash, + )? { + // Nothing to do in this case + return Ok(()); + } + // Request headers if necessary + if ctxt + .archive + .try_get_header(&rotxn, peer_state.tip.block_hash)? + .is_none() + { + let start = HashSet::from_iter( + ctxt.archive + .get_block_locator(&rotxn, tip.block_hash)?, + ); + let request = Request::GetHeaders { + start, + end: peer_state.tip.block_hash, + height: Some(peer_state.block_height), + peer_state_id: Some(peer_state.into()), + }; + internal_message_tx.unbounded_send(request.into())?; + return Ok(()); + } } - // Verify BMM - { + (Ordering::Less, Ordering::Greater) => { + // Need to check if tip ancestor before common + // mainchain ancestor had greater or equal height let rotxn = ctxt.env.read_txn()?; - match ctxt + let main_ancestor = ctxt.archive.last_common_main_ancestor( + &rotxn, + tip.main_block_hash, + peer_state.tip.main_block_hash, + )?; + let tip_ancestor_height = ctxt .archive - .try_get_bmm_verification(&rotxn, peer_state.tip)? + .ancestors(&rotxn, tip.block_hash) + .find_map(|tip_ancestor| { + if tip_ancestor == BlockHash::default() { + return Ok(Some(0)); + } + let header = + ctxt.archive.get_header(&rotxn, tip_ancestor)?; + if !ctxt.archive.is_main_descendant( + &rotxn, + header.prev_main_hash, + main_ancestor, + )? { + return Ok(None); + } + if header.prev_main_hash == main_ancestor { + return Ok(None); + } + ctxt.archive.get_height(&rotxn, tip_ancestor).map(Some) + })? + .unwrap(); + if tip_ancestor_height >= peer_state.block_height { + // Nothing to do in this case + return Ok(()); + } + // Request headers if necessary + if ctxt + .archive + .try_get_header(&rotxn, peer_state.tip.block_hash)? + .is_none() { - Some(true) => (), - Some(false) => { - let ban_reason = - BanReason::BmmVerificationFailed(peer_state.tip); - return Err(ConnectionError::PeerBan(ban_reason)); - } - None => { - let info = Info::NeedBmmVerification(peer_state.tip); - info_tx.unbounded_send(info)?; - return Ok(()); - } + let start = HashSet::from_iter( + ctxt.archive + .get_block_locator(&rotxn, tip.block_hash)?, + ); + let request = Request::GetHeaders { + start, + end: peer_state.tip.block_hash, + height: Some(peer_state.block_height), + peer_state_id: Some(peer_state.into()), + }; + internal_message_tx.unbounded_send(request.into())?; + return Ok(()); } - }; - let missing_bodies: Vec = { + } + (Ordering::Greater, Ordering::Less) => { + // Need to check if peer's tip ancestor before common + // mainchain ancestor had greater or equal height let rotxn = ctxt.env.read_txn()?; - let common_ancestor = ctxt.archive.last_common_ancestor( + if ctxt + .archive + .try_get_header(&rotxn, peer_state.tip.block_hash)? + .is_none() + { + let start = HashSet::from_iter( + ctxt.archive + .get_block_locator(&rotxn, tip.block_hash)?, + ); + let request = Request::GetHeaders { + start, + end: peer_state.tip.block_hash, + height: Some(peer_state.block_height), + peer_state_id: Some(peer_state.into()), + }; + internal_message_tx.unbounded_send(request.into())?; + return Ok(()); + } + let main_ancestor = ctxt.archive.last_common_main_ancestor( &rotxn, - tip, - peer_state.tip, + tip.main_block_hash, + peer_state.tip.main_block_hash, )?; - ctxt.archive.get_missing_bodies( + let peer_tip_ancestor_height = ctxt + .archive + .ancestors(&rotxn, peer_state.tip.block_hash) + .find_map(|peer_tip_ancestor| { + if peer_tip_ancestor == BlockHash::default() { + return Ok(Some(0)); + } + let header = ctxt + .archive + .get_header(&rotxn, peer_tip_ancestor)?; + if !ctxt.archive.is_main_descendant( + &rotxn, + header.prev_main_hash, + main_ancestor, + )? { + return Ok(None); + } + if header.prev_main_hash == main_ancestor { + return Ok(None); + } + ctxt.archive + .get_height(&rotxn, peer_tip_ancestor) + .map(Some) + })? + .unwrap(); + if peer_tip_ancestor_height < tip_height { + // Nothing to do in this case + return Ok(()); + } + } + (Ordering::Equal, Ordering::Equal) => { + // If the peer tip is the same as the tip, nothing to do + if peer_state.tip.block_hash == tip.block_hash { + return Ok(()); + } + // Need to compare tip ancestor and peer's tip ancestor + // before common mainchain ancestor + let rotxn = ctxt.env.read_txn()?; + if ctxt + .archive + .try_get_header(&rotxn, peer_state.tip.block_hash)? + .is_none() + { + let start = HashSet::from_iter( + ctxt.archive + .get_block_locator(&rotxn, tip.block_hash)?, + ); + let request = Request::GetHeaders { + start, + end: peer_state.tip.block_hash, + height: Some(peer_state.block_height), + peer_state_id: Some(peer_state.into()), + }; + internal_message_tx.unbounded_send(request.into())?; + return Ok(()); + } + let main_ancestor = ctxt.archive.last_common_main_ancestor( &rotxn, - peer_state.tip, - common_ancestor, - )? - }; - if missing_bodies.is_empty() { - let info = Info::NewTipReady(peer_state.tip); - info_tx.unbounded_send(info)?; - } else { - // Request missing bodies - missing_bodies.into_iter().try_for_each(|block_hash| { - let request = Request::GetBlock { block_hash }; - forward_request_tx.unbounded_send(request) - })?; + tip.main_block_hash, + peer_state.tip.main_block_hash, + )?; + let main_ancestor_height = + ctxt.archive.get_main_height(&rotxn, main_ancestor)?; + let (tip_ancestor_height, tip_ancestor_work) = ctxt + .archive + .ancestors(&rotxn, tip.block_hash) + .find_map(|tip_ancestor| { + if tip_ancestor == BlockHash::default() { + return Ok(Some((0, None))); + } + let header = + ctxt.archive.get_header(&rotxn, tip_ancestor)?; + if !ctxt.archive.is_main_descendant( + &rotxn, + header.prev_main_hash, + main_ancestor, + )? { + return Ok(None); + } + if header.prev_main_hash == main_ancestor { + return Ok(None); + } + let height = + ctxt.archive.get_height(&rotxn, tip_ancestor)?; + // Find mainchain block hash to get total work + let main_block = { + let prev_height = ctxt.archive.get_main_height( + &rotxn, + header.prev_main_hash, + )?; + let height = prev_height + 1; + ctxt.archive.get_nth_main_ancestor( + &rotxn, + main_ancestor, + main_ancestor_height - height, + )? + }; + let work = + ctxt.archive.get_total_work(&rotxn, main_block)?; + Ok(Some((height, Some(work)))) + })? + .unwrap(); + let (peer_tip_ancestor_height, peer_tip_ancestor_work) = ctxt + .archive + .ancestors(&rotxn, peer_state.tip.block_hash) + .find_map(|peer_tip_ancestor| { + if peer_tip_ancestor == BlockHash::default() { + return Ok(Some((0, None))); + } + let header = ctxt + .archive + .get_header(&rotxn, peer_tip_ancestor)?; + if !ctxt.archive.is_main_descendant( + &rotxn, + header.prev_main_hash, + main_ancestor, + )? { + return Ok(None); + } + if header.prev_main_hash == main_ancestor { + return Ok(None); + } + let height = ctxt + .archive + .get_height(&rotxn, peer_tip_ancestor)?; + // Find mainchain block hash to get total work + let main_block = { + let prev_height = ctxt.archive.get_main_height( + &rotxn, + header.prev_main_hash, + )?; + let height = prev_height + 1; + ctxt.archive.get_nth_main_ancestor( + &rotxn, + main_ancestor, + main_ancestor_height - height, + )? + }; + let work = + ctxt.archive.get_total_work(&rotxn, main_block)?; + Ok(Some((height, Some(work)))) + })? + .unwrap(); + match ( + tip_ancestor_work.cmp(&peer_tip_ancestor_work), + tip_ancestor_height.cmp(&peer_tip_ancestor_height), + ) { + (Ordering::Less | Ordering::Equal, Ordering::Equal) + | (_, Ordering::Greater) => { + // Peer tip is not better, nothing to do + return Ok(()); + } + (Ordering::Greater, Ordering::Equal) + | (_, Ordering::Less) => { + // Peer tip is better + } + } } } + // Check BMM now that headers are available + { + let rotxn = ctxt.env.read_txn()?; + let Some(BmmResult::Verified) = ctxt.archive.try_get_bmm_result( + &rotxn, + peer_state.tip.block_hash, + peer_state.tip.main_block_hash, + )? + else { + let ban_reason = + BanReason::BmmVerificationFailed(peer_state.tip); + return Err(ConnectionError::PeerBan(ban_reason)); + }; + } + // Request missing bodies, or notify that a new tip is ready + let (common_ancestor, missing_bodies): (BlockHash, Vec) = { + let rotxn = ctxt.env.read_txn()?; + let common_ancestor = ctxt.archive.last_common_ancestor( + &rotxn, + tip.block_hash, + peer_state.tip.block_hash, + )?; + let missing_bodies = ctxt.archive.get_missing_bodies( + &rotxn, + peer_state.tip.block_hash, + common_ancestor, + )?; + (common_ancestor, missing_bodies) + }; + if missing_bodies.is_empty() { + let info = Info::NewTipReady(peer_state.tip); + info_tx.unbounded_send(info)?; + } else { + // Request missing bodies + missing_bodies.into_iter().try_for_each(|block_hash| { + let request = Request::GetBlock { + block_hash, + descendant_tip: Some(peer_state.tip), + peer_state_id: Some(peer_state.into()), + ancestor: Some(common_ancestor), + }; + internal_message_tx.unbounded_send(request.into()) + })?; + } Ok(()) } @@ -505,30 +949,40 @@ impl ConnectionTask { async fn handle_request( ctxt: &ConnectionContext, info_tx: &mpsc::UnboundedSender, - forward_request_tx: &mpsc::UnboundedSender, - peer_state: &mut Option, + internal_message_tx: &mpsc::UnboundedSender, + peer_state: &mut Option, + // Map associating peer state hashes to peer state + peer_states: &mut HashMap, response_tx: SendStream, request: Request, ) -> Result<(), ConnectionError> { match request { Request::Heartbeat(new_peer_state) => { - let () = Self::handle_heartbeat( - ctxt, - info_tx, - forward_request_tx, - &new_peer_state, - ) - .await?; - *peer_state = Some(new_peer_state); + let new_peer_state_id = (&new_peer_state).into(); + peer_states.insert(new_peer_state_id, new_peer_state); + if *peer_state != Some(new_peer_state_id) { + let () = Self::handle_peer_state( + ctxt, + info_tx, + internal_message_tx, + &new_peer_state, + ) + .await?; + *peer_state = Some(new_peer_state_id); + } Ok(()) } - Request::GetBlock { block_hash } => { - Self::handle_get_block(ctxt, response_tx, block_hash).await - } + Request::GetBlock { + block_hash, + descendant_tip: _, + ancestor: _, + peer_state_id: _, + } => Self::handle_get_block(ctxt, response_tx, block_hash).await, Request::GetHeaders { start, end, height: _, + peer_state_id: _, } => Self::handle_get_headers(ctxt, response_tx, start, end).await, Request::PushTransaction { transaction } => { Self::handle_push_tx(ctxt, info_tx, response_tx, transaction) @@ -537,17 +991,18 @@ impl ConnectionTask { } } - async fn run(mut self) -> Result<(), ConnectionError> { + async fn run(self) -> Result<(), ConnectionError> { enum MailboxItem { - ForwardRequest(Request), + /// Internal messages from the connection task / net task / node + InternalMessage(InternalMessage), /// Signals that a heartbeat message should be sent to the peer Heartbeat, Request((Request, SendStream)), Response(Result, Request), } - let forward_request_stream = self - .forward_request_rx - .map(|request| Ok(MailboxItem::ForwardRequest(request))); + let internal_message_stream = self + .internal_message_rx + .map(|msg| Ok(MailboxItem::InternalMessage(msg))); let heartbeat_stream = IntervalStream::new(interval(Connection::HEARTBEAT_SEND_INTERVAL)) .map(|_| Ok(MailboxItem::Heartbeat)); @@ -572,18 +1027,24 @@ impl ConnectionTask { let response_stream = response_rx.map(|(resp, req)| Ok(MailboxItem::Response(resp, req))); let mut mailbox_stream = stream::select_all([ - forward_request_stream.boxed(), + internal_message_stream.boxed(), heartbeat_stream.boxed(), request_stream.boxed(), response_stream.boxed(), ]); // spawn child tasks on a JoinSet so that they are dropped alongside this task let mut task_set: JoinSet<()> = JoinSet::new(); + // current peer state + let mut peer_state = Option::::None; + // known peer states + let mut peer_states = HashMap::::new(); // Do not repeat requests let mut pending_request_hashes = HashSet::::new(); while let Some(mailbox_item) = mailbox_stream.try_next().await? { match mailbox_item { - MailboxItem::ForwardRequest(request) => { + MailboxItem::InternalMessage( + InternalMessage::ForwardRequest(request), + ) => { let request_hash = hash(&request); if !pending_request_hashes.insert(request_hash) { continue; @@ -601,29 +1062,68 @@ impl ConnectionTask { } }); } + MailboxItem::InternalMessage( + InternalMessage::BmmVerification { res, peer_state_id }, + ) => { + if let Err(block_not_found) = res { + tracing::warn!("{block_not_found}"); + continue; + } + let Some(peer_state) = peer_states.get(&peer_state_id) + else { + return Err(ConnectionError::MissingPeerState( + peer_state_id, + )); + }; + let () = Self::handle_peer_state( + &self.ctxt, + &self.info_tx, + &self.internal_message_tx, + peer_state, + ) + .await?; + } + MailboxItem::InternalMessage( + InternalMessage::MainchainAncestors(peer_state_id) + | InternalMessage::Headers(peer_state_id) + | InternalMessage::BodiesAvailable(peer_state_id), + ) => { + let Some(peer_state) = peer_states.get(&peer_state_id) + else { + return Err(ConnectionError::MissingPeerState( + peer_state_id, + )); + }; + let () = Self::handle_peer_state( + &self.ctxt, + &self.info_tx, + &self.internal_message_tx, + peer_state, + ) + .await?; + } MailboxItem::Heartbeat => { let (tip, tip_height, total_work) = { let rotxn = self.ctxt.env.read_txn()?; let tip = self.ctxt.state.get_tip(&rotxn)?; let tip_height = self.ctxt.state.get_height(&rotxn)?; - let total_work = match self - .ctxt - .archive - .try_get_header(&rotxn, tip)? - { - None => None, - Some(header) - if header.prev_main_hash - == bitcoin::BlockHash::all_zeros() => - { - None - } - Some(header) => { - Some(self.ctxt.archive.get_total_work( - &rotxn, - header.prev_main_hash, - )?) - } + let (bmm_verification, total_work) = + if tip == BlockHash::default() { + (bitcoin::BlockHash::all_zeros(), None) + } else { + let bmm_verification = self + .ctxt + .archive + .get_best_main_verification(&rotxn, tip)?; + let work = self + .ctxt + .archive + .get_total_work(&rotxn, bmm_verification)?; + (bmm_verification, Some(work)) + }; + let tip = Tip { + block_hash: tip, + main_block_hash: bmm_verification, }; (tip, tip_height, total_work) }; @@ -649,8 +1149,9 @@ impl ConnectionTask { let () = Self::handle_request( &self.ctxt, &self.info_tx, - &self.forward_request_tx, - &mut self.peer_state, + &self.internal_message_tx, + &mut peer_state, + &mut peer_states, response_tx, request, ) @@ -659,8 +1160,9 @@ impl ConnectionTask { MailboxItem::Response(resp, req) => { let request_hash = hash(&req); pending_request_hashes.remove(&request_hash); - let info = - resp.map(|resp| Info::Response(resp, req)).into(); + let info = resp + .map(|resp| Info::Response(Box::new((resp, req)))) + .into(); if self.info_tx.unbounded_send(info).is_err() { tracing::error!("Failed to send response info") }; @@ -674,7 +1176,8 @@ impl ConnectionTask { /// Connection killed on drop pub struct ConnectionHandle { task: JoinHandle<()>, - pub forward_request_tx: mpsc::UnboundedSender, + /// Push messages from connection task / net task / node + pub internal_message_tx: mpsc::UnboundedSender, } impl Drop for ConnectionHandle { @@ -688,19 +1191,18 @@ pub fn handle( ctxt: ConnectionContext, connection: Connection, ) -> (ConnectionHandle, mpsc::UnboundedReceiver) { - let (forward_request_tx, forward_request_rx) = mpsc::unbounded(); + let (internal_message_tx, internal_message_rx) = mpsc::unbounded(); let (info_tx, info_rx) = mpsc::unbounded(); let connection_task = { let info_tx = info_tx.clone(); - let forward_request_tx = forward_request_tx.clone(); + let internal_message_tx = internal_message_tx.clone(); move || async move { let connection_task = ConnectionTask { connection, ctxt, info_tx, - peer_state: None, - forward_request_tx, - forward_request_rx, + internal_message_tx, + internal_message_rx, }; connection_task.run().await } @@ -716,7 +1218,7 @@ pub fn handle( }); let connection_handle = ConnectionHandle { task, - forward_request_tx, + internal_message_tx, }; (connection_handle, info_rx) } @@ -726,20 +1228,19 @@ pub fn connect( addr: SocketAddr, ctxt: ConnectionContext, ) -> (ConnectionHandle, mpsc::UnboundedReceiver) { - let (forward_request_tx, forward_request_rx) = mpsc::unbounded(); + let (internal_message_tx, internal_message_rx) = mpsc::unbounded(); let (info_tx, info_rx) = mpsc::unbounded(); let connection_task = { let info_tx = info_tx.clone(); - let forward_request_tx = forward_request_tx.clone(); + let internal_message_tx = internal_message_tx.clone(); move || async move { let connection = Connection::new(&endpoint, addr).await?; let connection_task = ConnectionTask { connection, ctxt, info_tx, - peer_state: None, - forward_request_tx, - forward_request_rx, + internal_message_tx, + internal_message_rx, }; connection_task.run().await } @@ -755,7 +1256,7 @@ pub fn connect( }); let connection_handle = ConnectionHandle { task, - forward_request_tx, + internal_message_tx, }; (connection_handle, info_rx) } diff --git a/lib/node/mainchain_task.rs b/lib/node/mainchain_task.rs index 71b2c98..a95553c 100644 --- a/lib/node/mainchain_task.rs +++ b/lib/node/mainchain_task.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use bip300301::{ bitcoin::{self, hashes::Hash as _}, + client::{BlockCommitment, SidechainId}, Drivechain, Header as BitcoinHeader, }; use fallible_iterator::FallibleIterator; @@ -18,11 +19,11 @@ use thiserror::Error; use tokio::{ spawn, task::{self, JoinHandle}, - time::Duration, }; use crate::{ archive::{self, Archive}, + node::THIS_SIDECHAIN, types::BlockHash, }; @@ -30,14 +31,33 @@ use crate::{ #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] pub(super) enum Request { /// Request missing mainchain ancestor headers - AncestorHeaders(BlockHash), + AncestorHeaders(bitcoin::BlockHash), /// Request recursive BMM verification - VerifyBmm(BlockHash), + VerifyBmm(bitcoin::BlockHash), } -/// Response indicating that a request has been fulfilled successfully +/// Response indicating that a request has been fulfilled #[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub(super) struct Response(pub Request); +pub(super) enum Response { + AncestorHeaders(bitcoin::BlockHash), + VerifyBmm( + bitcoin::BlockHash, + Result<(), bip300301::BlockNotFoundError>, + ), +} + +impl From for Request { + fn from(resp: Response) -> Self { + match resp { + Response::AncestorHeaders(block_hash) => { + Request::AncestorHeaders(block_hash) + } + Response::VerifyBmm(block_hash, _) => { + Request::VerifyBmm(block_hash) + } + } + } +} #[derive(Debug, Error)] enum Error { @@ -72,9 +92,20 @@ impl MainchainTask { drivechain: &bip300301::Drivechain, mut block_hash: bitcoin::BlockHash, ) -> Result<(), Error> { + if block_hash == bitcoin::BlockHash::all_zeros() { + return Ok(()); + } else { + let rotxn = env.read_txn()?; + if archive.try_get_main_header(&rotxn, block_hash)?.is_some() { + return Ok(()); + } + } tracing::debug!("requesting ancestor headers for {block_hash}"); let mut headers: Vec = Vec::new(); loop { + let header = drivechain.get_header(block_hash).await?; + block_hash = header.prev_blockhash; + headers.push(header); if block_hash == bitcoin::BlockHash::all_zeros() { break; } else { @@ -83,119 +114,100 @@ impl MainchainTask { break; } } - let header = drivechain.get_header(block_hash).await?; - block_hash = header.prev_blockhash; - headers.push(header); } headers.reverse(); - if headers.is_empty() { + // Writing all headers during IBD can starve archive readers. + task::block_in_place(|| { + let mut rwtxn = env.write_txn()?; + headers.into_iter().try_for_each(|header| { + archive.put_main_header(&mut rwtxn, &header) + })?; + rwtxn.commit()?; Ok(()) - } else { - // Writing all headers during IBD can starve archive readers. - task::block_in_place(|| { - let mut rwtxn = env.write_txn()?; - headers.into_iter().try_for_each(|header| { - archive.put_main_header(&mut rwtxn, &header) - })?; - rwtxn.commit()?; - Ok(()) - }) - } + }) } - /// Attempt to verify bmm for the specified block, - /// and store the verification result - async fn verify_bmm( + /// Request ancestor BMM commitments from the mainchain node, + /// up to and including the specified block. + /// Mainchain headers for the specified block and all ancestors MUST exist + /// in the archive. + async fn request_bmm_commitments( env: &heed::Env, archive: &Archive, drivechain: &bip300301::Drivechain, - block_hash: BlockHash, - ) -> Result { - use jsonrpsee::types::error::ErrorCode as JsonrpseeErrorCode; - const VERIFY_BMM_POLL_INTERVAL: Duration = Duration::from_secs(15); - let header = { + main_hash: bitcoin::BlockHash, + ) -> Result, Error> { + if main_hash == bitcoin::BlockHash::all_zeros() { + return Ok(Ok(())); + } else { let rotxn = env.read_txn()?; - archive.get_header(&rotxn, block_hash)? - }; - let res = match drivechain - .verify_bmm( - &header.prev_main_hash, - &block_hash.into(), - VERIFY_BMM_POLL_INTERVAL, - ) - .await - { - Ok(()) => true, - Err(bip300301::Error::Jsonrpsee(jsonrpsee::core::Error::Call( - err, - ))) if JsonrpseeErrorCode::from(err.code()) - == JsonrpseeErrorCode::ServerError(-1) => + if archive + .try_get_main_bmm_commitment(&rotxn, main_hash)? + .is_some() { - false + return Ok(Ok(())); } - Err(err) => return Err(Error::from(err)), - }; - let mut rwtxn = env.write_txn()?; - let () = archive.put_bmm_verification(&mut rwtxn, block_hash, res)?; - rwtxn.commit()?; - Ok(res) - } - - /// Attempt to verify bmm recursively up to the specified block, - /// and store the verification results - async fn recursive_verify_bmm( - env: &heed::Env, - archive: &Archive, - drivechain: &bip300301::Drivechain, - block_hash: BlockHash, - ) -> Result<(), Error> { - tracing::debug!( - "requesting recursive BMM verification for {block_hash}" - ); - let blocks_to_verify: Vec = { + } + tracing::debug!("requesting ancestor bmm commitments for {main_hash}"); + let mut missing_commitments: Vec<_> = { let rotxn = env.read_txn()?; archive - .ancestors(&rotxn, block_hash) + .main_ancestors(&rotxn, main_hash) .take_while(|block_hash| { - archive - .try_get_bmm_verification(&rotxn, *block_hash) - .map(|bmm_verification| bmm_verification.is_none()) + Ok(*block_hash != bitcoin::BlockHash::all_zeros() + && archive + .try_get_main_bmm_commitment(&rotxn, *block_hash)? + .is_none()) }) .collect()? }; - let mut blocks_to_verify_iter = blocks_to_verify.into_iter().rev(); - while let Some(block_hash) = blocks_to_verify_iter.next() { - if !Self::verify_bmm(env, archive, drivechain, block_hash).await? { - // mark descendent blocks as BMM failed, - // no need to request from mainchain node + missing_commitments.reverse(); + for missing_commitment in missing_commitments { + let commitments: Vec = match drivechain + .get_block_commitments(missing_commitment) + .await? + { + Ok(commitments) => commitments + .into_iter() + .filter_map(|(_, commitment)| match commitment { + BlockCommitment::BmmHStar { + commitment, + sidechain_id: SidechainId(THIS_SIDECHAIN), + prev_bytes: _, + } => Some(commitment.into()), + BlockCommitment::BmmHStar { .. } + | BlockCommitment::ScdbUpdateBytes { .. } + | BlockCommitment::WitnessCommitment { .. } => None, + }) + .collect(), + Err(block_not_found) => return Ok(Err(block_not_found)), + }; + // Should never be more than one commitment + assert!(commitments.len() <= 1); + let commitment = commitments.first().copied(); + { let mut rwtxn = env.write_txn()?; - for block_hash in blocks_to_verify_iter { - let () = archive - .put_bmm_verification(&mut rwtxn, block_hash, false)?; - } + archive.put_main_bmm_commitment( + &mut rwtxn, main_hash, commitment, + )?; rwtxn.commit()?; - break; } } - Ok(()) + Ok(Ok(())) } async fn run(mut self) -> Result<(), Error> { while let Some((request, response_tx)) = self.request_rx.next().await { match request { - Request::AncestorHeaders(block_hash) => { - let header = { - let rotxn = self.env.read_txn()?; - self.archive.get_header(&rotxn, block_hash)? - }; + Request::AncestorHeaders(main_block_hash) => { let () = Self::request_ancestor_headers( &self.env, &self.archive, &self.drivechain, - header.prev_main_hash, + main_block_hash, ) .await?; - let response = Response(request); + let response = Response::AncestorHeaders(main_block_hash); if let Some(response_tx) = response_tx { response_tx .send(response) @@ -207,14 +219,16 @@ impl MainchainTask { } } Request::VerifyBmm(block_hash) => { - let () = Self::recursive_verify_bmm( - &self.env, - &self.archive, - &self.drivechain, + let response = Response::VerifyBmm( block_hash, - ) - .await?; - let response = Response(request); + Self::request_bmm_commitments( + &self.env, + &self.archive, + &self.drivechain, + block_hash, + ) + .await?, + ); if let Some(response_tx) = response_tx { response_tx .send(response) diff --git a/lib/node/mod.rs b/lib/node/mod.rs index 5149832..8322648 100644 --- a/lib/node/mod.rs +++ b/lib/node/mod.rs @@ -15,9 +15,9 @@ use crate::{ net::{self, Net}, state::{self, State}, types::{ - Accumulator, Address, AuthorizedTransaction, BlockHash, Body, GetValue, - Header, OutPoint, Output, SpentOutput, Transaction, Txid, - WithdrawalBundle, + Accumulator, Address, AuthorizedTransaction, BlockHash, BmmResult, + Body, GetValue, Header, OutPoint, Output, SpentOutput, Tip, + Transaction, Txid, WithdrawalBundle, }, }; @@ -429,6 +429,7 @@ impl Node { /// or was rejected as the new tip. pub async fn submit_block( &self, + main_block_hash: bitcoin::BlockHash, header: &Header, body: &Body, ) -> Result { @@ -450,21 +451,37 @@ impl Node { let _: mainchain_task::Response = self .mainchain_task .request_oneshot(mainchain_task::Request::AncestorHeaders( - header.hash(), + main_block_hash, )) .map_err(|_| Error::SendMainchainTaskRequest)? .await .map_err(|_| Error::ReceiveMainchainTaskResponse)?; // Verify BMM - let _: mainchain_task::Response = self + let mainchain_task::Response::VerifyBmm(_, res) = self .mainchain_task - .request_oneshot(mainchain_task::Request::VerifyBmm(block_hash)) + .request_oneshot(mainchain_task::Request::VerifyBmm( + main_block_hash, + )) .map_err(|_| Error::SendMainchainTaskRequest)? .await - .map_err(|_| Error::ReceiveMainchainTaskResponse)?; + .map_err(|_| Error::ReceiveMainchainTaskResponse)? + else { + panic!("should be impossible") + }; + if let Err(bip300301::BlockNotFoundError(missing_block)) = res { + tracing::error!( + "Rejecting block {block_hash} due to missing mainchain block {missing_block}", + ); + return Ok(false); + } { let rotxn = self.env.read_txn()?; - if !self.archive.get_bmm_verification(&rotxn, block_hash)? { + if self.archive.get_bmm_result( + &rotxn, + block_hash, + main_block_hash, + )? == BmmResult::Failed + { tracing::error!( "Rejecting block {block_hash} due to failing BMM verification", ); @@ -499,7 +516,11 @@ impl Node { } } // Submit new tip - if !self.net_task.new_tip_ready_confirm(header.hash()).await? { + let new_tip = Tip { + block_hash, + main_block_hash, + }; + if !self.net_task.new_tip_ready_confirm(new_tip).await? { return Ok(false); }; let rotxn = self.env.read_txn()?; diff --git a/lib/node/net_task.rs b/lib/node/net_task.rs index f0424c6..5ab2068 100644 --- a/lib/node/net_task.rs +++ b/lib/node/net_task.rs @@ -26,10 +26,11 @@ use crate::{ archive::{self, Archive}, mempool::{self, MemPool}, net::{ - self, Net, PeerConnectionInfo, PeerInfoRx, PeerRequest, PeerResponse, + self, Net, PeerConnectionInfo, PeerConnectionMessage, PeerInfoRx, + PeerRequest, PeerResponse, PeerStateId, }, state::{self, State}, - types::{BlockHash, Body, Header}, + types::{BlockHash, BmmResult, Body, Header, Tip}, }; #[derive(Debug, Error)] @@ -132,11 +133,7 @@ async fn disconnect_tip_( { let new_tip = state.get_tip(rwtxn)?; let accumulator = archive.get_accumulator(rwtxn, new_tip)?; - let () = state.utreexo_accumulator.put( - rwtxn, - &state::UnitKey, - &accumulator, - )?; + let () = state.utreexo_accumulator.put(rwtxn, &(), &accumulator)?; } for transaction in tip_body.authorized_transactions().iter().rev() { mempool.put(rwtxn, transaction)?; @@ -156,19 +153,29 @@ async fn reorg_to_tip( drivechain: &bip300301::Drivechain, mempool: &MemPool, state: &State, - new_tip: BlockHash, + new_tip: Tip, ) -> Result { let mut rwtxn = env.write_txn()?; - let tip = state.get_tip(&rwtxn)?; + let tip_hash = state.get_tip(&rwtxn)?; let tip_height = state.get_height(&rwtxn)?; + let bmm_verification = + archive.get_best_main_verification(&rwtxn, tip_hash)?; + let tip = Tip { + block_hash: tip_hash, + main_block_hash: bmm_verification, + }; // check that new tip is better than current tip if archive.better_tip(&rwtxn, tip, new_tip)? != Some(new_tip) { return Ok(false); } - let common_ancestor = archive.last_common_ancestor(&rwtxn, tip, new_tip)?; + let common_ancestor = archive.last_common_ancestor( + &rwtxn, + tip.block_hash, + new_tip.block_hash, + )?; // Check that all necessary bodies exist before disconnecting tip let blocks_to_apply: Vec<(Header, Body)> = archive - .ancestors(&rwtxn, new_tip) + .ancestors(&rwtxn, new_tip.block_hash) .take_while(|block_hash| Ok(*block_hash != common_ancestor)) .map(|block_hash| { let header = archive.get_header(&rwtxn, block_hash)?; @@ -193,9 +200,9 @@ async fn reorg_to_tip( .await?; } let tip = state.get_tip(&rwtxn)?; - assert_eq!(tip, new_tip); + assert_eq!(tip, new_tip.block_hash); rwtxn.commit()?; - tracing::info!("reorged to tip: {new_tip}"); + tracing::info!("reorged to tip: {}", new_tip.block_hash); Ok(true) } @@ -217,18 +224,20 @@ struct NetTaskContext { /// An optional oneshot sender can be used receive the result of attempting /// to reorg to the new tip, on the corresponding oneshot receiver. type NewTipReadyMessage = - (BlockHash, Option, Option>); + (Tip, Option, Option>); struct NetTask { ctxt: NetTaskContext, /// Receive a request to forward to the mainchain task, with the address of - /// the peer connection that caused the request + /// the peer connection that caused the request, and the peer state ID of + /// the request forward_mainchain_task_request_rx: - UnboundedReceiver<(mainchain_task::Request, SocketAddr)>, + UnboundedReceiver<(mainchain_task::Request, SocketAddr, PeerStateId)>, /// Push a request to forward to the mainchain task, with the address of - /// the peer connection that caused the request + /// the peer connection that caused the request, and the peer state ID of + /// the request forward_mainchain_task_request_tx: - UnboundedSender<(mainchain_task::Request, SocketAddr)>, + UnboundedSender<(mainchain_task::Request, SocketAddr, PeerStateId)>, mainchain_task_response_rx: UnboundedReceiver, /// Receive a tip that is ready to reorg to, with the address of the peer /// connection that caused the request, if it originated from a peer. @@ -253,14 +262,7 @@ impl NetTask { // Attempt to switch to a descendant tip once a body has been // stored, if all other ancestor bodies are available. // Each descendant tip maps to the peers that sent that tip. - descendant_tips: &mut HashMap< - BlockHash, - HashMap>, - >, - forward_mainchain_task_request_tx: &UnboundedSender<( - mainchain_task::Request, - SocketAddr, - )>, + descendant_tips: &mut HashMap>>, new_tip_ready_tx: &UnboundedSender, addr: SocketAddr, resp: PeerResponse, @@ -268,7 +270,12 @@ impl NetTask { ) -> Result<(), Error> { match (req, resp) { ( - req @ PeerRequest::GetBlock { block_hash }, + req @ PeerRequest::GetBlock { + block_hash, + descendant_tip: Some(descendant_tip), + ancestor: Some(ancestor), + peer_state_id: Some(peer_state_id), + }, ref resp @ PeerResponse::Block { ref header, ref body, @@ -286,18 +293,62 @@ impl NetTask { ctxt.archive.put_body(&mut rwtxn, block_hash, body)?; rwtxn.commit()?; } + // Notify the peer connection if all requested block bodies are + // now available + { + let rotxn = ctxt.env.read_txn()?; + let missing_bodies = ctxt + .archive + .get_missing_bodies(&rotxn, block_hash, ancestor)?; + if missing_bodies.is_empty() { + let message = PeerConnectionMessage::BodiesAvailable( + peer_state_id, + ); + let () = + ctxt.net.push_internal_message(message, addr)?; + } + } // Check if any new tips can be applied, // and send new tip ready if so { let rotxn = ctxt.env.read_txn()?; - let tip = ctxt.state.get_tip(&rotxn)?; - if header.prev_side_hash == tip { + let tip_hash = ctxt.state.get_tip(&rotxn)?; + // Find the BMM verification that is an ancestor of + // `main_descendant_tip` + let main_block_hash = ctxt + .archive + .get_bmm_results(&rotxn, block_hash)? + .into_iter() + .map(Result::<_, Error>::Ok) + .transpose_into_fallible() + .find_map(|(main_block_hash, bmm_result)| { + match bmm_result { + BmmResult::Failed => Ok(None), + BmmResult::Verified => { + if ctxt.archive.is_main_descendant( + &rotxn, + main_block_hash, + descendant_tip.main_block_hash, + )? { + Ok(Some(main_block_hash)) + } else { + Ok(None) + } + } + } + })? + .unwrap(); + let block_tip = Tip { + block_hash, + main_block_hash, + }; + if header.prev_side_hash == tip_hash { let () = new_tip_ready_tx - .unbounded_send((block_hash, Some(addr), None)) + .unbounded_send((block_tip, Some(addr), None)) .map_err(|_| Error::SendNewTipReady)?; } let Some(descendant_tips) = - descendant_tips.remove(&block_hash) + descendant_tips.remove(&block_tip) else { return Ok(()); }; @@ -305,12 +356,12 @@ impl NetTask { let common_ancestor = ctxt.archive.last_common_ancestor( &rotxn, - descendant_tip, - tip, + descendant_tip.block_hash, + tip_hash, )?; let missing_bodies = ctxt.archive.get_missing_bodies( &rotxn, - descendant_tip, + descendant_tip.block_hash, common_ancestor, )?; if missing_bodies.is_empty() { @@ -326,12 +377,14 @@ impl NetTask { } } } - Ok(()) } ( PeerRequest::GetBlock { block_hash: req_block_hash, + descendant_tip: Some(_), + ancestor: Some(_), + peer_state_id: Some(_), }, PeerResponse::NoBlock { block_hash: resp_block_hash, @@ -342,6 +395,7 @@ impl NetTask { ref start, end, height: Some(height), + peer_state_id: Some(peer_state_id), }, PeerResponse::Headers(headers), ) => { @@ -410,12 +464,9 @@ impl NetTask { } } rwtxn.commit()?; - // Request mainchain headers - let request = - mainchain_task::Request::AncestorHeaders(end_header_hash); - let () = forward_mainchain_task_request_tx - .unbounded_send((request, addr)) - .map_err(|_| Error::ForwardMainchainTaskRequest)?; + // Notify peer connection that headers are available + let message = PeerConnectionMessage::Headers(peer_state_id); + let () = ctxt.net.push_internal_message(message, addr)?; Ok(()) } ( @@ -423,6 +474,7 @@ impl NetTask { start: _, end, height: _, + peer_state_id: _, }, PeerResponse::NoHeader { block_hash }, ) if end == block_hash => Ok(()), @@ -453,18 +505,18 @@ impl NetTask { enum MailboxItem { AcceptConnection(Result<(), Error>), // Forward a mainchain task request, along with the peer that - // caused the request - ForwardMainchainTaskRequest(mainchain_task::Request, SocketAddr), + // caused the request, and the peer state ID of the request + ForwardMainchainTaskRequest( + mainchain_task::Request, + SocketAddr, + PeerStateId, + ), MainchainTaskResponse(mainchain_task::Response), // Apply new tip from peer or self. // An optional oneshot sender can be used receive the result of // attempting to reorg to the new tip, on the corresponding oneshot // receiver. - NewTipReady( - BlockHash, - Option, - Option>, - ), + NewTipReady(Tip, Option, Option>), PeerInfo(Option<(SocketAddr, Option)>), } let accept_connections = stream::try_unfold((), |()| { @@ -479,8 +531,12 @@ impl NetTask { .map(MailboxItem::AcceptConnection); let forward_request_stream = self .forward_mainchain_task_request_rx - .map(|(request, addr)| { - MailboxItem::ForwardMainchainTaskRequest(request, addr) + .map(|(request, addr, peer_state_id)| { + MailboxItem::ForwardMainchainTaskRequest( + request, + addr, + peer_state_id, + ) }); let mainchain_task_response_stream = self .mainchain_task_response_rx @@ -502,20 +558,25 @@ impl NetTask { // stored, if all other ancestor bodies are available. // Each descendant tip maps to the peers that sent that tip. let mut descendant_tips = - HashMap::>>::new( - ); + HashMap::>>::new(); // Map associating mainchain task requests with the peer(s) that - // caused the request - let mut mainchain_task_request_sources = - HashMap::>::new(); + // caused the request, and the request peer state ID + let mut mainchain_task_request_sources = HashMap::< + mainchain_task::Request, + HashSet<(SocketAddr, PeerStateId)>, + >::new(); while let Some(mailbox_item) = mailbox_stream.next().await { match mailbox_item { MailboxItem::AcceptConnection(res) => res?, - MailboxItem::ForwardMainchainTaskRequest(request, peer) => { + MailboxItem::ForwardMainchainTaskRequest( + request, + peer, + peer_state_id, + ) => { mainchain_task_request_sources .entry(request) .or_default() - .insert(peer); + .insert((peer, peer_state_id)); let () = self .ctxt .mainchain_task @@ -523,133 +584,46 @@ impl NetTask { .map_err(|_| Error::SendMainchainTaskRequest)?; } MailboxItem::MainchainTaskResponse(response) => { - let request = response.0; - match request { - mainchain_task::Request::AncestorHeaders( - block_hash, + let request = response.into(); + match response { + mainchain_task::Response::AncestorHeaders( + _block_hash, ) => { let Some(sources) = mainchain_task_request_sources.remove(&request) else { continue; }; - // request verify BMM - for addr in sources { - let request = - mainchain_task::Request::VerifyBmm( - block_hash, + for (addr, peer_state_id) in sources { + let message = + PeerConnectionMessage::MainchainAncestors( + peer_state_id, ); let () = self - .forward_mainchain_task_request_tx - .unbounded_send((request, addr)) - .map_err(|_| { - Error::ForwardMainchainTaskRequest - })?; + .ctxt + .net + .push_internal_message(message, addr)?; } } - mainchain_task::Request::VerifyBmm(block_hash) => { + mainchain_task::Response::VerifyBmm( + _block_hash, + res, + ) => { let Some(sources) = mainchain_task_request_sources.remove(&request) else { continue; }; - let verify_bmm_result = { - let rotxn = self.ctxt.env.read_txn()?; - self.ctxt - .archive - .get_bmm_verification(&rotxn, block_hash)? - }; - if !verify_bmm_result { - for addr in &sources { - tracing::warn!( - %addr, - %block_hash, - "Invalid response from peer; BMM verification failed" - ); - let () = - self.ctxt.net.remove_active_peer(*addr); - } - } - let missing_bodies: Vec = { - let rotxn = self.ctxt.env.read_txn()?; - let tip = self.ctxt.state.get_tip(&rotxn)?; - let last_common_ancestor = - self.ctxt.archive.last_common_ancestor( - &rotxn, tip, block_hash, - )?; - self.ctxt.archive.get_missing_bodies( - &rotxn, - block_hash, - last_common_ancestor, - )? - }; - if missing_bodies.is_empty() { - for addr in sources { - let () = self - .new_tip_ready_tx - .unbounded_send(( - block_hash, - Some(addr), - None, - )) - .map_err(|_| Error::SendNewTipReady)?; - } - } else { - let rotxn = self.ctxt.env.read_txn()?; - // Request missing bodies, update descendent tips - for missing_body in missing_bodies { - descendant_tips - .entry(missing_body) - .or_default() - .entry(block_hash) - .or_default() - .extend(&sources); - // tips descended from the missing body, - // that are alo ancestors of `block_hash` - let lineage_tips: Vec = - descendant_tips[&missing_body] - .keys() - .map(Ok) - .transpose_into_fallible() - .filter(|tip| { - self.ctxt.archive.is_descendant( - &rotxn, **tip, block_hash, - ) - }) - .cloned() - .collect()?; - for lineage_tip in lineage_tips.into_iter() - { - let updated_sources: HashSet< - SocketAddr, - > = descendant_tips[&missing_body] - [&lineage_tip] - .difference(&sources) - .cloned() - .collect(); - if updated_sources.is_empty() { - descendant_tips - .get_mut(&missing_body) - .unwrap() - .remove(&lineage_tip); - } else { - descendant_tips - .get_mut(&missing_body) - .unwrap() - .insert( - lineage_tip, - updated_sources, - ); - } - } - let request = PeerRequest::GetBlock { - block_hash: missing_body, + for (addr, peer_state_id) in sources { + let message = + PeerConnectionMessage::BmmVerification { + res, + peer_state_id, }; - let () = self - .ctxt - .net - .push_request(request, &sources); - } + let () = self + .ctxt + .net + .push_internal_message(message, addr)?; } } } @@ -686,26 +660,30 @@ impl NetTask { tracing::error!(%addr, err = format!("{err:#}"), "Peer connection error"); let () = self.ctxt.net.remove_active_peer(addr); } - PeerConnectionInfo::NeedBmmVerification(block_hash) => { + PeerConnectionInfo::NeedBmmVerification { + main_hash, + peer_state_id, + } => { let request = - mainchain_task::Request::VerifyBmm(block_hash); + mainchain_task::Request::VerifyBmm(main_hash); let () = self .forward_mainchain_task_request_tx - .unbounded_send((request, addr)) + .unbounded_send((request, addr, peer_state_id)) .map_err(|_| { Error::ForwardMainchainTaskRequest })?; } - PeerConnectionInfo::NeedMainchainAncestors( - block_hash, - ) => { + PeerConnectionInfo::NeedMainchainAncestors { + main_hash, + peer_state_id, + } => { let request = mainchain_task::Request::AncestorHeaders( - block_hash, + main_hash, ); let () = self .forward_mainchain_task_request_tx - .unbounded_send((request, addr)) + .unbounded_send((request, addr, peer_state_id)) .map_err(|_| { Error::ForwardMainchainTaskRequest })?; @@ -729,11 +707,11 @@ impl NetTask { .net .push_tx(HashSet::from_iter([addr]), new_tx); } - PeerConnectionInfo::Response(resp, req) => { + PeerConnectionInfo::Response(boxed) => { + let (resp, req) = *boxed; let () = Self::handle_response( &self.ctxt, &mut descendant_tips, - &self.forward_mainchain_task_request_tx, &self.new_tip_ready_tx, addr, resp, @@ -814,7 +792,7 @@ impl NetTaskHandle { /// Push a tip that is ready to reorg to. #[allow(dead_code)] - pub fn new_tip_ready(&self, new_tip: BlockHash) -> Result<(), Error> { + pub fn new_tip_ready(&self, new_tip: Tip) -> Result<(), Error> { self.new_tip_ready_tx .unbounded_send((new_tip, None, None)) .map_err(|_| Error::SendNewTipReady) @@ -826,7 +804,7 @@ impl NetTaskHandle { /// A result of Ok(false) indicates that the tip was not reorged to. pub async fn new_tip_ready_confirm( &self, - new_tip: BlockHash, + new_tip: Tip, ) -> Result { let (oneshot_tx, oneshot_rx) = oneshot::channel(); let () = self diff --git a/lib/state.rs b/lib/state.rs index 2c826df..086f175 100644 --- a/lib/state.rs +++ b/lib/state.rs @@ -1,6 +1,9 @@ use std::collections::{BTreeMap, HashMap, HashSet}; -use heed::{types::SerdeBincode, Database, RoTxn, RwTxn}; +use heed::{ + types::{SerdeBincode, Unit}, + Database, RoTxn, RwTxn, +}; use bip300301::{ bitcoin::{ @@ -9,7 +12,6 @@ use bip300301::{ TwoWayPegData, WithdrawalBundleStatus, }; use rustreexo::accumulator::{node_hash::NodeHash, proof::Proof}; -use serde::{Deserialize, Serialize}; use crate::{ authorization::Authorization, @@ -81,42 +83,17 @@ pub enum Error { WrongPubKeyForAddress, } -/// Unit key. LMDB can't use zero-sized keys, so this encodes to a single byte -#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)] -pub struct UnitKey; - -impl<'de> Deserialize<'de> for UnitKey { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - // Deserialize any byte (ignoring it) and return UnitKey - let _ = u8::deserialize(deserializer)?; - Ok(UnitKey) - } -} - -impl Serialize for UnitKey { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - // Always serialize to the same arbitrary byte - serializer.serialize_u8(0x69) - } -} - #[derive(Clone)] pub struct State { /// Current tip - tip: Database, SerdeBincode>, + tip: Database>, /// Current height - height: Database, SerdeBincode>, + height: Database>, pub utxos: Database, SerdeBincode>, pub stxos: Database, SerdeBincode>, /// Pending withdrawal bundle and block height pub pending_withdrawal_bundle: - Database, SerdeBincode<(WithdrawalBundle, u32)>>, + Database>, /// Mapping from block height to withdrawal bundle and status pub withdrawal_bundles: Database< SerdeBincode, @@ -125,8 +102,7 @@ pub struct State { /// deposit blocks and the height at which they were applied, keyed sequentially pub deposit_blocks: Database, SerdeBincode<(bitcoin::BlockHash, u32)>>, - pub utreexo_accumulator: - Database, SerdeBincode>, + pub utreexo_accumulator: Database>, } impl State { @@ -161,12 +137,12 @@ impl State { } pub fn get_tip(&self, rotxn: &RoTxn) -> Result { - let tip = self.tip.get(rotxn, &UnitKey)?.unwrap_or_default(); + let tip = self.tip.get(rotxn, &())?.unwrap_or_default(); Ok(tip) } pub fn get_height(&self, rotxn: &RoTxn) -> Result { - let height = self.height.get(rotxn, &UnitKey)?.unwrap_or_default(); + let height = self.height.get(rotxn, &())?.unwrap_or_default(); Ok(height) } @@ -215,7 +191,7 @@ impl State { pub fn get_accumulator(&self, rotxn: &RoTxn) -> Result { let accumulator = self .utreexo_accumulator - .get(rotxn, &UnitKey)? + .get(rotxn, &())? .unwrap_or_default(); Ok(accumulator) } @@ -419,7 +395,7 @@ impl State { &self, txn: &RoTxn, ) -> Result, Error> { - Ok(self.pending_withdrawal_bundle.get(txn, &UnitKey)?) + Ok(self.pending_withdrawal_bundle.get(txn, &())?) } pub fn validate_filled_transaction( @@ -519,7 +495,7 @@ impl State { } let mut accumulator = self .utreexo_accumulator - .get(rotxn, &UnitKey)? + .get(rotxn, &())? .unwrap_or_default(); // New leaves for the accumulator let mut accumulator_add = Vec::::new(); @@ -644,7 +620,7 @@ impl State { let block_height = self.get_height(rwtxn)?; let mut accumulator = self .utreexo_accumulator - .get(rwtxn, &UnitKey)? + .get(rwtxn, &())? .unwrap_or_default(); // New leaves for the accumulator let mut accumulator_add = Vec::::new(); @@ -682,10 +658,7 @@ impl State { .unwrap_or_default(); if block_height - last_withdrawal_bundle_failure_height > Self::WITHDRAWAL_BUNDLE_FAILURE_GAP - && self - .pending_withdrawal_bundle - .get(rwtxn, &UnitKey)? - .is_none() + && self.pending_withdrawal_bundle.get(rwtxn, &())?.is_none() { if let Some(bundle) = self.collect_withdrawal_bundle(rwtxn, block_height)? @@ -706,14 +679,14 @@ impl State { } self.pending_withdrawal_bundle.put( rwtxn, - &UnitKey, + &(), &(bundle, block_height), )?; } } for (txid, status) in &two_way_peg_data.bundle_statuses { if let Some((bundle, bundle_block_height)) = - self.pending_withdrawal_bundle.get(rwtxn, &UnitKey)? + self.pending_withdrawal_bundle.get(rwtxn, &())? { if bundle.transaction.txid() != *txid { continue; @@ -724,7 +697,7 @@ impl State { &block_height, &(bundle.clone(), *status), )?; - self.pending_withdrawal_bundle.delete(rwtxn, &UnitKey)?; + self.pending_withdrawal_bundle.delete(rwtxn, &())?; if let WithdrawalBundleStatus::Failed = status { for (outpoint, output) in &bundle.spend_utxos { self.stxos.delete(rwtxn, outpoint)?; @@ -743,8 +716,7 @@ impl State { .0 .modify(&accumulator_add, &accumulator_del) .map_err(Error::Utreexo)?; - self.utreexo_accumulator - .put(rwtxn, &UnitKey, &accumulator)?; + self.utreexo_accumulator.put(rwtxn, &(), &accumulator)?; Ok(()) } @@ -756,7 +728,7 @@ impl State { let block_height = self.get_height(rwtxn)?; let mut accumulator = self .utreexo_accumulator - .get(rwtxn, &UnitKey)? + .get(rwtxn, &())? .unwrap_or_default(); // New leaves for the accumulator let mut accumulator_add = Vec::::new(); @@ -779,7 +751,7 @@ impl State { .delete(rwtxn, &latest_bundle_height)?; self.pending_withdrawal_bundle.put( rwtxn, - &UnitKey, + &(), &(latest_bundle.clone(), latest_bundle_height), )?; if *status == WithdrawalBundleStatus::Failed { @@ -809,10 +781,10 @@ impl State { if block_height - last_withdrawal_bundle_failure_height > Self::WITHDRAWAL_BUNDLE_FAILURE_GAP && let Some((bundle, bundle_height)) = - self.pending_withdrawal_bundle.get(rwtxn, &UnitKey)? + self.pending_withdrawal_bundle.get(rwtxn, &())? && bundle_height == block_height { - self.pending_withdrawal_bundle.delete(rwtxn, &UnitKey)?; + self.pending_withdrawal_bundle.delete(rwtxn, &())?; for (outpoint, output) in bundle.spend_utxos.into_iter().rev() { let utxo_hash = hash(&PointedOutput { outpoint, @@ -862,8 +834,7 @@ impl State { .0 .modify(&accumulator_add, &accumulator_del) .map_err(Error::Utreexo)?; - self.utreexo_accumulator - .put(rwtxn, &UnitKey, &accumulator)?; + self.utreexo_accumulator.put(rwtxn, &(), &accumulator)?; Ok(()) } @@ -873,7 +844,7 @@ impl State { header: &Header, body: &Body, ) -> Result<(), Error> { - let tip_hash = self.tip.get(rwtxn, &UnitKey)?.unwrap_or_default(); + let tip_hash = self.get_tip(rwtxn)?; if tip_hash != header.prev_side_hash { let err = InvalidHeaderError::PrevSideHash { expected: tip_hash, @@ -891,7 +862,7 @@ impl State { } let mut accumulator = self .utreexo_accumulator - .get(rwtxn, &UnitKey)? + .get(rwtxn, &())? .unwrap_or_default(); // New leaves for the accumulator let mut accumulator_add = Vec::::new(); @@ -944,14 +915,13 @@ impl State { } let block_hash = header.hash(); let height = self.get_height(rwtxn)?; - self.tip.put(rwtxn, &UnitKey, &block_hash)?; - self.height.put(rwtxn, &UnitKey, &(height + 1))?; + self.tip.put(rwtxn, &(), &block_hash)?; + self.height.put(rwtxn, &(), &(height + 1))?; accumulator .0 .modify(&accumulator_add, &accumulator_del) .map_err(Error::Utreexo)?; - self.utreexo_accumulator - .put(rwtxn, &UnitKey, &accumulator)?; + self.utreexo_accumulator.put(rwtxn, &(), &accumulator)?; Ok(()) } @@ -961,7 +931,7 @@ impl State { header: &Header, body: &Body, ) -> Result<(), Error> { - let tip_hash = self.tip.get(rwtxn, &UnitKey)?.unwrap_or_default(); + let tip_hash = self.tip.get(rwtxn, &())?.unwrap_or_default(); if tip_hash != header.hash() { let err = InvalidHeaderError::BlockHash { expected: tip_hash, @@ -979,7 +949,7 @@ impl State { } let mut accumulator = self .utreexo_accumulator - .get(rwtxn, &UnitKey)? + .get(rwtxn, &())? .unwrap_or_default(); tracing::debug!("Got acc"); // New leaves for the accumulator @@ -1051,14 +1021,13 @@ impl State { }, )?; let height = self.get_height(rwtxn)?; - self.tip.put(rwtxn, &UnitKey, &header.prev_side_hash)?; - self.height.put(rwtxn, &UnitKey, &(height - 1))?; + self.tip.put(rwtxn, &(), &header.prev_side_hash)?; + self.height.put(rwtxn, &(), &(height - 1))?; accumulator .0 .modify(&accumulator_add, &accumulator_del) .map_err(Error::Utreexo)?; - self.utreexo_accumulator - .put(rwtxn, &UnitKey, &accumulator)?; + self.utreexo_accumulator.put(rwtxn, &(), &accumulator)?; Ok(()) } diff --git a/lib/types/mod.rs b/lib/types/mod.rs index 05fdc25..fc691ad 100644 --- a/lib/types/mod.rs +++ b/lib/types/mod.rs @@ -1,4 +1,4 @@ -use bip300301::bitcoin; +use bip300301::bitcoin::{self, hashes::Hash as _}; use borsh::BorshSerialize; use rustreexo::accumulator::{node_hash::NodeHash, pollard::Pollard}; use serde::{Deserialize, Serialize}; @@ -202,3 +202,37 @@ impl Serialize for Accumulator { as Serialize>::serialize(&bytes, serializer) } } + +#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub enum BmmResult { + Verified, + Failed, +} + +/// A tip refers to both a sidechain block AND the mainchain block that commits +/// to it. +#[derive( + BorshSerialize, + Clone, + Copy, + Debug, + Deserialize, + Eq, + Hash, + PartialEq, + Serialize, +)] +pub struct Tip { + pub block_hash: BlockHash, + #[borsh(serialize_with = "borsh_serialize_bitcoin_block_hash")] + pub main_block_hash: bitcoin::BlockHash, +} + +impl Default for Tip { + fn default() -> Self { + Self { + block_hash: BlockHash::default(), + main_block_hash: bitcoin::BlockHash::all_zeros(), + } + } +}