From 8e6217d26eda3d94685ec117165a9d76bc0f7c7c Mon Sep 17 00:00:00 2001 From: driftluo Date: Wed, 18 Dec 2024 17:05:16 +0800 Subject: [PATCH] feat: add new macro to support async client --- Cargo.toml | 9 +- src/rpc/ckb.rs | 96 +++++++++++++++++++ src/rpc/ckb_indexer.rs | 7 ++ src/rpc/ckb_light_client.rs | 32 +++++++ src/rpc/mod.rs | 177 +++++++++++++++++++++++++++++------- 5 files changed, 284 insertions(+), 37 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 77114fdc..f6acd1f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "ckb-sdk" version = "3.5.0" -authors = [ "Linfeng Qian ", "Nervos Core Dev " ] +authors = [ + "Linfeng Qian ", + "Nervos Core Dev ", +] edition = "2018" license = "MIT" description = "Rust SDK for CKB" @@ -17,7 +20,7 @@ anyhow = "1.0.63" bech32 = "0.8.1" derive-getters = "0.2.1" log = "0.4.6" -reqwest = { version = "0.11", default-features = false, features = [ "json", "blocking" ] } +reqwest = { version = "0.12", default-features = false, features = ["json"] } secp256k1 = { version = "0.29.0", features = ["recovery"] } tokio-util = { version = "0.7.7", features = ["codec"] } tokio = { version = "1" } @@ -57,7 +60,7 @@ rustls-tls = ["reqwest/rustls-tls"] test = [] [dev-dependencies] -clap = { version = "=4.4.18", features = [ "derive" ] } # TODO clap v4.5 requires rustc v1.74.0+ +clap = { version = "4.4.18", features = ["derive"] } httpmock = "0.6" async-global-executor = "2.3.1" hex = "0.4" diff --git a/src/rpc/ckb.rs b/src/rpc/ckb.rs index a435430a..958e0adc 100644 --- a/src/rpc/ckb.rs +++ b/src/rpc/ckb.rs @@ -110,6 +110,102 @@ crate::jsonrpc!(pub struct CkbRpcClient { pub fn calculate_dao_maximum_withdraw(&self, out_point: OutPoint, kind: DaoWithdrawingCalculationKind) -> Capacity; }); +crate::jsonrpc_async!(pub struct CkbRpcAyncClient { + // Chain + pub fn get_block(&self, hash: H256) -> Option; + pub fn get_block_by_number(&self, number: BlockNumber) -> Option; + pub fn get_block_hash(&self, number: BlockNumber) -> Option; + pub fn get_block_filter(&self, block_hash: H256) -> Option; + pub fn get_current_epoch(&self) -> EpochView; + pub fn get_epoch_by_number(&self, number: EpochNumber) -> Option; + pub fn get_header(&self, hash: H256) -> Option; + pub fn get_header_by_number(&self, number: BlockNumber) -> Option; + pub fn get_live_cell(&self, out_point: OutPoint, with_data: bool) -> CellWithStatus; + pub fn get_tip_block_number(&self) -> BlockNumber; + pub fn get_tip_header(&self) -> HeaderView; + pub fn get_transaction(&self, hash: H256) -> Option; + pub fn get_transaction_proof( + &self, + tx_hashes: Vec, + block_hash: Option + ) -> TransactionProof; + pub fn verify_transaction_proof(&self, tx_proof: TransactionProof) -> Vec; + pub fn get_transaction_and_witness_proof(&self, tx_hashes: Vec, + block_hash: Option) -> TransactionAndWitnessProof; + pub fn verify_transaction_and_witness_proof(&self, tx_proof: TransactionAndWitnessProof) -> Vec; + pub fn get_fork_block(&self, block_hash: H256) -> Option; + pub fn get_consensus(&self) -> Consensus; + pub fn get_deployments_info(&self) -> DeploymentsInfo; + pub fn get_block_median_time(&self, block_hash: H256) -> Option; + pub fn get_block_economic_state(&self, block_hash: H256) -> Option; + pub fn estimate_cycles(&self, tx: Transaction)-> EstimateCycles; + pub fn get_fee_rate_statics(&self, target:Option) -> Option; + pub fn get_fee_rate_statistics(&self, target:Option) -> Option; + + // Indexer + pub fn get_indexer_tip(&self) -> Option; + pub fn get_cells(&self, search_key: SearchKey, order: Order, limit: Uint32, after: Option) -> Pagination; + pub fn get_transactions(&self, search_key: SearchKey, order: Order, limit: Uint32, after: Option) -> Pagination; + pub fn get_cells_capacity(&self, search_key: SearchKey) -> Option; + + // Net + pub fn get_banned_addresses(&self) -> Vec; + pub fn get_peers(&self) -> Vec; + pub fn local_node_info(&self) -> LocalNode; + pub fn set_ban( + &self, + address: String, + command: String, + ban_time: Option, + absolute: Option, + reason: Option + ) -> (); + pub fn sync_state(&self) -> SyncState; + pub fn set_network_active(&self, state: bool) -> (); + pub fn add_node(&self, peer_id: String, address: String) -> (); + pub fn remove_node(&self, peer_id: String) -> (); + pub fn clear_banned_addresses(&self) -> (); + pub fn ping_peers(&self) -> (); + + // Pool + pub fn send_transaction(&self, tx: Transaction, outputs_validator: Option) -> H256; + pub fn remove_transaction(&self, tx_hash: H256) -> bool; + pub fn tx_pool_info(&self) -> TxPoolInfo; + pub fn get_pool_tx_detail_info(&self, tx_hash: H256) -> PoolTxDetailInfo; + pub fn clear_tx_pool(&self) -> (); + pub fn get_raw_tx_pool(&self, verbose: Option) -> RawTxPool; + pub fn tx_pool_ready(&self) -> bool; + pub fn test_tx_pool_accept(&self, tx: Transaction, outputs_validator: Option) -> EntryCompleted; + pub fn clear_tx_verify_queue(&self) -> (); + + // Stats + pub fn get_blockchain_info(&self) -> ChainInfo; + + // Miner + pub fn get_block_template(&self, bytes_limit: Option, proposals_limit: Option, max_version: Option) -> BlockTemplate; + pub fn submit_block(&self, _work_id: String, _data: Block) -> H256; + + // Alert + pub fn send_alert(&self, alert: Alert) -> (); + + // IntegrationTest + pub fn process_block_without_verify(&self, data: Block, broadcast: bool) -> Option; + pub fn truncate(&self, target_tip_hash: H256) -> (); + pub fn generate_block(&self) -> H256; + pub fn generate_epochs(&self, num_epochs: EpochNumberWithFraction) -> EpochNumberWithFraction; + pub fn notify_transaction(&self, tx: Transaction) -> H256; + pub fn calculate_dao_field(&self, block_template: BlockTemplate) -> JsonBytes; + pub fn generate_block_with_template(&self, block_template: BlockTemplate) -> H256; + + // Debug + pub fn jemalloc_profiling_dump(&self) -> String; + pub fn update_main_logger(&self, config: MainLoggerConfig) -> (); + pub fn set_extra_logger(&self, name: String, config_opt: Option) -> (); + + // Experimental + pub fn calculate_dao_maximum_withdraw(&self, out_point: OutPoint, kind: DaoWithdrawingCalculationKind) -> Capacity; +}); + fn transform_cycles(cycles: Option>) -> Vec { cycles .map(|c| c.into_iter().map(Into::into).collect()) diff --git a/src/rpc/ckb_indexer.rs b/src/rpc/ckb_indexer.rs index 5599acaa..f80eec32 100644 --- a/src/rpc/ckb_indexer.rs +++ b/src/rpc/ckb_indexer.rs @@ -195,3 +195,10 @@ crate::jsonrpc!(pub struct IndexerRpcClient { pub fn get_transactions(&self, search_key: SearchKey, order: Order, limit: Uint32, after: Option) -> Pagination; pub fn get_cells_capacity(&self, search_key: SearchKey) -> Option; }); + +crate::jsonrpc_async!(pub struct IndexerRpcAsyncClient { + pub fn get_indexer_tip(&self) -> Option; + pub fn get_cells(&self, search_key: SearchKey, order: Order, limit: Uint32, after: Option) -> Pagination; + pub fn get_transactions(&self, search_key: SearchKey, order: Order, limit: Uint32, after: Option) -> Pagination; + pub fn get_cells_capacity(&self, search_key: SearchKey) -> Option; +}); diff --git a/src/rpc/ckb_light_client.rs b/src/rpc/ckb_light_client.rs index 3d7c6670..3d3369d4 100644 --- a/src/rpc/ckb_light_client.rs +++ b/src/rpc/ckb_light_client.rs @@ -167,3 +167,35 @@ crate::jsonrpc!(pub struct LightClientRpcClient { pub fn get_peers(&self) -> Vec; pub fn local_node_info(&self) -> LocalNode; }); + +crate::jsonrpc_async!(pub struct LightClientRpcAsyncClient { + // BlockFilter + pub fn set_scripts(&self, scripts: Vec, command: Option) -> (); + pub fn get_scripts(&self) -> Vec; + pub fn get_cells(&self, search_key: SearchKey, order: Order, limit: Uint32, after: Option) -> Pagination; + pub fn get_transactions(&self, search_key: SearchKey, order: Order, limit: Uint32, after: Option) -> Pagination; + pub fn get_cells_capacity(&self, search_key: SearchKey) -> CellsCapacity; + + // Transaction + pub fn send_transaction(&self, tx: Transaction) -> H256; + + // Chain + pub fn get_tip_header(&self) -> HeaderView; + pub fn get_genesis_block(&self) -> BlockView; + pub fn get_header(&self, block_hash: H256) -> Option; + pub fn get_transaction(&self, tx_hash: H256) -> Option; + pub fn estimate_cycles(&self, tx: Transaction)-> EstimateCycles; + /// Fetch a header from remote node. If return status is `not_found` will re-sent fetching request immediately. + /// + /// Returns: FetchStatus + pub fn fetch_header(&self, block_hash: H256) -> FetchStatus; + + /// Fetch a transaction from remote node. If return status is `not_found` will re-sent fetching request immediately. + /// + /// Returns: FetchStatus + pub fn fetch_transaction(&self, tx_hash: H256) -> FetchStatus; + + // Net + pub fn get_peers(&self) -> Vec; + pub fn local_node_info(&self) -> LocalNode; +}); diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 1ba4b879..222e0ced 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -8,8 +8,12 @@ pub use ckb_indexer::IndexerRpcClient; use ckb_jsonrpc_types::{JsonBytes, ResponseFormat}; pub use ckb_light_client::LightClientRpcClient; +use std::sync::LazyLock; use thiserror::Error; +static RUNTIME: LazyLock = + LazyLock::new(|| tokio::runtime::Runtime::new().unwrap()); + #[derive(Error, Debug)] pub enum RpcError { #[error("parse json error: `{0}`")] @@ -34,21 +38,22 @@ macro_rules! jsonrpc { ) => ( $(#[$struct_attr])* pub struct $struct_name { - pub client: reqwest::blocking::Client, - pub url: reqwest::Url, - pub id: std::sync::atomic::AtomicU64, + client: crate::rpc::RpcClient, + id: std::sync::atomic::AtomicU64, } impl Clone for $struct_name { fn clone(&self) -> Self { - Self::new(&self.url.to_string()) + Self { + client: self.client.clone(), + id: 0.into() + } } } impl $struct_name { pub fn new(uri: &str) -> Self { - let url = reqwest::Url::parse(uri).expect("ckb uri, e.g. \"http://127.0.0.1:8114\""); - $struct_name { url, id: 0.into(), client: reqwest::blocking::Client::new(), } + $struct_name { id: 0.into(), client: crate::rpc::RpcClient::new(uri), } } pub fn post(&self, method:&str, params: PARAM)->Result @@ -56,25 +61,19 @@ macro_rules! jsonrpc { PARAM:serde::ser::Serialize, RET: serde::de::DeserializeOwned, { - let params = serde_json::to_value(params)?; let id = self.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let params_fn = || -> Result<_,_> { + let params = serde_json::to_value(params)?; + let mut req_json = serde_json::Map::new(); + req_json.insert("id".to_owned(), serde_json::json!(id)); + req_json.insert("jsonrpc".to_owned(), serde_json::json!("2.0")); + req_json.insert("method".to_owned(), serde_json::json!(method)); + req_json.insert("params".to_owned(), params); + Ok(req_json) + }; - let mut req_json = serde_json::Map::new(); - req_json.insert("id".to_owned(), serde_json::json!(id)); - req_json.insert("jsonrpc".to_owned(), serde_json::json!("2.0")); - req_json.insert("method".to_owned(), serde_json::json!(method)); - req_json.insert("params".to_owned(), params); - - let resp = self.client.post(self.url.clone()).json(&req_json).send()?; - let output = resp.json::()?; - match output { - jsonrpc_core::response::Output::Success(success) => { - serde_json::from_value(success.result).map_err(Into::into) - }, - jsonrpc_core::response::Output::Failure(failure) => { - Err(failure.error.into()) - } - } + let task = self.client.post(params_fn); + crate::rpc::RUNTIME.block_on(task) } @@ -85,28 +84,138 @@ macro_rules! jsonrpc { let params = $crate::serialize_parameters!($($arg_name,)*); let id = $selff.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let params_fn = || -> Result<_,_> { + let mut req_json = serde_json::Map::new(); + req_json.insert("id".to_owned(), serde_json::json!(id)); + req_json.insert("jsonrpc".to_owned(), serde_json::json!("2.0")); + req_json.insert("method".to_owned(), serde_json::json!(method)); + req_json.insert("params".to_owned(), params); + Ok(req_json) + }; + + let task = $selff.client.post(params_fn); + crate::rpc::RUNTIME.block_on(task) + } + )* + } + ) +} + +#[macro_export] +macro_rules! jsonrpc_async { + ( + $(#[$struct_attr:meta])* + pub struct $struct_name:ident {$( + $(#[$attr:meta])* + pub fn $method:ident(& $selff:ident $(, $arg_name:ident: $arg_ty:ty)*) + -> $return_ty:ty; + )*} + ) => ( + $(#[$struct_attr])* + pub struct $struct_name { + client: crate::rpc::RpcClient, + id: std::sync::atomic::AtomicU64, + } + + impl Clone for $struct_name { + fn clone(&self) -> Self { + Self { + client: self.client.clone(), + id: 0.into() + } + } + } + + impl $struct_name { + pub fn new(uri: &str) -> Self { + $struct_name { id: 0.into(), client: crate::rpc::RpcClient::new(uri), } + } + + pub fn post(&self, method:&str, params: PARAM)->impl std::future::Future> + where + PARAM:serde::ser::Serialize, + RET: serde::de::DeserializeOwned, + { + let id = self.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let method = serde_json::json!(method); + + let params_fn = move || -> Result<_,_> { + let params = serde_json::to_value(params)?; let mut req_json = serde_json::Map::new(); req_json.insert("id".to_owned(), serde_json::json!(id)); req_json.insert("jsonrpc".to_owned(), serde_json::json!("2.0")); - req_json.insert("method".to_owned(), serde_json::json!(method)); + req_json.insert("method".to_owned(), method); req_json.insert("params".to_owned(), params); + Ok(req_json) + }; + + self.client.post(params_fn) + + } + + $( + $(#[$attr])* + pub fn $method(&$selff $(, $arg_name: $arg_ty)*) -> impl std::future::Future> { + let id = $selff.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + let params_fn = move || -> Result<_,_> { + let method = String::from(stringify!($method)); + let params = $crate::serialize_parameters!($($arg_name,)*); + let mut req_json = serde_json::Map::new(); + req_json.insert("id".to_owned(), serde_json::json!(id)); + req_json.insert("jsonrpc".to_owned(), serde_json::json!("2.0")); + req_json.insert("method".to_owned(), serde_json::json!(method)); + req_json.insert("params".to_owned(), params); + Ok(req_json) + }; - let resp = $selff.client.post($selff.url.clone()).json(&req_json).send()?; - let output = resp.json::()?; - match output { - jsonrpc_core::response::Output::Success(success) => { - serde_json::from_value(success.result).map_err(Into::into) - }, - jsonrpc_core::response::Output::Failure(failure) => { - Err(failure.error.into()) - } - } + $selff.client.post(params_fn) } )* } ) } +#[derive(Debug, Clone)] +pub(crate) struct RpcClient { + client: reqwest::Client, + url: reqwest::Url, +} + +impl RpcClient { + pub fn new(uri: &str) -> Self { + let url = reqwest::Url::parse(uri).expect("ckb uri, e.g. \"http://127.0.0.1:8114\""); + Self { + client: reqwest::Client::new(), + url, + } + } + + pub fn post( + &self, + json_post_params: T, + ) -> impl std::future::Future> + where + PARAM: serde::ser::Serialize, + RET: serde::de::DeserializeOwned, + T: FnOnce() -> Result, + { + let url = self.url.clone(); + let client = self.client.clone(); + + async move { + let resp = client.post(url).json(&json_post_params()?).send().await?; + let output = resp.json::().await?; + match output { + jsonrpc_core::response::Output::Success(success) => { + serde_json::from_value(success.result).map_err(Into::into) + } + jsonrpc_core::response::Output::Failure(failure) => Err(failure.error.into()), + } + } + } +} + #[macro_export] macro_rules! serialize_parameters { () => ( serde_json::Value::Null );