From 88182f10c3d735d0183dfc2b627b00a007a3b50b Mon Sep 17 00:00:00 2001 From: Bastian Bloessl Date: Mon, 17 Jun 2024 15:22:45 +0200 Subject: [PATCH] runtime: better error handling --- check.sh | 1 - examples/custom-routes/src/main.rs | 2 +- perf/buffer_rand/buffer_rand.rs | 3 +- perf/buffer_size/buffer_size.rs | 4 +- src/runtime/block.rs | 9 +-- src/runtime/flowgraph.rs | 12 ++-- src/runtime/mod.rs | 25 ++++--- src/runtime/runtime.rs | 77 +++++++++++--------- src/runtime/topology.rs | 111 ++++++++++++++++++----------- tests/bad_block.rs | 6 +- tests/channel_source.rs | 8 +-- 11 files changed, 155 insertions(+), 103 deletions(-) diff --git a/check.sh b/check.sh index 4eda307ae..5c20bd703 100755 --- a/check.sh +++ b/check.sh @@ -63,7 +63,6 @@ cd ${SCRIPTPATH}/examples/zigbee && cargo fmt --check cd ${SCRIPTPATH} && cargo clippy --all-targets --workspace --features=vulkan,zeromq,audio,flow_scheduler,tpb_scheduler,soapy,lttng,zynq,wgpu -- -D warnings cd ${SCRIPTPATH}/crates/futuredsp && cargo clippy --all-targets -- -D warnings cd ${SCRIPTPATH}/crates/macros && cargo clippy --all-targets -- -D warnings -cd ${SCRIPTPATH}/crates/prophecy && cargo clippy --all-targets -- -D warnings cd ${SCRIPTPATH}/crates/remote && cargo clippy --all-targets -- -D warnings cd ${SCRIPTPATH}/crates/types && cargo clippy --all-targets -- -D warnings diff --git a/examples/custom-routes/src/main.rs b/examples/custom-routes/src/main.rs index cf33fbaf8..20e533266 100644 --- a/examples/custom-routes/src/main.rs +++ b/examples/custom-routes/src/main.rs @@ -73,6 +73,6 @@ async fn start_fg(State(ws): State) { .build(), ); let rt_handle = ws.rt.lock().unwrap().as_ref().unwrap().clone(); - let mut fg_handle = rt_handle.start(fg).await; + let mut fg_handle = rt_handle.start(fg).await.unwrap(); dbg!(fg_handle.description().await.unwrap()); } diff --git a/perf/buffer_rand/buffer_rand.rs b/perf/buffer_rand/buffer_rand.rs index e975b7813..a1b90683d 100644 --- a/perf/buffer_rand/buffer_rand.rs +++ b/perf/buffer_rand/buffer_rand.rs @@ -9,6 +9,7 @@ use futuresdr::blocks::NullSource; use futuresdr::runtime::buffer::slab::Slab; use futuresdr::runtime::scheduler::FlowScheduler; use futuresdr::runtime::scheduler::SmolScheduler; +use futuresdr::runtime::Error; use futuresdr::runtime::Flowgraph; use futuresdr::runtime::Runtime; @@ -19,7 +20,7 @@ fn connect( dst: usize, dst_port: &'static str, slab: bool, -) -> Result<()> { +) -> std::result::Result<(), Error> { if slab { fg.connect_stream_with_type(src, src_port, dst, dst_port, Slab::new()) } else { diff --git a/perf/buffer_size/buffer_size.rs b/perf/buffer_size/buffer_size.rs index dd191f609..2de386108 100644 --- a/perf/buffer_size/buffer_size.rs +++ b/perf/buffer_size/buffer_size.rs @@ -2,7 +2,6 @@ use clap::Parser; use std::time; use futuresdr::anyhow::{Context, Result}; -// use futuresdr::blocks::Copy; use futuresdr::blocks::CopyRand; use futuresdr::blocks::Head; use futuresdr::blocks::NullSink; @@ -11,6 +10,7 @@ use futuresdr::runtime::buffer::circular::Circular; use futuresdr::runtime::buffer::slab::Slab; use futuresdr::runtime::scheduler::FlowScheduler; use futuresdr::runtime::scheduler::SmolScheduler; +use futuresdr::runtime::Error; use futuresdr::runtime::Flowgraph; use futuresdr::runtime::Runtime; @@ -22,7 +22,7 @@ fn connect( dst_port: &'static str, slab: bool, min_bytes: usize, -) -> Result<()> { +) -> std::result::Result<(), Error> { if slab { fg.connect_stream_with_type(src, src_port, dst, dst_port, Slab::with_size(min_bytes)) } else { diff --git a/src/runtime/block.rs b/src/runtime/block.rs index ade277d7a..6f56cbc72 100644 --- a/src/runtime/block.rs +++ b/src/runtime/block.rs @@ -179,13 +179,13 @@ impl TypedBlockWrapper { if i < mio.inputs().len() { i } else { - return Err(Error::InvalidHandler(PortId::Index(i))); + return Err(Error::InvalidMessagePort(None, PortId::Index(i))); } } PortId::Name(n) => match mio.input_name_to_id(&n) { Some(s) => s, None => { - return Err(Error::InvalidHandler(PortId::Name(n))); + return Err(Error::InvalidMessagePort(None, PortId::Name(n))); } }, }; @@ -311,7 +311,7 @@ impl TypedBlockWrapper { ) .await { - Err(Error::InvalidHandler(port_id)) => { + Err(Error::InvalidMessagePort(_, port_id)) => { error!( "{}: BlockMessage::Call -> Invalid Handler {port_id:?}.", meta.instance_name().unwrap(), @@ -356,7 +356,8 @@ impl TypedBlockWrapper { "{}: Error in callback. Terminating.", meta.instance_name().unwrap(), ); - let _ = tx.send(Err(Error::InvalidHandler(port_id))); + let _ = tx + .send(Err(Error::InvalidMessagePort(Some(block_id), port_id))); main_inbox .send(FlowgraphMessage::BlockError { block_id, diff --git a/src/runtime/flowgraph.rs b/src/runtime/flowgraph.rs index be89a3fc8..88f485310 100644 --- a/src/runtime/flowgraph.rs +++ b/src/runtime/flowgraph.rs @@ -52,7 +52,7 @@ impl Flowgraph { src_port: impl Into, dst_block: usize, dst_port: impl Into, - ) -> Result<()> { + ) -> Result<(), Error> { self.topology.as_mut().unwrap().connect_stream( src_block, src_port.into(), @@ -70,7 +70,7 @@ impl Flowgraph { dst_block: usize, dst_port: impl Into, buffer: B, - ) -> Result<()> { + ) -> Result<(), Error> { self.topology.as_mut().unwrap().connect_stream( src_block, src_port.into(), @@ -87,7 +87,7 @@ impl Flowgraph { src_port: impl Into, dst_block: usize, dst_port: impl Into, - ) -> Result<()> { + ) -> Result<(), Error> { self.topology.as_mut().unwrap().connect_message( src_block, src_port.into(), @@ -152,8 +152,8 @@ impl FlowgraphHandle { tx, }) .await - .map_err(|_| Error::InvalidBlock)?; - rx.await.map_err(|_| Error::HandlerError)? + .or(Err(Error::InvalidBlock(block_id)))?; + rx.await.or(Err(Error::HandlerError))? } /// Call message handler @@ -172,7 +172,7 @@ impl FlowgraphHandle { tx, }) .await - .map_err(|_| Error::InvalidBlock)?; + .map_err(|_| Error::InvalidBlock(block_id))?; rx.await.map_err(|_| Error::HandlerError)? } diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index b2bf40f62..f9e3f0acb 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -205,21 +205,30 @@ pub enum BlockMessage { #[non_exhaustive] pub enum Error { /// Block does not exist - #[error("Block does not exist")] - InvalidBlock, + #[error("Block {0} does not exist")] + InvalidBlock(usize), /// Flowgraph does not exist or terminated #[error("Flowgraph terminated")] FlowgraphTerminated, - /// Handler does not exist - #[error("Handler does not exist (Id {0:?})")] - InvalidHandler(PortId), - /// Error in Handler + /// Message port does not exist + #[error("Block {0:?} does not have message port ({1:?})")] + InvalidMessagePort(Option, PortId), + /// Stream port does not exist + #[error("Block {0} does not have stream port ({1:?})")] + InvalidStreamPort(usize, PortId), + /// Connect Error + #[error("Connect Error {0}, {1:?} -> {2}, {3:?}")] + ConnectError(usize, PortId, usize, PortId), + /// Error in handler #[error("Error in handler")] HandlerError, /// Block is already terminated #[error("Block already terminated")] BlockTerminated, /// Runtime error - #[error("Error in runtime")] - RuntimeError, + #[error("Runtime error ({0})")] + RuntimeError(String), + /// Validation error + #[error("Validation error {0}")] + ValidationError(String), } diff --git a/src/runtime/runtime.rs b/src/runtime/runtime.rs index c40d9eaa3..0f41a3e55 100644 --- a/src/runtime/runtime.rs +++ b/src/runtime/runtime.rs @@ -9,13 +9,11 @@ use futures::FutureExt; use slab::Slab; use std::fmt; use std::pin::Pin; -use std::result; use std::sync::Arc; use std::sync::Mutex; use std::task; use std::task::Poll; -use crate::anyhow::{anyhow, bail, Context, Result}; use crate::runtime; use crate::runtime::config; use crate::runtime::scheduler::Scheduler; @@ -202,14 +200,14 @@ impl<'a, S: Scheduler + Sync> Runtime<'a, S> { pub async fn start<'b>( &'a self, fg: Flowgraph, - ) -> (TaskHandle<'b, Result>, FlowgraphHandle) + ) -> (TaskHandle<'b, Result>, FlowgraphHandle) where 'a: 'b, { let queue_size = config::config().queue_size; let (fg_inbox, fg_inbox_rx) = channel::(queue_size); - let (tx, rx) = oneshot::channel::>(); + let (tx, rx) = oneshot::channel::>(); let task = self.scheduler.spawn(run_flowgraph( fg, self.scheduler.clone(), @@ -227,19 +225,22 @@ impl<'a, S: Scheduler + Sync> Runtime<'a, S> { /// /// Blocks until the flowgraph is constructed and running. #[cfg(not(target_arch = "wasm32"))] - pub fn start_sync(&self, fg: Flowgraph) -> (TaskHandle>, FlowgraphHandle) { + pub fn start_sync( + &self, + fg: Flowgraph, + ) -> (TaskHandle>, FlowgraphHandle) { block_on(self.start(fg)) } /// Start a [`Flowgraph`] on the [`Runtime`] and block until it terminates. #[cfg(not(target_arch = "wasm32"))] - pub fn run(&self, fg: Flowgraph) -> Result { + pub fn run(&self, fg: Flowgraph) -> Result { let (handle, _) = block_on(self.start(fg)); block_on(handle) } /// Start a [`Flowgraph`] on the [`Runtime`] and await its termination. - pub async fn run_async(&self, fg: Flowgraph) -> Result { + pub async fn run_async(&self, fg: Flowgraph) -> Result { let (handle, _) = self.start(fg).await; handle.await } @@ -260,12 +261,12 @@ impl<'a, S: Scheduler + Sync> Runtime<'a, S> { #[async_trait] trait Spawn { - async fn start(&self, fg: Flowgraph) -> FlowgraphHandle; + async fn start(&self, fg: Flowgraph) -> Result; } #[async_trait] impl Spawn for S { - async fn start(&self, fg: Flowgraph) -> FlowgraphHandle { + async fn start(&self, fg: Flowgraph) -> Result { use crate::runtime::runtime::run_flowgraph; use crate::runtime::FlowgraphMessage; use futures::channel::mpsc::channel; @@ -274,16 +275,19 @@ impl Spawn for S { let queue_size = config::config().queue_size; let (fg_inbox, fg_inbox_rx) = channel::(queue_size); - let (tx, rx) = oneshot::channel::>(); + let (tx, rx) = oneshot::channel::>(); self.spawn(run_flowgraph( fg, self.clone(), fg_inbox.clone(), fg_inbox_rx, tx, - )).detach(); - rx.await.expect("run_flowgraph crashed").unwrap(); - FlowgraphHandle::new(fg_inbox) + )) + .detach(); + rx.await.or(Err(Error::RuntimeError( + "run_flowgraph crashed".to_string(), + )))??; + Ok(FlowgraphHandle::new(fg_inbox)) } } @@ -310,11 +314,11 @@ impl PartialEq for RuntimeHandle { impl RuntimeHandle { /// Start a [`Flowgraph`] on the runtime - pub async fn start(&self, fg: Flowgraph) -> FlowgraphHandle { - let handle = self.scheduler.start(fg).await; + pub async fn start(&self, fg: Flowgraph) -> Result { + let handle = self.scheduler.start(fg).await?; self.add_flowgraph(handle.clone()); - handle + Ok(handle) } /// Add a [`FlowgraphHandle`] to make it available to web handlers @@ -344,12 +348,14 @@ pub(crate) async fn run_flowgraph( scheduler: S, mut main_channel: Sender, mut main_rx: Receiver, - initialized: oneshot::Sender>, -) -> Result { + initialized: oneshot::Sender>, +) -> Result { debug!("in run_flowgraph"); - let mut topology = fg.topology.take().context("flowgraph not initialized")?; + let mut topology = fg.topology.take().ok_or(Error::RuntimeError( + "Flowgraph has no topology set".to_string(), + ))?; if let Err(e) = topology.validate() { - initialized.send(Err(anyhow!("{}", &e))).unwrap(); + initialized.send(Err(e.clone())).unwrap(); return Err(e); } @@ -368,24 +374,28 @@ pub(crate) async fn run_flowgraph( inboxes[*dst] .as_mut() - .context("did not find dst block")? + .unwrap() .send(BlockMessage::StreamInputInit { dst_port: *dst_port, reader: writer.add_reader(dst_inbox, *dst_port), }) .await - .context("could not connect stream input")?; + .or(Err(Error::RuntimeError( + "Could not connect stream input".to_string(), + )))?; } inboxes[*src] .as_mut() - .context("did not find src block")? + .unwrap() .send(BlockMessage::StreamOutputInit { src_port: *src_port, writer, }) .await - .context("could not connect stream output")?; + .or(Err(Error::RuntimeError( + "Could not connect stream output".to_string(), + )))?; } debug!("connect message io"); @@ -424,7 +434,7 @@ pub(crate) async fn run_flowgraph( break; } - let m = main_rx.next().await.context("no msg")?; + let m = main_rx.next().await.unwrap(); match m { FlowgraphMessage::Initialized => i -= 1, FlowgraphMessage::BlockError { block_id, block } => { @@ -477,7 +487,7 @@ pub(crate) async fn run_flowgraph( break; } - let m = main_rx.next().await.context("no msg")?; + let m = main_rx.next().await.unwrap(); match m { FlowgraphMessage::BlockCall { block_id, @@ -496,7 +506,7 @@ pub(crate) async fn run_flowgraph( let _ = tx.send(Err(Error::BlockTerminated)); } } else { - let _ = tx.send(Err(Error::InvalidBlock)); + let _ = tx.send(Err(Error::InvalidBlock(block_id))); } } FlowgraphMessage::BlockCallback { @@ -505,7 +515,7 @@ pub(crate) async fn run_flowgraph( data, tx, } => { - let (block_tx, block_rx) = oneshot::channel::>(); + let (block_tx, block_rx) = oneshot::channel::>(); if let Some(Some(inbox)) = inboxes.get_mut(block_id) { if inbox .send(BlockMessage::Callback { @@ -524,7 +534,7 @@ pub(crate) async fn run_flowgraph( let _ = tx.send(Err(Error::BlockTerminated)); } } else { - let _ = tx.send(Err(Error::InvalidBlock)); + let _ = tx.send(Err(Error::InvalidBlock(block_id))); } } FlowgraphMessage::BlockDone { block_id, block } => { @@ -549,13 +559,16 @@ pub(crate) async fn run_flowgraph( if let Ok(b) = rx.await { let _ = tx.send(Ok(b)); } else { - let _ = tx.send(Err(Error::RuntimeError)); + let _ = tx.send(Err(Error::RuntimeError(format!( + "Block {} terminated or crashed", + block_id + )))); } } else { let _ = tx.send(Err(Error::BlockTerminated)); } } else { - let _ = tx.send(Err(Error::InvalidBlock)); + let _ = tx.send(Err(Error::InvalidBlock(block_id))); } } FlowgraphMessage::FlowgraphDescription { tx } => { @@ -606,7 +619,7 @@ pub(crate) async fn run_flowgraph( fg.topology = Some(topology); if block_error { - bail!("flowgraph error"); + return Err(Error::RuntimeError("A block raised an error".to_string())); } Ok(fg) diff --git a/src/runtime/topology.rs b/src/runtime/topology.rs index f895d0331..47ee1d8eb 100644 --- a/src/runtime/topology.rs +++ b/src/runtime/topology.rs @@ -1,11 +1,11 @@ use futures::channel::mpsc::Sender; use std::collections::HashMap; -use crate::anyhow::{anyhow, bail, Context, Result}; use crate::runtime::buffer::BufferBuilder; use crate::runtime::buffer::BufferWriter; use crate::runtime::Block; use crate::runtime::BlockMessage; +use crate::runtime::Error; use crate::runtime::PortId; use slab::Slab; use std::any::{Any, TypeId}; @@ -167,50 +167,52 @@ impl Topology { dst_block: usize, dst_port: PortId, buffer_builder: B, - ) -> Result<()> { + ) -> Result<(), Error> { let src = self .blocks .get(src_block) - .context("src block invalid")? + .ok_or(Error::InvalidBlock(src_block))? .as_ref() - .context("src block not present")?; + .ok_or(Error::InvalidBlock(src_block))?; let dst = self .blocks .get(dst_block) - .context("dst block invalid")? + .ok_or(Error::InvalidBlock(dst_block))? .as_ref() - .context("dst block not present")?; + .ok_or(Error::InvalidBlock(dst_block))?; let src_port_id = match src_port { - PortId::Name(s) => src - .stream_output_name_to_id(&s) - .context("invalid src port name")?, + PortId::Name(ref s) => src + .stream_output_name_to_id(s) + .ok_or(Error::InvalidStreamPort(src_block, src_port.clone()))?, PortId::Index(i) => { if i < src.stream_outputs().len() { i } else { - bail!("invalid src port id {}", i) + return Err(Error::InvalidStreamPort(src_block, src_port)); } } }; let sp = src.stream_output(src_port_id); let dst_port_id = match dst_port { - PortId::Name(s) => dst - .stream_input_name_to_id(&s) - .context("invalid dst port name")?, + PortId::Name(ref s) => dst + .stream_input_name_to_id(s) + .ok_or(Error::InvalidStreamPort(dst_block, dst_port.clone()))?, PortId::Index(i) => { if i < dst.stream_inputs().len() { i } else { - bail!("invalid dst port id {}", i) + return Err(Error::InvalidStreamPort(dst_block, dst_port)); } } }; let dp = dst.stream_input(dst_port_id); if sp.type_id() != dp.type_id() { - bail!("item types do not match"); + return Err(Error::ConnectError( + src_block, src_port, dst_block, dst_port, + )); } let buffer_entry = BufferBuilderEntry { @@ -233,41 +235,41 @@ impl Topology { src_port: PortId, dst_block: usize, dst_port: PortId, - ) -> Result<()> { + ) -> Result<(), Error> { let src = self .blocks .get(src_block) - .context("invalid src block")? + .ok_or(Error::InvalidBlock(src_block))? .as_ref() - .context("src block not present")?; + .ok_or(Error::InvalidBlock(src_block))?; let dst = self .blocks .get(dst_block) - .context("invalid dst block")? + .ok_or(Error::InvalidBlock(dst_block))? .as_ref() - .context("dst block not present")?; + .ok_or(Error::InvalidBlock(dst_block))?; let src_port_id = match src_port { - PortId::Name(s) => src - .message_output_name_to_id(&s) - .context("invalid src port name")?, + PortId::Name(ref s) => src + .message_output_name_to_id(s) + .ok_or(Error::InvalidMessagePort(Some(src_block), src_port.clone()))?, PortId::Index(i) => { if i < src.message_outputs().len() { i } else { - bail!("wrong src port id {}", i) + return Err(Error::InvalidMessagePort(Some(src_block), src_port.clone())); } } }; let dst_port_id = match dst_port { - PortId::Name(s) => dst - .message_input_name_to_id(&s) - .context("invalid dst port name")?, + PortId::Name(ref s) => dst + .message_input_name_to_id(s) + .ok_or(Error::InvalidMessagePort(Some(dst_block), dst_port.clone()))?, PortId::Index(i) => { if i < dst.message_outputs().len() { i } else { - bail!("wrong dst port id {}", i) + return Err(Error::InvalidMessagePort(Some(dst_block), dst_port)); } } }; @@ -282,11 +284,11 @@ impl Topology { /// /// Make sure that all stream ports are connected. Check if connections are valid, e.g., every /// stream input has exactly one connection. - pub fn validate(&self) -> Result<()> { + pub fn validate(&self) -> Result<(), Error> { // check if all stream ports are connected (neither message inputs nor outputs have to be connected) for (block_id, e) in self.blocks.iter() { if let Some(block) = e { - for (out_id, _) in block.stream_outputs().iter().enumerate() { + for (out_id, out_port) in block.stream_outputs().iter().enumerate() { if self .stream_edges .iter() @@ -294,7 +296,11 @@ impl Topology { .count() == 0 { - return Err(anyhow!("unconnected stream output port of block {:?}", block.instance_name())); + return Err(Error::ValidationError(format!( + "unconnected stream output port {:?} of block {:?}", + out_port, + block.instance_name() + ))); } } @@ -308,24 +314,39 @@ impl Topology { .count() != 1 { - bail!("stream input port does not have exactly one input"); + return Err(Error::ValidationError(format!( + "Block {} stream input {} does not have exactly one input", + block_id, input_id + ))); } } } else { - bail!("block not owned by topology"); + return Err(Error::ValidationError(format!( + "Block {} not owned by topology", + block_id + ))); } } // check if all stream edges are valid for ((src, src_port, _), v) in self.stream_edges.iter() { - let src_block = self.block_ref(*src).expect("src block not found"); + let src_block = self.block_ref(*src).ok_or(Error::ValidationError(format!( + "Source block {} not found", + src + )))?; let output = src_block.stream_output(*src_port); for (dst, dst_port) in v.iter() { - let dst_block = self.block_ref(*dst).expect("dst block not found"); + let dst_block = self.block_ref(*dst).ok_or(Error::ValidationError(format!( + "Destination block {} not found", + dst + )))?; let input = dst_block.stream_input(*dst_port); - if output.item_size() != input.item_size() { - bail!("item size of stream connection does not match"); + if output.type_id() != input.type_id() { + return Err(Error::ValidationError(format!( + "Item size of stream connection does not match ({}, {:?} -> {}, {:?})", + src, src_port, dst, dst_port + ))); } } } @@ -334,16 +355,24 @@ impl Topology { // all instance names are Some // all instance names are unique let mut v = Vec::new(); - for (_, b) in self.blocks.iter() { - let c = b.as_ref().expect("block is not set"); - let name = c.instance_name().expect("block instance name not set"); + for (i, b) in self.blocks.iter() { + let c = b.as_ref().ok_or(Error::ValidationError(format!( + "Block {} not present/not owned by topology", + i + )))?; + let name = c.instance_name().ok_or(Error::ValidationError(format!( + "Block {}, {:?} has no instance name", + i, c + )))?; v.push(name.to_string()); } v.sort(); let len = v.len(); v.dedup(); if len != v.len() { - bail!("duplicate block instance names"); + return Err(Error::ValidationError( + "Duplicate block instance names".to_string(), + )); } Ok(()) diff --git a/tests/bad_block.rs b/tests/bad_block.rs index 1eb00938c..f0b2a6bed 100644 --- a/tests/bad_block.rs +++ b/tests/bad_block.rs @@ -1,12 +1,12 @@ -use futuresdr::anyhow::{bail, Error, Result}; +use futuresdr::anyhow::{bail, Result}; use futuresdr::async_io::block_on; use futuresdr::blocks::{Head, NullSink, NullSource, Throttle}; use futuresdr::log::debug; use futuresdr::macros::async_trait; use futuresdr::macros::connect; use futuresdr::runtime::{ - Block, BlockMeta, BlockMetaBuilder, Flowgraph, Kernel, MessageIo, MessageIoBuilder, Runtime, - StreamIo, StreamIoBuilder, WorkIo, + Block, BlockMeta, BlockMetaBuilder, Error, Flowgraph, Kernel, MessageIo, MessageIoBuilder, + Runtime, StreamIo, StreamIoBuilder, WorkIo, }; use std::cmp; use std::marker::PhantomData; diff --git a/tests/channel_source.rs b/tests/channel_source.rs index bd2550bed..246c0358a 100644 --- a/tests/channel_source.rs +++ b/tests/channel_source.rs @@ -1,4 +1,4 @@ -use futuresdr::anyhow::Result; +use futuresdr::anyhow::{anyhow, Result}; use futuresdr::async_io::block_on; use futuresdr::blocks::ChannelSource; use futuresdr::blocks::VectorSink; @@ -21,7 +21,7 @@ fn channel_source_min() -> Result<()> { let (fg, _) = rt.start(fg).await; tx.send(vec![0, 1, 2].into_boxed_slice()).await?; tx.close().await?; - fg.await as Result + fg.await.map_err(|e| anyhow!("Flowgraph error, {e}")) })?; let snk = fg.kernel::>(snk).unwrap(); @@ -48,7 +48,7 @@ fn channel_source_small() -> Result<()> { tx.send(vec![].into_boxed_slice()).await?; tx.send(vec![5].into_boxed_slice()).await?; tx.close().await?; - fg.await as Result + fg.await.map_err(|e| anyhow!("Flowgraph error, {e}")) })?; let snk = fg.kernel::>(snk).unwrap(); @@ -73,7 +73,7 @@ fn channel_source_big() -> Result<()> { tx.send(vec![0; 99999].into_boxed_slice()).await?; tx.send(vec![1; 88888].into_boxed_slice()).await?; tx.close().await?; - fg.await as Result + fg.await.map_err(|e| anyhow!("Flowgraph error, {e}")) })?; let snk = fg.kernel::>(snk).unwrap();