diff --git a/src/api.rs b/src/api.rs index c9201d9..f96e659 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,10 +1,10 @@ -use bitcoin::{Address, BlockHash, TxMerkleNode}; +use bitcoin::{Address, BlockHash, BlockHeader, TxMerkleNode}; -#[derive(serde::Deserialize, Clone, Debug, Copy, From)] +#[derive(serde::Deserialize, Clone, Debug, Copy)] pub struct BlockExtended { pub id: BlockHash, pub height: u32, - pub version: u32, + pub version: i32, #[serde(alias = "previousblockhash")] pub prev_blockhash: BlockHash, pub merkle_root: TxMerkleNode, @@ -15,6 +15,20 @@ pub struct BlockExtended { // add new fields if needed } +// FIXME: (@leonardo.lima) Should this use serde_json or some other approach instead ? +impl From for BlockHeader { + fn from(extended: BlockExtended) -> BlockHeader { + return BlockHeader { + version: (extended.version), + prev_blockhash: (extended.prev_blockhash), + merkle_root: (extended.merkle_root), + time: (extended.time), + bits: (extended.bits), + nonce: (extended.nonce), + }; + } +} + #[derive(serde::Deserialize, Debug)] pub struct MempoolSpaceWebSocketMessage { pub block: BlockExtended, @@ -34,7 +48,7 @@ pub enum MempoolSpaceWebSocketRequestData { #[derive(Debug, Clone, Copy)] pub enum BlockEvent { - Connected(BlockExtended), + Connected(BlockHeader), Disconnected((u32, BlockHash)), Error(), } diff --git a/src/bin.rs b/src/bin.rs index 36cf112..1f38ac9 100644 --- a/src/bin.rs +++ b/src/bin.rs @@ -1,8 +1,5 @@ -use std::str::FromStr; - use anyhow::Ok; -use bitcoin::{blockdata::block, BlockHash, Network}; -use block_events::{api::BlockEvent, http::HttpClient}; +use bitcoin::Network; use clap::{ArgGroup, Parser, Subcommand}; use futures_util::{pin_mut, StreamExt}; use serde::{Deserialize, Serialize}; @@ -76,15 +73,7 @@ async fn main() -> anyhow::Result<()> { .unwrap(); // async fetch the data stream through the lib - let block_events = block_events::subscribe_to_blocks( - &url, - Some(( - 1, - BlockHash::from_str("33e3a0e68a2023474bca48b1fa5127a568203957c252c757076fe37460f05261") - .unwrap(), - )), - ) - .await?; + let block_events = block_events::subscribe_to_blocks(&url, None).await?; // consume and execute the code (current matching and printing) in async manner for each new block-event pin_mut!(block_events); diff --git a/src/lib.rs b/src/lib.rs index 8f7460f..1d59e74 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,8 +2,8 @@ mod api; mod http; mod websocket; -use std::pin::Pin; use std::time::Duration; +use std::{collections::HashMap, collections::VecDeque, pin::Pin}; use api::{BlockEvent, BlockExtended}; @@ -11,31 +11,105 @@ use anyhow::{anyhow, Ok}; use async_stream::stream; use bitcoin::{BlockHash, BlockHeader}; use futures_util::stream::Stream; +use http::HttpClient; use tokio::time::Instant; use tokio_stream::StreamExt; use url::Url; const DEFAULT_CONCURRENT_REQUESTS: u8 = 4; +#[derive(Debug, Clone)] +struct Cache { + tip: BlockHash, + active_headers: HashMap, + stale_headers: HashMap, +} + +// TODO: (@leonardo.lima) The `BlockExtended` should be converted/translated to `BlockHeader` pub async fn subscribe_to_blocks( url: &Url, checkpoint: Option<(u32, BlockHash)>, -) -> anyhow::Result>>> { +) -> anyhow::Result>>> { // TODO: (@leonardo.lima) It's needed to infer the tls security from network, or feature ? - let ws_url = &url::Url::parse(format!("ws://{}/ws", url).as_str()).unwrap(); - let http_url = &url::Url::parse(format!("http://{}", url).as_str()).unwrap(); + let ws_url = url::Url::parse(format!("ws://{}/ws", url).as_str()).unwrap(); + let http_url = url::Url::parse(format!("http://{}", url).as_str()).unwrap(); + + let client = http::HttpClient::new(&http_url, DEFAULT_CONCURRENT_REQUESTS); + let chain_height = client._get_height().await.unwrap(); + let chain_tip = client._get_block_height(chain_height).await.unwrap(); + let cache = Cache { + tip: chain_tip, + active_headers: HashMap::new(), + stale_headers: HashMap::new(), + }; match checkpoint { Some(checkpoint) => { - let prev_blocks = fetch_previous_blocks(http_url, checkpoint).await?; - let new_blocks = websocket::subscribe_to_blocks(ws_url).await?; + let prev_blocks = fetch_previous_blocks(&http_url, checkpoint).await?; + let new_blocks = websocket::subscribe_to_blocks(&ws_url).await?; // FIXME: This should filter for duplicated blocks - Ok(Box::pin(prev_blocks.chain(new_blocks))) + let events = + process_candidates(client, cache, Box::pin(prev_blocks.chain(new_blocks))).await?; + Ok(Box::pin(events)) + } + _ => { + let candidates = websocket::subscribe_to_blocks(&ws_url).await?; + let events = process_candidates(client, cache, Box::pin(candidates)).await?; + Ok(Box::pin(events)) } - _ => Ok(Box::pin(websocket::subscribe_to_blocks(ws_url).await?)), } } +async fn process_candidates( + client: HttpClient, + mut cache: Cache, + mut candidates: Pin>>, +) -> anyhow::Result> { + let stream = stream! { + while let Some(candidate) = candidates.next().await { + // TODO: (@leonardo.lima) It should check and validate for valid BlockHeaders + + // validate if its a new valid tip + if cache.tip == candidate.prev_blockhash { + cache.tip = candidate.id; + cache.active_headers.insert(candidate.id, candidate); + yield BlockEvent::Connected(BlockHeader::from(candidate.clone())); + continue + } + + // find common ancestor for current active chain and the forked chain + // fetches forked chain candidates and store in cache + let mut common_ancestor = candidate.clone(); + let mut fork_branch: VecDeque = VecDeque::new(); + while !cache.active_headers.contains_key(&common_ancestor.id) { + log::debug!("{:?}", common_ancestor); + fork_branch.push_back(common_ancestor); + common_ancestor = client._get_block(common_ancestor.prev_blockhash).await.unwrap(); // TODO: (@leonardo.lima) improve error handling here + } + + // rollback current active chain, moving blocks to staled field + // yields BlockEvent::Disconnected((u32, BlockHash)) + while common_ancestor.id != cache.tip { + let (stale_hash, stale_header) = cache.active_headers.remove_entry(&cache.tip).unwrap(); + cache.stale_headers.insert(stale_hash, stale_header); + cache.tip = common_ancestor.id; + yield BlockEvent::Disconnected((stale_header.height, stale_hash)); + } + + // iterate over forked chain candidates + // update [`Cache`] active_headers field with candidates + // yields BlockEvent::Connected(candidate) + for fork_candidate in fork_branch.iter() { + cache.active_headers.insert(fork_candidate.id, fork_candidate.clone()); + cache.tip = fork_candidate.id; + yield BlockEvent::Connected(BlockHeader::from(fork_candidate.clone())); + } + // yield BlockEvent::Connected(BlockHeader::from(candidate.clone())); + } + }; + Ok(stream) +} + // FIXME: this fails when checkpoint is genesis block as it does not have a previousblockhash field pub async fn fetch_previous_blocks( url: &Url, diff --git a/src/websocket.rs b/src/websocket.rs index 1cad0fb..204ef21 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -60,6 +60,7 @@ pub async fn subscribe_to_blocks(url: &Url) -> anyhow::Result {