Skip to content

Commit

Permalink
wip(feat): fix fn and use a tuple as return type for prev and new blo…
Browse files Browse the repository at this point in the history
…ck streams
  • Loading branch information
oleonardolima committed Jun 27, 2022
1 parent 30a6f9e commit f7536a2
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 38 deletions.
4 changes: 2 additions & 2 deletions src/api.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use bitcoin::{Address, BlockHash};

#[derive(serde::Deserialize, Clone, Debug)]
#[derive(serde::Deserialize, Clone, Debug, Copy)]
pub struct BlockExtended {
pub height: u32,
pub timestamp: u32,
Expand All @@ -25,7 +25,7 @@ pub enum MempoolSpaceWebSocketRequestData {
TrackAddress(Address),
}

#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub enum BlockEvent {
Connected(BlockExtended),
Disconnected((u32, BlockHash)),
Expand Down
52 changes: 48 additions & 4 deletions src/bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,36 @@ async fn main() -> anyhow::Result<()> {

// async fetch the data stream through the lib
// let block_events = block_events::websocket::subscribe_to_blocks(&url).await?;
let block_events = block_events::subscribe_to_blocks(&url, Some(2)).await?;

// consume and execute the code (current matching and printing) in async manner for each new block-event
pin_mut!(block_events);
while let Some(block_event) = block_events.next().await {
match block_event {
// pin_mut!(block_events);
// while let Some(block_event) = block_events.next().await {
// match block_event {
// BlockEvent::Connected(block) => {
// println!("Connected BlockEvent: {:#?}", block);
// }
// BlockEvent::Disconnected((height, block_hash)) => {
// println!(
// "Disconnected BlockEvent: [height {:#?}] [block_hash: {:#?}]",
// height, block_hash
// );
// }
// BlockEvent::Error() => {
// eprint!("ERROR BlockEvent: received an error from the block-events stream");
// }
// }
// }

// async fetch the data stream through the lib
let events = block_events::subscribe_to_blocks(&url, Some(1)).await;

let prev = events.0.unwrap().unwrap();
let new = events.1.unwrap();

// consume and execute the code (current matching and printing) in async manner for each new block-event
pin_mut!(prev);
while let Some(prev) = prev.next().await {
match prev {
BlockEvent::Connected(block) => {
println!("Connected BlockEvent: {:#?}", block);
}
Expand All @@ -95,6 +119,26 @@ async fn main() -> anyhow::Result<()> {
}
}
}

// consume and execute the code (current matching and printing) in async manner for each new block-event
pin_mut!(new);
while let Some(new) = new.next().await {
match new {
BlockEvent::Connected(block) => {
println!("Connected BlockEvent: {:#?}", block);
}
BlockEvent::Disconnected((height, block_hash)) => {
println!(
"Disconnected BlockEvent: [height {:#?}] [block_hash: {:#?}]",
height, block_hash
);
}
BlockEvent::Error() => {
eprint!("ERROR BlockEvent: received an error from the block-events stream");
}
}
}

Ok(())

// let url = url::Url::parse(&match cli.network {
Expand Down
81 changes: 49 additions & 32 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,80 @@ pub mod api;
pub mod http;
pub mod websocket;

use std::pin::Pin;

use api::BlockEvent;

use anyhow::{anyhow, Ok};
use async_stream::stream;
use futures_util::{stream::Stream, StreamExt};
use async_stream::{stream, try_stream};
use bitcoin::Block;
use futures_util::{pin_mut, stream::Stream};
use tokio_stream::StreamExt;
use url::Url;

pub async fn subscribe_to_blocks(
url: &Url,
height: Option<u32>,
) -> anyhow::Result<impl Stream<Item = BlockEvent>> {
log::debug!("{}", height.is_none());
if !height.is_none() {
let http_url = &url::Url::parse(format!("http://{}", url).as_str()).unwrap();
// TODO: (@leonardo.lima) Move the concurrency for an environment variable
let http_client = http::HttpClient::new(http_url, 4);
let mut curr_tip = http_client._get_height().await.unwrap();
let mut height = height.unwrap();
) -> (
Option<anyhow::Result<impl Stream<Item = BlockEvent>>>,
anyhow::Result<impl Stream<Item = BlockEvent>>,
) {
log::debug!("[height.is_none] {:?}", height.is_none());

log::debug!("{}", curr_tip);
log::debug!("{}", height);
// TODO: (@leonardo.lima) It's needed to infer the tls security from network, or feature ?
let ws_url = &url::Url::parse(format!("ws://{}/ws", url).as_str()).unwrap();
let http_url = &url::Url::parse(format!("http://{}", url).as_str()).unwrap();

stream! {
while height <= curr_tip {
let block_hash = http_client._get_block_height(height).await.unwrap();
let block = http_client._get_block(block_hash).await.unwrap();
match height {
Some(height) => {
// let prev_blocks = fetch_previous_blocks(http_url, height).await?;
// let new_blocks = websocket::subscribe_to_blocks(ws_url).await?;

curr_tip = http_client._get_height().await.unwrap();
height = block.height;
log::debug!("{:?}", block);
yield BlockEvent::Connected(block);
}
};
}
// pin_mut!(prev_blocks);
// pin_mut!(new_blocks);

// TODO: (@leonardo.lima) It's needed to infer the tls security from network, or feature ?
let ws_url = &url::Url::parse(format!("ws://{}/ws", url).as_str()).unwrap();
websocket::subscribe_to_blocks(ws_url).await
// let stream = stream! {
// while let Some(prev_block) = prev_blocks.next().await {
// yield prev_block.clone();
// }

// while let Some(new_block) = new_blocks.next().await {
// yield new_block.clone();
// }
// };
// Ok(stream)

let prev_blocks = fetch_previous_blocks(http_url, height).await;
let new_blocks = websocket::subscribe_to_blocks(ws_url).await;
return (Some(prev_blocks), new_blocks);
}
_ => return (None, websocket::subscribe_to_blocks(ws_url).await),
}
}

pub async fn fetch_blocks(
pub async fn fetch_previous_blocks(
url: &Url,
height: Option<u32>,
mut height: u32,
) -> anyhow::Result<impl Stream<Item = BlockEvent>> {
let http_url = &url::Url::parse(format!("http://{}", url).as_str()).unwrap();
// TODO: (@leonardo.lima) Move the concurrency for an environment variable
let http_client = http::HttpClient::new(http_url, 4);
let http_client = http::HttpClient::new(url, 4);
let mut curr_tip = http_client._get_height().await.unwrap();
let mut height = height.unwrap();

log::debug!("[curr_tip {}]", &curr_tip);
log::debug!("[height {}]", &height);

let stream = stream! {
while height <= curr_tip {
let block_hash = http_client._get_block_height(height).await.unwrap();
let block = http_client._get_block(block_hash).await.unwrap();

log::debug!("[curr_tip {}]", &curr_tip);
log::debug!("[height {}]", &height);
log::debug!("[block {:#?}]", &block);

// TODO: (@leonardo.lima) The update in current tip should have some time in between, and not at every iteration
curr_tip = http_client._get_height().await.unwrap();
height = block.height;
height += 1;
yield BlockEvent::Connected(block);
}
};
Expand Down

0 comments on commit f7536a2

Please sign in to comment.