Skip to content

Commit

Permalink
feat: support async api
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Dec 23, 2024
1 parent 8e6217d commit 9319b5d
Show file tree
Hide file tree
Showing 27 changed files with 658 additions and 261 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ parking_lot = "0.12"
lru = "0.7.1"
dashmap = "5.4"
dyn-clone = "1.0"
async-trait = "0.1"

ckb-types = "0.119.0"
ckb-dao-utils = "0.119.0"
Expand All @@ -50,7 +51,6 @@ ckb-mock-tx-types = { version = "0.119.0" }
ckb-chain-spec = "0.119.0"

sparse-merkle-tree = "0.6.1"
lazy_static = "1.3.0"

[features]
default = ["default-tls"]
Expand Down
7 changes: 4 additions & 3 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ ignore = [
#{ id = "RUSTSEC-0000-0000", reason = "you can specify a reason the advisory is ignored" },
#"[email protected]", # you can also ignore yanked crate versions if you wish
#{ crate = "[email protected]", reason = "you can specify why you are ignoring the yanked crate"
"RUSTSEC-2024-0370" # proc-macro-error's maintainer seems to be unreachable, ignore this
"RUSTSEC-2024-0370", # proc-macro-error's maintainer seems to be unreachable, ignore this
"RUSTSEC-2024-0384", # instant is no longer maintained, ignore this
]
# If this is true, then cargo deny will use the git executable to fetch advisory database.
# If this is false, then it uses a built-in git library.
Expand All @@ -97,8 +98,8 @@ allow = [
"ISC",
"MIT",
"Unicode-DFS-2016",
"BSL-1.0", # xxhash-rust 0.8.10

"BSL-1.0", # xxhash-rust 0.8.10
"Unicode-3.0",
#"MIT",
#"Apache-2.0",
#"Apache-2.0 WITH LLVM-exception",
Expand Down
19 changes: 11 additions & 8 deletions examples/script_unlocker_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ use std::collections::HashMap;
/// [CapacityDiff]: https://github.com/doitian/ckb-sdk-examples-capacity-diff
struct CapacityDiffUnlocker {}

#[async_trait::async_trait]
impl ScriptUnlocker for CapacityDiffUnlocker {
// This works for any args
fn match_args(&self, _args: &[u8]) -> bool {
true
}

fn unlock(
async fn unlock_async(
&self,
tx: &TransactionView,
script_group: &ScriptGroup,
Expand All @@ -45,12 +46,14 @@ impl ScriptUnlocker for CapacityDiffUnlocker {

let mut total = 0i64;
for i in &script_group.input_indices {
let cell = tx_dep_provider.get_cell(
&tx.inputs()
.get(*i)
.ok_or_else(|| other_unlock_error("input index out of bound"))?
.previous_output(),
)?;
let cell = tx_dep_provider
.get_cell_async(
&tx.inputs()
.get(*i)
.ok_or_else(|| other_unlock_error("input index out of bound"))?
.previous_output(),
)
.await?;
let capacity: u64 = cell.capacity().unpack();
total -= capacity as i64;
}
Expand All @@ -71,7 +74,7 @@ impl ScriptUnlocker for CapacityDiffUnlocker {

// This is called before balancer. It's responsible to fill witness for inputs added manually
// by users.
fn fill_placeholder_witness(
async fn fill_placeholder_witness_async(
&self,
tx: &TransactionView,
script_group: &ScriptGroup,
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.75.0
1.81.0
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub mod test_util;
#[cfg(test)]
mod tests;

pub use rpc::{CkbRpcClient, IndexerRpcClient, RpcError};
pub use rpc::{CkbRpcAsyncClient, CkbRpcClient, IndexerRpcAsyncClient, IndexerRpcClient, RpcError};
pub use types::{
Address, AddressPayload, AddressType, CodeHashIndex, HumanCapacity, NetworkInfo, NetworkType,
OldAddress, OldAddressFormat, ScriptGroup, ScriptGroupType, ScriptId, Since, SinceType,
Expand Down
11 changes: 10 additions & 1 deletion src/rpc/ckb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ crate::jsonrpc!(pub struct CkbRpcClient {
pub fn calculate_dao_maximum_withdraw(&self, out_point: OutPoint, kind: DaoWithdrawingCalculationKind) -> Capacity;
});

crate::jsonrpc_async!(pub struct CkbRpcAyncClient {
crate::jsonrpc_async!(pub struct CkbRpcAsyncClient {
// Chain
pub fn get_block(&self, hash: H256) -> Option<BlockView>;
pub fn get_block_by_number(&self, number: BlockNumber) -> Option<BlockView>;
Expand Down Expand Up @@ -212,6 +212,15 @@ fn transform_cycles(cycles: Option<Vec<ckb_jsonrpc_types::Cycle>>) -> Vec<Cycle>
.unwrap_or_default()
}

impl From<&CkbRpcClient> for CkbRpcAsyncClient {
fn from(value: &CkbRpcClient) -> Self {
Self {
client: value.client.clone(),
id: 0.into(),
}
}
}

impl CkbRpcClient {
pub fn get_packed_block(&self, hash: H256) -> Result<Option<JsonBytes>, crate::RpcError> {
self.post("get_block", (hash, Some(Uint32::from(0u32))))
Expand Down
9 changes: 9 additions & 0 deletions src/rpc/ckb_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,12 @@ crate::jsonrpc_async!(pub struct IndexerRpcAsyncClient {
pub fn get_transactions(&self, search_key: SearchKey, order: Order, limit: Uint32, after: Option<JsonBytes>) -> Pagination<Tx>;
pub fn get_cells_capacity(&self, search_key: SearchKey) -> Option<CellsCapacity>;
});

impl From<&IndexerRpcClient> for IndexerRpcAsyncClient {
fn from(value: &IndexerRpcClient) -> Self {
Self {
client: value.client.clone(),
id: 0.into(),
}
}
}
9 changes: 9 additions & 0 deletions src/rpc/ckb_light_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,3 +199,12 @@ crate::jsonrpc_async!(pub struct LightClientRpcAsyncClient {
pub fn get_peers(&self) -> Vec<RemoteNode>;
pub fn local_node_info(&self) -> LocalNode;
});

impl From<&LightClientRpcClient> for LightClientRpcAsyncClient {
fn from(value: &LightClientRpcClient) -> Self {
Self {
client: value.client.clone(),
id: 0.into(),
}
}
}
62 changes: 43 additions & 19 deletions src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,40 @@ pub mod ckb_indexer;
pub mod ckb_light_client;

use anyhow::anyhow;
pub use ckb::CkbRpcClient;
pub use ckb_indexer::IndexerRpcClient;
pub use ckb::{CkbRpcAsyncClient, CkbRpcClient};
pub use ckb_indexer::{IndexerRpcAsyncClient, IndexerRpcClient};
use ckb_jsonrpc_types::{JsonBytes, ResponseFormat};
pub use ckb_light_client::LightClientRpcClient;
pub use ckb_light_client::{LightClientRpcAsyncClient, LightClientRpcClient};

use std::sync::LazyLock;
use std::{cell::LazyCell, future::Future};
use thiserror::Error;

static RUNTIME: LazyLock<tokio::runtime::Runtime> =
LazyLock::new(|| tokio::runtime::Runtime::new().unwrap());
thread_local! {
pub static RUNTIME: LazyCell<tokio::runtime::Runtime> =
LazyCell::new(|| tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap());
}

pub(crate) fn block_on<F: Send>(future: impl Future<Output = F> + Send) -> F {
match tokio::runtime::Handle::try_current() {
Ok(h)
if matches!(
h.runtime_flavor(),
tokio::runtime::RuntimeFlavor::MultiThread
) =>
{
tokio::task::block_in_place(|| h.block_on(future))
}
// if we on the current runtime, it must use another thread to poll this future,
// can't block on current runtime, it will block current reactor to stop forever
// in tokio runtime, this time will panic
Ok(_) => std::thread::scope(|s| {
s.spawn(|| RUNTIME.with(|rt| rt.block_on(future)))
.join()
.unwrap()
}),
Err(_) => RUNTIME.with(|rt| rt.block_on(future)),
}
}

#[derive(Error, Debug)]
pub enum RpcError {
Expand All @@ -38,8 +62,8 @@ macro_rules! jsonrpc {
) => (
$(#[$struct_attr])*
pub struct $struct_name {
client: crate::rpc::RpcClient,
id: std::sync::atomic::AtomicU64,
pub(crate) client: crate::rpc::RpcClient,
pub(crate) id: std::sync::atomic::AtomicU64,
}

impl Clone for $struct_name {
Expand All @@ -58,8 +82,8 @@ macro_rules! jsonrpc {

pub fn post<PARAM, RET>(&self, method:&str, params: PARAM)->Result<RET, $crate::rpc::RpcError>
where
PARAM:serde::ser::Serialize,
RET: serde::de::DeserializeOwned,
PARAM:serde::ser::Serialize + Send + 'static,
RET: serde::de::DeserializeOwned + Send + 'static,
{
let id = self.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let params_fn = || -> Result<_,_> {
Expand All @@ -73,7 +97,7 @@ macro_rules! jsonrpc {
};

let task = self.client.post(params_fn);
crate::rpc::RUNTIME.block_on(task)
crate::rpc::block_on(task)

}

Expand All @@ -94,7 +118,7 @@ macro_rules! jsonrpc {
};

let task = $selff.client.post(params_fn);
crate::rpc::RUNTIME.block_on(task)
crate::rpc::block_on(task)
}
)*
}
Expand All @@ -113,8 +137,8 @@ macro_rules! jsonrpc_async {
) => (
$(#[$struct_attr])*
pub struct $struct_name {
client: crate::rpc::RpcClient,
id: std::sync::atomic::AtomicU64,
pub(crate) client: crate::rpc::RpcClient,
pub(crate) id: std::sync::atomic::AtomicU64,
}

impl Clone for $struct_name {
Expand All @@ -131,10 +155,10 @@ macro_rules! jsonrpc_async {
$struct_name { id: 0.into(), client: crate::rpc::RpcClient::new(uri), }
}

pub fn post<PARAM, RET>(&self, method:&str, params: PARAM)->impl std::future::Future<Output =Result<RET, $crate::rpc::RpcError>>
pub fn post<PARAM, RET>(&self, method:&str, params: PARAM)->impl std::future::Future<Output =Result<RET, $crate::rpc::RpcError>> + Send + 'static
where
PARAM:serde::ser::Serialize,
RET: serde::de::DeserializeOwned,
PARAM:serde::ser::Serialize + Send + 'static,
RET: serde::de::DeserializeOwned + Send + 'static,
{
let id = self.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let method = serde_json::json!(method);
Expand Down Expand Up @@ -196,8 +220,8 @@ impl RpcClient {
json_post_params: T,
) -> impl std::future::Future<Output = Result<RET, crate::rpc::RpcError>>
where
PARAM: serde::ser::Serialize,
RET: serde::de::DeserializeOwned,
PARAM: serde::ser::Serialize + Send + 'static,
RET: serde::de::DeserializeOwned + Send + 'static,
T: FnOnce() -> Result<PARAM, crate::rpc::RpcError>,
{
let url = self.url.clone();
Expand Down
34 changes: 26 additions & 8 deletions src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,10 @@ impl Context {
}
}

#[async_trait::async_trait]
impl TransactionDependencyProvider for Context {
// For verify certain cell belong to certain transaction
fn get_transaction(
async fn get_transaction_async(
&self,
tx_hash: &Byte32,
) -> Result<TransactionView, TransactionDependencyError> {
Expand All @@ -413,25 +414,34 @@ impl TransactionDependencyProvider for Context {
})
}
// For get the output information of inputs or cell_deps, those cell should be live cell
fn get_cell(&self, out_point: &OutPoint) -> Result<CellOutput, TransactionDependencyError> {
async fn get_cell_async(
&self,
out_point: &OutPoint,
) -> Result<CellOutput, TransactionDependencyError> {
self.get_live_cell(out_point)
.map(|(output, _)| output)
.ok_or_else(|| TransactionDependencyError::NotFound("cell not found".to_string()))
}
// For get the output data information of inputs or cell_deps
fn get_cell_data(&self, out_point: &OutPoint) -> Result<Bytes, TransactionDependencyError> {
async fn get_cell_data_async(
&self,
out_point: &OutPoint,
) -> Result<Bytes, TransactionDependencyError> {
self.get_live_cell(out_point)
.map(|(_, data)| data)
.ok_or_else(|| TransactionDependencyError::NotFound("cell data not found".to_string()))
}
// For get the header information of header_deps
fn get_header(&self, _block_hash: &Byte32) -> Result<HeaderView, TransactionDependencyError> {
async fn get_header_async(
&self,
_block_hash: &Byte32,
) -> Result<HeaderView, TransactionDependencyError> {
Err(TransactionDependencyError::NotFound(
"header not found".to_string(),
))
}

fn get_block_extension(
async fn get_block_extension_async(
&self,
_block_hash: &Byte32,
) -> Result<Option<ckb_types::packed::Bytes>, TransactionDependencyError> {
Expand All @@ -441,8 +451,12 @@ impl TransactionDependencyProvider for Context {
}
}

#[async_trait::async_trait]
impl HeaderDepResolver for Context {
fn resolve_by_tx(&self, tx_hash: &Byte32) -> Result<Option<HeaderView>, anyhow::Error> {
async fn resolve_by_tx_async(
&self,
tx_hash: &Byte32,
) -> Result<Option<HeaderView>, anyhow::Error> {
let mut header_opt = None;
for item in &self.inputs {
if item.input.previous_output().tx_hash() == *tx_hash {
Expand All @@ -465,7 +479,10 @@ impl HeaderDepResolver for Context {
}
Ok(None)
}
fn resolve_by_number(&self, number: u64) -> Result<Option<HeaderView>, anyhow::Error> {
async fn resolve_by_number_async(
&self,
number: u64,
) -> Result<Option<HeaderView>, anyhow::Error> {
for mock_header in &self.header_deps {
if number == mock_header.number() {
return Ok(Some(mock_header.clone()));
Expand Down Expand Up @@ -503,8 +520,9 @@ impl CellDepResolver for Context {
}
}

#[async_trait::async_trait]
impl CellCollector for LiveCellsContext {
fn collect_live_cells(
async fn collect_live_cells_async(
&mut self,
query: &CellQueryOptions,
apply_changes: bool,
Expand Down
12 changes: 6 additions & 6 deletions src/tests/ckb_rpc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::rpc::{CkbRpcClient, ResponseFormatGetter};
use ckb_types::{core, h256, prelude::*, H256};
// use serde_json;
use std::sync::LazyLock;

const TEST_CKB_RPC_URL: &str = "https://testnet.ckb.dev";

Expand All @@ -13,9 +13,9 @@ const BLOCK_HASH_NOT_EXIST: H256 =
h256!("0x626c6f636b5f686173685f746861745f646f65735f6e6f745f65786973740000");
const BLOCK_NUMBER_NOT_EXIST: u64 = u64::max_value();
// transaction hash in block 0xd88eb0cf9f6e6f123c733e9aba29dec9cb449965a8adc98216c50d5083b909b1
lazy_static::lazy_static! {
pub static ref TRANSACTION_HASH_VEC : Vec<H256> =
vec! [

pub static TRANSACTION_HASH_VEC: LazyLock<Vec<H256>> = LazyLock::new(|| {
vec![
h256!("0x9ecdbaf1ac656c0e48ab66e7c539b43ad6073c85d17fa590d1d3d9e9525767d2"),
h256!("0xb8ba38f579b0aeedc7b9dd5c4c14806079bf7c232f63435e6aa08cca1c100826"),
h256!("0xd76f85fb9f87cf3e906846bf32eb34a796b5a3c19dbae9fc3bff0b498974c274"),
Expand All @@ -27,8 +27,8 @@ lazy_static::lazy_static! {
h256!("0xdbcc925afcd73e91c0d91b93943580bbb7a03241d7baef4089d736a1e7b0a4ae"),
h256!("0x9f91c8e5c1b6853b5f129eaba6631f5ebb887ef83faae5f5e1801bf2c5515ec0"),
h256!("0x56aa6d7ae97c4b2f59790c8856701a75352cd05772155595df07f13682cf5e50"),
];
}
]
});

#[test]
fn test_get_block() {
Expand Down
Loading

0 comments on commit 9319b5d

Please sign in to comment.