diff --git a/src/runtime/block.rs b/src/runtime/block.rs index 1b548b01..7ddfc345 100644 --- a/src/runtime/block.rs +++ b/src/runtime/block.rs @@ -214,7 +214,7 @@ impl TypedBlock { } let h = mio.input(id).get_handler(); let f = (h)(kernel, io, mio, meta, p); - f.await.or(Err(Error::HandlerError)) + f.await.map_err(|e| Error::HandlerError(e.to_string())) } async fn run_impl( @@ -326,12 +326,12 @@ impl TypedBlock { meta.instance_name().unwrap(), ); } - Err(Error::HandlerError) => { + Err(e @ Error::HandlerError(..)) => { error!( - "{}: BlockMessage::Call -> HandlerError. Terminating.", + "{}: BlockMessage::Call -> {e}. Terminating.", meta.instance_name().unwrap(), ); - return Err(Error::HandlerError); + return Err(e); } _ => {} } @@ -347,16 +347,16 @@ impl TypedBlock { ) .await { - Err(Error::HandlerError) => { + Err(e @ Error::HandlerError(..)) => { error!( - "{}: Error in callback. Terminating.", + "{}: BlockMessage::Callback -> {e}. Terminating.", meta.instance_name().unwrap(), ); let _ = tx.send(Err(Error::InvalidMessagePort( BlockPortCtx::Id(block_id), port_id, ))); - return Err(Error::HandlerError); + return Err(e); } res => { let _ = tx.send(res); diff --git a/src/runtime/flowgraph.rs b/src/runtime/flowgraph.rs index a60f7432..ff0ecf61 100644 --- a/src/runtime/flowgraph.rs +++ b/src/runtime/flowgraph.rs @@ -157,7 +157,7 @@ impl FlowgraphHandle { }) .await .or(Err(Error::InvalidBlock(block_id)))?; - rx.await.or(Err(Error::HandlerError))? + rx.await? } /// Call message handler @@ -177,7 +177,7 @@ impl FlowgraphHandle { }) .await .map_err(|_| Error::InvalidBlock(block_id))?; - rx.await.map_err(|_| Error::HandlerError)? + rx.await? } /// Get [`FlowgraphDescription`] diff --git a/src/runtime/mocker.rs b/src/runtime/mocker.rs index b43a0952..151d9c41 100644 --- a/src/runtime/mocker.rs +++ b/src/runtime/mocker.rs @@ -113,7 +113,7 @@ impl Mocker { } = &mut self.block; let h = mio.input(id).get_handler(); let f = (h)(kernel, &mut io, mio, meta, p); - async_io::block_on(f).or(Err(Error::HandlerError)) + async_io::block_on(f).map_err(|e| Error::HandlerError(e.to_string())) } /// Get data from output buffer diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index cff41d94..f1a75d9f 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -237,8 +237,8 @@ pub enum Error { #[error("Connect error: {0}")] ConnectError(Box), /// Error in handler - #[error("Error in handler")] - HandlerError, + #[error("Error in message handler: {0}")] + HandlerError(String), /// Block is already terminated #[error("Block already terminated")] BlockTerminated, @@ -260,6 +260,9 @@ pub enum Error { /// Duplicate block name #[error("A Block with an instance name of '{0}' already exists")] DuplicateBlockName(String), + /// Error returned from a Receiver when the corresponding Sender is dropped + #[error(transparent)] + ChannelCanceled(#[from] oneshot::Canceled), } #[cfg(feature = "seify")] diff --git a/src/runtime/runtime.rs b/src/runtime/runtime.rs index b943ae48..581f7728 100644 --- a/src/runtime/runtime.rs +++ b/src/runtime/runtime.rs @@ -534,9 +534,9 @@ pub(crate) async fn run_flowgraph( .await .is_ok() { - match block_rx.await { - Ok(Ok(p)) => tx.send(Ok(p)).ok(), - _ => tx.send(Err(Error::HandlerError)).ok(), + match block_rx.await? { + Ok(p) => tx.send(Ok(p)).ok(), + Err(e) => tx.send(Err(Error::HandlerError(e.to_string()))).ok(), }; } else { let _ = tx.send(Err(Error::BlockTerminated));