From f7536a23b65efba4133fccb8608a564c5ffc9aa4 Mon Sep 17 00:00:00 2001 From: Leonardo Lima Date: Mon, 27 Jun 2022 17:38:07 -0300 Subject: [PATCH] wip(feat): fix fn and use a tuple as return type for prev and new block streams --- src/api.rs | 4 +-- src/bin.rs | 52 ++++++++++++++++++++++++++++++++--- src/lib.rs | 81 +++++++++++++++++++++++++++++++++--------------------- 3 files changed, 99 insertions(+), 38 deletions(-) diff --git a/src/api.rs b/src/api.rs index d6a57a1..0564347 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,6 +1,6 @@ use bitcoin::{Address, BlockHash}; -#[derive(serde::Deserialize, Clone, Debug)] +#[derive(serde::Deserialize, Clone, Debug, Copy)] pub struct BlockExtended { pub height: u32, pub timestamp: u32, @@ -25,7 +25,7 @@ pub enum MempoolSpaceWebSocketRequestData { TrackAddress(Address), } -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub enum BlockEvent { Connected(BlockExtended), Disconnected((u32, BlockHash)), diff --git a/src/bin.rs b/src/bin.rs index 0acf147..58ccc1d 100644 --- a/src/bin.rs +++ b/src/bin.rs @@ -75,12 +75,36 @@ async fn main() -> anyhow::Result<()> { // async fetch the data stream through the lib // let block_events = block_events::websocket::subscribe_to_blocks(&url).await?; - let block_events = block_events::subscribe_to_blocks(&url, Some(2)).await?; // consume and execute the code (current matching and printing) in async manner for each new block-event - pin_mut!(block_events); - while let Some(block_event) = block_events.next().await { - match block_event { + // pin_mut!(block_events); + // while let Some(block_event) = block_events.next().await { + // match block_event { + // BlockEvent::Connected(block) => { + // println!("Connected BlockEvent: {:#?}", block); + // } + // BlockEvent::Disconnected((height, block_hash)) => { + // println!( + // "Disconnected BlockEvent: [height {:#?}] [block_hash: {:#?}]", + // height, block_hash + // ); + // } + // BlockEvent::Error() => { + // eprint!("ERROR BlockEvent: received an error from the block-events stream"); + // } + // } + // } + + // async fetch the data stream through the lib + let events = block_events::subscribe_to_blocks(&url, Some(1)).await; + + let prev = events.0.unwrap().unwrap(); + let new = events.1.unwrap(); + + // consume and execute the code (current matching and printing) in async manner for each new block-event + pin_mut!(prev); + while let Some(prev) = prev.next().await { + match prev { BlockEvent::Connected(block) => { println!("Connected BlockEvent: {:#?}", block); } @@ -95,6 +119,26 @@ async fn main() -> anyhow::Result<()> { } } } + + // consume and execute the code (current matching and printing) in async manner for each new block-event + pin_mut!(new); + while let Some(new) = new.next().await { + match new { + BlockEvent::Connected(block) => { + println!("Connected BlockEvent: {:#?}", block); + } + BlockEvent::Disconnected((height, block_hash)) => { + println!( + "Disconnected BlockEvent: [height {:#?}] [block_hash: {:#?}]", + height, block_hash + ); + } + BlockEvent::Error() => { + eprint!("ERROR BlockEvent: received an error from the block-events stream"); + } + } + } + Ok(()) // let url = url::Url::parse(&match cli.network { diff --git a/src/lib.rs b/src/lib.rs index 7487fcc..3707254 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,63 +2,80 @@ pub mod api; pub mod http; pub mod websocket; +use std::pin::Pin; + use api::BlockEvent; use anyhow::{anyhow, Ok}; -use async_stream::stream; -use futures_util::{stream::Stream, StreamExt}; +use async_stream::{stream, try_stream}; +use bitcoin::Block; +use futures_util::{pin_mut, stream::Stream}; +use tokio_stream::StreamExt; use url::Url; pub async fn subscribe_to_blocks( url: &Url, height: Option, -) -> anyhow::Result> { - log::debug!("{}", height.is_none()); - if !height.is_none() { - let http_url = &url::Url::parse(format!("http://{}", url).as_str()).unwrap(); - // TODO: (@leonardo.lima) Move the concurrency for an environment variable - let http_client = http::HttpClient::new(http_url, 4); - let mut curr_tip = http_client._get_height().await.unwrap(); - let mut height = height.unwrap(); +) -> ( + Option>>, + anyhow::Result>, +) { + log::debug!("[height.is_none] {:?}", height.is_none()); - log::debug!("{}", curr_tip); - log::debug!("{}", height); + // TODO: (@leonardo.lima) It's needed to infer the tls security from network, or feature ? + let ws_url = &url::Url::parse(format!("ws://{}/ws", url).as_str()).unwrap(); + let http_url = &url::Url::parse(format!("http://{}", url).as_str()).unwrap(); - stream! { - while height <= curr_tip { - let block_hash = http_client._get_block_height(height).await.unwrap(); - let block = http_client._get_block(block_hash).await.unwrap(); + match height { + Some(height) => { + // let prev_blocks = fetch_previous_blocks(http_url, height).await?; + // let new_blocks = websocket::subscribe_to_blocks(ws_url).await?; - curr_tip = http_client._get_height().await.unwrap(); - height = block.height; - log::debug!("{:?}", block); - yield BlockEvent::Connected(block); - } - }; - } + // pin_mut!(prev_blocks); + // pin_mut!(new_blocks); - // TODO: (@leonardo.lima) It's needed to infer the tls security from network, or feature ? - let ws_url = &url::Url::parse(format!("ws://{}/ws", url).as_str()).unwrap(); - websocket::subscribe_to_blocks(ws_url).await + // let stream = stream! { + // while let Some(prev_block) = prev_blocks.next().await { + // yield prev_block.clone(); + // } + + // while let Some(new_block) = new_blocks.next().await { + // yield new_block.clone(); + // } + // }; + // Ok(stream) + + let prev_blocks = fetch_previous_blocks(http_url, height).await; + let new_blocks = websocket::subscribe_to_blocks(ws_url).await; + return (Some(prev_blocks), new_blocks); + } + _ => return (None, websocket::subscribe_to_blocks(ws_url).await), + } } -pub async fn fetch_blocks( +pub async fn fetch_previous_blocks( url: &Url, - height: Option, + mut height: u32, ) -> anyhow::Result> { - let http_url = &url::Url::parse(format!("http://{}", url).as_str()).unwrap(); // TODO: (@leonardo.lima) Move the concurrency for an environment variable - let http_client = http::HttpClient::new(http_url, 4); + let http_client = http::HttpClient::new(url, 4); let mut curr_tip = http_client._get_height().await.unwrap(); - let mut height = height.unwrap(); + + log::debug!("[curr_tip {}]", &curr_tip); + log::debug!("[height {}]", &height); let stream = stream! { while height <= curr_tip { let block_hash = http_client._get_block_height(height).await.unwrap(); let block = http_client._get_block(block_hash).await.unwrap(); + log::debug!("[curr_tip {}]", &curr_tip); + log::debug!("[height {}]", &height); + log::debug!("[block {:#?}]", &block); + + // TODO: (@leonardo.lima) The update in current tip should have some time in between, and not at every iteration curr_tip = http_client._get_height().await.unwrap(); - height = block.height; + height += 1; yield BlockEvent::Connected(block); } };