Skip to content

Commit

Permalink
wip(refactor): initial updates for fns refactor and architecture change
Browse files Browse the repository at this point in the history
  • Loading branch information
oleonardolima committed Jun 29, 2022
1 parent f7536a2 commit 1c260af
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 156 deletions.
17 changes: 12 additions & 5 deletions src/api.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use bitcoin::{Address, BlockHash};
use bitcoin::{Address, BlockHash, TxMerkleNode};

#[derive(serde::Deserialize, Clone, Debug, Copy)]
#[derive(serde::Deserialize, Clone, Debug, Copy, From)]
pub struct BlockExtended {
pub height: u32,
pub timestamp: u32,
pub id: BlockHash,
pub previousblockhash: BlockHash,
pub height: u32,
pub version: u32,
#[serde(alias = "previousblockhash")]
pub prev_blockhash: BlockHash,
pub merkle_root: TxMerkleNode,
#[serde(alias = "timestamp")]
pub time: u32,
pub bits: u32,
pub nonce: u32,
// add new fields if needed
}

#[derive(serde::Deserialize, Debug)]
Expand Down
110 changes: 29 additions & 81 deletions src/bin.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::str::FromStr;

use anyhow::Ok;
use bitcoin::{blockdata::block, Network};
use bitcoin::{blockdata::block, BlockHash, Network};
use block_events::{api::BlockEvent, http::HttpClient};
use clap::{ArgGroup, Parser, Subcommand};
use futures_util::{pin_mut, StreamExt};
Expand Down Expand Up @@ -74,88 +76,34 @@ async fn main() -> anyhow::Result<()> {
.unwrap();

// async fetch the data stream through the lib
// let block_events = block_events::websocket::subscribe_to_blocks(&url).await?;

// consume and execute the code (current matching and printing) in async manner for each new block-event
// pin_mut!(block_events);
// while let Some(block_event) = block_events.next().await {
// match block_event {
// BlockEvent::Connected(block) => {
// println!("Connected BlockEvent: {:#?}", block);
// }
// BlockEvent::Disconnected((height, block_hash)) => {
// println!(
// "Disconnected BlockEvent: [height {:#?}] [block_hash: {:#?}]",
// height, block_hash
// );
// }
// BlockEvent::Error() => {
// eprint!("ERROR BlockEvent: received an error from the block-events stream");
// }
// }
// }

// async fetch the data stream through the lib
let events = block_events::subscribe_to_blocks(&url, Some(1)).await;

let prev = events.0.unwrap().unwrap();
let new = events.1.unwrap();

// consume and execute the code (current matching and printing) in async manner for each new block-event
pin_mut!(prev);
while let Some(prev) = prev.next().await {
match prev {
BlockEvent::Connected(block) => {
println!("Connected BlockEvent: {:#?}", block);
}
BlockEvent::Disconnected((height, block_hash)) => {
println!(
"Disconnected BlockEvent: [height {:#?}] [block_hash: {:#?}]",
height, block_hash
);
}
BlockEvent::Error() => {
eprint!("ERROR BlockEvent: received an error from the block-events stream");
}
}
}
let block_events = block_events::subscribe_to_blocks(
&url,
Some((
1,
BlockHash::from_str("33e3a0e68a2023474bca48b1fa5127a568203957c252c757076fe37460f05261")
.unwrap(),
)),
)
.await?;

// consume and execute the code (current matching and printing) in async manner for each new block-event
pin_mut!(new);
while let Some(new) = new.next().await {
match new {
BlockEvent::Connected(block) => {
println!("Connected BlockEvent: {:#?}", block);
}
BlockEvent::Disconnected((height, block_hash)) => {
println!(
"Disconnected BlockEvent: [height {:#?}] [block_hash: {:#?}]",
height, block_hash
);
}
BlockEvent::Error() => {
eprint!("ERROR BlockEvent: received an error from the block-events stream");
}
}
pin_mut!(block_events);
while let Some(block_event) = block_events.next().await {
println!("BlockExtended: {:#?}", block_event)
// match block_event {
// BlockEvent::Connected(block) => {
// println!("Connected BlockEvent: {:#?}", block);
// }
// BlockEvent::Disconnected((height, block_hash)) => {
// println!(
// "Disconnected BlockEvent: [height {:#?}] [block_hash: {:#?}]",
// height, block_hash
// );
// }
// BlockEvent::Error() => {
// eprint!("ERROR BlockEvent: received an error from the block-events stream");
// }
// }
}

Ok(())

// let url = url::Url::parse(&match cli.network {
// Network::Bitcoin => "http://mempool.space/api".to_string(),
// Network::Regtest => "http://localhost:8999/api/v1".to_string(),
// network => format!("http://mempool.space/{}/api", network),
// })
// .unwrap();

// let http = HttpClient::new(&url, 4);
// for i in 1..10 {
// let tip = http._get_height().await.unwrap();
// let block_hash = http._get_block_height(i).await.unwrap();

// println!("[/blocks/tip/height] {}", tip);
// println!("[/block-height/{}] {}", i, &block_hash);
// }

// Ok(())
}
2 changes: 1 addition & 1 deletion src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use url::Url;

use crate::api::BlockExtended;

#[derive(Debug)]
// #[derive(Debug)]
pub struct HttpClient {
url: String,
client: Client,
Expand Down
97 changes: 43 additions & 54 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,82 +1,71 @@
pub mod api;
pub mod http;
pub mod websocket;
mod api;
mod http;
mod websocket;

use std::pin::Pin;
use std::time::Duration;

use api::BlockEvent;
use api::{BlockEvent, BlockExtended};

use anyhow::{anyhow, Ok};
use async_stream::{stream, try_stream};
use bitcoin::Block;
use futures_util::{pin_mut, stream::Stream};
use async_stream::stream;
use bitcoin::{BlockHash, BlockHeader};
use futures_util::stream::Stream;
use tokio::time::Instant;
use tokio_stream::StreamExt;
use url::Url;

const DEFAULT_CONCURRENT_REQUESTS: u8 = 4;

pub async fn subscribe_to_blocks(
url: &Url,
height: Option<u32>,
) -> (
Option<anyhow::Result<impl Stream<Item = BlockEvent>>>,
anyhow::Result<impl Stream<Item = BlockEvent>>,
) {
log::debug!("[height.is_none] {:?}", height.is_none());

checkpoint: Option<(u32, BlockHash)>,
) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockExtended>>>> {
// TODO: (@leonardo.lima) It's needed to infer the tls security from network, or feature ?
let ws_url = &url::Url::parse(format!("ws://{}/ws", url).as_str()).unwrap();
let http_url = &url::Url::parse(format!("http://{}", url).as_str()).unwrap();

match height {
Some(height) => {
// let prev_blocks = fetch_previous_blocks(http_url, height).await?;
// let new_blocks = websocket::subscribe_to_blocks(ws_url).await?;

// pin_mut!(prev_blocks);
// pin_mut!(new_blocks);

// let stream = stream! {
// while let Some(prev_block) = prev_blocks.next().await {
// yield prev_block.clone();
// }

// while let Some(new_block) = new_blocks.next().await {
// yield new_block.clone();
// }
// };
// Ok(stream)

let prev_blocks = fetch_previous_blocks(http_url, height).await;
let new_blocks = websocket::subscribe_to_blocks(ws_url).await;
return (Some(prev_blocks), new_blocks);
match checkpoint {
Some(checkpoint) => {
let prev_blocks = fetch_previous_blocks(http_url, checkpoint).await?;
let new_blocks = websocket::subscribe_to_blocks(ws_url).await?;
// FIXME: This should filter for duplicated blocks
Ok(Box::pin(prev_blocks.chain(new_blocks)))
}
_ => return (None, websocket::subscribe_to_blocks(ws_url).await),
_ => Ok(Box::pin(websocket::subscribe_to_blocks(ws_url).await?)),
}
}

// FIXME: this fails when checkpoint is genesis block as it does not have a previousblockhash field
pub async fn fetch_previous_blocks(
url: &Url,
mut height: u32,
) -> anyhow::Result<impl Stream<Item = BlockEvent>> {
// TODO: (@leonardo.lima) Move the concurrency for an environment variable
let http_client = http::HttpClient::new(url, 4);
let mut curr_tip = http_client._get_height().await.unwrap();
checkpoint: (u32, BlockHash),
) -> anyhow::Result<impl Stream<Item = BlockExtended>> {
let client = http::HttpClient::new(url, DEFAULT_CONCURRENT_REQUESTS);
let (ckpt_height, ckpt_hash) = checkpoint;

log::debug!("[curr_tip {}]", &curr_tip);
log::debug!("[height {}]", &height);
if ckpt_hash != client._get_block_height(ckpt_height).await? {
return Err(anyhow!(
"The checkpoint passed is invalid, it should exist in the blockchain."
));
}

let stream = stream! {
while height <= curr_tip {
let block_hash = http_client._get_block_height(height).await.unwrap();
let block = http_client._get_block(block_hash).await.unwrap();
let mut tip = client._get_height().await?;
let mut height = ckpt_height;

log::debug!("[curr_tip {}]", &curr_tip);
log::debug!("[height {}]", &height);
log::debug!("[block {:#?}]", &block);
let mut interval = Instant::now(); // should try to update the tip every 5 minutes.
let stream = stream! {
while height <= tip {
let hash = client._get_block_height(height).await.unwrap();
let block = client._get_block(hash).await.unwrap();

// TODO: (@leonardo.lima) The update in current tip should have some time in between, and not at every iteration
curr_tip = http_client._get_height().await.unwrap();
height += 1;
yield BlockEvent::Connected(block);

if interval.elapsed() >= Duration::from_secs(300) {
interval = Instant::now();
tip = client._get_height().await.unwrap();
}
yield block;
}
};
Ok(stream)
Expand Down
41 changes: 26 additions & 15 deletions src/websocket.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::api::BlockExtended;

use super::api::{
BlockEvent, MempoolSpaceWebSocketMessage, MempoolSpaceWebSocketRequestData,
MempoolSpaceWebSocketMessage, MempoolSpaceWebSocketRequestData,
MempoolSpaceWebSocketRequestMessage,
};

Expand All @@ -8,36 +10,45 @@ use async_stream::stream;
use futures_util::stream::Stream;
use futures_util::{SinkExt, StreamExt};
use std::time::Duration;
use tokio_tungstenite::connect_async_tls_with_config;
use tokio::net::TcpStream;
use tokio_tungstenite::tungstenite::protocol::Message;
use tokio_tungstenite::{connect_async_tls_with_config, MaybeTlsStream, WebSocketStream};
use url::Url;

pub async fn subscribe_to_blocks(url: &Url) -> anyhow::Result<impl Stream<Item = BlockEvent>> {
log::info!("starting websocket handshake [url {}]", url);
async fn websocket_client(
url: &Url,
message: String,
) -> anyhow::Result<WebSocketStream<MaybeTlsStream<TcpStream>>> {
log::info!("starting websocket handshake with url={}", url);
let (mut websocket_stream, websocket_response) =
connect_async_tls_with_config(url, None, None).await?;

log::info!("websocket handshake successfully completed!");
log::info!(
"handshake completed with response: {:?}",
websocket_response
);
log::info!("handshake completed with response={:?}", websocket_response);

let message = build_websocket_request_message(&MempoolSpaceWebSocketRequestData::Blocks);
let item = serde_json::to_string(&message).unwrap();
if (websocket_stream.send(Message::text(&item)).await).is_err() {
if (websocket_stream.send(Message::text(&message)).await).is_err() {
log::error!("failed to publish first message to websocket");
return Err(anyhow!("failed to publish first message to websocket"));
};
log::info!("published message: {:#?}, successfully!", &item);
log::info!("published message: {:#?}, successfully!", &message);
Ok(websocket_stream)
}

pub async fn subscribe_to_blocks(url: &Url) -> anyhow::Result<impl Stream<Item = BlockExtended>> {
let init_message = serde_json::to_string(&build_websocket_request_message(
&MempoolSpaceWebSocketRequestData::Blocks,
))
.unwrap();

let mut ws_stream = websocket_client(url, init_message).await.unwrap();

// need to ping every so often to keep the websocket connection alive
let mut pinger = tokio::time::interval(Duration::from_secs(60));

let stream = stream! {
loop {
tokio::select! {
message = websocket_stream.next() => {
message = ws_stream.next() => {
if let Some(message) = message {
match message.unwrap() {
Message::Text(text) => {
Expand All @@ -49,7 +60,7 @@ pub async fn subscribe_to_blocks(url: &Url) -> anyhow::Result<impl Stream<Item =
continue
}
let res_msg: MempoolSpaceWebSocketMessage = serde_json::from_str(&text).unwrap();
yield BlockEvent::Connected(res_msg.block);
yield res_msg.block;
},
Message::Close(_) => {
eprintln!("websocket closing gracefully");
Expand All @@ -65,7 +76,7 @@ pub async fn subscribe_to_blocks(url: &Url) -> anyhow::Result<impl Stream<Item =
}
_ = pinger.tick() => {
log::info!("pinging to websocket to keep connection alive");
websocket_stream.send(Message::Ping(vec![])).await.unwrap()
ws_stream.send(Message::Ping(vec![])).await.unwrap() // TODO: (@leonardo.lima) Should this use a mempool expected ping message instead ?
}
}
}
Expand Down

0 comments on commit 1c260af

Please sign in to comment.