Skip to content

Commit

Permalink
better error handling
Browse files Browse the repository at this point in the history
(cherry picked from commit 665e9eb)
  • Loading branch information
bastibl committed Nov 14, 2024
1 parent 1572791 commit 3a987de
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 21 deletions.
8 changes: 5 additions & 3 deletions crates/macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,19 +163,21 @@ pub fn connect(attr: proc_macro::TokenStream) -> proc_macro::TokenStream {

out.extend(quote! {
use futuresdr::runtime::Block;
use futuresdr::runtime::Error;
use futuresdr::runtime::Flowgraph;
use std::result::Result;

struct FgOp;
trait Add<T> {
fn add(fg: &mut Flowgraph, b: T) -> Result<usize>;
fn add(fg: &mut Flowgraph, b: T) -> Result<usize, Error>;
}
impl Add<usize> for FgOp {
fn add(_fg: &mut Flowgraph, b: usize) -> Result<usize> {
fn add(_fg: &mut Flowgraph, b: usize) -> Result<usize, Error> {
Ok(b)
}
}
impl Add<Block> for FgOp {
fn add(fg: &mut Flowgraph, b: Block) -> Result<usize> {
fn add(fg: &mut Flowgraph, b: Block) -> Result<usize, Error> {
fg.add_block(b)
}
}
Expand Down
38 changes: 21 additions & 17 deletions src/runtime/flowgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ use futures::SinkExt;
use std::cmp::PartialEq;
use std::fmt::Debug;
use std::hash::Hash;
use std::result;

use crate::anyhow::Result;
#[cfg(not(target_arch = "wasm32"))]
use crate::runtime::buffer::circular::Circular;
#[cfg(target_arch = "wasm32")]
Expand Down Expand Up @@ -41,8 +39,8 @@ impl Flowgraph {
}

/// Add [`Block`] to flowgraph
pub fn add_block(&mut self, block: Block) -> Result<usize> {
Ok(self.topology.as_mut().unwrap().add_block(block)?)
pub fn add_block(&mut self, block: Block) -> Result<usize, Error> {
self.topology.as_mut().unwrap().add_block(block)
}

/// Make stream connection
Expand Down Expand Up @@ -148,8 +146,8 @@ impl FlowgraphHandle {
block_id: usize,
port_id: impl Into<PortId>,
data: Pmt,
) -> result::Result<(), Error> {
let (tx, rx) = oneshot::channel::<result::Result<(), Error>>();
) -> Result<(), Error> {
let (tx, rx) = oneshot::channel::<Result<(), Error>>();
self.inbox
.send(FlowgraphMessage::BlockCall {
block_id,
Expand All @@ -168,8 +166,8 @@ impl FlowgraphHandle {
block_id: usize,
port_id: impl Into<PortId>,
data: Pmt,
) -> result::Result<Pmt, Error> {
let (tx, rx) = oneshot::channel::<result::Result<Pmt, Error>>();
) -> Result<Pmt, Error> {
let (tx, rx) = oneshot::channel::<Result<Pmt, Error>>();
self.inbox
.send(FlowgraphMessage::BlockCallback {
block_id,
Expand All @@ -183,7 +181,7 @@ impl FlowgraphHandle {
}

/// Get [`FlowgraphDescription`]
pub async fn description(&mut self) -> result::Result<FlowgraphDescription, Error> {
pub async fn description(&mut self) -> Result<FlowgraphDescription, Error> {
let (tx, rx) = oneshot::channel::<FlowgraphDescription>();
self.inbox
.send(FlowgraphMessage::FlowgraphDescription { tx })
Expand All @@ -194,28 +192,34 @@ impl FlowgraphHandle {
}

/// Get [`BlockDescription`]
pub async fn block_description(&mut self, block_id: usize) -> Result<BlockDescription> {
let (tx, rx) = oneshot::channel::<result::Result<BlockDescription, Error>>();
pub async fn block_description(&mut self, block_id: usize) -> Result<BlockDescription, Error> {
let (tx, rx) = oneshot::channel::<Result<BlockDescription, Error>>();
self.inbox
.send(FlowgraphMessage::BlockDescription { block_id, tx })
.await?;
let d = rx.await??;
.await
.map_err(|_| Error::InvalidBlock(block_id))?;
let d = rx.await.map_err(|_| Error::InvalidBlock(block_id))??;
Ok(d)
}

/// Send a terminate message to the [`Flowgraph`]
///
/// Does not wait until the [`Flowgraph`] is actually terminated.
pub async fn terminate(&mut self) -> Result<()> {
self.inbox.send(FlowgraphMessage::Terminate).await?;
pub async fn terminate(&mut self) -> Result<(), Error> {
self.inbox
.send(FlowgraphMessage::Terminate)
.await
.map_err(|_| Error::FlowgraphTerminated)?;
Ok(())
}

/// Terminate the [`Flowgraph`]
///
/// Send a terminate message to the [`Flowgraph`] and wait until it is shutdown.
pub async fn terminate_and_wait(&mut self) -> Result<()> {
self.terminate().await?;
pub async fn terminate_and_wait(&mut self) -> Result<(), Error> {
self.terminate()
.await
.map_err(|_| Error::FlowgraphTerminated)?;
while !self.inbox.is_closed() {
#[cfg(not(target_arch = "wasm32"))]
async_io::Timer::after(std::time::Duration::from_millis(200)).await;
Expand Down
2 changes: 1 addition & 1 deletion src/runtime/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl Topology {
}

/// Adds a [Block] to the [Topology] returning the `id` of the [Block] in the [Topology].
pub fn add_block(&mut self, mut block: Block) -> Result<usize, crate::runtime::Error> {
pub fn add_block(&mut self, mut block: Block) -> Result<usize, Error> {
if let Some(name) = block.instance_name() {
if self.block_id(name).is_some() {
return Err(Error::DuplicateBlockName(name.to_string()));
Expand Down

0 comments on commit 3a987de

Please sign in to comment.