Skip to content

Commit

Permalink
feat: support async api
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Dec 23, 2024
1 parent 8e6217d commit 8cd654b
Show file tree
Hide file tree
Showing 28 changed files with 669 additions and 271 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ log = "0.4.6"
reqwest = { version = "0.12", default-features = false, features = ["json"] }
secp256k1 = { version = "0.29.0", features = ["recovery"] }
tokio-util = { version = "0.7.7", features = ["codec"] }
tokio = { version = "1" }
tokio = { version = "1", features = ["full"] }
bytes = "1"
futures = "0.3"
jsonrpc-core = "18"
parking_lot = "0.12"
lru = "0.7.1"
dashmap = "5.4"
dyn-clone = "1.0"
async-trait = "0.1"

ckb-types = "0.119.0"
ckb-dao-utils = "0.119.0"
Expand All @@ -50,7 +51,6 @@ ckb-mock-tx-types = { version = "0.119.0" }
ckb-chain-spec = "0.119.0"

sparse-merkle-tree = "0.6.1"
lazy_static = "1.3.0"

[features]
default = ["default-tls"]
Expand Down
7 changes: 4 additions & 3 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ ignore = [
#{ id = "RUSTSEC-0000-0000", reason = "you can specify a reason the advisory is ignored" },
#"[email protected]", # you can also ignore yanked crate versions if you wish
#{ crate = "[email protected]", reason = "you can specify why you are ignoring the yanked crate"
"RUSTSEC-2024-0370" # proc-macro-error's maintainer seems to be unreachable, ignore this
"RUSTSEC-2024-0370", # proc-macro-error's maintainer seems to be unreachable, ignore this
"RUSTSEC-2024-0384", # instant is no longer maintained, ignore this
]
# If this is true, then cargo deny will use the git executable to fetch advisory database.
# If this is false, then it uses a built-in git library.
Expand All @@ -97,8 +98,8 @@ allow = [
"ISC",
"MIT",
"Unicode-DFS-2016",
"BSL-1.0", # xxhash-rust 0.8.10

"BSL-1.0", # xxhash-rust 0.8.10
"Unicode-3.0",
#"MIT",
#"Apache-2.0",
#"Apache-2.0 WITH LLVM-exception",
Expand Down
19 changes: 11 additions & 8 deletions examples/script_unlocker_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ use std::collections::HashMap;
/// [CapacityDiff]: https://github.com/doitian/ckb-sdk-examples-capacity-diff
struct CapacityDiffUnlocker {}

#[async_trait::async_trait]
impl ScriptUnlocker for CapacityDiffUnlocker {
// This works for any args
fn match_args(&self, _args: &[u8]) -> bool {
true
}

fn unlock(
async fn unlock_async(
&self,
tx: &TransactionView,
script_group: &ScriptGroup,
Expand All @@ -45,12 +46,14 @@ impl ScriptUnlocker for CapacityDiffUnlocker {

let mut total = 0i64;
for i in &script_group.input_indices {
let cell = tx_dep_provider.get_cell(
&tx.inputs()
.get(*i)
.ok_or_else(|| other_unlock_error("input index out of bound"))?
.previous_output(),
)?;
let cell = tx_dep_provider
.get_cell_async(
&tx.inputs()
.get(*i)
.ok_or_else(|| other_unlock_error("input index out of bound"))?
.previous_output(),
)
.await?;
let capacity: u64 = cell.capacity().unpack();
total -= capacity as i64;
}
Expand All @@ -71,7 +74,7 @@ impl ScriptUnlocker for CapacityDiffUnlocker {

// This is called before balancer. It's responsible to fill witness for inputs added manually
// by users.
fn fill_placeholder_witness(
async fn fill_placeholder_witness_async(
&self,
tx: &TransactionView,
script_group: &ScriptGroup,
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.75.0
1.81.0
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub mod test_util;
#[cfg(test)]
mod tests;

pub use rpc::{CkbRpcClient, IndexerRpcClient, RpcError};
pub use rpc::{CkbRpcAsyncClient, CkbRpcClient, IndexerRpcAsyncClient, IndexerRpcClient, RpcError};
pub use types::{
Address, AddressPayload, AddressType, CodeHashIndex, HumanCapacity, NetworkInfo, NetworkType,
OldAddress, OldAddressFormat, ScriptGroup, ScriptGroupType, ScriptId, Since, SinceType,
Expand Down
11 changes: 10 additions & 1 deletion src/rpc/ckb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ crate::jsonrpc!(pub struct CkbRpcClient {
pub fn calculate_dao_maximum_withdraw(&self, out_point: OutPoint, kind: DaoWithdrawingCalculationKind) -> Capacity;
});

crate::jsonrpc_async!(pub struct CkbRpcAyncClient {
crate::jsonrpc_async!(pub struct CkbRpcAsyncClient {
// Chain
pub fn get_block(&self, hash: H256) -> Option<BlockView>;
pub fn get_block_by_number(&self, number: BlockNumber) -> Option<BlockView>;
Expand Down Expand Up @@ -212,6 +212,15 @@ fn transform_cycles(cycles: Option<Vec<ckb_jsonrpc_types::Cycle>>) -> Vec<Cycle>
.unwrap_or_default()
}

impl From<&CkbRpcClient> for CkbRpcAsyncClient {
fn from(value: &CkbRpcClient) -> Self {
Self {
client: value.client.clone(),
id: 0.into(),
}
}
}

impl CkbRpcClient {
pub fn get_packed_block(&self, hash: H256) -> Result<Option<JsonBytes>, crate::RpcError> {
self.post("get_block", (hash, Some(Uint32::from(0u32))))
Expand Down
9 changes: 9 additions & 0 deletions src/rpc/ckb_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,12 @@ crate::jsonrpc_async!(pub struct IndexerRpcAsyncClient {
pub fn get_transactions(&self, search_key: SearchKey, order: Order, limit: Uint32, after: Option<JsonBytes>) -> Pagination<Tx>;
pub fn get_cells_capacity(&self, search_key: SearchKey) -> Option<CellsCapacity>;
});

impl From<&IndexerRpcClient> for IndexerRpcAsyncClient {
fn from(value: &IndexerRpcClient) -> Self {
Self {
client: value.client.clone(),
id: 0.into(),
}
}
}
9 changes: 9 additions & 0 deletions src/rpc/ckb_light_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,3 +199,12 @@ crate::jsonrpc_async!(pub struct LightClientRpcAsyncClient {
pub fn get_peers(&self) -> Vec<RemoteNode>;
pub fn local_node_info(&self) -> LocalNode;
});

impl From<&LightClientRpcClient> for LightClientRpcAsyncClient {
fn from(value: &LightClientRpcClient) -> Self {
Self {
client: value.client.clone(),
id: 0.into(),
}
}
}
67 changes: 46 additions & 21 deletions src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,41 @@ pub mod ckb_indexer;
pub mod ckb_light_client;

use anyhow::anyhow;
pub use ckb::CkbRpcClient;
pub use ckb_indexer::IndexerRpcClient;
pub use ckb::{CkbRpcAsyncClient, CkbRpcClient};
pub use ckb_indexer::{IndexerRpcAsyncClient, IndexerRpcClient};
use ckb_jsonrpc_types::{JsonBytes, ResponseFormat};
pub use ckb_light_client::LightClientRpcClient;
pub use ckb_light_client::{LightClientRpcAsyncClient, LightClientRpcClient};

use std::sync::LazyLock;
use std::{cell::LazyCell, future::Future};
use thiserror::Error;

static RUNTIME: LazyLock<tokio::runtime::Runtime> =
LazyLock::new(|| tokio::runtime::Runtime::new().unwrap());
thread_local! {
pub static RUNTIME: LazyCell<tokio::runtime::Runtime> =
LazyCell::new(|| tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap());
}

pub(crate) fn block_on<F: Send>(future: impl Future<Output = F> + Send) -> F {
let timeout = async { tokio::time::timeout(std::time::Duration::from_secs(10), future).await };
match tokio::runtime::Handle::try_current() {
Ok(h)
if matches!(
h.runtime_flavor(),
tokio::runtime::RuntimeFlavor::MultiThread
) =>
{
tokio::task::block_in_place(|| h.block_on(timeout).unwrap())
}
// if we on the current runtime, it must use another thread to poll this future,
// can't block on current runtime, it will block current reactor to stop forever
// in tokio runtime, this time will panic
Ok(_) => std::thread::scope(|s| {
s.spawn(|| RUNTIME.with(|rt| rt.block_on(timeout).unwrap()))
.join()
.unwrap()
}),
Err(_) => RUNTIME.with(|rt| rt.block_on(timeout).unwrap()),
}
}

#[derive(Error, Debug)]
pub enum RpcError {
Expand All @@ -38,8 +63,8 @@ macro_rules! jsonrpc {
) => (
$(#[$struct_attr])*
pub struct $struct_name {
client: crate::rpc::RpcClient,
id: std::sync::atomic::AtomicU64,
pub(crate) client: $crate::rpc::RpcClient,
pub(crate) id: std::sync::atomic::AtomicU64,
}

impl Clone for $struct_name {
Expand All @@ -53,13 +78,13 @@ macro_rules! jsonrpc {

impl $struct_name {
pub fn new(uri: &str) -> Self {
$struct_name { id: 0.into(), client: crate::rpc::RpcClient::new(uri), }
$struct_name { id: 0.into(), client: $crate::rpc::RpcClient::new(uri), }
}

pub fn post<PARAM, RET>(&self, method:&str, params: PARAM)->Result<RET, $crate::rpc::RpcError>
where
PARAM:serde::ser::Serialize,
RET: serde::de::DeserializeOwned,
PARAM:serde::ser::Serialize + Send + 'static,
RET: serde::de::DeserializeOwned + Send + 'static,
{
let id = self.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let params_fn = || -> Result<_,_> {
Expand All @@ -73,7 +98,7 @@ macro_rules! jsonrpc {
};

let task = self.client.post(params_fn);
crate::rpc::RUNTIME.block_on(task)
$crate::rpc::block_on(task)

}

Expand All @@ -94,7 +119,7 @@ macro_rules! jsonrpc {
};

let task = $selff.client.post(params_fn);
crate::rpc::RUNTIME.block_on(task)
$crate::rpc::block_on(task)
}
)*
}
Expand All @@ -113,8 +138,8 @@ macro_rules! jsonrpc_async {
) => (
$(#[$struct_attr])*
pub struct $struct_name {
client: crate::rpc::RpcClient,
id: std::sync::atomic::AtomicU64,
pub(crate) client: $crate::rpc::RpcClient,
pub(crate) id: std::sync::atomic::AtomicU64,
}

impl Clone for $struct_name {
Expand All @@ -128,13 +153,13 @@ macro_rules! jsonrpc_async {

impl $struct_name {
pub fn new(uri: &str) -> Self {
$struct_name { id: 0.into(), client: crate::rpc::RpcClient::new(uri), }
$struct_name { id: 0.into(), client: $crate::rpc::RpcClient::new(uri), }
}

pub fn post<PARAM, RET>(&self, method:&str, params: PARAM)->impl std::future::Future<Output =Result<RET, $crate::rpc::RpcError>>
pub fn post<PARAM, RET>(&self, method:&str, params: PARAM)->impl std::future::Future<Output =Result<RET, $crate::rpc::RpcError>> + Send + 'static
where
PARAM:serde::ser::Serialize,
RET: serde::de::DeserializeOwned,
PARAM:serde::ser::Serialize + Send + 'static,
RET: serde::de::DeserializeOwned + Send + 'static,
{
let id = self.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let method = serde_json::json!(method);
Expand Down Expand Up @@ -196,8 +221,8 @@ impl RpcClient {
json_post_params: T,
) -> impl std::future::Future<Output = Result<RET, crate::rpc::RpcError>>
where
PARAM: serde::ser::Serialize,
RET: serde::de::DeserializeOwned,
PARAM: serde::ser::Serialize + Send + 'static,
RET: serde::de::DeserializeOwned + Send + 'static,
T: FnOnce() -> Result<PARAM, crate::rpc::RpcError>,
{
let url = self.url.clone();
Expand Down
36 changes: 27 additions & 9 deletions src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ impl Context {
println!("script: {:x}, debug: {}", script_hash, message);
});
verifier
.verify(u64::max_value())
.verify(u64::MAX)
.map_err(|err| Error::VerifyScript(format!("Verify script error: {:?}", err)))
}

Expand All @@ -391,9 +391,10 @@ impl Context {
}
}

#[async_trait::async_trait]
impl TransactionDependencyProvider for Context {
// For verify certain cell belong to certain transaction
fn get_transaction(
async fn get_transaction_async(
&self,
tx_hash: &Byte32,
) -> Result<TransactionView, TransactionDependencyError> {
Expand All @@ -413,25 +414,34 @@ impl TransactionDependencyProvider for Context {
})
}
// For get the output information of inputs or cell_deps, those cell should be live cell
fn get_cell(&self, out_point: &OutPoint) -> Result<CellOutput, TransactionDependencyError> {
async fn get_cell_async(
&self,
out_point: &OutPoint,
) -> Result<CellOutput, TransactionDependencyError> {
self.get_live_cell(out_point)
.map(|(output, _)| output)
.ok_or_else(|| TransactionDependencyError::NotFound("cell not found".to_string()))
}
// For get the output data information of inputs or cell_deps
fn get_cell_data(&self, out_point: &OutPoint) -> Result<Bytes, TransactionDependencyError> {
async fn get_cell_data_async(
&self,
out_point: &OutPoint,
) -> Result<Bytes, TransactionDependencyError> {
self.get_live_cell(out_point)
.map(|(_, data)| data)
.ok_or_else(|| TransactionDependencyError::NotFound("cell data not found".to_string()))
}
// For get the header information of header_deps
fn get_header(&self, _block_hash: &Byte32) -> Result<HeaderView, TransactionDependencyError> {
async fn get_header_async(
&self,
_block_hash: &Byte32,
) -> Result<HeaderView, TransactionDependencyError> {
Err(TransactionDependencyError::NotFound(
"header not found".to_string(),
))
}

fn get_block_extension(
async fn get_block_extension_async(
&self,
_block_hash: &Byte32,
) -> Result<Option<ckb_types::packed::Bytes>, TransactionDependencyError> {
Expand All @@ -441,8 +451,12 @@ impl TransactionDependencyProvider for Context {
}
}

#[async_trait::async_trait]
impl HeaderDepResolver for Context {
fn resolve_by_tx(&self, tx_hash: &Byte32) -> Result<Option<HeaderView>, anyhow::Error> {
async fn resolve_by_tx_async(
&self,
tx_hash: &Byte32,
) -> Result<Option<HeaderView>, anyhow::Error> {
let mut header_opt = None;
for item in &self.inputs {
if item.input.previous_output().tx_hash() == *tx_hash {
Expand All @@ -465,7 +479,10 @@ impl HeaderDepResolver for Context {
}
Ok(None)
}
fn resolve_by_number(&self, number: u64) -> Result<Option<HeaderView>, anyhow::Error> {
async fn resolve_by_number_async(
&self,
number: u64,
) -> Result<Option<HeaderView>, anyhow::Error> {
for mock_header in &self.header_deps {
if number == mock_header.number() {
return Ok(Some(mock_header.clone()));
Expand Down Expand Up @@ -503,8 +520,9 @@ impl CellDepResolver for Context {
}
}

#[async_trait::async_trait]
impl CellCollector for LiveCellsContext {
fn collect_live_cells(
async fn collect_live_cells_async(
&mut self,
query: &CellQueryOptions,
apply_changes: bool,
Expand Down
Loading

0 comments on commit 8cd654b

Please sign in to comment.