Skip to content

Commit

Permalink
feat: add full block stream api, better error handling
Browse files Browse the repository at this point in the history
- use TryStream instead of stream, returning result instead
- use `?` operator instead of unwraping and panicking
- add new mempool.space endpoints for http client
- add features for wss:// and https:// usage
- add features for api versioning based on mempool.space backend
  • Loading branch information
oleonardolima committed Jul 26, 2022
1 parent f8ce1ef commit 2b9e908
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 127 deletions.
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ bitcoind = { version = "^0.26.1", features = ["22_0"] }
electrsd = { version = "^0.19.1", features = ["bitcoind_22_0", "electrs_0_9_1"] }
serial_test = { version = "0.7.0" }

[features]
default = ["mempool-backend"]
tls-secure = []
esplora-backend = []
mempool-backend = []

[lib]
name = "block_events"
path = "src/lib.rs"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async fn main() -> anyhow::Result<()> {
let checkpoint = None;

// async fetch the block-events stream through the lib
let block_events = block_events::subscribe_to_blocks(base_url, checkpoint).await?;
let block_events = block_events::subscribe_to_block_headers(base_url, checkpoint).await?;

// consume and execute your code (current only matching and printing) in async manner for each new block-event
pin_mut!(block_events);
Expand Down
4 changes: 2 additions & 2 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ pub enum MempoolSpaceWebSocketRequestData {

/// Enum that implements the variants for `BlockEvent`
#[derive(Debug, Clone, Copy)]
pub enum BlockEvent {
pub enum BlockEvent<T> {
/// Used when connecting and extending the current active chain being streamed
Connected(BlockHeader),
Connected(T),
/// Used when there is a fork or reorganization event that turns the block stale
/// then it's disconnected from current active chain
Disconnected((u32, BlockHash)),
Expand Down
4 changes: 3 additions & 1 deletion src/bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();

// async fetch the data stream through the lib
let block_events = block_events::subscribe_to_blocks(cli.base_url.as_str(), None).await?;
let checkpoint = None;
let block_events =
block_events::subscribe_to_block_headers(cli.base_url.as_str(), checkpoint).await?;

// consume and execute the code (current matching and printing) in async manner for each new block-event
pin_mut!(block_events);
Expand Down
89 changes: 79 additions & 10 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,26 @@
//! Http client implementation for mempool.space available endpoints
//! It used `reqwest` async client
use bitcoin::BlockHash;
#![allow(unused_imports)]
use std::ops::Deref;

use bitcoin::{consensus::deserialize, hashes::hex::FromHex, Block, BlockHash, Transaction, Txid};
use reqwest::Client;

use crate::api::BlockExtended;

#[cfg(feature = "tls-secure")]
static HTTP_PROTOCOL: &str = "https";

#[cfg(not(feature = "tls-secure"))]
static HTTP_PROTOCOL: &str = "http";

#[cfg(feature = "esplora-backend")]
static API_PREFIX: &str = "api";

#[cfg(feature = "mempool-backend")]
static API_PREFIX: &str = "api/v1";

/// Generic HttpClient using `reqwest`
/// It has been based on the Esplora client from BDK
#[allow(dead_code)]
Expand All @@ -28,7 +43,9 @@ pub struct HttpClient {
impl HttpClient {
/// Creates a new HttpClient, for given base url and concurrency
pub fn new(base_url: &str, concurrency: u8) -> Self {
let url = url::Url::parse(format!("http://{}", base_url).as_str()).unwrap();
let url =
url::Url::parse(format!("{}://{}/{}", HTTP_PROTOCOL, base_url, API_PREFIX).as_str())
.unwrap();
HttpClient {
url: url.to_string(),
client: Client::new(),
Expand All @@ -37,35 +54,87 @@ impl HttpClient {
}

/// Get current blockchain block height (the current tip height)
pub async fn _get_height(&self) -> anyhow::Result<u32> {
let req = self
pub async fn _get_tip_height(&self) -> anyhow::Result<u32> {
let res = self
.client
.get(&format!("{}/blocks/tip/height", self.url))
.send()
.await?;

Ok(req.error_for_status()?.text().await?.parse()?)
Ok(res.error_for_status()?.text().await?.parse()?)
}

/// Get [`BlockHash`] from mempool.space, for current tip
pub async fn _get_tip_hash(&self) -> anyhow::Result<BlockHash> {
let res = self
.client
.get(&format!("{}/blocks/tip/hash", self.url))
.send()
.await?;

Ok(res.error_for_status()?.text().await?.parse()?)
}

/// Get [`BlockHash`] from mempool.space, for given block height
pub async fn _get_block_height(&self, height: u32) -> anyhow::Result<BlockHash> {
let req = self
pub async fn _get_block_hash(&self, height: u32) -> anyhow::Result<BlockHash> {
let res = self
.client
.get(&format!("{}/block-height/{}", self.url, height))
.send()
.await?;

Ok(req.error_for_status()?.text().await?.parse()?)
Ok(res.error_for_status()?.text().await?.parse()?)
}

/// Get [`BlockExtended`] from mempool.space, by [`BlockHash`]
pub async fn _get_block(&self, block_hash: BlockHash) -> anyhow::Result<BlockExtended> {
let req = self
let res = self
.client
.get(&format!("{}/block/{}", self.url, block_hash))
.send()
.await?;

Ok(serde_json::from_str(req.error_for_status()?.text().await?.as_str()).unwrap())
Ok(serde_json::from_str(
res.error_for_status()?.text().await?.as_str(),
)?)
}

/// FIXME: (@leonardo.lima) this only works when using the blockstream.info (esplora) client
#[cfg(feature = "esplora-backend")]
pub async fn _get_block_raw(&self, block_hash: BlockHash) -> anyhow::Result<Block> {
let res = self
.client
.get(&format!("{}/block/{}/raw", self.url, block_hash))
.send()
.await?;

let block: Block = deserialize(res.bytes().await?.deref())?;

Ok(block)
}

pub async fn _get_tx(&self, tx_id: Txid) -> anyhow::Result<Transaction> {
let res = self
.client
.get(&format!("{}/tx/{}/hex", self.url, tx_id))
.send()
.await?;

let tx: Transaction = deserialize(&Vec::<u8>::from_hex(
res.error_for_status()?.text().await?.as_str(),
)?)?;

Ok(tx)
}

pub async fn _get_tx_ids(&self, block_hash: BlockHash) -> anyhow::Result<Vec<Txid>> {
let res = self
.client
.get(format!("{}/block/{}/txids", self.url, block_hash))
.send()
.await?;

let tx_ids: Vec<Txid> = serde_json::from_str(res.text().await?.as_str())?;
Ok(tx_ids)
}
}
Loading

0 comments on commit 2b9e908

Please sign in to comment.