diff --git a/Cargo.lock b/Cargo.lock index 9ca661e..ca1b539 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -148,7 +148,7 @@ dependencies = [ "log", "tar", "tempfile", - "ureq 1.5.5", + "ureq", "which", ] @@ -176,10 +176,8 @@ dependencies = [ "bitcoin", "bitcoind", "clap", - "electrsd", "env_logger", - "futures-core", - "futures-util", + "futures", "log", "reqwest", "serde", @@ -187,7 +185,6 @@ dependencies = [ "serial_test", "testcontainers", "tokio", - "tokio-stream", "tokio-tungstenite", "url", ] @@ -221,27 +218,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" -[[package]] -name = "bzip2" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6afcd980b5f3a45017c57e57a2fcccbb351cc43a356ce117ef760ef8052b89b0" -dependencies = [ - "bzip2-sys", - "libc", -] - -[[package]] -name = "bzip2-sys" -version = "0.1.11+1.0.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc" -dependencies = [ - "cc", - "libc", - "pkg-config", -] - [[package]] name = "cc" version = "1.0.73" @@ -439,33 +415,6 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" -[[package]] -name = "electrsd" -version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ad65605e022b44ab8c1e489547311bb48b5c605a0aea9ba908e12cae2880111" -dependencies = [ - "bitcoin_hashes", - "bitcoind", - "electrum-client", - "log", - "nix", - "ureq 2.4.0", - "zip", -] - -[[package]] -name = "electrum-client" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ef9b40020912229e947b45d91f9ff96b10d543e0eddd75ff41b9eda24d9c051" -dependencies = [ - "bitcoin", - "log", - "serde", - "serde_json", -] - [[package]] name = "encoding_rs" version = "0.8.31" @@ -915,15 +864,6 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" -[[package]] -name = "memoffset" -version = "0.6.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce" -dependencies = [ - "autocfg", -] - [[package]] name = "mime" version = "0.3.16" @@ -981,19 +921,6 @@ dependencies = [ "tempfile", ] -[[package]] -name = "nix" -version = "0.22.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4916f159ed8e5de0082076562152a76b7a1f64a01fd9d1e0fea002c37624faf" -dependencies = [ - "bitflags", - "cc", - "cfg-if", - "libc", - "memoffset", -] - [[package]] name = "ntapi" version = "0.3.7" @@ -1335,20 +1262,8 @@ dependencies = [ "base64", "log", "ring", - "sct 0.6.1", - "webpki 0.21.4", -] - -[[package]] -name = "rustls" -version = "0.20.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aab8ee6c7097ed6057f43c187a62418d0c05a4bd5f18b3571db50ee0f9ce033" -dependencies = [ - "log", - "ring", - "sct 0.7.0", - "webpki 0.22.0", + "sct", + "webpki", ] [[package]] @@ -1389,16 +1304,6 @@ dependencies = [ "untrusted", ] -[[package]] -name = "sct" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "secp256k1" version = "0.22.1" @@ -1871,17 +1776,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-stream" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9" -dependencies = [ - "futures-core", - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-tungstenite" version = "0.17.1" @@ -2008,27 +1902,10 @@ dependencies = [ "log", "once_cell", "qstring", - "rustls 0.19.1", - "url", - "webpki 0.21.4", - "webpki-roots 0.21.1", -] - -[[package]] -name = "ureq" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9399fa2f927a3d327187cbd201480cee55bee6ac5d3c77dd27f0c6814cff16d5" -dependencies = [ - "base64", - "chunked_transfer", - "flate2", - "log", - "once_cell", - "rustls 0.20.6", + "rustls", "url", - "webpki 0.22.0", - "webpki-roots 0.22.3", + "webpki", + "webpki-roots", ] [[package]] @@ -2169,32 +2046,13 @@ dependencies = [ "untrusted", ] -[[package]] -name = "webpki" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "webpki-roots" version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aabe153544e473b775453675851ecc86863d2a81d786d741f6b76778f2a48940" dependencies = [ - "webpki 0.21.4", -] - -[[package]] -name = "webpki-roots" -version = "0.22.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44d8de8415c823c8abd270ad483c6feeac771fad964890779f9a8cb24fbbc1bf" -dependencies = [ - "webpki 0.22.0", + "webpki", ] [[package]] @@ -2299,16 +2157,3 @@ checksum = "6d1526bbe5aaeb5eb06885f4d987bcdfa5e23187055de9b83fe00156a821fabc" dependencies = [ "libc", ] - -[[package]] -name = "zip" -version = "0.5.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93ab48844d61251bb3835145c521d88aa4031d7139e8485990f60ca911fa0815" -dependencies = [ - "byteorder", - "bzip2", - "crc32fast", - "flate2", - "thiserror", -] diff --git a/Cargo.toml b/Cargo.toml index 94bb238..81686db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" authors = ["Leonardo Souza ", "LLFourn "] repository = "https://github.com/oleonardolima/block-events" description = "A real-time stream block events library, covering connected and disconnected blocks.\nThis a work in progress project for Summer of Bitcoin 2022." -keywords = ["bitcoin", "blockchain", "blocks", "mempool-space", "stream", "events", "summer-of-bitcoin"] +keywords = ["bitcoin", "blockchain", "blocks", "events", "mempool-space", "stream", "summer-of-bitcoin"] readme = "README.md" license = "MIT OR Apache-2.0" @@ -15,29 +15,20 @@ async-stream = { version = "0.3.3"} bitcoin = { version = "0.28", features = ["use-serde", "base64"] } clap = { version = "3.0", features = ["derive"]} env_logger = { version = "0.9.0" } -futures-core = { version = "0.3" } -futures-util = { version = "0.3" } +futures = { version = "0.3" } log = { version = "0.4" } +reqwest = { version = "0.11.11" } serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0" } -tokio = { version = "1.19.2", features = ["io-util", "io-std", "macros", "net", "rt-multi-thread", "time"] } -tokio-stream = { version = "0.1.9" } +tokio = { version = "1.19.2", features = ["macros", "rt-multi-thread"] } tokio-tungstenite = { version = "0.17.1", features = ["connect", "native-tls"]} url = { version = "2.0.0" } -reqwest = { version = "0.11.11" } [dev-dependencies] testcontainers = { version = "^0.14.0" } bitcoind = { version = "^0.26.1", features = ["22_0"] } -electrsd = { version = "^0.19.1", features = ["bitcoind_22_0", "electrs_0_9_1"] } serial_test = { version = "0.7.0" } -[features] -default = ["mempool-backend"] -tls-secure = [] -esplora-backend = [] -mempool-backend = [] - [lib] name = "block_events" path = "src/lib.rs" diff --git a/src/api.rs b/src/api.rs index baf0471..5993683 100644 --- a/src/api.rs +++ b/src/api.rs @@ -10,7 +10,7 @@ //! All structs from mempool.space API //! Also contains the main [`BlockEvent`] -use bitcoin::{Address, BlockHash, BlockHeader, TxMerkleNode}; +use bitcoin::{Address, Block, BlockHash, BlockHeader, TxMerkleNode}; /// A structure that implements the equivalent `BlockExtended` type from mempool.space, /// which is expected and parsed as response @@ -19,27 +19,44 @@ pub struct BlockExtended { pub id: BlockHash, pub height: u32, pub version: i32, - // none for genesis block #[serde(alias = "previousblockhash")] - pub prev_blockhash: BlockHash, + pub prev_blockhash: Option, // None for genesis block pub merkle_root: TxMerkleNode, #[serde(alias = "timestamp")] pub time: u32, pub bits: u32, pub nonce: u32, - // add new fields if needed } -// FIXME: (@leonardo.lima) Should this use serde_json or some other approach instead ? impl From for BlockHeader { - fn from(extended: BlockExtended) -> BlockHeader { + fn from(extended: BlockExtended) -> Self { BlockHeader { - version: (extended.version), - prev_blockhash: (extended.prev_blockhash), - merkle_root: (extended.merkle_root), - time: (extended.time), - bits: (extended.bits), - nonce: (extended.nonce), + version: extended.version, + prev_blockhash: extended + .prev_blockhash + .expect("Given `api::BlockExtended` does not have prev_blockhash field"), + merkle_root: extended.merkle_root, + time: extended.time, + bits: extended.bits, + nonce: extended.nonce, + } + } +} + +impl From for BlockExtended { + fn from(block: Block) -> Self { + BlockExtended { + id: block.block_hash(), + height: block + .bip34_block_height() + .expect("Given `bitcoin::Block` does not have height encoded as bip34") + as u32, + version: block.header.version, + prev_blockhash: Some(block.header.prev_blockhash), + merkle_root: block.header.merkle_root, + time: block.header.time, + bits: block.header.bits, + nonce: block.header.nonce, } } } @@ -48,6 +65,9 @@ impl From for BlockHeader { #[derive(serde::Deserialize, Debug)] pub struct MempoolSpaceWebSocketMessage { pub block: BlockExtended, + // pub mempool_info: MempoolInfo, + // pub da: DifficultyAdjustment, + // pub fees: RecommendedFee, } /// Structure that implements the standard fields for mempool.space WebSocket client message diff --git a/src/bin.rs b/src/bin.rs index 6f9d261..d0fd93f 100644 --- a/src/bin.rs +++ b/src/bin.rs @@ -1,6 +1,6 @@ use anyhow::Ok; use clap::{ArgGroup, Parser, Subcommand}; -use futures_util::{pin_mut, StreamExt}; +use futures::{pin_mut, StreamExt}; use serde::{Deserialize, Serialize}; #[derive(Parser)] @@ -15,8 +15,11 @@ struct Cli { #[clap(subcommand)] command: Commands, - #[clap(short, long, default_value = "mempool.space/testnet/api/v1")] - base_url: String, + #[clap(short, long, default_value = "https://mempool.space/testnet/api")] + http_base_url: String, + + #[clap(short, long, default_value = "wss://mempool.space/testnet/")] + ws_base_url: String, } #[derive(Debug, Subcommand)] @@ -64,8 +67,12 @@ async fn main() -> anyhow::Result<()> { // async fetch the data stream through the lib let checkpoint = None; - let block_events = - block_events::subscribe_to_block_headers(cli.base_url.as_str(), checkpoint).await?; + let block_events = block_events::subscribe_to_block_headers( + cli.http_base_url.as_str(), + cli.ws_base_url.as_str(), + checkpoint, + ) + .await?; // consume and execute the code (current matching and printing) in async manner for each new block-event pin_mut!(block_events); diff --git a/src/http.rs b/src/http.rs index f84f07f..42a61d3 100644 --- a/src/http.rs +++ b/src/http.rs @@ -13,84 +13,92 @@ #![allow(unused_imports)] use std::ops::Deref; -use bitcoin::{consensus::deserialize, hashes::hex::FromHex, Block, BlockHash, Transaction, Txid}; +use bitcoin::{ + consensus::deserialize, hashes::hex::FromHex, Block, BlockHash, BlockHeader, Transaction, Txid, +}; use reqwest::Client; use crate::api::BlockExtended; -#[cfg(feature = "tls-secure")] -static HTTP_PROTOCOL: &str = "https"; - -#[cfg(not(feature = "tls-secure"))] -static HTTP_PROTOCOL: &str = "http"; - -#[cfg(feature = "esplora-backend")] -static API_PREFIX: &str = "api"; - -#[cfg(feature = "mempool-backend")] -static API_PREFIX: &str = "api/v1"; - /// Generic HttpClient using `reqwest` -/// It has been based on the Esplora client from BDK +/// +/// This implementation and approach is based on the BDK's esplora client +/// +/// `` #[allow(dead_code)] #[derive(Debug, Clone)] pub struct HttpClient { - url: String, + /// The base url for building our http rest calls + /// It's expected to have the protocol, domain and initial api path (e.g: ``) + base_url: String, + /// A `reqwest` client with default or selected config client: Client, + /// The number of concurrency requests the client is allowed to make concurrency: u8, } impl HttpClient { /// Creates a new HttpClient, for given base url and concurrency - pub fn new(base_url: &str, concurrency: u8) -> Self { - let url = - url::Url::parse(format!("{}://{}/{}", HTTP_PROTOCOL, base_url, API_PREFIX).as_str()) - .unwrap(); + pub fn new(base_url: &str) -> Self { HttpClient { - url: url.to_string(), client: Client::new(), - concurrency, + base_url: base_url.to_string(), + concurrency: crate::DEFAULT_CONCURRENT_REQUESTS, } } - /// Get current blockchain block height (the current tip height) - pub async fn _get_tip_height(&self) -> anyhow::Result { + /// Get current blockchain height [`u32`], the current tip height + pub async fn get_tip_height(&self) -> anyhow::Result { let res = self .client - .get(&format!("{}/blocks/tip/height", self.url)) + .get(&format!("{}/blocks/tip/height", self.base_url)) .send() .await?; Ok(res.error_for_status()?.text().await?.parse()?) } - /// Get [`BlockHash`] from mempool.space, for current tip - pub async fn _get_tip_hash(&self) -> anyhow::Result { + /// Get current blockchain hash [`BlockHash`], the current tip hash + pub async fn get_tip_hash(&self) -> anyhow::Result { let res = self .client - .get(&format!("{}/blocks/tip/hash", self.url)) + .get(&format!("{}/blocks/tip/hash", self.base_url)) .send() .await?; Ok(res.error_for_status()?.text().await?.parse()?) } - /// Get [`BlockHash`] from mempool.space, for given block height - pub async fn _get_block_hash(&self, height: u32) -> anyhow::Result { + /// Get the [`BlockHash`] for given block height + pub async fn get_block_hash(&self, height: u32) -> anyhow::Result { let res = self .client - .get(&format!("{}/block-height/{}", self.url, height)) + .get(&format!("{}/block-height/{}", self.base_url, height)) .send() .await?; Ok(res.error_for_status()?.text().await?.parse()?) } - /// Get [`BlockExtended`] from mempool.space, by [`BlockHash`] - pub async fn _get_block(&self, block_hash: BlockHash) -> anyhow::Result { + /// Get the [`BlockHeader`] for given [`BlockHash`] + pub async fn get_block_header(&self, hash: BlockHash) -> anyhow::Result { let res = self .client - .get(&format!("{}/block/{}", self.url, block_hash)) + .get(&format!("{}/block/{}/header", self.base_url, hash)) + .send() + .await?; + + let raw_header = Vec::::from_hex(res.error_for_status()?.text().await?.as_str())?; + let header: BlockHeader = deserialize(&raw_header)?; + + Ok(header) + } + + /// Get full block in [`BlockExtended`] format, for given [`BlockHash`] + pub async fn get_block(&self, block_hash: BlockHash) -> anyhow::Result { + let res = self + .client + .get(&format!("{}/block/{}", self.base_url, block_hash)) .send() .await?; @@ -99,12 +107,14 @@ impl HttpClient { )?) } - /// FIXME: (@leonardo.lima) this only works when using the blockstream.info (esplora) client - #[cfg(feature = "esplora-backend")] - pub async fn _get_block_raw(&self, block_hash: BlockHash) -> anyhow::Result { + /// This only works when using the blockstream.info (esplora) client, it does not work with mempool.space client + /// + /// NOTE: It will be used instead of multiple calls for building blocks as this commit is added in a new mempool.space + /// release: `` + pub async fn get_block_raw(&self, block_hash: BlockHash) -> anyhow::Result { let res = self .client - .get(&format!("{}/block/{}/raw", self.url, block_hash)) + .get(&format!("{}/block/{}/raw", self.base_url, block_hash)) .send() .await?; @@ -113,28 +123,29 @@ impl HttpClient { Ok(block) } - pub async fn _get_tx(&self, tx_id: Txid) -> anyhow::Result { + /// Get all transactions ids [`Vec`] for given [`BlockHash`] + pub async fn get_tx_ids(&self, block_hash: BlockHash) -> anyhow::Result> { let res = self .client - .get(&format!("{}/tx/{}/hex", self.url, tx_id)) + .get(format!("{}/block/{}/txids", self.base_url, block_hash)) .send() .await?; - let tx: Transaction = deserialize(&Vec::::from_hex( - res.error_for_status()?.text().await?.as_str(), - )?)?; - - Ok(tx) + let tx_ids: Vec = serde_json::from_str(res.text().await?.as_str())?; + Ok(tx_ids) } - pub async fn _get_tx_ids(&self, block_hash: BlockHash) -> anyhow::Result> { + /// Get the [`Transaction`] for given transaction hash/id [`Txid`] + pub async fn get_tx(&self, tx_id: Txid) -> anyhow::Result { let res = self .client - .get(format!("{}/block/{}/txids", self.url, block_hash)) + .get(&format!("{}/tx/{}/hex", self.base_url, tx_id)) .send() .await?; - let tx_ids: Vec = serde_json::from_str(res.text().await?.as_str())?; - Ok(tx_ids) + let raw_tx = Vec::::from_hex(res.error_for_status()?.text().await?.as_str())?; + let tx: Transaction = deserialize(&raw_tx)?; + + Ok(tx) } } diff --git a/src/lib.rs b/src/lib.rs index a1576cf..c9b7c21 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,9 +66,6 @@ //! } //! ``` -#[cfg(all(feature = "esplora-backend", feature = "mempool-backend"))] -compile_error!("features esplora-backend and mempool-backend are mutually exclusive and cannot be enabled together"); - pub mod api; pub mod http; pub mod websocket; @@ -76,21 +73,19 @@ pub mod websocket; pub extern crate async_stream; pub extern crate bitcoin; pub extern crate tokio; -pub extern crate tokio_stream; pub extern crate tokio_tungstenite; use std::{collections::HashMap, collections::VecDeque, pin::Pin}; use api::{BlockEvent, BlockExtended}; -use futures_core::TryStream; +use futures::{Stream, StreamExt, TryStream}; use http::HttpClient; use anyhow::{anyhow, Ok}; -use async_stream::{stream, try_stream}; -use bitcoin::{Block, BlockHash, BlockHeader, Transaction}; -use core::result::Result::Ok as CoreOk; -use futures_util::stream::Stream; -use tokio_stream::StreamExt; +use async_stream::try_stream; +use bitcoin::{ + blockdata::constants::genesis_block, Block, BlockHash, BlockHeader, Network, Transaction, +}; const DEFAULT_CONCURRENT_REQUESTS: u8 = 4; @@ -100,18 +95,67 @@ const DEFAULT_CONCURRENT_REQUESTS: u8 = 4; #[derive(Debug, Clone)] pub struct BlockHeadersCache { pub tip: BlockHash, - pub active_headers: HashMap, - pub stale_headers: HashMap, + pub active_headers: HashMap, + pub stale_headers: HashMap, } impl BlockHeadersCache { + /// Create a new instance of [`BlockHeadersCache`] for given checkpoint (height: [`u32`], hash: [`BlockHash`]) + /// + /// It creates with the checkpoint block as tip and active_headers + pub async fn new(base_url: &str) -> anyhow::Result { + let http_client = HttpClient::new(base_url); + + let hash = http_client.get_tip_hash().await?; + let header = http_client.get_block_header(hash).await?; + + Ok(BlockHeadersCache { + tip: hash, + active_headers: HashMap::from([(hash, header)]), + stale_headers: HashMap::new(), + }) + } + + /// Create a new instance of [`BlockHeadersCache`] for given [`Network`] + /// + /// It creates with the genesis block for given network as tip and active_headers + pub fn new_with_genesis(network: Network) -> BlockHeadersCache { + let genesis_block = genesis_block(network); + + BlockHeadersCache { + tip: genesis_block.block_hash(), + active_headers: HashMap::from([(genesis_block.block_hash(), genesis_block.header)]), + stale_headers: HashMap::new(), + } + } + + /// Create a new instance of [`BlockHeadersCache`] for given checkpoint (height: [`u32`], hash: [`BlockHash`]) + /// + /// It creates with the checkpoint block as tip and active_headers + pub async fn new_with_checkpoint( + base_url: &str, + checkpoint: (u32, BlockHash), + ) -> anyhow::Result { + let (_, hash) = checkpoint; + + let header = HttpClient::new(base_url).get_block_header(hash).await?; + + Ok(BlockHeadersCache { + tip: hash, + active_headers: HashMap::from([(hash, header)]), + stale_headers: HashMap::new(), + }) + } + /// Validate if the new [`BlockHeader`] or [`BlockExtended`] candidate is a valid tip /// - /// Updates the [`BlockHeadersCache`] state and returns a boolean - pub fn validate_new_header(&mut self, candidate: BlockExtended) -> bool { + /// Updates the [`BlockHeadersCache`] state, updating the tip, extending the active_headers and returns a boolean + pub fn validate_new_header(&mut self, candidate: BlockHeader) -> bool { + // TODO: (@leonardo.lima) It should check and validate the PoW for the header candidates if self.tip == candidate.prev_blockhash { - self.tip = candidate.id; - self.active_headers.insert(candidate.id, candidate); + self.tip = candidate.block_hash(); + self.active_headers + .insert(candidate.block_hash(), candidate); return true; } false @@ -121,32 +165,36 @@ impl BlockHeadersCache { /// /// Updates the [`BlockHeadersCache`] state with fork chain candidates /// - /// Returns a common ancestor [`BlockExtended`] stored in [`BlockHeadersCache`] and fork branch chain as a `VecDeque` + /// Returns a common ancestor [`BlockHeader`] stored in [`BlockHeadersCache`] and the + /// fork branch chain as a [`VecDeque`] pub async fn find_or_fetch_common_ancestor( &self, http_client: HttpClient, - branch_candidate: BlockExtended, - ) -> anyhow::Result<(BlockExtended, VecDeque)> { - let mut common_ancestor = branch_candidate; - let mut fork_branch: VecDeque = VecDeque::new(); - while !self.active_headers.contains_key(&common_ancestor.id) { + fork_candidate: BlockHeader, + ) -> anyhow::Result<(BlockHeader, VecDeque)> { + let mut common_ancestor = fork_candidate; + let mut fork_branch: VecDeque = VecDeque::new(); + while !self + .active_headers + .contains_key(&common_ancestor.block_hash()) + { fork_branch.push_back(common_ancestor); common_ancestor = http_client - ._get_block(common_ancestor.prev_blockhash) + .get_block_header(common_ancestor.prev_blockhash) .await?; } Ok((common_ancestor, fork_branch)) } - /// Rollback active chain in [`BlockHeadersCache`] back to passed block + /// Rollback active chain in [`BlockHeadersCache`] back to passed [`BlockHeader`] /// - /// Returns all stale, and to be disconnected blocks as a `VecDeque` + /// Returns all stale, and to be disconnected blocks as a [`VecDeque`] pub async fn rollback_active_chain( &mut self, - block: BlockExtended, - ) -> anyhow::Result> { + header: BlockHeader, + ) -> anyhow::Result> { let mut disconnected = VecDeque::new(); - while block.id != self.tip { + while header.block_hash() != self.tip { let (stale_hash, stale_header) = self.active_headers.remove_entry(&self.tip).unwrap(); disconnected.push_back(stale_header); @@ -156,57 +204,99 @@ impl BlockHeadersCache { Ok(disconnected) } - /// Apply fork branch to active chain, and update tip to new `BlockExtended` + /// Apply fork branch to active chain, and update tip to new [`BlockHeader`] /// - /// Returns the new tip `BlockHash`, and the connected blocks as a `VecDeque` + /// Returns the new tip [`BlockHash`], and the connected block headers as a [`VecDeque`] pub fn apply_fork_chain( &mut self, - mut fork_branch: VecDeque, - ) -> anyhow::Result<(BlockHash, VecDeque)> { + mut fork_branch: VecDeque, + ) -> anyhow::Result<(BlockHash, VecDeque)> { let mut connected = VecDeque::new(); while !fork_branch.is_empty() { - let block = fork_branch.pop_front().unwrap(); - connected.push_back(block); + let header = fork_branch.pop_front().unwrap(); + connected.push_back(header); - self.active_headers.insert(block.id, block); - self.tip = block.id; + self.active_headers.insert(header.block_hash(), header); + self.tip = header.block_hash(); } Ok((self.tip, connected)) } } -/// Subscribe to a real-time stream of [`BlockEvent`], for all new blocks or starting from an optional checkpoint -pub async fn subscribe_to_block_headers( +/// Process all candidates listened from source, it tries to apply the candidate to current active chain cached +/// It handles reorganization and fork if needed +/// Steps: +/// - validates if current candidate is valid as a new tip, if valid extends chain producing [`BlockEvent::Connected`] +/// - otherwise, find common ancestor between branches +/// - rollback current cached active chain +/// - apply forked branch, and produces [`BlockEvent::Disconnected`] for staled blocks and [`BlockEvent::Connected`] +/// for new branch +async fn process_candidates( base_url: &str, - checkpoint: Option<(u32, BlockHash)>, -) -> anyhow::Result>>>> { - let http_client = http::HttpClient::new(base_url, DEFAULT_CONCURRENT_REQUESTS); + mut cache: BlockHeadersCache, + mut candidates: Pin>>>, +) -> anyhow::Result>>> { + let http_client = HttpClient::new(base_url); - let tip_height = match checkpoint { - Some((height, _)) => height - 1, - _ => http_client._get_tip_height().await?, - }; + let stream = try_stream! { + // TODO: (@leonardo.lima) Do not just propagate the errors, add a retry mechanism instead + while let candidate = candidates.next().await.ok_or(anyhow!("the `bitcoin::BlockHeader` candidate is None"))?? { + // validate if the [`BlockHeader`] candidate is a valid new tip + // yields a [`BlockEvent::Connected()`] variant and continue the iteration + if cache.validate_new_header(candidate) { + yield BlockEvent::Connected(BlockHeader::from(candidate.clone())); + continue + } + + // find common ancestor for current active chain and the forked chain + // fetches forked chain candidates and store in cache + let (common_ancestor, fork_chain) = cache.find_or_fetch_common_ancestor(http_client.clone(), candidate).await?; + + // rollback current active chain, moving blocks to staled field + // yields BlockEvent::Disconnected((u32, BlockHash)) + let mut disconnected: VecDeque = cache.rollback_active_chain(common_ancestor).await?; + while !disconnected.is_empty() { + if let Some(block_header) = disconnected.pop_back() { + let block_ext: BlockExtended = http_client.get_block(block_header.block_hash()).await?; + yield BlockEvent::Disconnected((block_ext.height, block_header.block_hash())); + } + } - let cache = BlockHeadersCache { - tip: http_client._get_block_hash(tip_height).await?, - active_headers: HashMap::new(), - stale_headers: HashMap::new(), + // iterate over forked chain candidates + // update [`Cache`] active_headers field with candidates + let (_, mut connected) = cache.apply_fork_chain(fork_chain)?; + while !connected.is_empty() { + let block = connected.pop_back().unwrap(); + yield BlockEvent::Connected(BlockHeader::from(block.clone())); + } + } }; + Ok(stream) +} +/// Subscribe to a real-time stream of [`BlockEvent`], for all new blocks or starting from an optional checkpoint +pub async fn subscribe_to_block_headers( + http_base_url: &str, + ws_base_url: &str, + checkpoint: Option<(u32, BlockHash)>, +) -> anyhow::Result>>>>> { match checkpoint { Some(checkpoint) => { - let prev_header_candidates = fetch_blocks(base_url, checkpoint).await?; - let new_header_candidates = websocket::subscribe_to_block_headers(base_url).await?; - let candidates = Box::pin(prev_header_candidates.chain(new_header_candidates)); + let headers_cache = + BlockHeadersCache::new_with_checkpoint(http_base_url, checkpoint).await?; + let prev_headers = fetch_block_headers(http_base_url, checkpoint).await?; + let new_headers = websocket::listen_new_block_headers(ws_base_url).await?; + let candidates = Box::pin(prev_headers.chain(new_headers)); Ok(Box::pin( - process_candidates(base_url, cache, candidates).await?, + process_candidates(http_base_url, headers_cache, candidates).await?, )) } None => { + let headers_cache = BlockHeadersCache::new(http_base_url).await?; let new_header_candidates = - Box::pin(websocket::subscribe_to_block_headers(base_url).await?); + Box::pin(websocket::listen_new_block_headers(ws_base_url).await?); Ok(Box::pin( - process_candidates(base_url, cache, new_header_candidates).await?, + process_candidates(http_base_url, headers_cache, new_header_candidates).await?, )) } } @@ -215,27 +305,29 @@ pub async fn subscribe_to_block_headers( /// Subscribe to a real-time stream of [`BlockEvent`] of full rust-bitcoin blocks, for new mined blocks or /// starting from an optional checkpoint (height: u32, hash: BlockHash) pub async fn subscribe_to_blocks( - base_url: &str, + http_base_url: &str, + ws_base_url: &str, checkpoint: Option<(u32, BlockHash)>, ) -> anyhow::Result>>> { // build and create a http client - let http_client = http::HttpClient::new(base_url, DEFAULT_CONCURRENT_REQUESTS); + let http_client = HttpClient::new(http_base_url); // subscribe to block_headers events - let mut header_events = subscribe_to_block_headers(base_url, checkpoint).await?; + let mut header_events = + subscribe_to_block_headers(http_base_url, ws_base_url, checkpoint).await?; // iterate through each event for block_headers let stream = try_stream! { while let Some(event) = header_events.next().await { - match event { + match event? { BlockEvent::Connected(header) => { // fetch all transaction ids (Txids) for the block - let tx_ids = http_client._get_tx_ids(header.block_hash()).await?; + let tx_ids = http_client.get_tx_ids(header.block_hash()).await?; // fetch full transaction and build transaction list Vec let mut txs: Vec = Vec::new(); for id in tx_ids { - let tx = http_client._get_tx(id).await?; + let tx = http_client.get_tx(id).await?; txs.push(Transaction::from(tx)); } @@ -253,92 +345,32 @@ pub async fn subscribe_to_blocks( Ok(stream) } -/// Process all candidates listened from source, it tries to apply the candidate to current active chain cached -/// It handles reorganization and fork if needed -/// Steps: -/// - validates if current candidate is valid as a new tip, if valid extends chain producing [`BlockEvent::Connected`] -/// - otherwise, find common ancestor between branches -/// - rollback current cached active chain -/// - apply forked branch, and produces [`BlockEvent::Disconnected`] for staled blocks and [`BlockEvent::Connected`] -/// for new branch -async fn process_candidates( - base_url: &str, - mut cache: BlockHeadersCache, - mut candidates: Pin>>>, -) -> anyhow::Result>> { - let http_client = HttpClient::new(base_url, DEFAULT_CONCURRENT_REQUESTS); - - let stream = stream! { - while let Some(candidate) = candidates.next().await { - match candidate { - // TODO: (@leonardo.lima) We should handle and return an specific error for the client - Err(_) => {/* ignore */}, - CoreOk(candidate) => { - // TODO: (@leonardo.lima) It should check and validate for valid BlockHeaders - - // validate if the [`BlockHeader`] candidate is a valid new tip - // yields a [`BlockEvent::Connected()`] variant and continue the iteration - if cache.validate_new_header(candidate) { - yield BlockEvent::Connected(BlockHeader::from(candidate.clone())); - continue - } - - // find common ancestor for current active chain and the forked chain - // fetches forked chain candidates and store in cache - let (common_ancestor, fork_chain) = cache.find_or_fetch_common_ancestor(http_client.clone(), candidate).await.unwrap(); - - // rollback current active chain, moving blocks to staled field - // yields BlockEvent::Disconnected((u32, BlockHash)) - let mut disconnected: VecDeque = cache.rollback_active_chain(common_ancestor).await.unwrap(); - while !disconnected.is_empty() { - let block: BlockExtended = disconnected.pop_back().unwrap(); - yield BlockEvent::Disconnected((block.height, block.id)); - } - - // iterate over forked chain candidates - // update [`Cache`] active_headers field with candidates - let (_, mut connected) = cache.apply_fork_chain(fork_chain).unwrap(); - while !connected.is_empty() { - let block = connected.pop_back().unwrap(); - yield BlockEvent::Connected(BlockHeader::from(block.clone())); - } - - }, - } - } - }; - Ok(stream) -} - -/// Fetch all new starting from the checkpoint up to current active tip -// FIXME: this fails when checkpoint is genesis block as it does not have a previousblockhash field -pub async fn fetch_blocks( +/// Fetch all [`BlockHeader`] starting from the checkpoint ([`u32`], [`BlockHeader`]) up to tip +pub async fn fetch_block_headers( base_url: &str, checkpoint: (u32, BlockHash), -) -> anyhow::Result>> { - let http_client = HttpClient::new(base_url, DEFAULT_CONCURRENT_REQUESTS); +) -> anyhow::Result>> { + let http_client = HttpClient::new(base_url); // checks if the checkpoint height and hash matches for the current chain let (ckpt_height, ckpt_hash) = checkpoint; - if ckpt_hash != http_client._get_block_hash(ckpt_height).await? { + if ckpt_hash != http_client.get_block_hash(ckpt_height).await? { return Err(anyhow!( "The checkpoint passed is invalid, it should exist in the blockchain." )); } - let tip_height = http_client._get_tip_height().await?; + let tip_height = http_client.get_tip_height().await?; let stream = try_stream! { for height in ckpt_height..tip_height { - let hash = http_client._get_block_hash(height).await?; - let block = http_client._get_block(hash).await?; - - yield block; + let block_hash = http_client.get_block_hash(height).await?; + let block_header = http_client.get_block_header(block_hash).await?; + yield block_header; }; - let height = http_client._get_tip_height().await?; - let hash = http_client._get_block_hash(height).await?; - let block = http_client._get_block(hash).await?; - yield block; + let tip_hash = http_client.get_tip_hash().await?; + let tip_header = http_client.get_block_header(tip_hash).await?; + yield tip_header; }; Ok(stream) } diff --git a/src/websocket.rs b/src/websocket.rs index c97aabd..970cd54 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -8,39 +8,31 @@ // licenses. //! WebSocket module for mempool.space -//! It has functions to connect and create a new WebSocket client, and also subscribe for only new block events - -use crate::api::BlockExtended; +//! It has functions to connect and create a new WebSocket client, and also subscribe for new blocks (BlockHeaders) use super::api::{ MempoolSpaceWebSocketMessage, MempoolSpaceWebSocketRequestData, MempoolSpaceWebSocketRequestMessage, }; -use anyhow::{anyhow, Ok}; +use anyhow::{anyhow, Ok as AnyhowOk}; use async_stream::try_stream; -use core::result::Result::Ok as CoreOk; -use futures_core::TryStream; -use futures_util::{SinkExt, StreamExt}; +use bitcoin::BlockHeader; +use core::result::Result::Ok; +use futures::{SinkExt, StreamExt, TryStream}; use std::time::Duration; use tokio::net::TcpStream; use tokio_tungstenite::tungstenite::protocol::Message; use tokio_tungstenite::{connect_async_tls_with_config, MaybeTlsStream, WebSocketStream}; -#[cfg(feature = "tls-secure")] -static WEBSOCKET_PROTOCOL: &str = "wss"; - -#[cfg(not(feature = "tls-secure"))] -static WEBSOCKET_PROTOCOL: &str = "ws"; - /// Create a new WebSocket client for given base url and initial message /// -/// It uses `tokio_tungestenite` crate and produces `WebSocketStream` to be handled and treated by caller +/// It uses `tokio_tungestenite` crate and produces [`WebSocketStream`] to be handled and treated by caller async fn websocket_client( base_url: &str, message: String, ) -> anyhow::Result>> { - let url = url::Url::parse(format!("{}://{}/api/v1/ws", WEBSOCKET_PROTOCOL, base_url).as_str())?; + let url = url::Url::parse(format!("{}/api/v1/ws", base_url).as_str())?; log::info!("starting websocket handshake with url={}", url); let (mut websocket_stream, websocket_response) = @@ -54,13 +46,13 @@ async fn websocket_client( return Err(anyhow!("failed to publish first message to websocket")); }; log::info!("published message: {:#?}, successfully!", &message); - Ok(websocket_stream) + AnyhowOk(websocket_stream) } -/// Connects to mempool.space WebSocket client and listen to new messages producing a stream of [`BlockExtended`] candidates -pub async fn subscribe_to_block_headers( +/// Connects to mempool.space WebSocket client and listen to new messages producing a stream of [`BlockHeader`] candidates +pub async fn listen_new_block_headers( base_url: &str, -) -> anyhow::Result>> { +) -> anyhow::Result>> { let init_message = serde_json::to_string(&build_websocket_request_message( &MempoolSpaceWebSocketRequestData::Blocks, ))?; @@ -75,34 +67,41 @@ pub async fn subscribe_to_block_headers( tokio::select! { message = ws_stream.next() => { if let Some(message) = message { - match message.unwrap() { - Message::Text(text) => { - let parsed_msg: MempoolSpaceWebSocketMessage = match serde_json::from_str(&text) { - Err(_) => continue, - CoreOk(parsed_msg) => parsed_msg, - }; - yield parsed_msg.block; - }, - Message::Close(_) => { - eprintln!("websocket closing gracefully"); - break; + match message { + Ok(message) => match message { + Message::Text(text) => { + let parsed: MempoolSpaceWebSocketMessage = match serde_json::from_str(&text) { + Err(_) => continue, + Ok(parsed) => parsed, + }; + yield BlockHeader::from(parsed.block); + }, + Message::Close(_) => { + eprintln!("websocket closing gracefully"); + break; + }, + Message::Binary(_) => { + eprintln!("unexpected binary message"); + break; + }, + _ => {/* ignore */} }, - Message::Binary(_) => { - eprintln!("unexpected binary message"); - break; - }, - _ => { /*ignore*/ } + Err(_error) => { /* ignore */}, } } } _ = pinger.tick() => { log::info!("pinging to websocket to keep connection alive"); - ws_stream.send(Message::Ping(vec![])).await.unwrap(); + if (ws_stream.send(Message::Ping(vec![])).await).is_err() { + log::error!("failed to send ping message to websocket"); + continue + } } } } }; - Ok(stream) + + AnyhowOk(stream) } fn build_websocket_request_message( diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 39eaa79..9f83d25 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1,13 +1,11 @@ use bitcoin::BlockHash; use bitcoind::{bitcoincore_rpc::RpcApi, BitcoinD}; use block_events::{api::BlockEvent, http::HttpClient, websocket}; -use futures_util::{pin_mut, StreamExt}; +use futures::{pin_mut, StreamExt}; use serial_test::serial; use std::{collections::VecDeque, ops::Deref, time::Duration}; use testcontainers::{clients, images, images::generic::GenericImage, RunnableImage}; -const DEFAULT_CONCURRENT_REQUESTS: u8 = 4; - const HOST_IP: &str = "127.0.0.1"; const MARIADB_NAME: &str = "mariadb"; @@ -58,7 +56,20 @@ impl MempoolTestClient { let bitcoind = BitcoinD::with_conf(&bitcoind_exe, &conf).unwrap(); - log::debug!("successfully launched [bitcoind_exe {:?}]", bitcoind_exe); + // get a new address from node wallet + let node_address = bitcoind.client.get_new_address(None, None).unwrap(); + + // generate first 101 blocks + bitcoind + .client + .generate_to_address(101, &node_address) + .unwrap(); + + log::debug!( + "successfully launched bitcoind and generated initial coins [bitcoind_exe {:?}]", + bitcoind_exe + ); + bitcoind } @@ -162,14 +173,12 @@ async fn test_fetch_tip_height() { let mempool = docker.run(client.mempool_backend); let rpc_client = &client.bitcoind.client; - let http_client = HttpClient::new( - format!("{}:{}", HOST_IP, mempool.get_host_port_ipv4(8999)).as_str(), - DEFAULT_CONCURRENT_REQUESTS, - ); + let http_client = + HttpClient::new(format!("{}:{}", HOST_IP, mempool.get_host_port_ipv4(8999)).as_str()); // should return the current tip height for i in 0..10 { - let tip = http_client._get_tip_height().await.unwrap(); + let tip = http_client.get_tip_height().await.unwrap(); assert_eq!(i, tip); let _ = rpc_client @@ -194,12 +203,16 @@ async fn test_fetch_block_hash_by_height() { let rpc_client = &client.bitcoind.client; let http_client = HttpClient::new( - format!("{}:{}", HOST_IP, mempool.get_host_port_ipv4(8999)).as_str(), - DEFAULT_CONCURRENT_REQUESTS, + format!( + "http://{}:{}/api/v1", + HOST_IP, + mempool.get_host_port_ipv4(8999) + ) + .as_str(), ); // should return an error if there is no block created yet for given height - assert!(http_client._get_block_hash(100).await.is_err()); + assert!(http_client.get_block_hash(100).await.is_err()); // should return block hash for existing block by height for i in 1..10 { @@ -207,7 +220,7 @@ async fn test_fetch_block_hash_by_height() { .generate_to_address(1, &rpc_client.get_new_address(None, None).unwrap()) .unwrap(); - let res_hash = http_client._get_block_hash(i).await.unwrap(); + let res_hash = http_client.get_block_hash(i).await.unwrap(); assert_eq!(gen_hash.first().unwrap(), &res_hash); } } @@ -227,7 +240,7 @@ async fn test_fetch_blocks_for_invalid_checkpoint() { let mempool = docker.run(client.mempool_backend); let checkpoint = (0, BlockHash::default()); - let blocks = block_events::fetch_blocks( + let blocks = block_events::fetch_block_headers( format!("{}:{}", HOST_IP, mempool.get_host_port_ipv4(8999)).as_str(), checkpoint, ) @@ -265,7 +278,7 @@ async fn test_fetch_blocks_for_checkpoint() { log::debug!("[{:#?}]", gen_blocks); let checkpoint = (10, *gen_blocks.get(9).unwrap()); - let blocks = block_events::fetch_blocks( + let blocks = block_events::fetch_block_headers( format!("{}:{}", HOST_IP, mempool.get_host_port_ipv4(8999)).as_str(), checkpoint, ) @@ -276,7 +289,7 @@ async fn test_fetch_blocks_for_checkpoint() { // should return all 10 blocks from 10 to 20, as 10 being the checkpoint for gen_block in &mut gen_blocks[9..] { let block = blocks.next().await.unwrap().unwrap(); - assert_eq!(gen_block.deref(), &block.id); + assert_eq!(gen_block.deref(), &block.block_hash()); } } @@ -284,7 +297,7 @@ async fn test_fetch_blocks_for_checkpoint() { #[serial] async fn test_failure_for_invalid_websocket_url() { let block_events = - websocket::subscribe_to_block_headers(format!("{}:{}", HOST_IP, 8999).as_str()).await; + websocket::listen_new_block_headers(format!("{}:{}", HOST_IP, 8999).as_str()).await; // should return an Err. assert!(block_events.is_err()); @@ -310,9 +323,24 @@ async fn test_block_events_stream() { std::thread::sleep(delay); // there is some delay between running the docker and the port being really available let mempool = docker.run(client.mempool_backend); + log::debug!( + "{:?}", + &client.bitcoind.client.get_blockchain_info().unwrap(), + ); + + log::debug!("{:?}", &client.bitcoind.client.get_chain_tips(),); + + let http_base_url = format!( + "http://{}:{}/api/v1", + HOST_IP, + mempool.get_host_port_ipv4(8999) + ); + let ws_base_url = format!("ws://{}:{}", HOST_IP, mempool.get_host_port_ipv4(8999)); + // get block-events stream let block_events = block_events::subscribe_to_block_headers( - format!("{}:{}", HOST_IP, mempool.get_host_port_ipv4(8999)).as_str(), + http_base_url.as_str(), + ws_base_url.as_str(), None, ) .await @@ -332,7 +360,7 @@ async fn test_block_events_stream() { pin_mut!(block_events); while !generated_blocks.is_empty() { let block_hash = generated_blocks.pop_front().unwrap(); - let block_event = block_events.next().await.unwrap(); + let block_event = block_events.next().await.unwrap().unwrap(); // should produce a BlockEvent::Connected result for each block event assert!(matches!(block_event, BlockEvent::Connected { .. })); @@ -373,9 +401,17 @@ async fn test_block_events_stream_with_checkpoint() { std::thread::sleep(delay); // there is some delay between running the docker and the port being really available let mempool = docker.run(client.mempool_backend); + let http_base_url = format!( + "http://{}:{}/api/v1", + HOST_IP, + mempool.get_host_port_ipv4(8999) + ); + let ws_base_url = format!("ws://{}:{}", HOST_IP, mempool.get_host_port_ipv4(8999)); + // get block-events stream, starting from the tip let block_events = block_events::subscribe_to_block_headers( - format!("{}:{}", HOST_IP, mempool.get_host_port_ipv4(8999)).as_str(), + http_base_url.as_str(), + ws_base_url.as_str(), Some((3, checkpoint.block_hash())), ) .await @@ -385,7 +421,7 @@ async fn test_block_events_stream_with_checkpoint() { pin_mut!(block_events); while !first_blocks.is_empty() { let block_hash = first_blocks.pop_front().unwrap(); - let block_event = block_events.next().await.unwrap(); + let block_event = block_events.next().await.unwrap().unwrap(); // should produce a BlockEvent::Connected result for each block event assert!(matches!(block_event, BlockEvent::Connected { .. })); @@ -414,13 +450,18 @@ async fn test_block_events_stream_with_reorg() { std::thread::sleep(delay); // there is some delay between running the docker and the port being really available let mempool = docker.run(client.mempool_backend); + let http_base_url = format!( + "http://{}:{}/api/v1", + HOST_IP, + mempool.get_host_port_ipv4(8999) + ); + let ws_base_url = format!("ws://{}:{}", HOST_IP, mempool.get_host_port_ipv4(8999)); + // get block-events stream - let block_events = block_events::subscribe_to_block_headers( - format!("{}:{}", HOST_IP, mempool.get_host_port_ipv4(8999)).as_str(), - None, - ) - .await - .unwrap(); + let block_events = + block_events::subscribe_to_block_headers(http_base_url.deref(), ws_base_url.deref(), None) + .await + .unwrap(); // initiate bitcoind client let rpc_client = &client.bitcoind.client; @@ -437,7 +478,7 @@ async fn test_block_events_stream_with_reorg() { pin_mut!(block_events); while !new_blocks.is_empty() { let block_hash = new_blocks.pop_front().unwrap(); - let block_event = block_events.next().await.unwrap(); + let block_event = block_events.next().await.unwrap().unwrap(); // should produce a BlockEvent::Connected result for each block event assert!(matches!(block_event, BlockEvent::Connected { .. })); @@ -467,7 +508,7 @@ async fn test_block_events_stream_with_reorg() { // should disconnect invalidated blocks while !invalidated_blocks.is_empty() { let invalidated = invalidated_blocks.pop_back().unwrap(); - let block_event = block_events.next().await.unwrap(); + let block_event = block_events.next().await.unwrap().unwrap(); // should produce a BlockEvent::Connected result for each block event assert!(matches!(block_event, BlockEvent::Disconnected(..))); @@ -483,7 +524,7 @@ async fn test_block_events_stream_with_reorg() { // should connect the new created blocks while !new_blocks.is_empty() { let new_block = new_blocks.pop_front().unwrap(); - let block_event = block_events.next().await.unwrap(); + let block_event = block_events.next().await.unwrap().unwrap(); // should produce a BlockEvent::Connected result for each block event assert!(matches!(block_event, BlockEvent::Connected { .. }));