diff --git a/src/inx/client.rs b/src/inx/client.rs index e64d41c76..35e73bf91 100644 --- a/src/inx/client.rs +++ b/src/inx/client.rs @@ -1,7 +1,10 @@ // Copyright 2023 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 -use futures::stream::{Stream, StreamExt}; +use futures::{ + stream::{Stream, StreamExt}, + TryStreamExt, +}; use inx::{client::InxClient, proto}; use iota_sdk::types::block::{payload::signed_transaction::TransactionId, slot::SlotIndex}; use packable::PackableExt; @@ -37,6 +40,20 @@ impl Inx { self.inx.read_node_status(proto::NoParams {}).await?.try_convert() } + /// Wait for the status of the node to change. + pub async fn listen_to_status_changes( + &mut self, + ) -> Result>, InxError> { + Ok(self + .inx + .listen_to_node_status(proto::NodeStatusRequest { + cooldown_in_milliseconds: 100, + }) + .await? + .into_inner() + .map(|msg| msg?.try_convert())) + } + /// Get the configuration of the node. pub async fn get_node_configuration(&mut self) -> Result { self.inx @@ -45,17 +62,84 @@ impl Inx { .try_convert() } - /// Get a stream of committed slots. - pub async fn get_committed_slots( + /// Get a committed slot by index. + pub async fn get_committed_slot(&mut self, slot: SlotIndex) -> Result { + self.inx + .read_commitment(proto::CommitmentRequest { + commitment_slot: slot.0, + commitment_id: None, + }) + .await? + .try_convert() + } + + /// Get a stream of finalized slots. + pub async fn get_finalized_slots( &mut self, request: SlotRangeRequest, ) -> Result>, InxError> { - Ok(self - .inx - .listen_to_commitments(proto::SlotRangeRequest::from(request)) + struct StreamState { + inx: Option, + latest_finalized_slot: u32, + curr_slot: u32, + last_slot: u32, + } + + let latest_finalized_slot = self + .get_node_status() .await? - .into_inner() - .map(|msg| msg?.try_convert())) + .latest_finalized_commitment + .commitment_id + .slot_index() + .0; + Ok(futures::stream::unfold( + StreamState { + inx: Some(self.clone()), + latest_finalized_slot, + curr_slot: request.start_slot(), + last_slot: request.end_slot(), + }, + |mut state| async move { + // Inner function definition to simplify result type + async fn next(state: &mut StreamState) -> Result, InxError> { + let Some(inx) = state.inx.as_mut() else { return Ok(None) }; + + if state.last_slot != 0 && state.curr_slot > state.last_slot { + return Ok(None); + } + + // If the current slot is not yet finalized, we will wait. + if state.latest_finalized_slot < state.curr_slot { + let mut status_changes = inx.listen_to_status_changes().await?; + loop { + match status_changes.try_next().await? { + Some(status) => { + // If the status change updated the latest finalized commitment, we can continue. + if status.latest_finalized_commitment.commitment_id.slot_index().0 + > state.latest_finalized_slot + { + state.latest_finalized_slot = + status.latest_finalized_commitment.commitment_id.slot_index().0; + break; + } + } + None => { + return Ok(None); + } + } + } + } + let commitment = inx.get_committed_slot(state.curr_slot.into()).await?; + state.curr_slot += 1; + Ok(Some(commitment)) + } + let res = next(&mut state).await; + if res.is_err() { + state.inx = None; + } + res.transpose().map(|res| (res, state)) + }, + )) } /// Get accepted blocks for a given slot. diff --git a/src/inx/request.rs b/src/inx/request.rs index b5f4f7764..60c2a2d69 100644 --- a/src/inx/request.rs +++ b/src/inx/request.rs @@ -47,6 +47,16 @@ impl SlotRangeRequest { { Self(to_slot_range_request(range)) } + + /// Get the start slot. + pub fn start_slot(&self) -> u32 { + self.0.start_slot + } + + /// Get the end slot. + pub fn end_slot(&self) -> u32 { + self.0.end_slot + } } impl From for proto::SlotRangeRequest { diff --git a/src/tangle/sources/inx.rs b/src/tangle/sources/inx.rs index 98db1add0..d0fe709e7 100644 --- a/src/tangle/sources/inx.rs +++ b/src/tangle/sources/inx.rs @@ -38,7 +38,7 @@ impl InputSource for Inx { ) -> Result>, Self::Error> { let mut inx = self.clone(); Ok(Box::pin( - inx.get_committed_slots(SlotRangeRequest::from_range(range)) + inx.get_finalized_slots(SlotRangeRequest::from_range(range)) .await? .map_err(Self::Error::from), ))