Skip to content

Commit

Permalink
runtime: better error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
bastibl committed Jun 17, 2024
1 parent 0bb90be commit 88182f1
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 103 deletions.
1 change: 0 additions & 1 deletion check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ cd ${SCRIPTPATH}/examples/zigbee && cargo fmt --check
cd ${SCRIPTPATH} && cargo clippy --all-targets --workspace --features=vulkan,zeromq,audio,flow_scheduler,tpb_scheduler,soapy,lttng,zynq,wgpu -- -D warnings
cd ${SCRIPTPATH}/crates/futuredsp && cargo clippy --all-targets -- -D warnings
cd ${SCRIPTPATH}/crates/macros && cargo clippy --all-targets -- -D warnings
cd ${SCRIPTPATH}/crates/prophecy && cargo clippy --all-targets -- -D warnings
cd ${SCRIPTPATH}/crates/remote && cargo clippy --all-targets -- -D warnings
cd ${SCRIPTPATH}/crates/types && cargo clippy --all-targets -- -D warnings

Expand Down
2 changes: 1 addition & 1 deletion examples/custom-routes/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,6 @@ async fn start_fg(State(ws): State<WebState>) {
.build(),
);
let rt_handle = ws.rt.lock().unwrap().as_ref().unwrap().clone();
let mut fg_handle = rt_handle.start(fg).await;
let mut fg_handle = rt_handle.start(fg).await.unwrap();
dbg!(fg_handle.description().await.unwrap());
}
3 changes: 2 additions & 1 deletion perf/buffer_rand/buffer_rand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use futuresdr::blocks::NullSource;
use futuresdr::runtime::buffer::slab::Slab;
use futuresdr::runtime::scheduler::FlowScheduler;
use futuresdr::runtime::scheduler::SmolScheduler;
use futuresdr::runtime::Error;
use futuresdr::runtime::Flowgraph;
use futuresdr::runtime::Runtime;

