From 2b9e9082276b1522cd03aae571d258b213e7e41c Mon Sep 17 00:00:00 2001 From: Leonardo Lima Date: Mon, 25 Jul 2022 23:48:42 -0300 Subject: [PATCH] feat: add full block stream api, better error handling - use TryStream instead of stream, returning result instead - use `?` operator instead of unwraping and panicking - add new mempool.space endpoints for http client - add features for wss:// and https:// usage - add features for api versioning based on mempool.space backend --- Cargo.toml | 6 ++ README.md | 2 +- src/api.rs | 4 +- src/bin.rs | 4 +- src/http.rs | 89 ++++++++++++++++-- src/lib.rs | 182 ++++++++++++++++++++++++------------- src/websocket.rs | 37 ++++---- tests/integration_tests.rs | 58 ++++++------ 8 files changed, 255 insertions(+), 127 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d6d0470..94bb238 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,12 @@ bitcoind = { version = "^0.26.1", features = ["22_0"] } electrsd = { version = "^0.19.1", features = ["bitcoind_22_0", "electrs_0_9_1"] } serial_test = { version = "0.7.0" } +[features] +default = ["mempool-backend"] +tls-secure = [] +esplora-backend = [] +mempool-backend = [] + [lib] name = "block_events" path = "src/lib.rs" diff --git a/README.md b/README.md index b8a67c7..8faf34e 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,7 @@ async fn main() -> anyhow::Result<()> { let checkpoint = None; // async fetch the block-events stream through the lib - let block_events = block_events::subscribe_to_blocks(base_url, checkpoint).await?; + let block_events = block_events::subscribe_to_block_headers(base_url, checkpoint).await?; // consume and execute your code (current only matching and printing) in async manner for each new block-event pin_mut!(block_events); diff --git a/src/api.rs b/src/api.rs index 921f4d7..baf0471 100644 --- a/src/api.rs +++ b/src/api.rs @@ -70,9 +70,9 @@ pub enum MempoolSpaceWebSocketRequestData { /// Enum that implements the variants for `BlockEvent` #[derive(Debug, Clone, Copy)] -pub enum BlockEvent { +pub enum BlockEvent { /// Used when connecting and extending the current active chain being streamed - Connected(BlockHeader), + Connected(T), /// Used when there is a fork or reorganization event that turns the block stale /// then it's disconnected from current active chain Disconnected((u32, BlockHash)), diff --git a/src/bin.rs b/src/bin.rs index 9825061..6f9d261 100644 --- a/src/bin.rs +++ b/src/bin.rs @@ -63,7 +63,9 @@ async fn main() -> anyhow::Result<()> { let cli = Cli::parse(); // async fetch the data stream through the lib - let block_events = block_events::subscribe_to_blocks(cli.base_url.as_str(), None).await?; + let checkpoint = None; + let block_events = + block_events::subscribe_to_block_headers(cli.base_url.as_str(), checkpoint).await?; // consume and execute the code (current matching and printing) in async manner for each new block-event pin_mut!(block_events); diff --git a/src/http.rs b/src/http.rs index 3bd9346..f84f07f 100644 --- a/src/http.rs +++ b/src/http.rs @@ -10,11 +10,26 @@ //! Http client implementation for mempool.space available endpoints //! It used `reqwest` async client -use bitcoin::BlockHash; +#![allow(unused_imports)] +use std::ops::Deref; + +use bitcoin::{consensus::deserialize, hashes::hex::FromHex, Block, BlockHash, Transaction, Txid}; use reqwest::Client; use crate::api::BlockExtended; +#[cfg(feature = "tls-secure")] +static HTTP_PROTOCOL: &str = "https"; + +#[cfg(not(feature = "tls-secure"))] +static HTTP_PROTOCOL: &str = "http"; + +#[cfg(feature = "esplora-backend")] +static API_PREFIX: &str = "api"; + +#[cfg(feature = "mempool-backend")] +static API_PREFIX: &str = "api/v1"; + /// Generic HttpClient using `reqwest` /// It has been based on the Esplora client from BDK #[allow(dead_code)] @@ -28,7 +43,9 @@ pub struct HttpClient { impl HttpClient { /// Creates a new HttpClient, for given base url and concurrency pub fn new(base_url: &str, concurrency: u8) -> Self { - let url = url::Url::parse(format!("http://{}", base_url).as_str()).unwrap(); + let url = + url::Url::parse(format!("{}://{}/{}", HTTP_PROTOCOL, base_url, API_PREFIX).as_str()) + .unwrap(); HttpClient { url: url.to_string(), client: Client::new(), @@ -37,35 +54,87 @@ impl HttpClient { } /// Get current blockchain block height (the current tip height) - pub async fn _get_height(&self) -> anyhow::Result { - let req = self + pub async fn _get_tip_height(&self) -> anyhow::Result { + let res = self .client .get(&format!("{}/blocks/tip/height", self.url)) .send() .await?; - Ok(req.error_for_status()?.text().await?.parse()?) + Ok(res.error_for_status()?.text().await?.parse()?) + } + + /// Get [`BlockHash`] from mempool.space, for current tip + pub async fn _get_tip_hash(&self) -> anyhow::Result { + let res = self + .client + .get(&format!("{}/blocks/tip/hash", self.url)) + .send() + .await?; + + Ok(res.error_for_status()?.text().await?.parse()?) } /// Get [`BlockHash`] from mempool.space, for given block height - pub async fn _get_block_height(&self, height: u32) -> anyhow::Result { - let req = self + pub async fn _get_block_hash(&self, height: u32) -> anyhow::Result { + let res = self .client .get(&format!("{}/block-height/{}", self.url, height)) .send() .await?; - Ok(req.error_for_status()?.text().await?.parse()?) + Ok(res.error_for_status()?.text().await?.parse()?) } /// Get [`BlockExtended`] from mempool.space, by [`BlockHash`] pub async fn _get_block(&self, block_hash: BlockHash) -> anyhow::Result { - let req = self + let res = self .client .get(&format!("{}/block/{}", self.url, block_hash)) .send() .await?; - Ok(serde_json::from_str(req.error_for_status()?.text().await?.as_str()).unwrap()) + Ok(serde_json::from_str( + res.error_for_status()?.text().await?.as_str(), + )?) + } + + /// FIXME: (@leonardo.lima) this only works when using the blockstream.info (esplora) client + #[cfg(feature = "esplora-backend")] + pub async fn _get_block_raw(&self, block_hash: BlockHash) -> anyhow::Result { + let res = self + .client + .get(&format!("{}/block/{}/raw", self.url, block_hash)) + .send() + .await?; + + let block: Block = deserialize(res.bytes().await?.deref())?; + + Ok(block) + } + + pub async fn _get_tx(&self, tx_id: Txid) -> anyhow::Result { + let res = self + .client + .get(&format!("{}/tx/{}/hex", self.url, tx_id)) + .send() + .await?; + + let tx: Transaction = deserialize(&Vec::::from_hex( + res.error_for_status()?.text().await?.as_str(), + )?)?; + + Ok(tx) + } + + pub async fn _get_tx_ids(&self, block_hash: BlockHash) -> anyhow::Result> { + let res = self + .client + .get(format!("{}/block/{}/txids", self.url, block_hash)) + .send() + .await?; + + let tx_ids: Vec = serde_json::from_str(res.text().await?.as_str())?; + Ok(tx_ids) } } diff --git a/src/lib.rs b/src/lib.rs index 4583f8a..a1576cf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,7 +41,7 @@ //! let checkpoint = None; //! //! // async fetch the block-events stream through the lib -//! let block_events = block_events::subscribe_to_blocks(base_url, checkpoint).await?; +//! let block_events = block_events::subscribe_to_block_headers(base_url, checkpoint).await?; //! //! // consume and execute your code (current only matching and printing) in async manner for each new block-event //! pin_mut!(block_events); @@ -66,6 +66,9 @@ //! } //! ``` +#[cfg(all(feature = "esplora-backend", feature = "mempool-backend"))] +compile_error!("features esplora-backend and mempool-backend are mutually exclusive and cannot be enabled together"); + pub mod api; pub mod http; pub mod websocket; @@ -76,17 +79,17 @@ pub extern crate tokio; pub extern crate tokio_stream; pub extern crate tokio_tungstenite; -use std::time::Duration; use std::{collections::HashMap, collections::VecDeque, pin::Pin}; use api::{BlockEvent, BlockExtended}; +use futures_core::TryStream; use http::HttpClient; use anyhow::{anyhow, Ok}; -use async_stream::stream; -use bitcoin::{BlockHash, BlockHeader}; +use async_stream::{stream, try_stream}; +use bitcoin::{Block, BlockHash, BlockHeader, Transaction}; +use core::result::Result::Ok as CoreOk; use futures_util::stream::Stream; -use tokio::time::Instant; use tokio_stream::StreamExt; const DEFAULT_CONCURRENT_REQUESTS: u8 = 4; @@ -173,39 +176,83 @@ impl BlockHeadersCache { } /// Subscribe to a real-time stream of [`BlockEvent`], for all new blocks or starting from an optional checkpoint -pub async fn subscribe_to_blocks( +pub async fn subscribe_to_block_headers( base_url: &str, checkpoint: Option<(u32, BlockHash)>, -) -> anyhow::Result>>> { +) -> anyhow::Result>>>> { let http_client = http::HttpClient::new(base_url, DEFAULT_CONCURRENT_REQUESTS); - let current_tip = match checkpoint { + let tip_height = match checkpoint { Some((height, _)) => height - 1, - _ => http_client._get_height().await?, + _ => http_client._get_tip_height().await?, }; let cache = BlockHeadersCache { - tip: http_client._get_block_height(current_tip).await?, + tip: http_client._get_block_hash(tip_height).await?, active_headers: HashMap::new(), stale_headers: HashMap::new(), }; match checkpoint { Some(checkpoint) => { - let old_candidates = fetch_blocks(http_client.clone(), checkpoint).await?; - let new_candidates = websocket::subscribe_to_blocks(base_url).await?; - let candidates = Box::pin(old_candidates.chain(new_candidates)); - let events = process_candidates(cache, candidates, http_client.clone()).await?; - Ok(Box::pin(events)) + let prev_header_candidates = fetch_blocks(base_url, checkpoint).await?; + let new_header_candidates = websocket::subscribe_to_block_headers(base_url).await?; + let candidates = Box::pin(prev_header_candidates.chain(new_header_candidates)); + Ok(Box::pin( + process_candidates(base_url, cache, candidates).await?, + )) } - _ => { - let candidates = Box::pin(websocket::subscribe_to_blocks(base_url).await?); - let events = process_candidates(cache, candidates, http_client.clone()).await?; - Ok(Box::pin(events)) + None => { + let new_header_candidates = + Box::pin(websocket::subscribe_to_block_headers(base_url).await?); + Ok(Box::pin( + process_candidates(base_url, cache, new_header_candidates).await?, + )) } } } +/// Subscribe to a real-time stream of [`BlockEvent`] of full rust-bitcoin blocks, for new mined blocks or +/// starting from an optional checkpoint (height: u32, hash: BlockHash) +pub async fn subscribe_to_blocks( + base_url: &str, + checkpoint: Option<(u32, BlockHash)>, +) -> anyhow::Result>>> { + // build and create a http client + let http_client = http::HttpClient::new(base_url, DEFAULT_CONCURRENT_REQUESTS); + + // subscribe to block_headers events + let mut header_events = subscribe_to_block_headers(base_url, checkpoint).await?; + + // iterate through each event for block_headers + let stream = try_stream! { + while let Some(event) = header_events.next().await { + match event { + BlockEvent::Connected(header) => { + // fetch all transaction ids (Txids) for the block + let tx_ids = http_client._get_tx_ids(header.block_hash()).await?; + + // fetch full transaction and build transaction list Vec + let mut txs: Vec = Vec::new(); + for id in tx_ids { + let tx = http_client._get_tx(id).await?; + txs.push(Transaction::from(tx)); + } + + // yield connected event for full block + yield BlockEvent::Connected(Block { + header: header, + txdata: txs, + }); + }, + // otherwise yield error or the disconnected event + BlockEvent::Disconnected((height, hash)) => yield BlockEvent::Disconnected((height, hash)), + } + } + }; + Ok(stream) +} + /// Process all candidates listened from source, it tries to apply the candidate to current active chain cached /// It handles reorganization and fork if needed /// Steps: @@ -215,39 +262,48 @@ pub async fn subscribe_to_blocks( /// - apply forked branch, and produces [`BlockEvent::Disconnected`] for staled blocks and [`BlockEvent::Connected`] /// for new branch async fn process_candidates( + base_url: &str, mut cache: BlockHeadersCache, - mut candidates: Pin>>, - http_client: HttpClient, -) -> anyhow::Result> { + mut candidates: Pin>>>, +) -> anyhow::Result>> { + let http_client = HttpClient::new(base_url, DEFAULT_CONCURRENT_REQUESTS); + let stream = stream! { while let Some(candidate) = candidates.next().await { - // TODO: (@leonardo.lima) It should check and validate for valid BlockHeaders + match candidate { + // TODO: (@leonardo.lima) We should handle and return an specific error for the client + Err(_) => {/* ignore */}, + CoreOk(candidate) => { + // TODO: (@leonardo.lima) It should check and validate for valid BlockHeaders - // validate if the [`BlockHeader`] candidate is a valid new tip - // yields a [`BlockEvent::Connected()`] variant and continue the iteration - if cache.validate_new_header(candidate) { - yield BlockEvent::Connected(BlockHeader::from(candidate.clone())); - continue - } + // validate if the [`BlockHeader`] candidate is a valid new tip + // yields a [`BlockEvent::Connected()`] variant and continue the iteration + if cache.validate_new_header(candidate) { + yield BlockEvent::Connected(BlockHeader::from(candidate.clone())); + continue + } - // find common ancestor for current active chain and the forked chain - // fetches forked chain candidates and store in cache - let (common_ancestor, fork_chain) = cache.find_or_fetch_common_ancestor(http_client.clone(), candidate).await.unwrap(); + // find common ancestor for current active chain and the forked chain + // fetches forked chain candidates and store in cache + let (common_ancestor, fork_chain) = cache.find_or_fetch_common_ancestor(http_client.clone(), candidate).await.unwrap(); - // rollback current active chain, moving blocks to staled field - // yields BlockEvent::Disconnected((u32, BlockHash)) - let mut disconnected: VecDeque = cache.rollback_active_chain(common_ancestor).await.unwrap(); - while !disconnected.is_empty() { - let block: BlockExtended = disconnected.pop_back().unwrap(); - yield BlockEvent::Disconnected((block.height, block.id)); - } + // rollback current active chain, moving blocks to staled field + // yields BlockEvent::Disconnected((u32, BlockHash)) + let mut disconnected: VecDeque = cache.rollback_active_chain(common_ancestor).await.unwrap(); + while !disconnected.is_empty() { + let block: BlockExtended = disconnected.pop_back().unwrap(); + yield BlockEvent::Disconnected((block.height, block.id)); + } + + // iterate over forked chain candidates + // update [`Cache`] active_headers field with candidates + let (_, mut connected) = cache.apply_fork_chain(fork_chain).unwrap(); + while !connected.is_empty() { + let block = connected.pop_back().unwrap(); + yield BlockEvent::Connected(BlockHeader::from(block.clone())); + } - // iterate over forked chain candidates - // update [`Cache`] active_headers field with candidates - let (_, mut connected) = cache.apply_fork_chain(fork_chain).unwrap(); - while !connected.is_empty() { - let block = connected.pop_back().unwrap(); - yield BlockEvent::Connected(BlockHeader::from(block.clone())); + }, } } }; @@ -257,34 +313,32 @@ async fn process_candidates( /// Fetch all new starting from the checkpoint up to current active tip // FIXME: this fails when checkpoint is genesis block as it does not have a previousblockhash field pub async fn fetch_blocks( - http_client: HttpClient, + base_url: &str, checkpoint: (u32, BlockHash), -) -> anyhow::Result> { - let (ckpt_height, ckpt_hash) = checkpoint; +) -> anyhow::Result>> { + let http_client = HttpClient::new(base_url, DEFAULT_CONCURRENT_REQUESTS); - if ckpt_hash != http_client._get_block_height(ckpt_height).await? { + // checks if the checkpoint height and hash matches for the current chain + let (ckpt_height, ckpt_hash) = checkpoint; + if ckpt_hash != http_client._get_block_hash(ckpt_height).await? { return Err(anyhow!( "The checkpoint passed is invalid, it should exist in the blockchain." )); } - let mut tip = http_client._get_height().await?; - let mut height = ckpt_height; - - let mut interval = Instant::now(); // it should try to update the tip every 5 minutes. - let stream = stream! { - while height <= tip { - let hash = http_client._get_block_height(height).await.unwrap(); - let block = http_client._get_block(hash).await.unwrap(); - - height += 1; + let tip_height = http_client._get_tip_height().await?; + let stream = try_stream! { + for height in ckpt_height..tip_height { + let hash = http_client._get_block_hash(height).await?; + let block = http_client._get_block(hash).await?; - if interval.elapsed() >= Duration::from_secs(300) { - interval = Instant::now(); - tip = http_client._get_height().await.unwrap(); - } yield block; - } + }; + + let height = http_client._get_tip_height().await?; + let hash = http_client._get_block_hash(height).await?; + let block = http_client._get_block(hash).await?; + yield block; }; Ok(stream) } diff --git a/src/websocket.rs b/src/websocket.rs index 998a2dd..c97aabd 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -18,14 +18,21 @@ use super::api::{ }; use anyhow::{anyhow, Ok}; -use async_stream::stream; -use futures_util::stream::Stream; +use async_stream::try_stream; +use core::result::Result::Ok as CoreOk; +use futures_core::TryStream; use futures_util::{SinkExt, StreamExt}; use std::time::Duration; use tokio::net::TcpStream; use tokio_tungstenite::tungstenite::protocol::Message; use tokio_tungstenite::{connect_async_tls_with_config, MaybeTlsStream, WebSocketStream}; +#[cfg(feature = "tls-secure")] +static WEBSOCKET_PROTOCOL: &str = "wss"; + +#[cfg(not(feature = "tls-secure"))] +static WEBSOCKET_PROTOCOL: &str = "ws"; + /// Create a new WebSocket client for given base url and initial message /// /// It uses `tokio_tungestenite` crate and produces `WebSocketStream` to be handled and treated by caller @@ -33,10 +40,9 @@ async fn websocket_client( base_url: &str, message: String, ) -> anyhow::Result>> { - let url = url::Url::parse(format!("ws://{}/ws", base_url).as_str()).unwrap(); + let url = url::Url::parse(format!("{}://{}/api/v1/ws", WEBSOCKET_PROTOCOL, base_url).as_str())?; log::info!("starting websocket handshake with url={}", url); - // TODO: (@leonardo.lima) It's needed to infer the tls security from network, or feature ? let (mut websocket_stream, websocket_response) = connect_async_tls_with_config(url, None, None).await?; @@ -52,35 +58,30 @@ async fn websocket_client( } /// Connects to mempool.space WebSocket client and listen to new messages producing a stream of [`BlockExtended`] candidates -pub async fn subscribe_to_blocks( +pub async fn subscribe_to_block_headers( base_url: &str, -) -> anyhow::Result> { +) -> anyhow::Result>> { let init_message = serde_json::to_string(&build_websocket_request_message( &MempoolSpaceWebSocketRequestData::Blocks, - )) - .unwrap(); + ))?; let mut ws_stream = websocket_client(base_url, init_message).await?; // need to ping every so often to keep the websocket connection alive let mut pinger = tokio::time::interval(Duration::from_secs(60)); - let stream = stream! { + let stream = try_stream! { loop { tokio::select! { message = ws_stream.next() => { if let Some(message) = message { match message.unwrap() { Message::Text(text) => { - let parse_ws_msg = || -> anyhow::Result<()> { - let _: MempoolSpaceWebSocketMessage = serde_json::from_str(&text)?; - Ok(()) + let parsed_msg: MempoolSpaceWebSocketMessage = match serde_json::from_str(&text) { + Err(_) => continue, + CoreOk(parsed_msg) => parsed_msg, }; - if let Err(_) = parse_ws_msg() { - continue - } - let res_msg: MempoolSpaceWebSocketMessage = serde_json::from_str(&text).unwrap(); - yield res_msg.block; + yield parsed_msg.block; }, Message::Close(_) => { eprintln!("websocket closing gracefully"); @@ -96,7 +97,7 @@ pub async fn subscribe_to_blocks( } _ = pinger.tick() => { log::info!("pinging to websocket to keep connection alive"); - ws_stream.send(Message::Ping(vec![])).await.unwrap() // TODO: (@leonardo.lima) Should this use a mempool expected ping message instead ? + ws_stream.send(Message::Ping(vec![])).await.unwrap(); } } } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 6669009..39eaa79 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -119,7 +119,6 @@ impl MempoolTestClient { let image = RunnableImage::from(image) .with_env_var(("MEMPOOL_BACKEND", "none")) - // .with_env_var(("MEMPOOL_NETWORK", "regtest")) .with_env_var(("DATABASE_HOST", docker_host_address().to_string())) .with_env_var(("CORE_RPC_HOST", docker_host_address().to_string())) .with_env_var(("CORE_RPC_PORT", bitcoind_port)) @@ -148,10 +147,6 @@ impl Default for MempoolTestClient { } } -fn build_base_url(mapped_port: u16) -> String { - format!("{}:{}/api/v1", HOST_IP, mapped_port) -} - #[tokio::test] #[serial] async fn test_fetch_tip_height() { @@ -168,13 +163,13 @@ async fn test_fetch_tip_height() { let rpc_client = &client.bitcoind.client; let http_client = HttpClient::new( - build_base_url(mempool.get_host_port_ipv4(8999)).as_str(), + format!("{}:{}", HOST_IP, mempool.get_host_port_ipv4(8999)).as_str(), DEFAULT_CONCURRENT_REQUESTS, ); // should return the current tip height for i in 0..10 { - let tip = http_client._get_height().await.unwrap(); + let tip = http_client._get_tip_height().await.unwrap(); assert_eq!(i, tip); let _ = rpc_client @@ -199,12 +194,12 @@ async fn test_fetch_block_hash_by_height() { let rpc_client = &client.bitcoind.client; let http_client = HttpClient::new( - build_base_url(mempool.get_host_port_ipv4(8999)).as_str(), + format!("{}:{}", HOST_IP, mempool.get_host_port_ipv4(8999)).as_str(), DEFAULT_CONCURRENT_REQUESTS, ); // should return an error if there is no block created yet for given height - assert!(http_client._get_block_height(100).await.is_err()); + assert!(http_client._get_block_hash(100).await.is_err()); // should return block hash for existing block by height for i in 1..10 { @@ -212,7 +207,7 @@ async fn test_fetch_block_hash_by_height() { .generate_to_address(1, &rpc_client.get_new_address(None, None).unwrap()) .unwrap(); - let res_hash = http_client._get_block_height(i).await.unwrap(); + let res_hash = http_client._get_block_hash(i).await.unwrap(); assert_eq!(gen_hash.first().unwrap(), &res_hash); } } @@ -231,13 +226,12 @@ async fn test_fetch_blocks_for_invalid_checkpoint() { let mempool = docker.run(client.mempool_backend); - let http_client = HttpClient::new( - build_base_url(mempool.get_host_port_ipv4(8999)).as_str(), - DEFAULT_CONCURRENT_REQUESTS, - ); - let checkpoint = (0, BlockHash::default()); - let blocks = block_events::fetch_blocks(http_client, checkpoint).await; + let blocks = block_events::fetch_blocks( + format!("{}:{}", HOST_IP, mempool.get_host_port_ipv4(8999)).as_str(), + checkpoint, + ) + .await; // should produce an error for invalid checkpoint assert!(blocks.is_err()); @@ -263,32 +257,34 @@ async fn test_fetch_blocks_for_checkpoint() { let mempool = docker.run(client.mempool_backend); let rpc_client = &client.bitcoind.client; - let http_client = HttpClient::new( - build_base_url(mempool.get_host_port_ipv4(8999)).as_str(), - DEFAULT_CONCURRENT_REQUESTS, - ); // generate new 20 blocks let mut gen_blocks = rpc_client .generate_to_address(20, &rpc_client.get_new_address(None, None).unwrap()) .unwrap(); + log::debug!("[{:#?}]", gen_blocks); let checkpoint = (10, *gen_blocks.get(9).unwrap()); - let blocks = block_events::fetch_blocks(http_client, checkpoint) - .await - .unwrap(); + let blocks = block_events::fetch_blocks( + format!("{}:{}", HOST_IP, mempool.get_host_port_ipv4(8999)).as_str(), + checkpoint, + ) + .await + .unwrap(); pin_mut!(blocks); // should return all 10 blocks from 10 to 20, as 10 being the checkpoint for gen_block in &mut gen_blocks[9..] { - let block = blocks.next().await.unwrap(); + let block = blocks.next().await.unwrap().unwrap(); assert_eq!(gen_block.deref(), &block.id); } } #[tokio::test] +#[serial] async fn test_failure_for_invalid_websocket_url() { - let block_events = websocket::subscribe_to_blocks(build_base_url(8999).as_str()).await; + let block_events = + websocket::subscribe_to_block_headers(format!("{}:{}", HOST_IP, 8999).as_str()).await; // should return an Err. assert!(block_events.is_err()); @@ -315,8 +311,8 @@ async fn test_block_events_stream() { let mempool = docker.run(client.mempool_backend); // get block-events stream - let block_events = block_events::subscribe_to_blocks( - build_base_url(mempool.get_host_port_ipv4(mempool.get_host_port_ipv4(8999))).as_str(), + let block_events = block_events::subscribe_to_block_headers( + format!("{}:{}", HOST_IP, mempool.get_host_port_ipv4(8999)).as_str(), None, ) .await @@ -378,8 +374,8 @@ async fn test_block_events_stream_with_checkpoint() { let mempool = docker.run(client.mempool_backend); // get block-events stream, starting from the tip - let block_events = block_events::subscribe_to_blocks( - build_base_url(mempool.get_host_port_ipv4(mempool.get_host_port_ipv4(8999))).as_str(), + let block_events = block_events::subscribe_to_block_headers( + format!("{}:{}", HOST_IP, mempool.get_host_port_ipv4(8999)).as_str(), Some((3, checkpoint.block_hash())), ) .await @@ -419,8 +415,8 @@ async fn test_block_events_stream_with_reorg() { let mempool = docker.run(client.mempool_backend); // get block-events stream - let block_events = block_events::subscribe_to_blocks( - build_base_url(mempool.get_host_port_ipv4(mempool.get_host_port_ipv4(8999))).as_str(), + let block_events = block_events::subscribe_to_block_headers( + format!("{}:{}", HOST_IP, mempool.get_host_port_ipv4(8999)).as_str(), None, ) .await