Skip to content

Commit

Permalink
feat: Add listen timeouts to iroha cli (#5241)
Browse files Browse the repository at this point in the history
* feat: add timeouts to client cli

Signed-off-by: Lohachov Mykhailo <[email protected]>

* chore: remove unused macro

Signed-off-by: Lohachov Mykhailo <[email protected]>

* test: add tests for cli

Signed-off-by: Lohachov Mykhailo <[email protected]>

* fix: test codecover

Signed-off-by: Lohachov Mykhailo <[email protected]>

* fix: remove unused tests

Signed-off-by: Lohachov Mykhailo <[email protected]>

* chore: newlines

Signed-off-by: Lohachov Mykhailo <[email protected]>

* chore: make args global

Signed-off-by: Lohachov Mykhailo <[email protected]>

* chore: update docker-compose.single.yml

Signed-off-by: Mykhailo Lohachov <[email protected]>

* fix: use float for duration

Signed-off-by: Lohachov Mykhailo <[email protected]>

* fix: use milis in events streaming

Signed-off-by: Lohachov Mykhailo <[email protected]>

* chore: fix compose

Signed-off-by: Lohachov Mykhailo <[email protected]>

* fix: use humantime

Signed-off-by: Lohachov Mykhailo <[email protected]>

* test: add integration test

Signed-off-by: Lohachov Mykhailo <[email protected]>

* chore: fmt

Signed-off-by: Lohachov Mykhailo <[email protected]>

* chore: fmt

Signed-off-by: Lohachov Mykhailo <[email protected]>

* chore: update python test dependencies

Signed-off-by: Lohachov Mykhailo <[email protected]>

---------

Signed-off-by: Lohachov Mykhailo <[email protected]>
Signed-off-by: Mykhailo Lohachov <[email protected]>
  • Loading branch information
aoyako authored Nov 21, 2024
1 parent eb5be5b commit 8836304
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 125 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/iroha_cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ serde = { workspace = true }
serde_json = { workspace = true }
erased-serde = "0.4.5"
supports-color = { workspace = true }
tokio = { workspace = true, features = ["rt"] }
futures = { workspace = true }

[build-dependencies]
vergen = { version = "8.3.1", default-features = false }
Expand Down
110 changes: 88 additions & 22 deletions crates/iroha_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ use std::{
io::{stdin, stdout},
path::PathBuf,
str::FromStr,
time::Duration,
};

use erased_serde::Serialize;
use error_stack::{fmt::ColorMode, IntoReportCompat, ResultExt};
use eyre::{eyre, Error, Result, WrapErr};
use futures::TryStreamExt;
use iroha::{client::Client, config::Config, data_model::prelude::*};
use iroha_primitives::json::Json;
use thiserror::Error;
use tokio::runtime::Runtime;

/// Re-usable clap `--metadata <PATH>` (`-m`) argument.
/// Should be combined with `#[command(flatten)]` attr.
Expand Down Expand Up @@ -100,7 +103,6 @@ enum Subcommand {
#[clap(subcommand)]
Peer(peer::Args),
/// The subcommand related to event streaming
#[clap(subcommand)]
Events(events::Args),
/// The subcommand related to Wasm
Wasm(wasm::Args),
Expand Down Expand Up @@ -305,9 +307,18 @@ mod events {

use super::*;

#[derive(clap::Args, Debug, Clone, Copy)]
pub struct Args {
/// Wait timeout
#[clap(short, long, global = true)]
timeout: Option<humantime::Duration>,
#[clap(subcommand)]
command: Command,
}

/// Get event stream from Iroha peer
#[derive(clap::Subcommand, Debug, Clone, Copy)]
pub enum Args {
enum Command {
/// Gets block pipeline events
BlockPipeline,
/// Gets transaction pipeline events
Expand All @@ -322,24 +333,53 @@ mod events {

impl RunArgs for Args {
fn run(self, context: &mut dyn RunContext) -> Result<()> {
match self {
Args::TransactionPipeline => listen(TransactionEventFilter::default(), context),
Args::BlockPipeline => listen(BlockEventFilter::default(), context),
Args::Data => listen(DataEventFilter::Any, context),
Args::ExecuteTrigger => listen(ExecuteTriggerEventFilter::new(), context),
Args::TriggerCompleted => listen(TriggerCompletedEventFilter::new(), context),
let timeout: Option<Duration> = self.timeout.map(Into::into);

match self.command {
Command::TransactionPipeline => {
listen(TransactionEventFilter::default(), context, timeout)
}
Command::BlockPipeline => listen(BlockEventFilter::default(), context, timeout),
Command::Data => listen(DataEventFilter::Any, context, timeout),
Command::ExecuteTrigger => {
listen(ExecuteTriggerEventFilter::new(), context, timeout)
}
Command::TriggerCompleted => {
listen(TriggerCompletedEventFilter::new(), context, timeout)
}
}
}
}

fn listen(filter: impl Into<EventFilterBox>, context: &mut dyn RunContext) -> Result<()> {
fn listen(
filter: impl Into<EventFilterBox>,
context: &mut dyn RunContext,
timeout: Option<Duration>,
) -> Result<()> {
let filter = filter.into();
let client = context.client_from_config();
eprintln!("Listening to events with filter: {filter:?}");
client
.listen_for_events([filter])
.wrap_err("Failed to listen for events.")?
.try_for_each(|event| context.print_data(&event?))?;

if let Some(timeout) = timeout {
eprintln!("Listening to events with filter: {filter:?} and timeout: {timeout:?}");
let rt = Runtime::new().wrap_err("Failed to create runtime.")?;
rt.block_on(async {
let mut stream = client
.listen_for_events_async([filter])
.await
.expect("Failed to listen for events.");
while let Ok(event) = tokio::time::timeout(timeout, stream.try_next()).await {
context.print_data(&event?)?;
}
eprintln!("Timeout period has expired.");
Result::<()>::Ok(())
})?;
} else {
eprintln!("Listening to events with filter: {filter:?}");
client
.listen_for_events([filter])
.wrap_err("Failed to listen for events.")?
.try_for_each(|event| context.print_data(&event?))?;
}
Ok(())
}
}
Expand All @@ -354,22 +394,47 @@ mod blocks {
pub struct Args {
/// Block height from which to start streaming blocks
height: NonZeroU64,

/// Wait timeout
#[clap(short, long)]
timeout: Option<humantime::Duration>,
}

impl RunArgs for Args {
fn run(self, context: &mut dyn RunContext) -> Result<()> {
let Args { height } = self;
listen(height, context)
let Args { height, timeout } = self;
let timeout: Option<Duration> = timeout.map(Into::into);
listen(height, context, timeout)
}
}

fn listen(height: NonZeroU64, context: &mut dyn RunContext) -> Result<()> {
fn listen(
height: NonZeroU64,
context: &mut dyn RunContext,
timeout: Option<Duration>,
) -> Result<()> {
let client = context.client_from_config();
eprintln!("Listening to blocks from height: {height}");
client
.listen_for_blocks(height)
.wrap_err("Failed to listen for blocks.")?
.try_for_each(|event| context.print_data(&event?))?;
if let Some(timeout) = timeout {
eprintln!("Listening to blocks from height: {height} and timeout: {timeout:?}");
let rt = Runtime::new().wrap_err("Failed to create runtime.")?;
rt.block_on(async {
let mut stream = client
.listen_for_blocks_async(height)
.await
.expect("Failed to listen for blocks.");
while let Ok(event) = tokio::time::timeout(timeout, stream.try_next()).await {
context.print_data(&event?)?;
}
eprintln!("Timeout period has expired.");
Result::<()>::Ok(())
})?;
} else {
eprintln!("Listening to blocks from height: {height}");
client
.listen_for_blocks(height)
.wrap_err("Failed to listen for blocks.")?
.try_for_each(|event| context.print_data(&event?))?;
}
Ok(())
}
}
Expand Down Expand Up @@ -1377,6 +1442,7 @@ mod multisig {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit 8836304

Please sign in to comment.