Skip to content

Commit

Permalink
wip(feat+refactor): add fn to process candidate BlockHeaders, handle …
Browse files Browse the repository at this point in the history
…reorg and yield BlockEvent
  • Loading branch information
oleonardolima committed Jul 1, 2022
1 parent 1c260af commit c6492e0
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 25 deletions.
22 changes: 18 additions & 4 deletions src/api.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use bitcoin::{Address, BlockHash, TxMerkleNode};
use bitcoin::{Address, BlockHash, BlockHeader, TxMerkleNode};

#[derive(serde::Deserialize, Clone, Debug, Copy, From)]
#[derive(serde::Deserialize, Clone, Debug, Copy)]
pub struct BlockExtended {
pub id: BlockHash,
pub height: u32,
pub version: u32,
pub version: i32,
#[serde(alias = "previousblockhash")]
pub prev_blockhash: BlockHash,
pub merkle_root: TxMerkleNode,
Expand All @@ -15,6 +15,20 @@ pub struct BlockExtended {
// add new fields if needed
}

// FIXME: (@leonardo.lima) Should this use serde_json or some other approach instead ?
impl From<BlockExtended> for BlockHeader {
fn from(extended: BlockExtended) -> BlockHeader {
return BlockHeader {
version: (extended.version),
prev_blockhash: (extended.prev_blockhash),
merkle_root: (extended.merkle_root),
time: (extended.time),
bits: (extended.bits),
nonce: (extended.nonce),
};
}
}

#[derive(serde::Deserialize, Debug)]
pub struct MempoolSpaceWebSocketMessage {
pub block: BlockExtended,
Expand All @@ -34,7 +48,7 @@ pub enum MempoolSpaceWebSocketRequestData {

#[derive(Debug, Clone, Copy)]
pub enum BlockEvent {
Connected(BlockExtended),
Connected(BlockHeader),
Disconnected((u32, BlockHash)),
Error(),
}
15 changes: 2 additions & 13 deletions src/bin.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use std::str::FromStr;

use anyhow::Ok;
use bitcoin::{blockdata::block, BlockHash, Network};
use block_events::{api::BlockEvent, http::HttpClient};
use bitcoin::Network;
use clap::{ArgGroup, Parser, Subcommand};
use futures_util::{pin_mut, StreamExt};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -76,15 +73,7 @@ async fn main() -> anyhow::Result<()> {
.unwrap();

// async fetch the data stream through the lib
let block_events = block_events::subscribe_to_blocks(
&url,
Some((
1,
BlockHash::from_str("33e3a0e68a2023474bca48b1fa5127a568203957c252c757076fe37460f05261")
.unwrap(),
)),
)
.await?;
let block_events = block_events::subscribe_to_blocks(&url, None).await?;

// consume and execute the code (current matching and printing) in async manner for each new block-event
pin_mut!(block_events);
Expand Down
90 changes: 82 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,114 @@ mod api;
mod http;
mod websocket;

use std::pin::Pin;
use std::time::Duration;
use std::{collections::HashMap, collections::VecDeque, pin::Pin};

use api::{BlockEvent, BlockExtended};

use anyhow::{anyhow, Ok};
use async_stream::stream;
use bitcoin::{BlockHash, BlockHeader};
use futures_util::stream::Stream;
use http::HttpClient;
use tokio::time::Instant;
use tokio_stream::StreamExt;
use url::Url;

const DEFAULT_CONCURRENT_REQUESTS: u8 = 4;

#[derive(Debug, Clone)]
struct Cache {
tip: BlockHash,
active_headers: HashMap<BlockHash, BlockExtended>,
stale_headers: HashMap<BlockHash, BlockExtended>,
}

// TODO: (@leonardo.lima) The `BlockExtended` should be converted/translated to `BlockHeader`
pub async fn subscribe_to_blocks(
url: &Url,
checkpoint: Option<(u32, BlockHash)>,
) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockExtended>>>> {
) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockEvent>>>> {
// TODO: (@leonardo.lima) It's needed to infer the tls security from network, or feature ?
let ws_url = &url::Url::parse(format!("ws://{}/ws", url).as_str()).unwrap();
let http_url = &url::Url::parse(format!("http://{}", url).as_str()).unwrap();
let ws_url = url::Url::parse(format!("ws://{}/ws", url).as_str()).unwrap();
let http_url = url::Url::parse(format!("http://{}", url).as_str()).unwrap();

let client = http::HttpClient::new(&http_url, DEFAULT_CONCURRENT_REQUESTS);
let chain_height = client._get_height().await.unwrap();
let chain_tip = client._get_block_height(chain_height).await.unwrap();
let cache = Cache {
tip: chain_tip,
active_headers: HashMap::new(),
stale_headers: HashMap::new(),
};

match checkpoint {
Some(checkpoint) => {
let prev_blocks = fetch_previous_blocks(http_url, checkpoint).await?;
let new_blocks = websocket::subscribe_to_blocks(ws_url).await?;
let prev_blocks = fetch_previous_blocks(&http_url, checkpoint).await?;
let new_blocks = websocket::subscribe_to_blocks(&ws_url).await?;
// FIXME: This should filter for duplicated blocks
Ok(Box::pin(prev_blocks.chain(new_blocks)))
let events =
process_candidates(client, cache, Box::pin(prev_blocks.chain(new_blocks))).await?;
Ok(Box::pin(events))
}
_ => {
let candidates = websocket::subscribe_to_blocks(&ws_url).await?;
let events = process_candidates(client, cache, Box::pin(candidates)).await?;
Ok(Box::pin(events))
}
_ => Ok(Box::pin(websocket::subscribe_to_blocks(ws_url).await?)),
}
}

async fn process_candidates(
client: HttpClient,
mut cache: Cache,
mut candidates: Pin<Box<dyn Stream<Item = BlockExtended>>>,
) -> anyhow::Result<impl Stream<Item = BlockEvent>> {
let stream = stream! {
while let Some(candidate) = candidates.next().await {
// TODO: (@leonardo.lima) It should check and validate for valid BlockHeaders

// validate if its a new valid tip
if cache.tip == candidate.prev_blockhash {
cache.tip = candidate.id;
cache.active_headers.insert(candidate.id, candidate);
yield BlockEvent::Connected(BlockHeader::from(candidate.clone()));
continue
}

// find common ancestor for current active chain and the forked chain
// fetches forked chain candidates and store in cache
let mut common_ancestor = candidate.clone();
let mut fork_branch: VecDeque<BlockExtended> = VecDeque::new();
while !cache.active_headers.contains_key(&common_ancestor.id) {
log::debug!("{:?}", common_ancestor);
fork_branch.push_back(common_ancestor);
common_ancestor = client._get_block(common_ancestor.prev_blockhash).await.unwrap(); // TODO: (@leonardo.lima) improve error handling here
}

// rollback current active chain, moving blocks to staled field
// yields BlockEvent::Disconnected((u32, BlockHash))
while common_ancestor.id != cache.tip {
let (stale_hash, stale_header) = cache.active_headers.remove_entry(&cache.tip).unwrap();
cache.stale_headers.insert(stale_hash, stale_header);
cache.tip = common_ancestor.id;
yield BlockEvent::Disconnected((stale_header.height, stale_hash));
}

// iterate over forked chain candidates
// update [`Cache`] active_headers field with candidates
// yields BlockEvent::Connected(candidate)
for fork_candidate in fork_branch.iter() {
cache.active_headers.insert(fork_candidate.id, fork_candidate.clone());
cache.tip = fork_candidate.id;
yield BlockEvent::Connected(BlockHeader::from(fork_candidate.clone()));
}
// yield BlockEvent::Connected(BlockHeader::from(candidate.clone()));
}
};
Ok(stream)
}

// FIXME: this fails when checkpoint is genesis block as it does not have a previousblockhash field
pub async fn fetch_previous_blocks(
url: &Url,
Expand Down
1 change: 1 addition & 0 deletions src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub async fn subscribe_to_blocks(url: &Url) -> anyhow::Result<impl Stream<Item =
continue
}
let res_msg: MempoolSpaceWebSocketMessage = serde_json::from_str(&text).unwrap();
log::debug!("{:?}", res_msg.block);
yield res_msg.block;
},
Message::Close(_) => {
Expand Down

0 comments on commit c6492e0

Please sign in to comment.