Skip to content

Commit

Permalink
Tweak to message handler error propagation to ensure cause is logged.
Browse files Browse the repository at this point in the history
  • Loading branch information
metasim authored and bastibl committed Dec 6, 2024
1 parent c22bad3 commit 9d264b2
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 15 deletions.
14 changes: 7 additions & 7 deletions src/runtime/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl<T: Kernel + Send + 'static> TypedBlock<T> {
}
let h = mio.input(id).get_handler();
let f = (h)(kernel, io, mio, meta, p);
f.await.or(Err(Error::HandlerError))
f.await.map_err(|e| Error::HandlerError(e.to_string()))
}

async fn run_impl(
Expand Down Expand Up @@ -326,12 +326,12 @@ impl<T: Kernel + Send + 'static> TypedBlock<T> {
meta.instance_name().unwrap(),
);
}
Err(Error::HandlerError) => {
Err(e @ Error::HandlerError(..)) => {
error!(
"{}: BlockMessage::Call -> HandlerError. Terminating.",
"{}: BlockMessage::Call -> {e}. Terminating.",
meta.instance_name().unwrap(),
);
return Err(Error::HandlerError);
return Err(e);
}
_ => {}
}
Expand All @@ -347,16 +347,16 @@ impl<T: Kernel + Send + 'static> TypedBlock<T> {
)
.await
{
Err(Error::HandlerError) => {
Err(e @ Error::HandlerError(..)) => {
error!(
"{}: Error in callback. Terminating.",
"{}: BlockMessage::Callback -> {e}. Terminating.",
meta.instance_name().unwrap(),
);
let _ = tx.send(Err(Error::InvalidMessagePort(
BlockPortCtx::Id(block_id),
port_id,
)));
return Err(Error::HandlerError);
return Err(e);
}
res => {
let _ = tx.send(res);
Expand Down
4 changes: 2 additions & 2 deletions src/runtime/flowgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl FlowgraphHandle {
})
.await
.or(Err(Error::InvalidBlock(block_id)))?;
rx.await.or(Err(Error::HandlerError))?
rx.await?
}

/// Call message handler
Expand All @@ -177,7 +177,7 @@ impl FlowgraphHandle {
})
.await
.map_err(|_| Error::InvalidBlock(block_id))?;
rx.await.map_err(|_| Error::HandlerError)?
rx.await?
}

/// Get [`FlowgraphDescription`]
Expand Down
2 changes: 1 addition & 1 deletion src/runtime/mocker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl<K: Kernel + 'static> Mocker<K> {
} = &mut self.block;
let h = mio.input(id).get_handler();
let f = (h)(kernel, &mut io, mio, meta, p);
async_io::block_on(f).or(Err(Error::HandlerError))
async_io::block_on(f).map_err(|e| Error::HandlerError(e.to_string()))
}

/// Get data from output buffer
Expand Down
7 changes: 5 additions & 2 deletions src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ pub enum Error {
#[error("Connect error: {0}")]
ConnectError(Box<ConnectCtx>),
/// Error in handler
#[error("Error in handler")]
HandlerError,
#[error("Error in message handler: {0}")]
HandlerError(String),
/// Block is already terminated
#[error("Block already terminated")]
BlockTerminated,
Expand All @@ -260,6 +260,9 @@ pub enum Error {
/// Duplicate block name
#[error("A Block with an instance name of '{0}' already exists")]
DuplicateBlockName(String),
/// Error returned from a Receiver when the corresponding Sender is dropped
#[error(transparent)]
ChannelCanceled(#[from] oneshot::Canceled),
}

#[cfg(feature = "seify")]
Expand Down
6 changes: 3 additions & 3 deletions src/runtime/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,9 +534,9 @@ pub(crate) async fn run_flowgraph<S: Scheduler>(
.await
.is_ok()
{
match block_rx.await {
Ok(Ok(p)) => tx.send(Ok(p)).ok(),
_ => tx.send(Err(Error::HandlerError)).ok(),
match block_rx.await? {
Ok(p) => tx.send(Ok(p)).ok(),
Err(e) => tx.send(Err(Error::HandlerError(e.to_string()))).ok(),
};
} else {
let _ = tx.send(Err(Error::BlockTerminated));
Expand Down

0 comments on commit 9d264b2

Please sign in to comment.