Skip to content

Commit

Permalink
feat: consume ack in a loop
Browse files Browse the repository at this point in the history
  • Loading branch information
liyukun committed Nov 17, 2023
1 parent 7c8b359 commit 740ec6d
Showing 1 changed file with 63 additions and 55 deletions.
118 changes: 63 additions & 55 deletions examples/sudt-transfer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! An example cli for sending/receiving SUDT with the sudt transfer module.
use std::{collections::HashMap, fmt, fs, path::PathBuf, pin::pin, time::Duration};
use std::{collections::HashMap, fmt, fs, path::PathBuf, time::Duration};

use anyhow::{bail, ensure, Context, Result};
use bytes::Bytes;
Expand Down Expand Up @@ -31,7 +31,6 @@ use forcerelay_ckb_sdk::{
assemble_send_packet_partial_transaction, assemble_write_ack_partial_transaction,
},
};
use futures::TryStreamExt;
use prost::Message;
use secp256k1::Secp256k1;
use serde::Deserialize;
Expand Down Expand Up @@ -151,7 +150,7 @@ async fn consume_ack_retry(
sender_lock_script: packed::Script,
) -> Result<()> {
loop {
match consume_ack(&config, sk, sender_lock_script.clone()).await {
match consume_ack(config, sk, sender_lock_script.clone()).await {
Ok(()) => return Ok(()),
Err(e) => {
if let Some(re) = e.downcast_ref::<ckb_sdk::RpcError>() {
Expand All @@ -177,62 +176,71 @@ async fn consume_ack(
let client = CkbRpcClient::new(config.ckb_rpc_url.clone());
let sender = sender_lock_script.calc_script_hash().as_bytes().slice(..20);

let mut ack_packets = pin!(
PacketCell::subscribe(client.clone(), config.sdk_config.clone())
.try_filter(|cell| futures::future::ready(cell.is_ack_packet()))
);
// Filter for a packet that is sent to us.
let (p, pd) = loop {
let p = ack_packets.try_next().await?.context("no packet cell")?;
let pd = FungibleTokenPacketData::decode(&p.packet.packet.data[..])?;
if pd.sender == sender {
if p.packet.ack.as_deref() != Some(&[1]) {
println!("skipping packet {pd}");
continue;
let packets = PacketCell::search(&client, &config.sdk_config, 100, &mut 0)
.await?
.into_iter()
.filter_map(|p| {
if !p.is_ack_packet() {
return None;
}
break (p, pd);
} else {
println!("skipping packet {pd}");
}
};
println!("consuming packet ack\n{pd}\n{:?}", p.packet.packet);
let pd = FungibleTokenPacketData::decode(&p.packet.packet.data[..]).ok()?;
if pd.sender == sender {
if p.packet.ack.as_deref() != Some(&[1]) {
println!("skipping packet {pd}");
return None;
}
Some((p, pd))
} else {
println!("skipping packet\n{pd:?}\n{:?}", p.packet.packet);
None
}
});

let sudt_type_hash = hex::decode(pd.denom).context("decode base denom")?;
let (sudt_transfer_dep, st_cell, st_cell_amount, _) =
get_st_cell_by_sudt_type_hash(&client, config, sudt_type_hash).await?;
let sudt_dep = get_type_dep_from_cell(&client, &st_cell)
.await
.context("get sudt dep")?;
let mut consumed = 0;
for (p, pd) in packets {
println!("consuming packet ack\n{pd}\n{:?}", p.packet.packet);

let user_input = get_capacity_input(&client, &sender_lock_script).await?;
let sudt_type_hash = hex::decode(pd.denom).context("decode base denom")?;
let (sudt_transfer_dep, st_cell, st_cell_amount, _) =
get_st_cell_by_sudt_type_hash(&client, config, sudt_type_hash).await?;
let sudt_dep = get_type_dep_from_cell(&client, &st_cell)
.await
.context("get sudt dep")?;

let packet_contract_cell = get_latest_cell_by_type_script(
&client,
config.sdk_config.packet_contract_type_script().into(),
)
.await?;
let user_input = get_capacity_input(&client, &sender_lock_script).await?;

let (tx, envelope) = assemble_consume_ack_packet_partial_transaction(
simple_dep(packet_contract_cell.out_point.into()),
p,
)?;
// sighash placeholder witness.
let placeholder_witness = packed::WitnessArgs::new_builder()
.lock(Some(Bytes::from_static(&[0u8; 65])).pack())
.build();
let tx = tx
.input(simple_input(st_cell.out_point.into()))
.output(packed::CellOutput::from(st_cell.output))
.output_data(sudt_amount_data(st_cell_amount).pack())
.witness([].pack())
.cell_dep(sudt_transfer_dep)
.cell_dep(sudt_dep)
// capacity input and witness.
.input(simple_input(user_input.out_point.into()))
.witness(placeholder_witness.as_bytes().pack());
let tx = add_ibc_envelope(tx, &envelope).build();
let tx = complete_tx(&config.ckb_rpc_url, &tx, sender_lock_script, sk)?;
send_transaction(&config.ckb_rpc_url, tx)?;
let packet_contract_cell = get_latest_cell_by_type_script(
&client,
config.sdk_config.packet_contract_type_script().into(),
)
.await?;

let (tx, envelope) = assemble_consume_ack_packet_partial_transaction(
simple_dep(packet_contract_cell.out_point.into()),
p,
)?;
// sighash placeholder witness.
let placeholder_witness = packed::WitnessArgs::new_builder()
.lock(Some(Bytes::from_static(&[0u8; 65])).pack())
.build();
let tx = tx
.input(simple_input(st_cell.out_point.into()))
.output(packed::CellOutput::from(st_cell.output))
.output_data(sudt_amount_data(st_cell_amount).pack())
.witness([].pack())
.cell_dep(sudt_transfer_dep)
.cell_dep(sudt_dep)
// capacity input and witness.
.input(simple_input(user_input.out_point.into()))
.witness(placeholder_witness.as_bytes().pack());
let tx = add_ibc_envelope(tx, &envelope).build();
let tx = complete_tx(&config.ckb_rpc_url, &tx, sender_lock_script.clone(), sk)?;
send_transaction(&config.ckb_rpc_url, tx)?;

consumed += 1;
}

println!("consumed {consumed} packets");

Ok(())
}
Expand Down Expand Up @@ -560,7 +568,7 @@ async fn receive(
let sudt_type_hash = hex::decode(base_denom).context("decode base denom")?;

let (sudt_transfer_dep, st_cell, st_cell_amount, sudt_type_script) =
get_st_cell_by_sudt_type_hash(&client, &config, sudt_type_hash).await?;
get_st_cell_by_sudt_type_hash(&client, config, sudt_type_hash).await?;

let sudt_dep = get_type_dep_from_cell(&client, &st_cell)
.await
Expand Down

0 comments on commit 740ec6d

Please sign in to comment.