From 8e6217d26eda3d94685ec117165a9d76bc0f7c7c Mon Sep 17 00:00:00 2001 From: driftluo Date: Wed, 18 Dec 2024 17:05:16 +0800 Subject: [PATCH 1/2] feat: add new macro to support async client --- Cargo.toml | 9 +- src/rpc/ckb.rs | 96 +++++++++++++++++++ src/rpc/ckb_indexer.rs | 7 ++ src/rpc/ckb_light_client.rs | 32 +++++++ src/rpc/mod.rs | 177 +++++++++++++++++++++++++++++------- 5 files changed, 284 insertions(+), 37 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 77114fdc..f6acd1f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "ckb-sdk" version = "3.5.0" -authors = [ "Linfeng Qian ", "Nervos Core Dev " ] +authors = [ + "Linfeng Qian ", + "Nervos Core Dev ", +] edition = "2018" license = "MIT" description = "Rust SDK for CKB" @@ -17,7 +20,7 @@ anyhow = "1.0.63" bech32 = "0.8.1" derive-getters = "0.2.1" log = "0.4.6" -reqwest = { version = "0.11", default-features = false, features = [ "json", "blocking" ] } +reqwest = { version = "0.12", default-features = false, features = ["json"] } secp256k1 = { version = "0.29.0", features = ["recovery"] } tokio-util = { version = "0.7.7", features = ["codec"] } tokio = { version = "1" } @@ -57,7 +60,7 @@ rustls-tls = ["reqwest/rustls-tls"] test = [] [dev-dependencies] -clap = { version = "=4.4.18", features = [ "derive" ] } # TODO clap v4.5 requires rustc v1.74.0+ +clap = { version = "4.4.18", features = ["derive"] } httpmock = "0.6" async-global-executor = "2.3.1" hex = "0.4" diff --git a/src/rpc/ckb.rs b/src/rpc/ckb.rs index a435430a..958e0adc 100644 --- a/src/rpc/ckb.rs +++ b/src/rpc/ckb.rs @@ -110,6 +110,102 @@ crate::jsonrpc!(pub struct CkbRpcClient { pub fn calculate_dao_maximum_withdraw(&self, out_point: OutPoint, kind: DaoWithdrawingCalculationKind) -> Capacity; }); +crate::jsonrpc_async!(pub struct CkbRpcAyncClient { + // Chain + pub fn get_block(&self, hash: H256) -> Option; + pub fn get_block_by_number(&self, number: BlockNumber) -> Option; + pub fn get_block_hash(&self, number: BlockNumber) -> Option; + pub fn get_block_filter(&self, block_hash: H256) -> Option; + pub fn get_current_epoch(&self) -> EpochView; + pub fn get_epoch_by_number(&self, number: EpochNumber) -> Option; + pub fn get_header(&self, hash: H256) -> Option; + pub fn get_header_by_number(&self, number: BlockNumber) -> Option; + pub fn get_live_cell(&self, out_point: OutPoint, with_data: bool) -> CellWithStatus; + pub fn get_tip_block_number(&self) -> BlockNumber; + pub fn get_tip_header(&self) -> HeaderView; + pub fn get_transaction(&self, hash: H256) -> Option; + pub fn get_transaction_proof( + &self, + tx_hashes: Vec, + block_hash: Option + ) -> TransactionProof; + pub fn verify_transaction_proof(&self, tx_proof: TransactionProof) -> Vec; + pub fn get_transaction_and_witness_proof(&self, tx_hashes: Vec, + block_hash: Option) -> TransactionAndWitnessProof; + pub fn verify_transaction_and_witness_proof(&self, tx_proof: TransactionAndWitnessProof) -> Vec; + pub fn get_fork_block(&self, block_hash: H256) -> Option; + pub fn get_consensus(&self) -> Consensus; + pub fn get_deployments_info(&self) -> DeploymentsInfo; + pub fn get_block_median_time(&self, block_hash: H256) -> Option; + pub fn get_block_economic_state(&self, block_hash: H256) -> Option; + pub fn estimate_cycles(&self, tx: Transaction)-> EstimateCycles; + pub fn get_fee_rate_statics(&self, target:Option) -> Option; + pub fn get_fee_rate_statistics(&self, target:Option) -> Option; + + // Indexer + pub fn get_indexer_tip(&self) -> Option; + pub fn get_cells(&self, search_key: SearchKey, order: Order, limit: Uint32, after: Option) -> Pagination; + pub fn get_transactions(&self, search_key: SearchKey, order: Order, limit: Uint32, after: Option) -> Pagination; + pub fn get_cells_capacity(&self, search_key: SearchKey) -> Option; + + // Net + pub fn get_banned_addresses(&self) -> Vec; + pub fn get_peers(&self) -> Vec; + pub fn local_node_info(&self) -> LocalNode; + pub fn set_ban( + &self, + address: String, + command: String, + ban_time: Option, + absolute: Option, + reason: Option + ) -> (); + pub fn sync_state(&self) -> SyncState; + pub fn set_network_active(&self, state: bool) -> (); + pub fn add_node(&self, peer_id: String, address: String) -> (); + pub fn remove_node(&self, peer_id: String) -> (); + pub fn clear_banned_addresses(&self) -> (); + pub fn ping_peers(&self) -> (); + + // Pool + pub fn send_transaction(&self, tx: Transaction, outputs_validator: Option) -> H256; + pub fn remove_transaction(&self, tx_hash: H256) -> bool; + pub fn tx_pool_info(&self) -> TxPoolInfo; + pub fn get_pool_tx_detail_info(&self, tx_hash: H256) -> PoolTxDetailInfo; + pub fn clear_tx_pool(&self) -> (); + pub fn get_raw_tx_pool(&self, verbose: Option) -> RawTxPool; + pub fn tx_pool_ready(&self) -> bool; + pub fn test_tx_pool_accept(&self, tx: Transaction, outputs_validator: Option) -> EntryCompleted; + pub fn clear_tx_verify_queue(&self) -> (); + + // Stats + pub fn get_blockchain_info(&self) -> ChainInfo; + + // Miner + pub fn get_block_template(&self, bytes_limit: Option, proposals_limit: Option, max_version: Option) -> BlockTemplate; + pub fn submit_block(&self, _work_id: String, _data: Block) -> H256; + + // Alert + pub fn send_alert(&self, alert: Alert) -> (); + + // IntegrationTest + pub fn process_block_without_verify(&self, data: Block, broadcast: bool) -> Option; + pub fn truncate(&self, target_tip_hash: H256) -> (); + pub fn generate_block(&self) -> H256; + pub fn generate_epochs(&self, num_epochs: EpochNumberWithFraction) -> EpochNumberWithFraction; + pub fn notify_transaction(&self, tx: Transaction) -> H256; + pub fn calculate_dao_field(&self, block_template: BlockTemplate) -> JsonBytes; + pub fn generate_block_with_template(&self, block_template: BlockTemplate) -> H256; + + // Debug + pub fn jemalloc_profiling_dump(&self) -> String; + pub fn update_main_logger(&self, config: MainLoggerConfig) -> (); + pub fn set_extra_logger(&self, name: String, config_opt: Option) -> (); + + // Experimental + pub fn calculate_dao_maximum_withdraw(&self, out_point: OutPoint, kind: DaoWithdrawingCalculationKind) -> Capacity; +}); + fn transform_cycles(cycles: Option>) -> Vec { cycles .map(|c| c.into_iter().map(Into::into).collect()) diff --git a/src/rpc/ckb_indexer.rs b/src/rpc/ckb_indexer.rs index 5599acaa..f80eec32 100644 --- a/src/rpc/ckb_indexer.rs +++ b/src/rpc/ckb_indexer.rs @@ -195,3 +195,10 @@ crate::jsonrpc!(pub struct IndexerRpcClient { pub fn get_transactions(&self, search_key: SearchKey, order: Order, limit: Uint32, after: Option) -> Pagination; pub fn get_cells_capacity(&self, search_key: SearchKey) -> Option; }); + +crate::jsonrpc_async!(pub struct IndexerRpcAsyncClient { + pub fn get_indexer_tip(&self) -> Option; + pub fn get_cells(&self, search_key: SearchKey, order: Order, limit: Uint32, after: Option) -> Pagination; + pub fn get_transactions(&self, search_key: SearchKey, order: Order, limit: Uint32, after: Option) -> Pagination; + pub fn get_cells_capacity(&self, search_key: SearchKey) -> Option; +}); diff --git a/src/rpc/ckb_light_client.rs b/src/rpc/ckb_light_client.rs index 3d7c6670..3d3369d4 100644 --- a/src/rpc/ckb_light_client.rs +++ b/src/rpc/ckb_light_client.rs @@ -167,3 +167,35 @@ crate::jsonrpc!(pub struct LightClientRpcClient { pub fn get_peers(&self) -> Vec; pub fn local_node_info(&self) -> LocalNode; }); + +crate::jsonrpc_async!(pub struct LightClientRpcAsyncClient { + // BlockFilter + pub fn set_scripts(&self, scripts: Vec, command: Option) -> (); + pub fn get_scripts(&self) -> Vec; + pub fn get_cells(&self, search_key: SearchKey, order: Order, limit: Uint32, after: Option) -> Pagination; + pub fn get_transactions(&self, search_key: SearchKey, order: Order, limit: Uint32, after: Option) -> Pagination; + pub fn get_cells_capacity(&self, search_key: SearchKey) -> CellsCapacity; + + // Transaction + pub fn send_transaction(&self, tx: Transaction) -> H256; + + // Chain + pub fn get_tip_header(&self) -> HeaderView; + pub fn get_genesis_block(&self) -> BlockView; + pub fn get_header(&self, block_hash: H256) -> Option; + pub fn get_transaction(&self, tx_hash: H256) -> Option; + pub fn estimate_cycles(&self, tx: Transaction)-> EstimateCycles; + /// Fetch a header from remote node. If return status is `not_found` will re-sent fetching request immediately. + /// + /// Returns: FetchStatus + pub fn fetch_header(&self, block_hash: H256) -> FetchStatus; + + /// Fetch a transaction from remote node. If return status is `not_found` will re-sent fetching request immediately. + /// + /// Returns: FetchStatus + pub fn fetch_transaction(&self, tx_hash: H256) -> FetchStatus; + + // Net + pub fn get_peers(&self) -> Vec; + pub fn local_node_info(&self) -> LocalNode; +}); diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 1ba4b879..222e0ced 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -8,8 +8,12 @@ pub use ckb_indexer::IndexerRpcClient; use ckb_jsonrpc_types::{JsonBytes, ResponseFormat}; pub use ckb_light_client::LightClientRpcClient; +use std::sync::LazyLock; use thiserror::Error; +static RUNTIME: LazyLock = + LazyLock::new(|| tokio::runtime::Runtime::new().unwrap()); + #[derive(Error, Debug)] pub enum RpcError { #[error("parse json error: `{0}`")] @@ -34,21 +38,22 @@ macro_rules! jsonrpc { ) => ( $(#[$struct_attr])* pub struct $struct_name { - pub client: reqwest::blocking::Client, - pub url: reqwest::Url, - pub id: std::sync::atomic::AtomicU64, + client: crate::rpc::RpcClient, + id: std::sync::atomic::AtomicU64, } impl Clone for $struct_name { fn clone(&self) -> Self { - Self::new(&self.url.to_string()) + Self { + client: self.client.clone(), + id: 0.into() + } } } impl $struct_name { pub fn new(uri: &str) -> Self { - let url = reqwest::Url::parse(uri).expect("ckb uri, e.g. \"http://127.0.0.1:8114\""); - $struct_name { url, id: 0.into(), client: reqwest::blocking::Client::new(), } + $struct_name { id: 0.into(), client: crate::rpc::RpcClient::new(uri), } } pub fn post(&self, method:&str, params: PARAM)->Result @@ -56,25 +61,19 @@ macro_rules! jsonrpc { PARAM:serde::ser::Serialize, RET: serde::de::DeserializeOwned, { - let params = serde_json::to_value(params)?; let id = self.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let params_fn = || -> Result<_,_> { + let params = serde_json::to_value(params)?; + let mut req_json = serde_json::Map::new(); + req_json.insert("id".to_owned(), serde_json::json!(id)); + req_json.insert("jsonrpc".to_owned(), serde_json::json!("2.0")); + req_json.insert("method".to_owned(), serde_json::json!(method)); + req_json.insert("params".to_owned(), params); + Ok(req_json) + }; - let mut req_json = serde_json::Map::new(); - req_json.insert("id".to_owned(), serde_json::json!(id)); - req_json.insert("jsonrpc".to_owned(), serde_json::json!("2.0")); - req_json.insert("method".to_owned(), serde_json::json!(method)); - req_json.insert("params".to_owned(), params); - - let resp = self.client.post(self.url.clone()).json(&req_json).send()?; - let output = resp.json::()?; - match output { - jsonrpc_core::response::Output::Success(success) => { - serde_json::from_value(success.result).map_err(Into::into) - }, - jsonrpc_core::response::Output::Failure(failure) => { - Err(failure.error.into()) - } - } + let task = self.client.post(params_fn); + crate::rpc::RUNTIME.block_on(task) } @@ -85,28 +84,138 @@ macro_rules! jsonrpc { let params = $crate::serialize_parameters!($($arg_name,)*); let id = $selff.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let params_fn = || -> Result<_,_> { + let mut req_json = serde_json::Map::new(); + req_json.insert("id".to_owned(), serde_json::json!(id)); + req_json.insert("jsonrpc".to_owned(), serde_json::json!("2.0")); + req_json.insert("method".to_owned(), serde_json::json!(method)); + req_json.insert("params".to_owned(), params); + Ok(req_json) + }; + + let task = $selff.client.post(params_fn); + crate::rpc::RUNTIME.block_on(task) + } + )* + } + ) +} + +#[macro_export] +macro_rules! jsonrpc_async { + ( + $(#[$struct_attr:meta])* + pub struct $struct_name:ident {$( + $(#[$attr:meta])* + pub fn $method:ident(& $selff:ident $(, $arg_name:ident: $arg_ty:ty)*) + -> $return_ty:ty; + )*} + ) => ( + $(#[$struct_attr])* + pub struct $struct_name { + client: crate::rpc::RpcClient, + id: std::sync::atomic::AtomicU64, + } + + impl Clone for $struct_name { + fn clone(&self) -> Self { + Self { + client: self.client.clone(), + id: 0.into() + } + } + } + + impl $struct_name { + pub fn new(uri: &str) -> Self { + $struct_name { id: 0.into(), client: crate::rpc::RpcClient::new(uri), } + } + + pub fn post(&self, method:&str, params: PARAM)->impl std::future::Future> + where + PARAM:serde::ser::Serialize, + RET: serde::de::DeserializeOwned, + { + let id = self.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let method = serde_json::json!(method); + + let params_fn = move || -> Result<_,_> { + let params = serde_json::to_value(params)?; let mut req_json = serde_json::Map::new(); req_json.insert("id".to_owned(), serde_json::json!(id)); req_json.insert("jsonrpc".to_owned(), serde_json::json!("2.0")); - req_json.insert("method".to_owned(), serde_json::json!(method)); + req_json.insert("method".to_owned(), method); req_json.insert("params".to_owned(), params); + Ok(req_json) + }; + + self.client.post(params_fn) + + } + + $( + $(#[$attr])* + pub fn $method(&$selff $(, $arg_name: $arg_ty)*) -> impl std::future::Future> { + let id = $selff.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + let params_fn = move || -> Result<_,_> { + let method = String::from(stringify!($method)); + let params = $crate::serialize_parameters!($($arg_name,)*); + let mut req_json = serde_json::Map::new(); + req_json.insert("id".to_owned(), serde_json::json!(id)); + req_json.insert("jsonrpc".to_owned(), serde_json::json!("2.0")); + req_json.insert("method".to_owned(), serde_json::json!(method)); + req_json.insert("params".to_owned(), params); + Ok(req_json) + }; - let resp = $selff.client.post($selff.url.clone()).json(&req_json).send()?; - let output = resp.json::()?; - match output { - jsonrpc_core::response::Output::Success(success) => { - serde_json::from_value(success.result).map_err(Into::into) - }, - jsonrpc_core::response::Output::Failure(failure) => { - Err(failure.error.into()) - } - } + $selff.client.post(params_fn) } )* } ) } +#[derive(Debug, Clone)] +pub(crate) struct RpcClient { + client: reqwest::Client, + url: reqwest::Url, +} + +impl RpcClient { + pub fn new(uri: &str) -> Self { + let url = reqwest::Url::parse(uri).expect("ckb uri, e.g. \"http://127.0.0.1:8114\""); + Self { + client: reqwest::Client::new(), + url, + } + } + + pub fn post( + &self, + json_post_params: T, + ) -> impl std::future::Future> + where + PARAM: serde::ser::Serialize, + RET: serde::de::DeserializeOwned, + T: FnOnce() -> Result, + { + let url = self.url.clone(); + let client = self.client.clone(); + + async move { + let resp = client.post(url).json(&json_post_params()?).send().await?; + let output = resp.json::().await?; + match output { + jsonrpc_core::response::Output::Success(success) => { + serde_json::from_value(success.result).map_err(Into::into) + } + jsonrpc_core::response::Output::Failure(failure) => Err(failure.error.into()), + } + } + } +} + #[macro_export] macro_rules! serialize_parameters { () => ( serde_json::Value::Null ); From f484708cfda60f1f574bb7a44122d8f345e4e154 Mon Sep 17 00:00:00 2001 From: driftluo Date: Mon, 23 Dec 2024 15:18:14 +0800 Subject: [PATCH 2/2] feat: support async api --- Cargo.toml | 2 +- deny.toml | 7 +- examples/script_unlocker_example.rs | 19 +-- rust-toolchain | 2 +- src/lib.rs | 2 +- src/rpc/ckb.rs | 205 +++++++++++++++++++++++++++- src/rpc/ckb_indexer.rs | 9 ++ src/rpc/ckb_light_client.rs | 9 ++ src/rpc/mod.rs | 71 +++++++--- src/test_util.rs | 36 +++-- src/tests/ckb_rpc.rs | 15 +- src/tests/cycle.rs | 5 +- src/tests/mod.rs | 8 +- src/traits/default_impls.rs | 112 +++++++++++---- src/traits/dummy_impls.rs | 34 +++-- src/traits/light_client_impls.rs | 65 ++++++--- src/traits/mod.rs | 89 ++++++++++-- src/traits/offchain_impls.rs | 38 ++++-- src/tx_builder/acp.rs | 7 +- src/tx_builder/cheque.rs | 28 ++-- src/tx_builder/dao.rs | 53 ++++--- src/tx_builder/mod.rs | 58 ++++++-- src/tx_builder/omni_lock.rs | 5 +- src/tx_builder/transfer.rs | 3 +- src/tx_builder/udt/mod.rs | 36 +++-- src/unlock/rc_data.rs | 18 +-- src/unlock/unlocker.rs | 192 ++++++++++++++++---------- src/util.rs | 9 +- 28 files changed, 866 insertions(+), 271 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f6acd1f6..65bb4c04 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ parking_lot = "0.12" lru = "0.7.1" dashmap = "5.4" dyn-clone = "1.0" +async-trait = "0.1" ckb-types = "0.119.0" ckb-dao-utils = "0.119.0" @@ -50,7 +51,6 @@ ckb-mock-tx-types = { version = "0.119.0" } ckb-chain-spec = "0.119.0" sparse-merkle-tree = "0.6.1" -lazy_static = "1.3.0" [features] default = ["default-tls"] diff --git a/deny.toml b/deny.toml index df53c8cc..314be59f 100644 --- a/deny.toml +++ b/deny.toml @@ -74,7 +74,8 @@ ignore = [ #{ id = "RUSTSEC-0000-0000", reason = "you can specify a reason the advisory is ignored" }, #"a-crate-that-is-yanked@0.1.1", # you can also ignore yanked crate versions if you wish #{ crate = "a-crate-that-is-yanked@0.1.1", reason = "you can specify why you are ignoring the yanked crate" - "RUSTSEC-2024-0370" # proc-macro-error's maintainer seems to be unreachable, ignore this + "RUSTSEC-2024-0370", # proc-macro-error's maintainer seems to be unreachable, ignore this + "RUSTSEC-2024-0384", # instant is no longer maintained, ignore this ] # If this is true, then cargo deny will use the git executable to fetch advisory database. # If this is false, then it uses a built-in git library. @@ -97,8 +98,8 @@ allow = [ "ISC", "MIT", "Unicode-DFS-2016", - "BSL-1.0", # xxhash-rust 0.8.10 - + "BSL-1.0", # xxhash-rust 0.8.10 + "Unicode-3.0", #"MIT", #"Apache-2.0", #"Apache-2.0 WITH LLVM-exception", diff --git a/examples/script_unlocker_example.rs b/examples/script_unlocker_example.rs index a4c3dca6..28b73122 100644 --- a/examples/script_unlocker_example.rs +++ b/examples/script_unlocker_example.rs @@ -17,13 +17,14 @@ use std::collections::HashMap; /// [CapacityDiff]: https://github.com/doitian/ckb-sdk-examples-capacity-diff struct CapacityDiffUnlocker {} +#[async_trait::async_trait] impl ScriptUnlocker for CapacityDiffUnlocker { // This works for any args fn match_args(&self, _args: &[u8]) -> bool { true } - fn unlock( + async fn unlock_async( &self, tx: &TransactionView, script_group: &ScriptGroup, @@ -45,12 +46,14 @@ impl ScriptUnlocker for CapacityDiffUnlocker { let mut total = 0i64; for i in &script_group.input_indices { - let cell = tx_dep_provider.get_cell( - &tx.inputs() - .get(*i) - .ok_or_else(|| other_unlock_error("input index out of bound"))? - .previous_output(), - )?; + let cell = tx_dep_provider + .get_cell_async( + &tx.inputs() + .get(*i) + .ok_or_else(|| other_unlock_error("input index out of bound"))? + .previous_output(), + ) + .await?; let capacity: u64 = cell.capacity().unpack(); total -= capacity as i64; } @@ -71,7 +74,7 @@ impl ScriptUnlocker for CapacityDiffUnlocker { // This is called before balancer. It's responsible to fill witness for inputs added manually // by users. - fn fill_placeholder_witness( + async fn fill_placeholder_witness_async( &self, tx: &TransactionView, script_group: &ScriptGroup, diff --git a/rust-toolchain b/rust-toolchain index 7c7053aa..dbd41264 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -1.75.0 +1.81.0 diff --git a/src/lib.rs b/src/lib.rs index f4745518..393a13df 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,7 +16,7 @@ pub mod test_util; #[cfg(test)] mod tests; -pub use rpc::{CkbRpcClient, IndexerRpcClient, RpcError}; +pub use rpc::{CkbRpcAsyncClient, CkbRpcClient, IndexerRpcAsyncClient, IndexerRpcClient, RpcError}; pub use types::{ Address, AddressPayload, AddressType, CodeHashIndex, HumanCapacity, NetworkInfo, NetworkType, OldAddress, OldAddressFormat, ScriptGroup, ScriptGroupType, ScriptId, Since, SinceType, diff --git a/src/rpc/ckb.rs b/src/rpc/ckb.rs index 958e0adc..abbf988f 100644 --- a/src/rpc/ckb.rs +++ b/src/rpc/ckb.rs @@ -110,7 +110,7 @@ crate::jsonrpc!(pub struct CkbRpcClient { pub fn calculate_dao_maximum_withdraw(&self, out_point: OutPoint, kind: DaoWithdrawingCalculationKind) -> Capacity; }); -crate::jsonrpc_async!(pub struct CkbRpcAyncClient { +crate::jsonrpc_async!(pub struct CkbRpcAsyncClient { // Chain pub fn get_block(&self, hash: H256) -> Option; pub fn get_block_by_number(&self, number: BlockNumber) -> Option; @@ -212,6 +212,15 @@ fn transform_cycles(cycles: Option>) -> Vec .unwrap_or_default() } +impl From<&CkbRpcClient> for CkbRpcAsyncClient { + fn from(value: &CkbRpcClient) -> Self { + Self { + client: value.client.clone(), + id: 0.into(), + } + } +} + impl CkbRpcClient { pub fn get_packed_block(&self, hash: H256) -> Result, crate::RpcError> { self.post("get_block", (hash, Some(Uint32::from(0u32)))) @@ -386,3 +395,197 @@ impl CkbRpcClient { self.post::<_, Option>("get_fork_block", (block_hash, Some(Uint32::from(0u32)))) } } + +impl CkbRpcAsyncClient { + pub async fn get_packed_block(&self, hash: H256) -> Result, crate::RpcError> { + self.post("get_block", (hash, Some(Uint32::from(0u32)))) + .await + } + + // turn block response into BlockView and cycle vec + fn transform_block_view_with_cycle( + opt_resp: Option, + ) -> Result)>, crate::rpc::RpcError> { + opt_resp + .map(|resp| match resp { + BlockResponse::Regular(block_view) => Ok((block_view.get_value()?, vec![])), + BlockResponse::WithCycles(block_cycles) => { + let cycles = transform_cycles(block_cycles.cycles); + Ok((block_cycles.block.get_value()?, cycles)) + } + }) + .transpose() + } + /// Same as get_block except with parameter with_cycles and return BlockResponse + pub async fn get_block_with_cycles( + &self, + hash: H256, + ) -> Result)>, crate::rpc::RpcError> { + let res = self + .post::<_, Option>("get_block", (hash, None::, true)) + .await?; + Self::transform_block_view_with_cycle(res) + } + + // turn BlockResponse to JsonBytes and Cycle tuple + fn blockresponse2bytes( + opt_resp: Option, + ) -> Result)>, crate::rpc::RpcError> { + opt_resp + .map(|resp| match resp { + BlockResponse::Regular(block_view) => Ok((block_view.get_json_bytes()?, vec![])), + BlockResponse::WithCycles(block_cycles) => { + let cycles = transform_cycles(block_cycles.cycles); + Ok((block_cycles.block.get_json_bytes()?, cycles)) + } + }) + .transpose() + } + + pub async fn get_packed_block_with_cycles( + &self, + hash: H256, + ) -> Result)>, crate::rpc::RpcError> { + let res = self + .post::<_, Option>("get_block", (hash, Some(Uint32::from(0u32)), true)) + .await?; + Self::blockresponse2bytes(res) + } + + /// Same as get_block_by_number except with parameter with_cycles and return BlockResponse + pub async fn get_packed_block_by_number( + &self, + number: BlockNumber, + ) -> Result, crate::rpc::RpcError> { + self.post("get_block_by_number", (number, Some(Uint32::from(0u32)))) + .await + } + + pub async fn get_block_by_number_with_cycles( + &self, + number: BlockNumber, + ) -> Result)>, crate::rpc::RpcError> { + let res = self + .post::<_, Option>("get_block_by_number", (number, None::, true)) + .await?; + Self::transform_block_view_with_cycle(res) + } + + pub async fn get_packed_block_by_number_with_cycles( + &self, + number: BlockNumber, + ) -> Result)>, crate::rpc::RpcError> { + let res = self + .post::<_, Option>( + "get_block_by_number", + (number, Some(Uint32::from(0u32)), true), + ) + .await?; + Self::blockresponse2bytes(res) + } + + pub async fn get_packed_header( + &self, + hash: H256, + ) -> Result, crate::rpc::RpcError> { + self.post::<_, Option>("get_header", (hash, Some(Uint32::from(0u32)))) + .await + } + + pub async fn get_packed_header_by_number( + &self, + number: BlockNumber, + ) -> Result, crate::rpc::RpcError> { + self.post::<_, Option>( + "get_header_by_number", + (number, Some(Uint32::from(0u32))), + ) + .await + } + + pub async fn get_live_cell_with_include_tx_pool( + &self, + out_point: OutPoint, + with_data: bool, + include_tx_pool: bool, + ) -> Result { + self.post::<_, CellWithStatus>( + "get_live_cell", + (out_point, with_data, Some(include_tx_pool)), + ) + .await + } + + // get transaction with only_committed=true + pub async fn get_only_committed_transaction( + &self, + hash: H256, + ) -> Result { + self.post::<_, TransactionWithStatusResponse>( + "get_transaction", + (hash, Some(Uint32::from(2u32)), true), + ) + .await + } + + // get transaction with verbosity=0 + pub async fn get_packed_transaction( + &self, + hash: H256, + ) -> Result { + self.post::<_, TransactionWithStatusResponse>( + "get_transaction", + (hash, Some(Uint32::from(0u32))), + ) + .await + } + + // get transaction with verbosity=0 and only_committed=true + pub async fn get_only_committed_packed_transaction( + &self, + hash: H256, + ) -> Result { + self.post::<_, TransactionWithStatusResponse>( + "get_transaction", + (hash, Some(Uint32::from(0u32)), true), + ) + .await + } + + // get transaction with verbosity=1, so the result transaction field is None + pub async fn get_transaction_status( + &self, + hash: H256, + ) -> Result { + self.post::<_, TransactionWithStatusResponse>( + "get_transaction", + (hash, Some(Uint32::from(1u32))), + ) + .await + } + + // get transaction with verbosity=1 and only_committed=true, so the result transaction field is None + pub async fn get_only_committed_transaction_status( + &self, + hash: H256, + ) -> Result { + self.post::<_, TransactionWithStatusResponse>( + "get_transaction", + (hash, Some(Uint32::from(1u32)), true), + ) + .await + } + + pub async fn get_packed_tip_header(&self) -> Result { + self.post::<_, JsonBytes>("get_tip_header", (Some(Uint32::from(0u32)),)) + .await + } + + pub async fn get_packed_fork_block( + &self, + block_hash: H256, + ) -> Result, crate::rpc::RpcError> { + self.post::<_, Option>("get_fork_block", (block_hash, Some(Uint32::from(0u32)))) + .await + } +} diff --git a/src/rpc/ckb_indexer.rs b/src/rpc/ckb_indexer.rs index f80eec32..79caa5fc 100644 --- a/src/rpc/ckb_indexer.rs +++ b/src/rpc/ckb_indexer.rs @@ -202,3 +202,12 @@ crate::jsonrpc_async!(pub struct IndexerRpcAsyncClient { pub fn get_transactions(&self, search_key: SearchKey, order: Order, limit: Uint32, after: Option) -> Pagination; pub fn get_cells_capacity(&self, search_key: SearchKey) -> Option; }); + +impl From<&IndexerRpcClient> for IndexerRpcAsyncClient { + fn from(value: &IndexerRpcClient) -> Self { + Self { + client: value.client.clone(), + id: 0.into(), + } + } +} diff --git a/src/rpc/ckb_light_client.rs b/src/rpc/ckb_light_client.rs index 3d3369d4..4460a2c6 100644 --- a/src/rpc/ckb_light_client.rs +++ b/src/rpc/ckb_light_client.rs @@ -199,3 +199,12 @@ crate::jsonrpc_async!(pub struct LightClientRpcAsyncClient { pub fn get_peers(&self) -> Vec; pub fn local_node_info(&self) -> LocalNode; }); + +impl From<&LightClientRpcClient> for LightClientRpcAsyncClient { + fn from(value: &LightClientRpcClient) -> Self { + Self { + client: value.client.clone(), + id: 0.into(), + } + } +} diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 222e0ced..8c12e4c3 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -3,16 +3,45 @@ pub mod ckb_indexer; pub mod ckb_light_client; use anyhow::anyhow; -pub use ckb::CkbRpcClient; -pub use ckb_indexer::IndexerRpcClient; +pub use ckb::{CkbRpcAsyncClient, CkbRpcClient}; +pub use ckb_indexer::{IndexerRpcAsyncClient, IndexerRpcClient}; use ckb_jsonrpc_types::{JsonBytes, ResponseFormat}; -pub use ckb_light_client::LightClientRpcClient; +pub use ckb_light_client::{LightClientRpcAsyncClient, LightClientRpcClient}; -use std::sync::LazyLock; +use std::future::Future; use thiserror::Error; -static RUNTIME: LazyLock = - LazyLock::new(|| tokio::runtime::Runtime::new().unwrap()); +pub(crate) fn block_on(future: impl Future + Send) -> F { + match tokio::runtime::Handle::try_current() { + Ok(h) + if matches!( + h.runtime_flavor(), + tokio::runtime::RuntimeFlavor::MultiThread + ) => + { + tokio::task::block_in_place(|| h.block_on(future)) + } + // if we on the current runtime, it must use another thread to poll this future, + // can't block on current runtime, it will block current reactor to stop forever + // in tokio runtime, this time will panic + Ok(_) => std::thread::scope(|s| { + s.spawn(|| { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(future) + }) + .join() + .unwrap() + }), + Err(_) => tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(future), + } +} #[derive(Error, Debug)] pub enum RpcError { @@ -38,8 +67,8 @@ macro_rules! jsonrpc { ) => ( $(#[$struct_attr])* pub struct $struct_name { - client: crate::rpc::RpcClient, - id: std::sync::atomic::AtomicU64, + pub(crate) client: $crate::rpc::RpcClient, + pub(crate) id: std::sync::atomic::AtomicU64, } impl Clone for $struct_name { @@ -53,13 +82,13 @@ macro_rules! jsonrpc { impl $struct_name { pub fn new(uri: &str) -> Self { - $struct_name { id: 0.into(), client: crate::rpc::RpcClient::new(uri), } + $struct_name { id: 0.into(), client: $crate::rpc::RpcClient::new(uri), } } pub fn post(&self, method:&str, params: PARAM)->Result where - PARAM:serde::ser::Serialize, - RET: serde::de::DeserializeOwned, + PARAM:serde::ser::Serialize + Send + 'static, + RET: serde::de::DeserializeOwned + Send + 'static, { let id = self.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let params_fn = || -> Result<_,_> { @@ -73,7 +102,7 @@ macro_rules! jsonrpc { }; let task = self.client.post(params_fn); - crate::rpc::RUNTIME.block_on(task) + $crate::rpc::block_on(task) } @@ -94,7 +123,7 @@ macro_rules! jsonrpc { }; let task = $selff.client.post(params_fn); - crate::rpc::RUNTIME.block_on(task) + $crate::rpc::block_on(task) } )* } @@ -113,8 +142,8 @@ macro_rules! jsonrpc_async { ) => ( $(#[$struct_attr])* pub struct $struct_name { - client: crate::rpc::RpcClient, - id: std::sync::atomic::AtomicU64, + pub(crate) client: $crate::rpc::RpcClient, + pub(crate) id: std::sync::atomic::AtomicU64, } impl Clone for $struct_name { @@ -128,13 +157,13 @@ macro_rules! jsonrpc_async { impl $struct_name { pub fn new(uri: &str) -> Self { - $struct_name { id: 0.into(), client: crate::rpc::RpcClient::new(uri), } + $struct_name { id: 0.into(), client: $crate::rpc::RpcClient::new(uri), } } - pub fn post(&self, method:&str, params: PARAM)->impl std::future::Future> + pub fn post(&self, method:&str, params: PARAM)->impl std::future::Future> + Send + 'static where - PARAM:serde::ser::Serialize, - RET: serde::de::DeserializeOwned, + PARAM:serde::ser::Serialize + Send + 'static, + RET: serde::de::DeserializeOwned + Send + 'static, { let id = self.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let method = serde_json::json!(method); @@ -196,8 +225,8 @@ impl RpcClient { json_post_params: T, ) -> impl std::future::Future> where - PARAM: serde::ser::Serialize, - RET: serde::de::DeserializeOwned, + PARAM: serde::ser::Serialize + Send + 'static, + RET: serde::de::DeserializeOwned + Send + 'static, T: FnOnce() -> Result, { let url = self.url.clone(); diff --git a/src/test_util.rs b/src/test_util.rs index 77f5f1bc..ec8da58b 100644 --- a/src/test_util.rs +++ b/src/test_util.rs @@ -378,7 +378,7 @@ impl Context { println!("script: {:x}, debug: {}", script_hash, message); }); verifier - .verify(u64::max_value()) + .verify(u64::MAX) .map_err(|err| Error::VerifyScript(format!("Verify script error: {:?}", err))) } @@ -391,9 +391,10 @@ impl Context { } } +#[async_trait::async_trait] impl TransactionDependencyProvider for Context { // For verify certain cell belong to certain transaction - fn get_transaction( + async fn get_transaction_async( &self, tx_hash: &Byte32, ) -> Result { @@ -413,25 +414,34 @@ impl TransactionDependencyProvider for Context { }) } // For get the output information of inputs or cell_deps, those cell should be live cell - fn get_cell(&self, out_point: &OutPoint) -> Result { + async fn get_cell_async( + &self, + out_point: &OutPoint, + ) -> Result { self.get_live_cell(out_point) .map(|(output, _)| output) .ok_or_else(|| TransactionDependencyError::NotFound("cell not found".to_string())) } // For get the output data information of inputs or cell_deps - fn get_cell_data(&self, out_point: &OutPoint) -> Result { + async fn get_cell_data_async( + &self, + out_point: &OutPoint, + ) -> Result { self.get_live_cell(out_point) .map(|(_, data)| data) .ok_or_else(|| TransactionDependencyError::NotFound("cell data not found".to_string())) } // For get the header information of header_deps - fn get_header(&self, _block_hash: &Byte32) -> Result { + async fn get_header_async( + &self, + _block_hash: &Byte32, + ) -> Result { Err(TransactionDependencyError::NotFound( "header not found".to_string(), )) } - fn get_block_extension( + async fn get_block_extension_async( &self, _block_hash: &Byte32, ) -> Result, TransactionDependencyError> { @@ -441,8 +451,12 @@ impl TransactionDependencyProvider for Context { } } +#[async_trait::async_trait] impl HeaderDepResolver for Context { - fn resolve_by_tx(&self, tx_hash: &Byte32) -> Result, anyhow::Error> { + async fn resolve_by_tx_async( + &self, + tx_hash: &Byte32, + ) -> Result, anyhow::Error> { let mut header_opt = None; for item in &self.inputs { if item.input.previous_output().tx_hash() == *tx_hash { @@ -465,7 +479,10 @@ impl HeaderDepResolver for Context { } Ok(None) } - fn resolve_by_number(&self, number: u64) -> Result, anyhow::Error> { + async fn resolve_by_number_async( + &self, + number: u64, + ) -> Result, anyhow::Error> { for mock_header in &self.header_deps { if number == mock_header.number() { return Ok(Some(mock_header.clone())); @@ -503,8 +520,9 @@ impl CellDepResolver for Context { } } +#[async_trait::async_trait] impl CellCollector for LiveCellsContext { - fn collect_live_cells( + async fn collect_live_cells_async( &mut self, query: &CellQueryOptions, apply_changes: bool, diff --git a/src/tests/ckb_rpc.rs b/src/tests/ckb_rpc.rs index f468fd08..73826863 100644 --- a/src/tests/ckb_rpc.rs +++ b/src/tests/ckb_rpc.rs @@ -1,6 +1,6 @@ use crate::rpc::{CkbRpcClient, ResponseFormatGetter}; use ckb_types::{core, h256, prelude::*, H256}; -// use serde_json; +use std::sync::LazyLock; const TEST_CKB_RPC_URL: &str = "https://testnet.ckb.dev"; @@ -11,11 +11,11 @@ const BLOCK_NUMBER: u64 = 7981482; // output '626c6f636b5f686173685f746861745f646f65735f6e6f745f6578697374' const BLOCK_HASH_NOT_EXIST: H256 = h256!("0x626c6f636b5f686173685f746861745f646f65735f6e6f745f65786973740000"); -const BLOCK_NUMBER_NOT_EXIST: u64 = u64::max_value(); +const BLOCK_NUMBER_NOT_EXIST: u64 = u64::MAX; // transaction hash in block 0xd88eb0cf9f6e6f123c733e9aba29dec9cb449965a8adc98216c50d5083b909b1 -lazy_static::lazy_static! { - pub static ref TRANSACTION_HASH_VEC : Vec = - vec! [ + +pub static TRANSACTION_HASH_VEC: LazyLock> = LazyLock::new(|| { + vec![ h256!("0x9ecdbaf1ac656c0e48ab66e7c539b43ad6073c85d17fa590d1d3d9e9525767d2"), h256!("0xb8ba38f579b0aeedc7b9dd5c4c14806079bf7c232f63435e6aa08cca1c100826"), h256!("0xd76f85fb9f87cf3e906846bf32eb34a796b5a3c19dbae9fc3bff0b498974c274"), @@ -27,8 +27,8 @@ lazy_static::lazy_static! { h256!("0xdbcc925afcd73e91c0d91b93943580bbb7a03241d7baef4089d736a1e7b0a4ae"), h256!("0x9f91c8e5c1b6853b5f129eaba6631f5ebb887ef83faae5f5e1801bf2c5515ec0"), h256!("0x56aa6d7ae97c4b2f59790c8856701a75352cd05772155595df07f13682cf5e50"), - ]; -} + ] +}); #[test] fn test_get_block() { @@ -62,7 +62,6 @@ fn test_get_block() { let block_from_types_n = ckb_types::packed::Block::new_unchecked(block.into_bytes()).into_view(); - assert_eq!(block_from_json_n, block_from_types_n); assert_eq!(block_from_json, block_from_json_n); } diff --git a/src/tests/cycle.rs b/src/tests/cycle.rs index 921c3f9f..dc047aab 100644 --- a/src/tests/cycle.rs +++ b/src/tests/cycle.rs @@ -27,12 +27,13 @@ const CYCLE_BIN: &[u8] = include_bytes!("../test-data/cycle"); pub struct CycleUnlocker { loops: u64, } +#[async_trait::async_trait] impl ScriptUnlocker for CycleUnlocker { fn match_args(&self, _args: &[u8]) -> bool { true } - fn unlock( + async fn unlock_async( &self, tx: &TransactionView, script_group: &ScriptGroup, @@ -47,7 +48,7 @@ impl ScriptUnlocker for CycleUnlocker { Ok(tx.as_advanced_builder().set_witnesses(witnesses).build()) } - fn fill_placeholder_witness( + async fn fill_placeholder_witness_async( &self, tx: &TransactionView, _script_group: &ScriptGroup, diff --git a/src/tests/mod.rs b/src/tests/mod.rs index ac3d50b6..84e83f4d 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, u64}; +use std::collections::HashMap; use ckb_dao_utils::pack_dao_data; use ckb_hash::blake2b_256; @@ -382,7 +382,7 @@ fn test_transfer_to_acp() { assert_eq!(tx.header_deps().len(), 0); assert_eq!(tx.cell_deps().len(), 2); assert_eq!(tx.inputs().len(), 3); - let input_cells = vec![ + let input_cells = [ CellOutput::new_builder() .capacity((99 * ONE_CKB).pack()) .lock(receiver.clone()) @@ -496,7 +496,7 @@ fn test_cheque_claim() { assert_eq!(tx.header_deps().len(), 0); assert_eq!(tx.cell_deps().len(), 3); assert_eq!(tx.inputs().len(), 3); - let input_cells = vec![ + let input_cells = [ cheque_output, receiver_output.clone(), CellOutput::new_builder() @@ -599,7 +599,7 @@ fn test_cheque_withdraw() { assert_eq!(tx.header_deps().len(), 0); assert_eq!(tx.cell_deps().len(), 3); assert_eq!(tx.inputs().len(), 2); - let input_cells = vec![ + let input_cells = [ cheque_output.clone(), CellOutput::new_builder() .capacity((100 * ONE_CKB).pack()) diff --git a/src/traits/default_impls.rs b/src/traits/default_impls.rs index dea184fc..a844c5b8 100644 --- a/src/traits/default_impls.rs +++ b/src/traits/default_impls.rs @@ -6,8 +6,8 @@ use std::time::Duration; use anyhow::anyhow; use ckb_crypto::secp::Pubkey; use lru::LruCache; -use parking_lot::Mutex; use thiserror::Error; +use tokio::sync::Mutex; use ckb_hash::blake2b_256; use ckb_jsonrpc_types::{self as json_types, Either}; @@ -24,14 +24,14 @@ use super::{ OffchainTransactionDependencyProvider, }; use crate::rpc::ckb_indexer::{Order, SearchKey, Tip}; -use crate::rpc::{CkbRpcClient, IndexerRpcClient}; +use crate::rpc::{CkbRpcAsyncClient, IndexerRpcClient}; use crate::traits::{ CellCollector, CellCollectorError, CellDepResolver, CellQueryOptions, HeaderDepResolver, LiveCell, QueryOrder, Signer, SignerError, TransactionDependencyError, TransactionDependencyProvider, }; use crate::types::ScriptId; -use crate::util::{get_max_mature_number, serialize_signature, zeroize_privkey}; +use crate::util::{get_max_mature_number_async, serialize_signature, zeroize_privkey}; use crate::SECP256K1; use crate::{ constants::{ @@ -204,35 +204,46 @@ impl CellDepResolver for DefaultCellDepResolver { /// A header_dep resolver use ckb jsonrpc client as backend pub struct DefaultHeaderDepResolver { - ckb_client: CkbRpcClient, + ckb_client: CkbRpcAsyncClient, } impl DefaultHeaderDepResolver { pub fn new(ckb_client: &str) -> DefaultHeaderDepResolver { - let ckb_client = CkbRpcClient::new(ckb_client); + let ckb_client = CkbRpcAsyncClient::new(ckb_client); DefaultHeaderDepResolver { ckb_client } } } + +#[async_trait::async_trait] impl HeaderDepResolver for DefaultHeaderDepResolver { - fn resolve_by_tx(&self, tx_hash: &Byte32) -> Result, anyhow::Error> { + async fn resolve_by_tx_async( + &self, + tx_hash: &Byte32, + ) -> Result, anyhow::Error> { if let Some(block_hash) = self .ckb_client .get_transaction(tx_hash.unpack()) + .await .map_err(|e| anyhow!(e))? .and_then(|tx_with_status| tx_with_status.tx_status.block_hash) { Ok(self .ckb_client .get_header(block_hash) + .await .map_err(Box::new)? .map(Into::into)) } else { Ok(None) } } - fn resolve_by_number(&self, number: u64) -> Result, anyhow::Error> { + async fn resolve_by_number_async( + &self, + number: u64, + ) -> Result, anyhow::Error> { Ok(self .ckb_client .get_header_by_number(number.into()) + .await .map_err(|e| anyhow!(e))? .map(Into::into)) } @@ -242,7 +253,7 @@ impl HeaderDepResolver for DefaultHeaderDepResolver { #[derive(Clone)] pub struct DefaultCellCollector { indexer_client: IndexerRpcClient, - ckb_client: CkbRpcClient, + ckb_client: CkbRpcAsyncClient, offchain: OffchainCellCollector, acceptable_indexer_leftbehind: u64, } @@ -250,7 +261,7 @@ pub struct DefaultCellCollector { impl DefaultCellCollector { pub fn new(ckb_client: &str) -> DefaultCellCollector { let indexer_client = IndexerRpcClient::new(ckb_client); - let ckb_client = CkbRpcClient::new(ckb_client); + let ckb_client = CkbRpcAsyncClient::new(ckb_client); DefaultCellCollector { indexer_client, ckb_client, @@ -268,11 +279,16 @@ impl DefaultCellCollector { self.acceptable_indexer_leftbehind = value; } + /// wrapper check_ckb_chain_async future + pub async fn check_ckb_chain(&mut self) -> Result<(), CellCollectorError> { + crate::rpc::block_on(self.check_ckb_chain_async()) + } /// Check if ckb-indexer synced with ckb node. This will check every 50ms for 100 times (more than 5s in total, since ckb-indexer's poll interval is 2.0s). - pub fn check_ckb_chain(&mut self) -> Result<(), CellCollectorError> { + pub async fn check_ckb_chain_async(&mut self) -> Result<(), CellCollectorError> { let tip_number = self .ckb_client .get_tip_block_number() + .await .map_err(|err| CellCollectorError::Internal(err.into()))?; for _ in 0..100 { @@ -303,19 +319,22 @@ impl DefaultCellCollector { } } +#[async_trait::async_trait] impl CellCollector for DefaultCellCollector { - fn collect_live_cells( + async fn collect_live_cells_async( &mut self, query: &CellQueryOptions, apply_changes: bool, ) -> Result<(Vec, u64), CellCollectorError> { - let max_mature_number = get_max_mature_number(&self.ckb_client) + let max_mature_number = get_max_mature_number_async(&self.ckb_client) + .await .map_err(|err| CellCollectorError::Internal(anyhow!(err)))?; self.offchain.max_mature_number = max_mature_number; let tip_num = self .ckb_client .get_tip_block_number() + .await .map_err(|err| CellCollectorError::Internal(anyhow!(err)))? .value(); let CollectResult { @@ -326,7 +345,7 @@ impl CellCollector for DefaultCellCollector { let mut cells: Vec<_> = cells.into_iter().map(|c| c.0).collect(); if total_capacity < query.min_total_capacity { - self.check_ckb_chain()?; + self.check_ckb_chain().await?; let order = match query.order { QueryOrder::Asc => Order::Asc, QueryOrder::Desc => Order::Desc, @@ -406,7 +425,7 @@ impl CellCollector for DefaultCellCollector { } struct DefaultTxDepProviderInner { - rpc_client: CkbRpcClient, + rpc_client: CkbRpcAsyncClient, tx_cache: LruCache, cell_cache: LruCache, header_cache: LruCache, @@ -431,7 +450,7 @@ impl DefaultTransactionDependencyProvider { /// * `url` is the ckb http jsonrpc server url /// * When `cache_capacity` is 0 for not using cache. pub fn new(url: &str, cache_capacity: usize) -> DefaultTransactionDependencyProvider { - let rpc_client = CkbRpcClient::new(url); + let rpc_client = CkbRpcAsyncClient::new(url); let inner = DefaultTxDepProviderInner { rpc_client, tx_cache: LruCache::new(cache_capacity), @@ -449,7 +468,15 @@ impl DefaultTransactionDependencyProvider { tx: Transaction, tip_block_number: u64, ) -> Result<(), TransactionDependencyError> { - let mut inner = self.inner.lock(); + crate::rpc::block_on(self.apply_tx_async(tx, tip_block_number)) + } + + pub async fn apply_tx_async( + &mut self, + tx: Transaction, + tip_block_number: u64, + ) -> Result<(), TransactionDependencyError> { + let mut inner = self.inner.lock().await; inner.offchain_cache.apply_tx(tx, tip_block_number)?; Ok(()) } @@ -458,7 +485,14 @@ impl DefaultTransactionDependencyProvider { &self, out_point: &OutPoint, ) -> Result<(CellOutput, Bytes), TransactionDependencyError> { - let mut inner = self.inner.lock(); + crate::rpc::block_on(self.get_cell_with_data_async(out_point)) + } + + pub async fn get_cell_with_data_async( + &self, + out_point: &OutPoint, + ) -> Result<(CellOutput, Bytes), TransactionDependencyError> { + let mut inner = self.inner.lock().await; if let Some(pair) = inner.cell_cache.get(out_point) { return Ok(pair.clone()); } @@ -466,6 +500,7 @@ impl DefaultTransactionDependencyProvider { let cell_with_status = inner .rpc_client .get_live_cell(out_point.clone().into(), true) + .await .map_err(|err| TransactionDependencyError::Other(err.into()))?; if cell_with_status.status != "live" { return Err(TransactionDependencyError::Other(anyhow!( @@ -483,22 +518,25 @@ impl DefaultTransactionDependencyProvider { } } +#[async_trait::async_trait] impl TransactionDependencyProvider for DefaultTransactionDependencyProvider { - fn get_transaction( + async fn get_transaction_async( &self, tx_hash: &Byte32, ) -> Result { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().await; if let Some(tx) = inner.tx_cache.get(tx_hash) { return Ok(tx.clone()); } - let ret = inner.offchain_cache.get_transaction(tx_hash); + let ret: Result = + inner.offchain_cache.get_transaction(tx_hash); if ret.is_ok() { return ret; } let tx_with_status = inner .rpc_client .get_transaction(tx_hash.unpack()) + .await .map_err(|err| TransactionDependencyError::Other(err.into()))? .ok_or_else(|| TransactionDependencyError::NotFound("transaction".to_string()))?; if tx_with_status.tx_status.status != json_types::Status::Committed { @@ -516,35 +554,48 @@ impl TransactionDependencyProvider for DefaultTransactionDependencyProvider { inner.tx_cache.put(tx_hash.clone(), tx.clone()); Ok(tx) } - fn get_cell(&self, out_point: &OutPoint) -> Result { + async fn get_cell_async( + &self, + out_point: &OutPoint, + ) -> Result { { - let inner = self.inner.lock(); + let inner = self.inner.lock().await; let ret = inner.offchain_cache.get_cell(out_point); if ret.is_ok() { return ret; } } - self.get_cell_with_data(out_point).map(|(output, _)| output) + self.get_cell_with_data_async(out_point) + .await + .map(|(output, _)| output) } - fn get_cell_data(&self, out_point: &OutPoint) -> Result { + async fn get_cell_data_async( + &self, + out_point: &OutPoint, + ) -> Result { { - let inner = self.inner.lock(); + let inner = self.inner.lock().await; let ret = inner.offchain_cache.get_cell_data(out_point); if ret.is_ok() { return ret; } } - self.get_cell_with_data(out_point) + self.get_cell_with_data_async(out_point) + .await .map(|(_, output_data)| output_data) } - fn get_header(&self, block_hash: &Byte32) -> Result { - let mut inner = self.inner.lock(); + async fn get_header_async( + &self, + block_hash: &Byte32, + ) -> Result { + let mut inner = self.inner.lock().await; if let Some(header) = inner.header_cache.get(block_hash) { return Ok(header.clone()); } let header = inner .rpc_client .get_header(block_hash.unpack()) + .await .map_err(|err| TransactionDependencyError::Other(err.into()))? .map(HeaderView::from) .ok_or_else(|| TransactionDependencyError::NotFound("header".to_string()))?; @@ -552,15 +603,16 @@ impl TransactionDependencyProvider for DefaultTransactionDependencyProvider { Ok(header) } - fn get_block_extension( + async fn get_block_extension_async( &self, block_hash: &Byte32, ) -> Result, TransactionDependencyError> { - let inner = self.inner.lock(); + let inner = self.inner.lock().await; let block = inner .rpc_client .get_block(block_hash.unpack()) + .await .map_err(|err| TransactionDependencyError::Other(err.into()))?; match block { Some(block) => Ok(block.extension.map(ckb_types::packed::Bytes::from)), diff --git a/src/traits/dummy_impls.rs b/src/traits/dummy_impls.rs index df999003..5dd6d01e 100644 --- a/src/traits/dummy_impls.rs +++ b/src/traits/dummy_impls.rs @@ -14,8 +14,9 @@ use anyhow::anyhow; #[derive(Clone, Default)] pub struct DummyCellCollector; +#[async_trait::async_trait] impl CellCollector for DummyCellCollector { - fn collect_live_cells( + async fn collect_live_cells_async( &mut self, _query: &CellQueryOptions, _apply_changes: bool, @@ -47,11 +48,18 @@ impl CellCollector for DummyCellCollector { #[derive(Default)] pub struct DummyHeaderDepResolver; +#[async_trait::async_trait] impl HeaderDepResolver for DummyHeaderDepResolver { - fn resolve_by_tx(&self, _tx_hash: &Byte32) -> Result, anyhow::Error> { + async fn resolve_by_tx_async( + &self, + _tx_hash: &Byte32, + ) -> Result, anyhow::Error> { Err(anyhow!("dummy resolve_by_tx")) } - fn resolve_by_number(&self, _number: u64) -> Result, anyhow::Error> { + async fn resolve_by_number_async( + &self, + _number: u64, + ) -> Result, anyhow::Error> { Err(anyhow!("dummy resolve_by_number")) } } @@ -60,9 +68,10 @@ impl HeaderDepResolver for DummyHeaderDepResolver { #[derive(Default)] pub struct DummyTransactionDependencyProvider; +#[async_trait::async_trait] impl TransactionDependencyProvider for DummyTransactionDependencyProvider { // For verify certain cell belong to certain transaction - fn get_transaction( + async fn get_transaction_async( &self, _tx_hash: &Byte32, ) -> Result { @@ -71,22 +80,31 @@ impl TransactionDependencyProvider for DummyTransactionDependencyProvider { ))) } // For get the output information of inputs or cell_deps, those cell should be live cell - fn get_cell(&self, _out_point: &OutPoint) -> Result { + async fn get_cell_async( + &self, + _out_point: &OutPoint, + ) -> Result { Err(TransactionDependencyError::Other(anyhow!("dummy get_cell"))) } // For get the output data information of inputs or cell_deps - fn get_cell_data(&self, _out_point: &OutPoint) -> Result { + async fn get_cell_data_async( + &self, + _out_point: &OutPoint, + ) -> Result { Err(TransactionDependencyError::Other(anyhow!( "dummy get_cell_data" ))) } // For get the header information of header_deps - fn get_header(&self, _block_hash: &Byte32) -> Result { + async fn get_header_async( + &self, + _block_hash: &Byte32, + ) -> Result { Err(TransactionDependencyError::Other(anyhow!( "dummy get_header" ))) } - fn get_block_extension( + async fn get_block_extension_async( &self, _block_hash: &Byte32, ) -> Result, TransactionDependencyError> { diff --git a/src/traits/light_client_impls.rs b/src/traits/light_client_impls.rs index 95481d0f..d96e1329 100644 --- a/src/traits/light_client_impls.rs +++ b/src/traits/light_client_impls.rs @@ -14,7 +14,7 @@ use ckb_types::{ use super::{offchain_impls::CollectResult, OffchainCellCollector}; use crate::rpc::{ ckb_light_client::{FetchStatus, Order, SearchKey}, - LightClientRpcClient, + LightClientRpcAsyncClient, }; use crate::traits::{ CellCollector, CellCollectorError, CellQueryOptions, HeaderDepResolver, LiveCell, QueryOrder, @@ -22,14 +22,14 @@ use crate::traits::{ }; pub struct LightClientHeaderDepResolver { - client: LightClientRpcClient, + client: LightClientRpcAsyncClient, // tx_hash => HeaderView headers: DashMap>, } impl LightClientHeaderDepResolver { pub fn new(url: &str) -> LightClientHeaderDepResolver { - let client = LightClientRpcClient::new(url); + let client = LightClientRpcAsyncClient::new(url); LightClientHeaderDepResolver { client, headers: DashMap::new(), @@ -42,15 +42,19 @@ impl LightClientHeaderDepResolver { } } +#[async_trait::async_trait] impl HeaderDepResolver for LightClientHeaderDepResolver { - fn resolve_by_tx(&self, tx_hash: &Byte32) -> Result, anyhow::Error> { + async fn resolve_by_tx_async( + &self, + tx_hash: &Byte32, + ) -> Result, anyhow::Error> { if let Some(Some(header)) = self.headers.get(tx_hash).as_ref().map(|pair| pair.value()) { return Ok(Some(header.clone())); } - match self.client.fetch_transaction(tx_hash.unpack())? { + match self.client.fetch_transaction(tx_hash.unpack()).await? { FetchStatus::Fetched { data } => { if let Some(block_hash) = data.tx_status.block_hash { - match self.client.fetch_header(block_hash)? { + match self.client.fetch_header(block_hash).await? { FetchStatus::Fetched { data } => { let header: HeaderView = data.into(); self.headers.insert(tx_hash.clone(), Some(header.clone())); @@ -73,7 +77,10 @@ impl HeaderDepResolver for LightClientHeaderDepResolver { } } - fn resolve_by_number(&self, number: u64) -> Result, anyhow::Error> { + async fn resolve_by_number_async( + &self, + number: u64, + ) -> Result, anyhow::Error> { for pair in self.headers.iter() { if let Some(header) = pair.value() { if header.number() == number { @@ -82,13 +89,13 @@ impl HeaderDepResolver for LightClientHeaderDepResolver { } } Err(anyhow!( - "unable to resolver header by number directly when use light client as backend, you can call resolve_by_tx(tx_hash) to load the header first." - )) + "unable to resolver header by number directly when use light client as backend, you can call resolve_by_tx(tx_hash) to load the header first." + )) } } pub struct LightClientTransactionDependencyProvider { - client: LightClientRpcClient, + client: LightClientRpcAsyncClient, // headers to load headers: DashMap>, // transactions to load @@ -98,7 +105,7 @@ pub struct LightClientTransactionDependencyProvider { impl LightClientTransactionDependencyProvider { pub fn new(url: &str) -> LightClientTransactionDependencyProvider { LightClientTransactionDependencyProvider { - client: LightClientRpcClient::new(url), + client: LightClientRpcAsyncClient::new(url), headers: DashMap::new(), txs: DashMap::new(), } @@ -112,8 +119,9 @@ impl LightClientTransactionDependencyProvider { } } +#[async_trait::async_trait] impl TransactionDependencyProvider for LightClientTransactionDependencyProvider { - fn get_transaction( + async fn get_transaction_async( &self, tx_hash: &Byte32, ) -> Result { @@ -123,6 +131,7 @@ impl TransactionDependencyProvider for LightClientTransactionDependencyProvider match self .client .fetch_transaction(tx_hash.unpack()) + .await .map_err(|err| TransactionDependencyError::Other(anyhow!(err)))? { FetchStatus::Fetched { data } => { @@ -130,6 +139,7 @@ impl TransactionDependencyProvider for LightClientTransactionDependencyProvider match self .client .fetch_header(block_hash) + .await .map_err(|err| TransactionDependencyError::Other(anyhow!(err)))? { FetchStatus::Fetched { data: header_view } => { @@ -174,15 +184,21 @@ impl TransactionDependencyProvider for LightClientTransactionDependencyProvider } } - fn get_cell(&self, out_point: &OutPoint) -> Result { - let tx = self.get_transaction(&out_point.tx_hash())?; + async fn get_cell_async( + &self, + out_point: &OutPoint, + ) -> Result { + let tx = self.get_transaction_async(&out_point.tx_hash()).await?; let output_index: u32 = out_point.index().unpack(); tx.outputs().get(output_index as usize).ok_or_else(|| { TransactionDependencyError::NotFound(format!("invalid output index: {}", output_index)) }) } - fn get_cell_data(&self, out_point: &OutPoint) -> Result { - let tx = self.get_transaction(&out_point.tx_hash())?; + async fn get_cell_data_async( + &self, + out_point: &OutPoint, + ) -> Result { + let tx = self.get_transaction_async(&out_point.tx_hash()).await?; let output_index: u32 = out_point.index().unpack(); tx.outputs_data() .get(output_index as usize) @@ -194,7 +210,10 @@ impl TransactionDependencyProvider for LightClientTransactionDependencyProvider )) }) } - fn get_header(&self, block_hash: &Byte32) -> Result { + async fn get_header_async( + &self, + block_hash: &Byte32, + ) -> Result { if let Some(Some(header)) = self .headers .get(block_hash) @@ -206,6 +225,7 @@ impl TransactionDependencyProvider for LightClientTransactionDependencyProvider match self .client .fetch_header(block_hash.unpack()) + .await .map_err(|err| TransactionDependencyError::Other(anyhow!(err)))? { FetchStatus::Fetched { data } => { @@ -224,7 +244,7 @@ impl TransactionDependencyProvider for LightClientTransactionDependencyProvider } } - fn get_block_extension( + async fn get_block_extension_async( &self, _block_hash: &Byte32, ) -> Result, TransactionDependencyError> { @@ -236,13 +256,13 @@ impl TransactionDependencyProvider for LightClientTransactionDependencyProvider #[derive(Clone)] pub struct LightClientCellCollector { - light_client: LightClientRpcClient, + light_client: LightClientRpcAsyncClient, offchain: OffchainCellCollector, } impl LightClientCellCollector { pub fn new(url: &str) -> LightClientCellCollector { - let light_client = LightClientRpcClient::new(url); + let light_client = LightClientRpcAsyncClient::new(url); LightClientCellCollector { light_client, offchain: OffchainCellCollector::default(), @@ -250,8 +270,9 @@ impl LightClientCellCollector { } } +#[async_trait::async_trait] impl CellCollector for LightClientCellCollector { - fn collect_live_cells( + async fn collect_live_cells_async( &mut self, query: &CellQueryOptions, apply_changes: bool, @@ -261,6 +282,7 @@ impl CellCollector for LightClientCellCollector { let tip_num = self .light_client .get_tip_header() + .await .map_err(|err| CellCollectorError::Internal(anyhow!(err)))? .inner .number @@ -290,6 +312,7 @@ impl CellCollector for LightClientCellCollector { let page = self .light_client .get_cells(search_key.clone(), order.clone(), limit.into(), last_cursor) + .await .map_err(|err| CellCollectorError::Internal(err.into()))?; if page.objects.is_empty() { break; diff --git a/src/traits/mod.rs b/src/traits/mod.rs index 82d68282..0258f510 100644 --- a/src/traits/mod.rs +++ b/src/traits/mod.rs @@ -59,7 +59,7 @@ pub enum SignerError { /// * secp256k1 eth signer /// * RSA signer /// * Hardware wallet signer -pub trait Signer { +pub trait Signer: Send + Sync { /// typecial id are blake160(pubkey) and keccak256(pubkey)[12..20] fn match_id(&self, id: &[u8]) -> bool; @@ -90,24 +90,60 @@ pub enum TransactionDependencyError { /// * inputs /// * cell_deps /// * header_deps +#[async_trait::async_trait] pub trait TransactionDependencyProvider: Sync + Send { + async fn get_transaction_async( + &self, + tx_hash: &Byte32, + ) -> Result; + /// For get the output information of inputs or cell_deps, those cell should be live cell + async fn get_cell_async( + &self, + out_point: &OutPoint, + ) -> Result; + /// For get the output data information of inputs or cell_deps + async fn get_cell_data_async( + &self, + out_point: &OutPoint, + ) -> Result; + /// For get the header information of header_deps + async fn get_header_async( + &self, + block_hash: &Byte32, + ) -> Result; + + /// For get_block_extension + async fn get_block_extension_async( + &self, + block_hash: &Byte32, + ) -> Result, TransactionDependencyError>; /// For verify certain cell belong to certain transaction fn get_transaction( &self, tx_hash: &Byte32, - ) -> Result; + ) -> Result { + crate::rpc::block_on(self.get_transaction_async(tx_hash)) + } /// For get the output information of inputs or cell_deps, those cell should be live cell - fn get_cell(&self, out_point: &OutPoint) -> Result; + fn get_cell(&self, out_point: &OutPoint) -> Result { + crate::rpc::block_on(self.get_cell_async(out_point)) + } /// For get the output data information of inputs or cell_deps - fn get_cell_data(&self, out_point: &OutPoint) -> Result; + fn get_cell_data(&self, out_point: &OutPoint) -> Result { + crate::rpc::block_on(self.get_cell_data_async(out_point)) + } /// For get the header information of header_deps - fn get_header(&self, block_hash: &Byte32) -> Result; + fn get_header(&self, block_hash: &Byte32) -> Result { + crate::rpc::block_on(self.get_header_async(block_hash)) + } /// For get_block_extension fn get_block_extension( &self, block_hash: &Byte32, - ) -> Result, TransactionDependencyError>; + ) -> Result, TransactionDependencyError> { + crate::rpc::block_on(self.get_block_extension_async(block_hash)) + } } // Implement CellDataProvider trait is currently for `DaoCalculator` @@ -384,14 +420,25 @@ impl CellQueryOptions { } } } -pub trait CellCollector: DynClone { + +#[async_trait::async_trait] +pub trait CellCollector: DynClone + Send + Sync { /// Collect live cells by query options, if `apply_changes` is true will /// mark all collected cells as dead cells. - fn collect_live_cells( + async fn collect_live_cells_async( &mut self, query: &CellQueryOptions, apply_changes: bool, ) -> Result<(Vec, u64), CellCollectorError>; + /// Collect live cells by query options, if `apply_changes` is true will + /// mark all collected cells as dead cells. + fn collect_live_cells( + &mut self, + query: &CellQueryOptions, + apply_changes: bool, + ) -> Result<(Vec, u64), CellCollectorError> { + crate::rpc::block_on(self.collect_live_cells_async(query, apply_changes)) + } /// Mark this cell as dead cell fn lock_cell( @@ -410,18 +457,36 @@ pub trait CellCollector: DynClone { fn reset(&mut self); } -pub trait CellDepResolver { +pub trait CellDepResolver: Send + Sync { /// Resolve cell dep by script. /// /// When a new script is added, transaction builders use CellDepResolver to find the corresponding cell deps and add them to the transaction. fn resolve(&self, script: &Script) -> Option; } -pub trait HeaderDepResolver { + +#[async_trait::async_trait] +pub trait HeaderDepResolver: Send + Sync { + /// Resolve header dep by trancation hash + async fn resolve_by_tx_async( + &self, + tx_hash: &Byte32, + ) -> Result, anyhow::Error>; + + /// Resolve header dep by block number + async fn resolve_by_number_async( + &self, + number: u64, + ) -> Result, anyhow::Error>; + /// Resolve header dep by trancation hash - fn resolve_by_tx(&self, tx_hash: &Byte32) -> Result, anyhow::Error>; + fn resolve_by_tx(&self, tx_hash: &Byte32) -> Result, anyhow::Error> { + crate::rpc::block_on(self.resolve_by_tx_async(tx_hash)) + } /// Resolve header dep by block number - fn resolve_by_number(&self, number: u64) -> Result, anyhow::Error>; + fn resolve_by_number(&self, number: u64) -> Result, anyhow::Error> { + crate::rpc::block_on(self.resolve_by_number_async(number)) + } } // test cases make sure new added exception won't breadk `anyhow!(e_variable)` usage, diff --git a/src/traits/offchain_impls.rs b/src/traits/offchain_impls.rs index 4c8c6ceb..8fc551aa 100644 --- a/src/traits/offchain_impls.rs +++ b/src/traits/offchain_impls.rs @@ -37,13 +37,23 @@ pub struct OffchainHeaderDepResolver { pub by_number: HashMap, } +#[async_trait::async_trait] impl HeaderDepResolver for OffchainHeaderDepResolver { - fn resolve_by_tx(&self, tx_hash: &Byte32) -> Result, anyhow::Error> { + async fn resolve_by_tx_async( + &self, + tx_hash: &Byte32, + ) -> Result, anyhow::Error> { let tx_hash: H256 = tx_hash.unpack(); - Ok(self.by_tx_hash.get(&tx_hash).cloned()) + let header = self.by_tx_hash.get(&tx_hash).cloned(); + Ok(header) } - fn resolve_by_number(&self, number: u64) -> Result, anyhow::Error> { - Ok(self.by_number.get(&number).cloned()) + async fn resolve_by_number_async( + &self, + number: u64, + ) -> Result, anyhow::Error> { + let header = self.by_number.get(&number).cloned(); + + Ok(header) } } @@ -218,9 +228,10 @@ impl OffchainTransactionDependencyProvider { } } +#[async_trait::async_trait] impl TransactionDependencyProvider for OffchainTransactionDependencyProvider { // For verify certain cell belong to certain transaction - fn get_transaction( + async fn get_transaction_async( &self, tx_hash: &Byte32, ) -> Result { @@ -231,7 +242,10 @@ impl TransactionDependencyProvider for OffchainTransactionDependencyProvider { .ok_or_else(|| TransactionDependencyError::Other(anyhow!("offchain get_transaction"))) } // For get the output information of inputs or cell_deps, those cell should be live cell - fn get_cell(&self, out_point: &OutPoint) -> Result { + async fn get_cell_async( + &self, + out_point: &OutPoint, + ) -> Result { let tx_hash: H256 = out_point.tx_hash().unpack(); let index: u32 = out_point.index().unpack(); self.cells @@ -240,7 +254,10 @@ impl TransactionDependencyProvider for OffchainTransactionDependencyProvider { .ok_or_else(|| TransactionDependencyError::Other(anyhow!("offchain get_cell"))) } // For get the output data information of inputs or cell_deps - fn get_cell_data(&self, out_point: &OutPoint) -> Result { + async fn get_cell_data_async( + &self, + out_point: &OutPoint, + ) -> Result { let tx_hash: H256 = out_point.tx_hash().unpack(); let index: u32 = out_point.index().unpack(); self.cells @@ -249,13 +266,16 @@ impl TransactionDependencyProvider for OffchainTransactionDependencyProvider { .ok_or_else(|| TransactionDependencyError::Other(anyhow!("offchain get_cell_data"))) } // For get the header information of header_deps - fn get_header(&self, _block_hash: &Byte32) -> Result { + async fn get_header_async( + &self, + _block_hash: &Byte32, + ) -> Result { Err(TransactionDependencyError::Other(anyhow!( "get_header not supported" ))) } - fn get_block_extension( + async fn get_block_extension_async( &self, _block_hash: &Byte32, ) -> Result, TransactionDependencyError> { diff --git a/src/tx_builder/acp.rs b/src/tx_builder/acp.rs index 9370418a..29f414cd 100644 --- a/src/tx_builder/acp.rs +++ b/src/tx_builder/acp.rs @@ -37,8 +37,9 @@ impl AcpTransferBuilder { } } +#[async_trait::async_trait] impl TxBuilder for AcpTransferBuilder { - fn build_base( + async fn build_base_async( &self, cell_collector: &mut dyn CellCollector, cell_dep_resolver: &dyn CellDepResolver, @@ -52,7 +53,9 @@ impl TxBuilder for AcpTransferBuilder { let mut outputs_data = Vec::new(); for receiver in &self.receivers { let query = CellQueryOptions::new_lock(receiver.lock_script.clone()); - let (cells, input_capacity) = cell_collector.collect_live_cells(&query, true)?; + let (cells, input_capacity) = cell_collector + .collect_live_cells_async(&query, true) + .await?; if cells.is_empty() { return Err(TxBuilderError::Other(anyhow!( "can not found cell by lock script: {:?}", diff --git a/src/tx_builder/cheque.rs b/src/tx_builder/cheque.rs index bfd62797..745a57a7 100644 --- a/src/tx_builder/cheque.rs +++ b/src/tx_builder/cheque.rs @@ -43,8 +43,9 @@ impl ChequeClaimBuilder { } } +#[async_trait::async_trait] impl TxBuilder for ChequeClaimBuilder { - fn build_base( + async fn build_base_async( &self, _cell_collector: &mut dyn CellCollector, cell_dep_resolver: &dyn CellDepResolver, @@ -62,10 +63,12 @@ impl TxBuilder for ChequeClaimBuilder { let mut inputs = self.inputs.clone(); inputs.push(self.receiver_input.clone()); - let receiver_input_cell = - tx_dep_provider.get_cell(&self.receiver_input.previous_output())?; - let receiver_input_data = - tx_dep_provider.get_cell_data(&self.receiver_input.previous_output())?; + let receiver_input_cell = tx_dep_provider + .get_cell_async(&self.receiver_input.previous_output()) + .await?; + let receiver_input_data = tx_dep_provider + .get_cell_data_async(&self.receiver_input.previous_output()) + .await?; let receiver_type_script = receiver_input_cell.type_().to_opt().ok_or_else(|| { TxBuilderError::InvalidParameter(anyhow!("receiver input missing type script")) })?; @@ -97,8 +100,8 @@ impl TxBuilder for ChequeClaimBuilder { let mut last_lock_script = None; for input in &self.inputs { let out_point = input.previous_output(); - let input_cell = tx_dep_provider.get_cell(&out_point)?; - let input_data = tx_dep_provider.get_cell_data(&out_point)?; + let input_cell = tx_dep_provider.get_cell_async(&out_point).await?; + let input_data = tx_dep_provider.get_cell_data_async(&out_point).await?; let type_script = receiver_input_cell.type_().to_opt().ok_or_else(|| { TxBuilderError::InvalidParameter(anyhow!( "cheque input missing type script: {}", @@ -207,8 +210,9 @@ impl ChequeWithdrawBuilder { } } +#[async_trait::async_trait] impl TxBuilder for ChequeWithdrawBuilder { - fn build_base( + async fn build_base_async( &self, cell_collector: &mut dyn CellCollector, cell_dep_resolver: &dyn CellDepResolver, @@ -227,8 +231,8 @@ impl TxBuilder for ChequeWithdrawBuilder { let mut cheque_total_amount: u128 = 0; let mut cheque_total_capacity: u64 = 0; for out_point in &self.out_points { - let input_cell = tx_dep_provider.get_cell(out_point)?; - let input_data = tx_dep_provider.get_cell_data(out_point)?; + let input_cell = tx_dep_provider.get_cell_async(out_point).await?; + let input_data = tx_dep_provider.get_cell_data_async(out_point).await?; let lock_script = input_cell.lock(); let type_script = input_cell.type_().to_opt().ok_or_else(|| { TxBuilderError::InvalidParameter(anyhow!( @@ -309,7 +313,9 @@ impl TxBuilder for ChequeWithdrawBuilder { let mut query = CellQueryOptions::new_lock(acp_lock.clone()); query.secondary_script = Some(type_script.clone()); query.data_len_range = Some(ValueRangeOption::new_min(16)); - let (acp_cells, _) = cell_collector.collect_live_cells(&query, true)?; + let (acp_cells, _) = cell_collector + .collect_live_cells_async(&query, true) + .await?; if acp_cells.is_empty() { return Err(TxBuilderError::Other(anyhow!( "can not find acp cell by lock script: {:?}", diff --git a/src/tx_builder/dao.rs b/src/tx_builder/dao.rs index 37c514c6..a5e8ce91 100644 --- a/src/tx_builder/dao.rs +++ b/src/tx_builder/dao.rs @@ -44,8 +44,9 @@ impl DaoDepositBuilder { } } +#[async_trait::async_trait] impl TxBuilder for DaoDepositBuilder { - fn build_base( + async fn build_base_async( &self, _cell_collector: &mut dyn CellCollector, cell_dep_resolver: &dyn CellDepResolver, @@ -120,8 +121,9 @@ impl From> for DaoPrepareBuilder { } } +#[async_trait::async_trait] impl TxBuilder for DaoPrepareBuilder { - fn build_base( + async fn build_base_async( &self, _cell_collector: &mut dyn CellCollector, cell_dep_resolver: &dyn CellDepResolver, @@ -153,10 +155,11 @@ impl TxBuilder for DaoPrepareBuilder { let out_point = input.previous_output(); let tx_hash = out_point.tx_hash(); let deposit_header = header_dep_resolver - .resolve_by_tx(&tx_hash) + .resolve_by_tx_async(&tx_hash) + .await .map_err(TxBuilderError::Other)? .ok_or_else(|| TxBuilderError::ResolveHeaderDepByTxHashFailed(tx_hash.clone()))?; - let input_cell = tx_dep_provider.get_cell(&out_point)?; + let input_cell = tx_dep_provider.get_cell_async(&out_point).await?; if input_cell.type_().to_opt().as_ref() != Some(&dao_type_script) { return Err(TxBuilderError::InvalidParameter(anyhow!( "the input cell has invalid type script" @@ -198,7 +201,7 @@ pub enum DaoWithdrawReceiver { LockScript { script: Script, /// * `fee_rate`: If fee_rate is given, the fee is from withdraw capacity so - /// that no additional input and change cell is needed. + /// that no additional input and change cell is needed. fee_rate: Option, }, Custom { @@ -239,8 +242,9 @@ impl DaoWithdrawBuilder { } } +#[async_trait::async_trait] impl TxBuilder for DaoWithdrawBuilder { - fn build_base( + async fn build_base_async( &self, _cell_collector: &mut dyn CellCollector, cell_dep_resolver: &dyn CellDepResolver, @@ -276,11 +280,12 @@ impl TxBuilder for DaoWithdrawBuilder { { let tx_hash = out_point.tx_hash(); let prepare_header = header_dep_resolver - .resolve_by_tx(&tx_hash) + .resolve_by_tx_async(&tx_hash) + .await .map_err(TxBuilderError::Other)? .ok_or_else(|| TxBuilderError::ResolveHeaderDepByTxHashFailed(tx_hash.clone()))?; prepare_block_hashes.push(prepare_header.hash()); - let input_cell = tx_dep_provider.get_cell(out_point)?; + let input_cell = tx_dep_provider.get_cell_async(out_point).await?; if input_cell.type_().to_opt().as_ref() != Some(&dao_type_script) { return Err(TxBuilderError::InvalidParameter(anyhow!( "the input cell has invalid type script" @@ -289,7 +294,7 @@ impl TxBuilder for DaoWithdrawBuilder { let input_lock_cell_dep = cell_dep_resolver .resolve(&input_cell.lock()) .ok_or_else(|| TxBuilderError::ResolveCellDepFailed(input_cell.lock()))?; - let data = tx_dep_provider.get_cell_data(out_point)?; + let data = tx_dep_provider.get_cell_data_async(out_point).await?; if data.len() != 8 { return Err(TxBuilderError::InvalidParameter(anyhow!( "the input cell has invalid data length, expected: 8, got: {}", @@ -301,21 +306,29 @@ impl TxBuilder for DaoWithdrawBuilder { number_bytes.copy_from_slice(data.as_ref()); u64::from_le_bytes(number_bytes) }; - let deposit_header = header_dep_resolver - .resolve_by_number(deposit_number) - .or_else(|_err| { + let deposit_header = match header_dep_resolver + .resolve_by_number_async(deposit_number) + .await + { + Err(_) => { // for light client - let prepare_tx = tx_dep_provider.get_transaction(&tx_hash)?; + let prepare_tx = tx_dep_provider.get_transaction_async(&tx_hash).await?; for input in prepare_tx.inputs() { let _ = header_dep_resolver - .resolve_by_tx(&input.previous_output().tx_hash())?; + .resolve_by_tx_async(&input.previous_output().tx_hash()) + .await + .map_err(TxBuilderError::Other)?; } - header_dep_resolver.resolve_by_number(deposit_number) - }) - .map_err(TxBuilderError::Other)? - .ok_or(TxBuilderError::ResolveHeaderDepByNumberFailed( - deposit_number, - ))?; + header_dep_resolver + .resolve_by_number_async(deposit_number) + .await + .map_err(TxBuilderError::Other)? + } + Ok(i) => i, + } + .ok_or(TxBuilderError::ResolveHeaderDepByNumberFailed( + deposit_number, + ))?; let input = { let unlock_point = minimal_unlock_point(&deposit_header, &prepare_header); let since = Since::new( diff --git a/src/tx_builder/mod.rs b/src/tx_builder/mod.rs index d6482f85..e1f0da42 100644 --- a/src/tx_builder/mod.rs +++ b/src/tx_builder/mod.rs @@ -81,9 +81,10 @@ pub enum TxBuilderError { } /// Transaction Builder interface -pub trait TxBuilder { +#[async_trait::async_trait] +pub trait TxBuilder: Send + Sync { /// Build base transaction - fn build_base( + async fn build_base_async( &self, cell_collector: &mut dyn CellCollector, cell_dep_resolver: &dyn CellDepResolver, @@ -91,11 +92,27 @@ pub trait TxBuilder { tx_dep_provider: &dyn TransactionDependencyProvider, ) -> Result; + /// Build base transaction + fn build_base( + &self, + cell_collector: &mut dyn CellCollector, + cell_dep_resolver: &dyn CellDepResolver, + header_dep_resolver: &dyn HeaderDepResolver, + tx_dep_provider: &dyn TransactionDependencyProvider, + ) -> Result { + crate::rpc::block_on(self.build_base_async( + cell_collector, + cell_dep_resolver, + header_dep_resolver, + tx_dep_provider, + )) + } + /// Build balanced transaction that ready to sign: /// * Build base transaction /// * Fill placeholder witness for lock script /// * balance the capacity - fn build_balanced( + async fn build_balanced_async( &self, cell_collector: &mut dyn CellCollector, cell_dep_resolver: &dyn CellDepResolver, @@ -104,12 +121,14 @@ pub trait TxBuilder { balancer: &CapacityBalancer, unlockers: &HashMap>, ) -> Result { - let base_tx = self.build_base( - cell_collector, - cell_dep_resolver, - header_dep_resolver, - tx_dep_provider, - )?; + let base_tx = self + .build_base_async( + cell_collector, + cell_dep_resolver, + header_dep_resolver, + tx_dep_provider, + ) + .await?; let (tx_filled_witnesses, _) = fill_placeholder_witnesses(base_tx, tx_dep_provider, unlockers)?; Ok(balance_tx_capacity( @@ -122,6 +141,25 @@ pub trait TxBuilder { )?) } + fn build_balanced( + &self, + cell_collector: &mut dyn CellCollector, + cell_dep_resolver: &dyn CellDepResolver, + header_dep_resolver: &dyn HeaderDepResolver, + tx_dep_provider: &dyn TransactionDependencyProvider, + balancer: &CapacityBalancer, + unlockers: &HashMap>, + ) -> Result { + crate::rpc::block_on(self.build_balanced_async( + cell_collector, + cell_dep_resolver, + header_dep_resolver, + tx_dep_provider, + balancer, + unlockers, + )) + } + /// Build unlocked transaction that ready to send or for further unlock: /// * build base transaction /// * balance the capacity @@ -642,7 +680,7 @@ impl< verifier.set_debug_printer(|script_hash, message| { println!("script: {:x}, debug: {}", script_hash, message); }); - verifier.verify(u64::max_value()).map_err(|err| { + verifier.verify(u64::MAX).map_err(|err| { BalanceTxCapacityError::VerifyScript(format!("Verify script error : {:?}", err)) }) } diff --git a/src/tx_builder/omni_lock.rs b/src/tx_builder/omni_lock.rs index 2662f030..68ef04fc 100644 --- a/src/tx_builder/omni_lock.rs +++ b/src/tx_builder/omni_lock.rs @@ -35,8 +35,9 @@ impl OmniLockTransferBuilder { } } +#[async_trait::async_trait] impl TxBuilder for OmniLockTransferBuilder { - fn build_base( + async fn build_base_async( &self, _cell_collector: &mut dyn CellCollector, cell_dep_resolver: &dyn CellDepResolver, @@ -70,7 +71,7 @@ impl TxBuilder for OmniLockTransferBuilder { .previous_output(cell.clone()) .build(); inputs.insert(input); - let cell_output = tx_dep_provider.get_cell(cell)?; + let cell_output = tx_dep_provider.get_cell_async(cell).await?; // extract lock dep let lock = cell_output.lock(); if let Some(cell_dep) = cell_dep_resolver.resolve(&lock) { diff --git a/src/tx_builder/transfer.rs b/src/tx_builder/transfer.rs index 7f960e6b..649ace72 100644 --- a/src/tx_builder/transfer.rs +++ b/src/tx_builder/transfer.rs @@ -25,8 +25,9 @@ impl CapacityTransferBuilder { } } +#[async_trait::async_trait] impl TxBuilder for CapacityTransferBuilder { - fn build_base( + async fn build_base_async( &self, _cell_collector: &mut dyn CellCollector, cell_dep_resolver: &dyn CellDepResolver, diff --git a/src/tx_builder/udt/mod.rs b/src/tx_builder/udt/mod.rs index 4c600f1e..60479455 100644 --- a/src/tx_builder/udt/mod.rs +++ b/src/tx_builder/udt/mod.rs @@ -84,6 +84,15 @@ impl UdtTargetReceiver { type_script: &Script, cell_collector: &mut dyn CellCollector, cell_dep_resolver: &dyn CellDepResolver, + ) -> Result { + crate::rpc::block_on(self.build_async(type_script, cell_collector, cell_dep_resolver)) + } + + pub async fn build_async( + &self, + type_script: &Script, + cell_collector: &mut dyn CellCollector, + cell_dep_resolver: &dyn CellDepResolver, ) -> Result { match self.action { TransferAction::Create => { @@ -137,8 +146,9 @@ impl UdtTargetReceiver { query.data_len_range = Some(ValueRangeOption::new_min(16)); query }; - let (receiver_cells, _) = - cell_collector.collect_live_cells(&receiver_query, true)?; + let (receiver_cells, _) = cell_collector + .collect_live_cells_async(&receiver_query, true) + .await?; if receiver_cells.is_empty() { return Err(TxBuilderError::Other(anyhow!( "update receiver cell failed, cell not found, lock={:?}", @@ -191,8 +201,9 @@ pub struct UdtIssueBuilder { pub receivers: Vec, } +#[async_trait::async_trait] impl TxBuilder for UdtIssueBuilder { - fn build_base( + async fn build_base_async( &self, cell_collector: &mut dyn CellCollector, cell_dep_resolver: &dyn CellDepResolver, @@ -207,7 +218,9 @@ impl TxBuilder for UdtIssueBuilder { query }; - let (owner_cells, _) = cell_collector.collect_live_cells(&owner_query, true)?; + let (owner_cells, _) = cell_collector + .collect_live_cells_async(&owner_query, true) + .await?; if owner_cells.is_empty() { return Err(TxBuilderError::Other(anyhow!("owner cell not found"))); } @@ -238,7 +251,9 @@ impl TxBuilder for UdtIssueBuilder { input, output, output_data, - } = receiver.build(&type_script, cell_collector, cell_dep_resolver)?; + } = receiver + .build_async(&type_script, cell_collector, cell_dep_resolver) + .await?; if let Some((input, input_lock_cell_dep)) = input { inputs.push(input); cell_deps.insert(input_lock_cell_dep); @@ -267,8 +282,9 @@ pub struct UdtTransferBuilder { pub receivers: Vec, } +#[async_trait::async_trait] impl TxBuilder for UdtTransferBuilder { - fn build_base( + async fn build_base_async( &self, cell_collector: &mut dyn CellCollector, cell_dep_resolver: &dyn CellDepResolver, @@ -281,7 +297,9 @@ impl TxBuilder for UdtTransferBuilder { query.data_len_range = Some(ValueRangeOption::new_min(16)); query }; - let (sender_cells, _) = cell_collector.collect_live_cells(&sender_query, true)?; + let (sender_cells, _) = cell_collector + .collect_live_cells_async(&sender_query, true) + .await?; if sender_cells.is_empty() { return Err(TxBuilderError::Other(anyhow!("sender cell not found"))); } @@ -326,7 +344,9 @@ impl TxBuilder for UdtTransferBuilder { input, output, output_data, - } = receiver.build(&self.type_script, cell_collector, cell_dep_resolver)?; + } = receiver + .build_async(&self.type_script, cell_collector, cell_dep_resolver) + .await?; if let Some((input, input_lock_cell_dep)) = input { inputs.push(input); cell_deps.insert(input_lock_cell_dep); diff --git a/src/unlock/rc_data.rs b/src/unlock/rc_data.rs index 09d6c84d..f572dbaa 100644 --- a/src/unlock/rc_data.rs +++ b/src/unlock/rc_data.rs @@ -1,5 +1,3 @@ -use lazy_static::lazy_static; - use sparse_merkle_tree::{default_store::DefaultStore, SparseMerkleTree, H256 as SmtH256}; use crate::types::xudt_rce_mol::{ @@ -12,16 +10,20 @@ use ckb_types::{molecule, prelude::*}; use sparse_merkle_tree::traits::Hasher; use thiserror::Error; -lazy_static! { - pub static ref SMT_EXISTING: SmtH256 = SmtH256::from([ +use std::sync::LazyLock; + +pub static SMT_EXISTING: LazyLock = LazyLock::new(|| { + SmtH256::from([ 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - ]); - pub static ref SMT_NOT_EXISTING: SmtH256 = SmtH256::from([ + ]) +}); +pub static SMT_NOT_EXISTING: LazyLock = LazyLock::new(|| { + SmtH256::from([ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - ]); -} + ]) +}); #[allow(clippy::upper_case_acronyms)] type SMT = SparseMerkleTree>; diff --git a/src/unlock/unlocker.rs b/src/unlock/unlocker.rs index 9ff74160..6caec562 100644 --- a/src/unlock/unlocker.rs +++ b/src/unlock/unlocker.rs @@ -48,11 +48,12 @@ pub enum UnlockError { /// * Put extra unlock information into transaction (e.g. SMT proof in omni-lock case) /// /// See example in `examples/script_unlocker_example.rs` -pub trait ScriptUnlocker { +#[async_trait::async_trait] +pub trait ScriptUnlocker: Sync + Send { fn match_args(&self, args: &[u8]) -> bool; /// Check if the script group is already unlocked - fn is_unlocked( + async fn is_unlocked_async( &self, _tx: &TransactionView, _script_group: &ScriptGroup, @@ -61,15 +62,33 @@ pub trait ScriptUnlocker { Ok(false) } + fn is_unlocked( + &self, + tx: &TransactionView, + script_group: &ScriptGroup, + tx_dep_provider: &dyn TransactionDependencyProvider, + ) -> Result { + crate::rpc::block_on(self.is_unlocked_async(tx, script_group, tx_dep_provider)) + } + /// Add signature or other information to witnesses, when the script is /// already unlocked should reset the witness instead. - fn unlock( + async fn unlock_async( &self, tx: &TransactionView, script_group: &ScriptGroup, tx_dep_provider: &dyn TransactionDependencyProvider, ) -> Result; + fn unlock( + &self, + tx: &TransactionView, + script_group: &ScriptGroup, + tx_dep_provider: &dyn TransactionDependencyProvider, + ) -> Result { + crate::rpc::block_on(self.unlock_async(tx, script_group, tx_dep_provider)) + } + fn clear_placeholder_witness( &self, tx: &TransactionView, @@ -79,12 +98,21 @@ pub trait ScriptUnlocker { } /// Fill a placehodler witness before balance the transaction capacity - fn fill_placeholder_witness( + async fn fill_placeholder_witness_async( &self, tx: &TransactionView, script_group: &ScriptGroup, tx_dep_provider: &dyn TransactionDependencyProvider, ) -> Result; + + fn fill_placeholder_witness( + &self, + tx: &TransactionView, + script_group: &ScriptGroup, + tx_dep_provider: &dyn TransactionDependencyProvider, + ) -> Result { + crate::rpc::block_on(self.fill_placeholder_witness_async(tx, script_group, tx_dep_provider)) + } } pub fn fill_witness_lock( @@ -151,12 +179,13 @@ impl From> for SecpSighashUnlocker { SecpSighashUnlocker::new(SecpSighashScriptSigner::new(signer)) } } +#[async_trait::async_trait] impl ScriptUnlocker for SecpSighashUnlocker { fn match_args(&self, args: &[u8]) -> bool { self.signer.match_args(args) } - fn unlock( + async fn unlock_async( &self, tx: &TransactionView, script_group: &ScriptGroup, @@ -165,7 +194,7 @@ impl ScriptUnlocker for SecpSighashUnlocker { Ok(self.signer.sign_tx(tx, script_group)?) } - fn fill_placeholder_witness( + async fn fill_placeholder_witness_async( &self, tx: &TransactionView, script_group: &ScriptGroup, @@ -188,12 +217,13 @@ impl From<(Box, MultisigConfig)> for SecpMultisigUnlocker { SecpMultisigUnlocker::new(SecpMultisigScriptSigner::new(signer, config)) } } +#[async_trait::async_trait] impl ScriptUnlocker for SecpMultisigUnlocker { fn match_args(&self, args: &[u8]) -> bool { (args.len() == 20 || args.len() == 28) && self.signer.match_args(args) } - fn unlock( + async fn unlock_async( &self, tx: &TransactionView, script_group: &ScriptGroup, @@ -202,7 +232,7 @@ impl ScriptUnlocker for SecpMultisigUnlocker { Ok(self.signer.sign_tx(tx, script_group)?) } - fn fill_placeholder_witness( + async fn fill_placeholder_witness_async( &self, tx: &TransactionView, script_group: &ScriptGroup, @@ -231,7 +261,7 @@ impl From> for AcpUnlocker { } } -fn acp_is_unlocked( +async fn acp_is_unlocked( tx: &TransactionView, script_group: &ScriptGroup, tx_dep_provider: &dyn TransactionDependencyProvider, @@ -288,42 +318,44 @@ fn acp_is_unlocked( udt_amount: u128, output_cnt: usize, } - let mut input_wallets = script_group - .input_indices - .iter() - .map(|idx| { - let input = tx - .inputs() - .get(*idx) - .ok_or_else(|| anyhow!("input index in script group is out of bound: {}", idx))?; - let output = tx_dep_provider.get_cell(&input.previous_output())?; - let output_data = tx_dep_provider.get_cell_data(&input.previous_output())?; - - let type_hash_opt = output - .type_() - .to_opt() - .map(|script| script.calc_script_hash()); - if type_hash_opt.is_some() && output_data.len() < 16 { - return Err(UnlockError::Other(anyhow!( - "invalid udt output data in input cell: {:?}", - input - ))); - } - let udt_amount = if type_hash_opt.is_some() { - let mut amount_bytes = [0u8; 16]; - amount_bytes.copy_from_slice(&output_data[0..16]); - u128::from_le_bytes(amount_bytes) - } else { - 0 - }; - Ok(InputWallet { - type_hash_opt, - ckb_amount: output.capacity().unpack(), - udt_amount, - output_cnt: 0, - }) + let mut input_wallets = Vec::new(); + + for idx in script_group.input_indices.iter() { + let input = tx + .inputs() + .get(*idx) + .ok_or_else(|| anyhow!("input index in script group is out of bound: {}", idx))?; + let output = tx_dep_provider + .get_cell_async(&input.previous_output()) + .await?; + let output_data = tx_dep_provider + .get_cell_data_async(&input.previous_output()) + .await?; + + let type_hash_opt = output + .type_() + .to_opt() + .map(|script| script.calc_script_hash()); + if type_hash_opt.is_some() && output_data.len() < 16 { + return Err(UnlockError::Other(anyhow!( + "invalid udt output data in input cell: {:?}", + input + ))); + } + let udt_amount = if type_hash_opt.is_some() { + let mut amount_bytes = [0u8; 16]; + amount_bytes.copy_from_slice(&output_data[0..16]); + u128::from_le_bytes(amount_bytes) + } else { + 0 + }; + input_wallets.push(InputWallet { + type_hash_opt, + ckb_amount: output.capacity().unpack(), + udt_amount, + output_cnt: 0, }) - .collect::, UnlockError>>()?; + } for (output_idx, output) in tx.outputs().into_iter().enumerate() { if output.lock() != script_group.script { @@ -402,12 +434,13 @@ fn acp_is_unlocked( Ok(true) } +#[async_trait::async_trait] impl ScriptUnlocker for AcpUnlocker { fn match_args(&self, args: &[u8]) -> bool { self.signer.match_args(args) } - fn is_unlocked( + async fn is_unlocked_async( &self, tx: &TransactionView, script_group: &ScriptGroup, @@ -422,16 +455,19 @@ impl ScriptUnlocker for AcpUnlocker { &[] } }; - acp_is_unlocked(tx, script_group, tx_dep_provider, acp_args) + acp_is_unlocked(tx, script_group, tx_dep_provider, acp_args).await } - fn unlock( + async fn unlock_async( &self, tx: &TransactionView, script_group: &ScriptGroup, tx_dep_provider: &dyn TransactionDependencyProvider, ) -> Result { - if self.is_unlocked(tx, script_group, tx_dep_provider)? { + if self + .is_unlocked_async(tx, script_group, tx_dep_provider) + .await? + { self.clear_placeholder_witness(tx, script_group) } else { Ok(self.signer.sign_tx(tx, script_group)?) @@ -447,13 +483,16 @@ impl ScriptUnlocker for AcpUnlocker { .map_err(UnlockError::InvalidWitnessArgs) } - fn fill_placeholder_witness( + async fn fill_placeholder_witness_async( &self, tx: &TransactionView, script_group: &ScriptGroup, tx_dep_provider: &dyn TransactionDependencyProvider, ) -> Result { - if self.is_unlocked(tx, script_group, tx_dep_provider)? { + if self + .is_unlocked_async(tx, script_group, tx_dep_provider) + .await? + { Ok(tx.clone()) } else { fill_witness_lock(tx, script_group, Bytes::from(vec![0u8; 65])) @@ -475,12 +514,13 @@ impl From<(Box, ChequeAction)> for ChequeUnlocker { } } +#[async_trait::async_trait] impl ScriptUnlocker for ChequeUnlocker { fn match_args(&self, args: &[u8]) -> bool { self.signer.match_args(args) } - fn is_unlocked( + async fn is_unlocked_async( &self, tx: &TransactionView, script_group: &ScriptGroup, @@ -506,7 +546,9 @@ impl ScriptUnlocker for ChequeUnlocker { let mut receiver_lock_witness = None; let mut sender_lock_witness = None; for (input_idx, input) in inputs.into_iter().enumerate() { - let output = tx_dep_provider.get_cell(&input.previous_output())?; + let output = tx_dep_provider + .get_cell_async(&input.previous_output()) + .await?; let lock_hash = output.lock().calc_script_hash(); let lock_hash_prefix = &lock_hash.as_slice()[0..20]; let witness = tx @@ -571,13 +613,16 @@ impl ScriptUnlocker for ChequeUnlocker { Ok(false) } - fn unlock( + async fn unlock_async( &self, tx: &TransactionView, script_group: &ScriptGroup, tx_dep_provider: &dyn TransactionDependencyProvider, ) -> Result { - if self.is_unlocked(tx, script_group, tx_dep_provider)? { + if self + .is_unlocked_async(tx, script_group, tx_dep_provider) + .await? + { self.clear_placeholder_witness(tx, script_group) } else { Ok(self.signer.sign_tx(tx, script_group)?) @@ -593,13 +638,16 @@ impl ScriptUnlocker for ChequeUnlocker { .map_err(UnlockError::InvalidWitnessArgs) } - fn fill_placeholder_witness( + async fn fill_placeholder_witness_async( &self, tx: &TransactionView, script_group: &ScriptGroup, tx_dep_provider: &dyn TransactionDependencyProvider, ) -> Result { - if self.is_unlocked(tx, script_group, tx_dep_provider)? { + if self + .is_unlocked_async(tx, script_group, tx_dep_provider) + .await? + { Ok(tx.clone()) } else { fill_witness_lock(tx, script_group, Bytes::from(vec![0u8; 65])) @@ -624,13 +672,14 @@ impl From<(Box, OmniLockConfig, OmniUnlockMode)> for OmniLockUnlocke OmniLockUnlocker::new(OmniLockScriptSigner::new(signer, config, unlock_mode), cfg) } } +#[async_trait::async_trait] impl ScriptUnlocker for OmniLockUnlocker { fn match_args(&self, args: &[u8]) -> bool { self.signer.match_args(args) } /// Check if the script group is already unlocked - fn is_unlocked( + async fn is_unlocked_async( &self, tx: &TransactionView, script_group: &ScriptGroup, @@ -650,7 +699,7 @@ impl ScriptUnlocker for OmniLockUnlocker { &[] } }; - let acp_unlocked = acp_is_unlocked(tx, script_group, tx_dep_provider, acp_args)?; + let acp_unlocked = acp_is_unlocked(tx, script_group, tx_dep_provider, acp_args).await?; if acp_unlocked { return Ok(true); } @@ -683,20 +732,25 @@ impl ScriptUnlocker for OmniLockUnlocker { inputs.len() ))); } - let matched = tx + let mut matched = false; + for (_idx, input) in tx .inputs() .into_iter() .enumerate() .filter(|(idx, _input)| !script_group.input_indices.contains(idx)) - .any(|(_idx, input)| { - if let Ok(output) = tx_dep_provider.get_cell(&input.previous_output()) { - let lock_hash = output.calc_lock_hash(); - let h = &lock_hash.as_slice()[0..20]; - h == auth_content.as_bytes() - } else { - false + { + if let Ok(output) = tx_dep_provider + .get_cell_async(&input.previous_output()) + .await + { + let lock_hash = output.calc_lock_hash(); + let h = &lock_hash.as_slice()[0..20]; + if h == auth_content.as_bytes() { + matched = true; + break; } - }); + } + } if !matched { return Err(UnlockError::Other(anyhow!( "can not find according owner lock input" @@ -705,7 +759,7 @@ impl ScriptUnlocker for OmniLockUnlocker { Ok(matched) } - fn unlock( + async fn unlock_async( &self, tx: &TransactionView, script_group: &ScriptGroup, @@ -714,7 +768,7 @@ impl ScriptUnlocker for OmniLockUnlocker { Ok(self.signer.sign_tx(tx, script_group)?) } - fn fill_placeholder_witness( + async fn fill_placeholder_witness_async( &self, tx: &TransactionView, script_group: &ScriptGroup, diff --git a/src/util.rs b/src/util.rs index 9148f4ab..cc754182 100644 --- a/src/util.rs +++ b/src/util.rs @@ -9,7 +9,7 @@ use ckb_types::{ }; use sha3::{Digest, Keccak256}; -use crate::rpc::CkbRpcClient; +use crate::rpc::{CkbRpcAsyncClient, CkbRpcClient}; use crate::traits::LiveCell; use secp256k1::ffi::CPtr; @@ -30,15 +30,21 @@ pub fn zeroize_slice(data: &mut [u8]) { } pub fn get_max_mature_number(rpc_client: &CkbRpcClient) -> Result { + crate::rpc::block_on(get_max_mature_number_async(&rpc_client.into())) +} + +pub async fn get_max_mature_number_async(rpc_client: &CkbRpcAsyncClient) -> Result { let cellbase_maturity = EpochNumberWithFraction::from_full_value( rpc_client .get_consensus() + .await .map_err(|err| err.to_string())? .cellbase_maturity .value(), ); let tip_epoch = rpc_client .get_tip_header() + .await .map(|header| EpochNumberWithFraction::from_full_value(header.inner.epoch.value())) .map_err(|err| err.to_string())?; @@ -61,6 +67,7 @@ pub fn get_max_mature_number(rpc_client: &CkbRpcClient) -> Result { .into(); let max_mature_epoch = rpc_client .get_epoch_by_number(epoch_number) + .await .map_err(|err| err.to_string())? .ok_or_else(|| "Can not get epoch less than current epoch number".to_string())?;