Skip to content

Commit

Permalink
Add retry_until_included (iotaledger#395)
Browse files Browse the repository at this point in the history
* add retry_until_included

* fix: callback argument position

* fix parameter index

* use default for getAddressOutputs

Co-authored-by: Lucas Nogueira <[email protected]>
  • Loading branch information
Thoralf-M and lucasfernog authored Mar 8, 2021
1 parent f95470a commit be869fc
Show file tree
Hide file tree
Showing 20 changed files with 193 additions and 129 deletions.
13 changes: 13 additions & 0 deletions bindings/nodejs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,19 @@ Retries (promotes or reattaches) the message associated with the given id.

**Returns** A promise resolving to the new [Message](#message) instance.

#### retryUntilIncluded(messageId: string[, interval: int, max_attempts: int]): Promise<Message>

Retries (promotes or reattaches) the message associated with the given id until it's included in the Tangle.
Default interval is 5 seconds and max_attempts is 10.

| Param | Type | Description |
| ---------------------- | ------------------- | ------------------------------------------------------ |
| messageId | <code>string</code> | The id of the message to retry |
| [options.interval] | <code>int</code> | The interval in seconds in which we retry the message. |
| [options.max_attempts] | <code>int</code> | The maximum of attempts we retry the message. |

**Returns** the message ids and [Message](#message) of reattached messages.

#### getInfo(): Promise<NodeInfo>

Gets information about the node.
Expand Down
5 changes: 3 additions & 2 deletions bindings/nodejs/lib/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export declare interface AddressOutputsOptions {
}

export declare class Client {
networkInfo(): NetworkInfo
networkInfo(): Promise<NetworkInfo>
subscriber(): TopicSubscriber
message(): MessageSender
getUnspentAddress(seed: string): UnspentAddressGetter
Expand All @@ -85,6 +85,7 @@ export declare class Client {
getBalance(seed: string): BalanceGetter
getAddressBalances(addresses: string[]): Promise<AddressBalance[]>
retry(messageId: string): Promise<MessageWrapper>
retryUntilIncluded(messageId: string, interval?: number, maxAttempts?: number): Promise<MessageWrapper[]>

getInfo(): Promise<NodeInfo>
getTips(): Promise<string[]>
Expand All @@ -94,7 +95,7 @@ export declare class Client {
getOutput(outputId: string): Promise<OutputMetadata>
findOutputs(outputIds: string[], addresses: string[]): Promise<OutputMetadata[]>
getAddressOutputs(address: string, options?: AddressOutputsOptions): Promise<string[]>
getAddressBalance(address: string): Promise<number>
getAddressBalance(address: string): Promise<AddressBalance>
isAddressValid(address: string): boolean
getMilestone(index: number): Promise<MilestoneMetadata>
getMilestoneUTXOChanges(index: number): Promise<MilestoneUTXOChanges>
Expand Down
19 changes: 15 additions & 4 deletions bindings/nodejs/lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,12 @@ Client.prototype.getOutput = promisify(Client.prototype.getOutput)
Client.prototype.findOutputs = promisify(Client.prototype.findOutputs)
const getAddressOutputs = Client.prototype.getAddressOutputs
Client.prototype.getAddressOutputs = function (address, options) {
if (options) {
return promisify(getAddressOutputs).apply(this, [address, JSON.stringify(options)])
} else {
return promisify(getAddressOutputs).apply(this, [address])
if (typeof options == 'undefined') {
options = {
includeSpent: false
}
}
return promisify(getAddressOutputs).apply(this, [address, JSON.stringify(options)])
}
Client.prototype.getAddressBalance = promisify(Client.prototype.getAddressBalance)
Client.prototype.getMilestone = promisify(Client.prototype.getMilestone)
Expand All @@ -97,6 +98,16 @@ Client.prototype.getReceipts = promisify(Client.prototype.getReceipts)
Client.prototype.getReceiptsMigratedAt = promisify(Client.prototype.getReceiptsMigratedAt)
Client.prototype.getTreasury = promisify(Client.prototype.getTreasury)
Client.prototype.retry = promisify(Client.prototype.retry)
const retryUntilIncluded = Client.prototype.retryUntilIncluded
Client.prototype.retryUntilIncluded = function (msg_id, interval, maxAttempts) {
if (typeof interval == 'undefined') {
interval = 5
}
if (typeof maxAttempts == 'undefined') {
maxAttempts = 10
}
return promisify(retryUntilIncluded).apply(this, [msg_id, interval, maxAttempts])
}
Client.prototype.reattach = promisify(Client.prototype.reattach)
Client.prototype.promote = promisify(Client.prototype.promote)

Expand Down
1 change: 1 addition & 0 deletions bindings/nodejs/lib/types/models.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export declare interface BrokerOptions {
export declare type Address = 'string'

export declare interface AddressBalance {
type: number
address: Address
balance: number
}
18 changes: 17 additions & 1 deletion bindings/nodejs/native/src/classes/client/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub(crate) enum Api {
GetReceiptsMigratedAt(u32),
GetTreasury(),
Retry(MessageId),
RetryUntilIncluded(MessageId, Option<u64>, Option<u64>),
Reattach(MessageId),
Promote(MessageId),
}
Expand Down Expand Up @@ -300,11 +301,26 @@ impl Task for ClientTask {
Api::Retry(message_id) => {
let message = client.retry(message_id).await?;
serde_json::to_string(&MessageWrapper {
message: message.1,
message_id: message.0,
message: message.1,
})
.unwrap()
}
Api::RetryUntilIncluded(message_id, interval, max_attempts) => {
let messages = client
.retry_until_included(message_id, *interval, *max_attempts)
.await?;
messages
.into_iter()
.map(|msg| {
serde_json::to_string(&MessageWrapper {
message_id: msg.0,
message: msg.1,
})
.unwrap()
})
.collect()
}
Api::Reattach(message_id) => {
let message = client.reattach(message_id).await?;
serde_json::to_string(&MessageWrapper {
Expand Down
32 changes: 31 additions & 1 deletion bindings/nodejs/native/src/classes/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,36 @@ declare_types! {
Ok(cx.undefined().upcast())
}

method retryUntilIncluded(mut cx) {
let message_id = cx.argument::<JsString>(0)?.value();
let message_id = MessageId::from_str(message_id.as_str()).expect("invalid message id");
let interval: Option<u64> = match cx.argument_opt(1) {
Some(arg) => {
Some(arg.downcast::<JsNumber>().or_throw(&mut cx)?.value() as u64)
},
None => None,
};
let max_attempts: Option<u64> = match cx.argument_opt(2) {
Some(arg) => {
Some(arg.downcast::<JsNumber>().or_throw(&mut cx)?.value() as u64)
},
None => None,
};
let cb = cx.argument::<JsFunction>(cx.len()-1)?;
{
let this = cx.this();
let guard = cx.lock();
let id = &this.borrow(&guard).0;
let client_task = ClientTask {
client_id: id.clone(),
api: Api::RetryUntilIncluded(message_id, interval, max_attempts),
};
client_task.schedule(cb);
}

Ok(cx.undefined().upcast())
}

method networkInfo(mut cx) {
let cb = cx.argument::<JsFunction>(0)?;
{
Expand Down Expand Up @@ -389,7 +419,7 @@ declare_types! {
None => Default::default(),
};

let cb = cx.argument::<JsFunction>(1)?;
let cb = cx.argument::<JsFunction>(cx.len()-1)?;
{
let this = cx.this();
let guard = cx.lock();
Expand Down
10 changes: 5 additions & 5 deletions bindings/nodejs/tests/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ describe('Client', () => {
})

it('sends a value transaction and checks output balance', async () => {
const depositAddress = 'atoi1qzt0nhsf38nh6rs4p6zs5knqp6psgha9wsv74uajqgjmwc75ugupx3y7x0r'
const depositAddress = 'atoi1qpnrumvaex24dy0duulp4q07lpa00w20ze6jfd0xly422kdcjxzakzsz5kf'
const message = await client
.message()
.seed(seed)
Expand All @@ -65,8 +65,8 @@ describe('Client', () => {
}
}

const depositBalance = await client.getAddressBalance(depositAddress)
assert.strictEqual(depositBalance >= 2, true)
const addressBalanceObject = await client.getAddressBalance(depositAddress)
assert.strictEqual(addressBalanceObject.balance >= 1000000, true)
})

it('gets an unspent address', async () => {
Expand All @@ -84,11 +84,11 @@ describe('Client', () => {
})

it('get milestone and message', async () => {
const milestone = await client.getMilestone(750)
const info = await client.getInfo()
const milestone = await client.getMilestone(info.confirmedMilestoneIndex)
assert.strictEqual(typeof milestone, 'object')
assert.strictEqual('message_id' in milestone, true)
assertMessageId(milestone.message_id)

const message = await client.getMessage().data(milestone.message_id)
assertMessage(message)

Expand Down
12 changes: 12 additions & 0 deletions bindings/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,18 @@ Retries (promotes or reattaches) the message associated with the given id.

**Returns** the message id and the retried [Message](#message).

#### retry_until_included(message_id, interval (optional), max_attempts (optional)): list[(str, Message)]

Retries (promotes or reattaches) the message associated with the given id.

| Param | Type | Default | Description |
| ------------ | ---------------- | ---------------------- | ------------------------------------------------------ |
| [message_id] | <code>str</code> | <code>undefined</code> | The message id |
| interval | <code>int</code> | <code>5</code> | The interval in seconds in which we retry the message. |
| max_attempts | <code>int</code> | <code>10</code> | The maximum of attempts we retry the message. |

**Returns** the message ids and [Message](#message) of reattached messages.

#### reattach(message_id): (str, Message)

Reattaches the message associated with the given id.
Expand Down
18 changes: 18 additions & 0 deletions bindings/python/native/src/client/high_level_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,24 @@ impl Client {
rt.block_on(async { self.client.retry(&RustMessageId::from_str(&message_id)?).await })?;
Ok((message_id_message.0.to_string(), message_id_message.1.try_into()?))
}
fn retry_until_included(
&self,
message_id: String,
interval: Option<u64>,
max_attempts: Option<u64>,
) -> Result<Vec<(String, Message)>> {
let rt = tokio::runtime::Runtime::new()?;
let messages = rt.block_on(async {
self.client
.retry_until_included(&RustMessageId::from_str(&message_id)?, interval, max_attempts)
.await
})?;
let mut res = Vec::new();
for msg in messages {
res.push((msg.0.to_string(), msg.1.try_into()?));
}
Ok(res)
}
fn reattach(&self, message_id: String) -> Result<(String, Message)> {
let rt = tokio::runtime::Runtime::new()?;
let message_id_message =
Expand Down
17 changes: 2 additions & 15 deletions examples/message_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@

//! cargo run --example message_time --release
use iota::{Client, MessageId};
use std::time::Duration;
use tokio::time::sleep;
use iota::Client;

#[tokio::main]
async fn main() {
Expand All @@ -28,7 +26,7 @@ async fn main() {
let message_id = message.id().0;
println!("Message ID: {}", message_id);

reattach_promote_until_confirmed(&message_id, &iota).await;
let _ = iota.retry_until_included(&message_id, None, None).await.unwrap();

let metadata = iota.get_message().metadata(&message_id).await.unwrap();
match metadata.referenced_by_milestone_index {
Expand All @@ -39,14 +37,3 @@ async fn main() {
_ => println!("Message is not referenced by a milestone"),
}
}

async fn reattach_promote_until_confirmed(message_id: &MessageId, iota: &Client) {
while let Ok(metadata) = iota.get_message().metadata(&message_id).await {
if metadata.referenced_by_milestone_index.is_some() {
break;
} else if let Ok(msg_id) = iota.reattach(&message_id).await {
println!("Reattached or promoted {}", msg_id.0);
}
sleep(Duration::from_secs(5)).await;
}
}
18 changes: 2 additions & 16 deletions examples/multiple_outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@

//! cargo run --example multiple_outputs --release
use iota::{Client, MessageId, Seed};
use std::time::Duration;
use tokio::time::sleep;
use iota::{Client, Seed};
extern crate dotenv;
use dotenv::dotenv;
use std::env;
Expand Down Expand Up @@ -64,17 +62,5 @@ async fn main() {
message.id().0
);

reattach_promote_until_confirmed(message.id().0, &iota).await;
}

async fn reattach_promote_until_confirmed(message_id: MessageId, iota: &Client) {
while let Ok(metadata) = iota.get_message().metadata(&message_id).await {
if let Some(state) = metadata.ledger_inclusion_state {
println!("Leder inclusion state: {:?}", state);
break;
} else if let Ok(msg_id) = iota.reattach(&message_id).await {
println!("Reattached or promoted {}", msg_id.0);
}
sleep(Duration::from_secs(5)).await;
}
let _ = iota.retry_until_included(&message.id().0, None, None).await.unwrap();
}
18 changes: 2 additions & 16 deletions examples/send_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@

//! cargo run --example send_all --release
use iota::{client::Result, Client, MessageId, Seed};
use std::time::Duration;
use tokio::time::sleep;
use iota::{client::Result, Client, Seed};
extern crate dotenv;
use dotenv::dotenv;
use std::env;
Expand Down Expand Up @@ -46,18 +44,6 @@ async fn main() -> Result<()> {
message.id().0
);

reattach_promote_until_confirmed(message.id().0, &iota).await;
let _ = iota.retry_until_included(&message.id().0, None, None).await.unwrap();
Ok(())
}

async fn reattach_promote_until_confirmed(message_id: MessageId, iota: &Client) {
while let Ok(metadata) = iota.get_message().metadata(&message_id).await {
if let Some(state) = metadata.ledger_inclusion_state {
println!("Leder inclusion state: {:?}", state);
break;
} else if let Ok(msg_id) = iota.reattach(&message_id).await {
println!("Reattached or promoted {}", msg_id.0);
}
sleep(Duration::from_secs(5)).await;
}
}
18 changes: 2 additions & 16 deletions examples/split_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@

//! cargo run --example split_all --release
use iota::{client::Result, Client, MessageId, Seed};
use std::time::Duration;
use tokio::time::sleep;
use iota::{client::Result, Client, Seed};
extern crate dotenv;
use dotenv::dotenv;
use std::env;
Expand Down Expand Up @@ -63,18 +61,6 @@ async fn main() -> Result<()> {
message.id().0
);

reattach_promote_until_confirmed(message.id().0, &iota).await;
let _ = iota.retry_until_included(&message.id().0, None, None).await.unwrap();
Ok(())
}

async fn reattach_promote_until_confirmed(message_id: MessageId, iota: &Client) {
while let Ok(metadata) = iota.get_message().metadata(&message_id).await {
if let Some(state) = metadata.ledger_inclusion_state {
println!("Leder inclusion state: {:?}", state);
break;
} else if let Ok(msg_id) = iota.reattach(&message_id).await {
println!("Reattached or promoted {}", msg_id.0);
}
sleep(Duration::from_secs(5)).await;
}
}
Loading

0 comments on commit be869fc

Please sign in to comment.