Skip to content

Commit

Permalink
fix(inx): wait for slots to be finalized in INX (#1395)
Browse files Browse the repository at this point in the history
Wait for slots to be finalized in INX
  • Loading branch information
DaughterOfMars authored May 14, 2024
1 parent 4a0ff69 commit 98d2f37
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 9 deletions.
100 changes: 92 additions & 8 deletions src/inx/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright 2023 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use futures::stream::{Stream, StreamExt};
use futures::{
stream::{Stream, StreamExt},
TryStreamExt,
};
use inx::{client::InxClient, proto};
use iota_sdk::types::block::{payload::signed_transaction::TransactionId, slot::SlotIndex};
use packable::PackableExt;
Expand Down Expand Up @@ -37,6 +40,20 @@ impl Inx {
self.inx.read_node_status(proto::NoParams {}).await?.try_convert()
}

/// Wait for the status of the node to change.
pub async fn listen_to_status_changes(
&mut self,
) -> Result<impl Stream<Item = Result<NodeStatus, InxError>>, InxError> {
Ok(self
.inx
.listen_to_node_status(proto::NodeStatusRequest {
cooldown_in_milliseconds: 100,
})
.await?
.into_inner()
.map(|msg| msg?.try_convert()))
}

/// Get the configuration of the node.
pub async fn get_node_configuration(&mut self) -> Result<NodeConfiguration, InxError> {
self.inx
Expand All @@ -45,17 +62,84 @@ impl Inx {
.try_convert()
}

/// Get a stream of committed slots.
pub async fn get_committed_slots(
/// Get a committed slot by index.
pub async fn get_committed_slot(&mut self, slot: SlotIndex) -> Result<Commitment, InxError> {
self.inx
.read_commitment(proto::CommitmentRequest {
commitment_slot: slot.0,
commitment_id: None,
})
.await?
.try_convert()
}

/// Get a stream of finalized slots.
pub async fn get_finalized_slots(
&mut self,
request: SlotRangeRequest,
) -> Result<impl Stream<Item = Result<Commitment, InxError>>, InxError> {
Ok(self
.inx
.listen_to_commitments(proto::SlotRangeRequest::from(request))
struct StreamState {
inx: Option<Inx>,
latest_finalized_slot: u32,
curr_slot: u32,
last_slot: u32,
}

let latest_finalized_slot = self
.get_node_status()
.await?
.into_inner()
.map(|msg| msg?.try_convert()))
.latest_finalized_commitment
.commitment_id
.slot_index()
.0;
Ok(futures::stream::unfold(
StreamState {
inx: Some(self.clone()),
latest_finalized_slot,
curr_slot: request.start_slot(),
last_slot: request.end_slot(),
},
|mut state| async move {
// Inner function definition to simplify result type
async fn next(state: &mut StreamState) -> Result<Option<Commitment>, InxError> {
let Some(inx) = state.inx.as_mut() else { return Ok(None) };

if state.last_slot != 0 && state.curr_slot > state.last_slot {
return Ok(None);
}

// If the current slot is not yet finalized, we will wait.
if state.latest_finalized_slot < state.curr_slot {
let mut status_changes = inx.listen_to_status_changes().await?;
loop {
match status_changes.try_next().await? {
Some(status) => {
// If the status change updated the latest finalized commitment, we can continue.
if status.latest_finalized_commitment.commitment_id.slot_index().0
> state.latest_finalized_slot
{
state.latest_finalized_slot =
status.latest_finalized_commitment.commitment_id.slot_index().0;
break;
}
}
None => {
return Ok(None);
}
}
}
}
let commitment = inx.get_committed_slot(state.curr_slot.into()).await?;
state.curr_slot += 1;
Ok(Some(commitment))
}
let res = next(&mut state).await;
if res.is_err() {
state.inx = None;
}
res.transpose().map(|res| (res, state))
},
))
}

/// Get accepted blocks for a given slot.
Expand Down
10 changes: 10 additions & 0 deletions src/inx/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ impl SlotRangeRequest {
{
Self(to_slot_range_request(range))
}

/// Get the start slot.
pub fn start_slot(&self) -> u32 {
self.0.start_slot
}

/// Get the end slot.
pub fn end_slot(&self) -> u32 {
self.0.end_slot
}
}

impl From<SlotRangeRequest> for proto::SlotRangeRequest {
Expand Down
2 changes: 1 addition & 1 deletion src/tangle/sources/inx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl InputSource for Inx {
) -> Result<BoxStream<Result<Commitment, Self::Error>>, Self::Error> {
let mut inx = self.clone();
Ok(Box::pin(
inx.get_committed_slots(SlotRangeRequest::from_range(range))
inx.get_finalized_slots(SlotRangeRequest::from_range(range))
.await?
.map_err(Self::Error::from),
))
Expand Down

0 comments on commit 98d2f37

Please sign in to comment.