Expand All @@ -19,7 +20,7 @@ fn connect(
dst: usize,
dst_port: &'static str,
slab: bool,
) -> Result<()> {
) -> std::result::Result<(), Error> {
if slab {
fg.connect_stream_with_type(src, src_port, dst, dst_port, Slab::new())
} else {
Expand Down
4 changes: 2 additions & 2 deletions perf/buffer_size/buffer_size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use clap::Parser;
use std::time;

use futuresdr::anyhow::{Context, Result};
// use futuresdr::blocks::Copy;
use futuresdr::blocks::CopyRand;
use futuresdr::blocks::Head;
use futuresdr::blocks::NullSink;
Expand All @@ -11,6 +10,7 @@ use futuresdr::runtime::buffer::circular::Circular;
use futuresdr::runtime::buffer::slab::Slab;
use futuresdr::runtime::scheduler::FlowScheduler;
use futuresdr::runtime::scheduler::SmolScheduler;
use futuresdr::runtime::Error;
use futuresdr::runtime::Flowgraph;
use futuresdr::runtime::Runtime;

Expand All @@ -22,7 +22,7 @@ fn connect(
dst_port: &'static str,
slab: bool,
min_bytes: usize,
) -> Result<()> {
) -> std::result::Result<(), Error> {
if slab {
fg.connect_stream_with_type(src, src_port, dst, dst_port, Slab::with_size(min_bytes))
} else {
Expand Down
9 changes: 5 additions & 4 deletions src/runtime/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,13 @@ impl<T: Kernel + Send + 'static> TypedBlockWrapper<T> {
if i < mio.inputs().len() {
i
} else {
return Err(Error::InvalidHandler(PortId::Index(i)));
return Err(Error::InvalidMessagePort(None, PortId::Index(i)));
}
}
PortId::Name(n) => match mio.input_name_to_id(&n) {
Some(s) => s,
None => {
return Err(Error::InvalidHandler(PortId::Name(n)));
return Err(Error::InvalidMessagePort(None, PortId::Name(n)));
}
},
};
Expand Down Expand Up @@ -311,7 +311,7 @@ impl<T: Kernel + Send + 'static> TypedBlockWrapper<T> {
)
.await
{
Err(Error::InvalidHandler(port_id)) => {
Err(Error::InvalidMessagePort(_, port_id)) => {
error!(
"{}: BlockMessage::Call -> Invalid Handler {port_id:?}.",
meta.instance_name().unwrap(),
Expand Down Expand Up @@ -356,7 +356,8 @@ impl<T: Kernel + Send + 'static> TypedBlockWrapper<T> {
"{}: Error in callback. Terminating.",
meta.instance_name().unwrap(),
);
let _ = tx.send(Err(Error::InvalidHandler(port_id)));
let _ = tx
.send(Err(Error::InvalidMessagePort(Some(block_id), port_id)));
main_inbox
.send(FlowgraphMessage::BlockError {
block_id,
Expand Down
12 changes: 6 additions & 6 deletions src/runtime/flowgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl Flowgraph {
src_port: impl Into<PortId>,
dst_block: usize,
dst_port: impl Into<PortId>,
) -> Result<()> {
) -> Result<(), Error> {
self.topology.as_mut().unwrap().connect_stream(
src_block,
src_port.into(),
Expand All @@ -70,7 +70,7 @@ impl Flowgraph {
dst_block: usize,
dst_port: impl Into<PortId>,
buffer: B,
) -> Result<()> {
) -> Result<(), Error> {
self.topology.as_mut().unwrap().connect_stream(
src_block,
src_port.into(),
Expand All @@ -87,7 +87,7 @@ impl Flowgraph {
src_port: impl Into<PortId>,
dst_block: usize,
dst_port: impl Into<PortId>,
) -> Result<()> {
) -> Result<(), Error> {
self.topology.as_mut().unwrap().connect_message(
src_block,
src_port.into(),
Expand Down Expand Up @@ -152,8 +152,8 @@ impl FlowgraphHandle {
tx,
})
.await
.map_err(|_| Error::InvalidBlock)?;
rx.await.map_err(|_| Error::HandlerError)?
.or(Err(Error::InvalidBlock(block_id)))?;
rx.await.or(Err(Error::HandlerError))?
}

/// Call message handler
Expand All @@ -172,7 +172,7 @@ impl FlowgraphHandle {
tx,
})
.await
.map_err(|_| Error::InvalidBlock)?;
.map_err(|_| Error::InvalidBlock(block_id))?;
rx.await.map_err(|_| Error::HandlerError)?
}

Expand Down
25 changes: 17 additions & 8 deletions src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,21 +205,30 @@ pub enum BlockMessage {
#[non_exhaustive]
pub enum Error {
/// Block does not exist
#[error("Block does not exist")]
InvalidBlock,
#[error("Block {0} does not exist")]
InvalidBlock(usize),
/// Flowgraph does not exist or terminated
#[error("Flowgraph terminated")]
FlowgraphTerminated,
/// Handler does not exist
#[error("Handler does not exist (Id {0:?})")]
InvalidHandler(PortId),
/// Error in Handler
/// Message port does not exist
#[error("Block {0:?} does not have message port ({1:?})")]
InvalidMessagePort(Option<usize>, PortId),
/// Stream port does not exist
#[error("Block {0} does not have stream port ({1:?})")]
InvalidStreamPort(usize, PortId),
/// Connect Error
#[error("Connect Error {0}, {1:?} -> {2}, {3:?}")]
ConnectError(usize, PortId, usize, PortId),
/// Error in handler
#[error("Error in handler")]
HandlerError,
/// Block is already terminated
#[error("Block already terminated")]
BlockTerminated,
/// Runtime error
#[error("Error in runtime")]
RuntimeError,
#[error("Runtime error ({0})")]
RuntimeError(String),
/// Validation error
#[error("Validation error {0}")]
ValidationError(String),
}
Loading

0 comments on commit 88182f1

Please sign in to comment.