Skip to content

Commit

Permalink
mocker: message IO
Browse files Browse the repository at this point in the history
  • Loading branch information
bastibl committed Nov 15, 2024
1 parent 3a987de commit 8e643c6
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 70 deletions.
2 changes: 1 addition & 1 deletion crates/types/src/port_id.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt;

/// Port Identifier
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PortId {
/// Index
Index(usize),
Expand Down
8 changes: 7 additions & 1 deletion src/blocks/message_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::runtime::MessageIo;
use crate::runtime::MessageIoBuilder;
use crate::runtime::Pmt;
use crate::runtime::StreamIoBuilder;
use crate::runtime::TypedBlock;
use crate::runtime::WorkIo;

/// Forward messages.
Expand All @@ -14,7 +15,12 @@ pub struct MessageCopy {}
impl MessageCopy {
/// Create MessageCopy block
pub fn new() -> Block {
Block::new(
Self::new_typed().into()
}

/// Create MessageCopy block
pub fn new_typed() -> TypedBlock<Self> {
TypedBlock::new(
BlockMetaBuilder::new("MessageCopy").build(),
StreamIoBuilder::new().build(),
MessageIoBuilder::new()
Expand Down
74 changes: 72 additions & 2 deletions src/runtime/mocker.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
use futures::channel::mpsc::channel;
use futures::channel::mpsc::Receiver;
use futures::channel::mpsc::Sender;
use std::any::Any;
use std::fmt::Debug;

use crate::runtime::buffer::BufferReaderHost;
use crate::runtime::buffer::BufferWriterHost;
use crate::runtime::config::config;
use crate::runtime::BlockMessage;
use crate::runtime::BlockPortCtx;
use crate::runtime::BufferReader;
use crate::runtime::BufferWriter;
use crate::runtime::Error;
use crate::runtime::ItemTag;
use crate::runtime::Kernel;
use crate::runtime::Pmt;
use crate::runtime::PortId;
use crate::runtime::TypedBlock;
use crate::runtime::WorkIo;

Expand All @@ -17,12 +24,28 @@ use crate::runtime::WorkIo;
/// A harness to run a block without a runtime. Used for unit tests and benchmarking.
pub struct Mocker<K> {
block: TypedBlock<K>,
message_sinks: Vec<Receiver<BlockMessage>>,
messages: Vec<Vec<Pmt>>,
}

impl<K: Kernel + 'static> Mocker<K> {
/// Create mocker
pub fn new(block: TypedBlock<K>) -> Self {
Mocker { block }
pub fn new(mut block: TypedBlock<K>) -> Self {
let mut messages = Vec::new();
let mut message_sinks = Vec::new();
let msg_len = config().queue_size;
for (n, p) in block.mio.outputs_mut().iter_mut().enumerate() {
messages.push(Vec::new());
let (tx, rx) = channel(msg_len);
message_sinks.push(rx);
p.connect(n, tx);
}

Mocker {
block,
message_sinks,
messages,
}
}

/// Add input buffer with given data
Expand Down Expand Up @@ -68,6 +91,31 @@ impl<K: Kernel + 'static> Mocker<K> {
.init(BufferWriter::Host(Box::new(MockWriter::<T>::new(size))));
}

/// Post a PMT to a message handler of the block.
pub fn post(&mut self, id: PortId, p: Pmt) -> Result<Pmt, Error> {
let id = match id {
PortId::Name(ref n) => self
.block
.mio
.input_name_to_id(n)
.ok_or(Error::InvalidMessagePort(BlockPortCtx::None, id))?,
PortId::Index(id) => id,
};

let mut io = WorkIo {
call_again: false,
finished: false,
block_on: None,
};

let TypedBlock {
meta, mio, kernel, ..
} = &mut self.block;
let h = mio.input(id).get_handler();
let f = (h)(kernel, &mut io, mio, meta, p);
async_io::block_on(f).or(Err(Error::HandlerError))
}

/// Get data from output buffer
pub fn output<T>(&mut self, id: usize) -> (Vec<T>, Vec<ItemTag>)
where
Expand Down Expand Up @@ -132,6 +180,16 @@ impl<K: Kernel + 'static> Mocker<K> {
});
}

/// Get produced PMTs from output message ports.
pub fn messages(&self) -> Vec<Vec<Pmt>> {
self.messages.clone()
}

/// Take produced PMTs from output message ports.
pub fn take_messages(&mut self) -> Vec<Vec<Pmt>> {
std::mem::take(&mut self.messages)
}

/// Run the mocker async
pub async fn run_async(&mut self) {
let mut io = WorkIo {
Expand All @@ -152,6 +210,18 @@ impl<K: Kernel + 'static> Mocker<K> {
.await
.unwrap();
self.block.sio.commit();

for (n, r) in self.message_sinks.iter_mut().enumerate() {
while let Ok(Some(m)) = r.try_next() {
match m {
BlockMessage::Call { data, .. } => {
self.messages[n].push(data);
}
_ => panic!("Mocked Block produced unexpected BlockMessage {:?}", m),
}
}
}

if !io.call_again {
break;
} else {
Expand Down
6 changes: 3 additions & 3 deletions src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ pub enum BlockMessage {
}

/// FutureSDR Error
#[derive(Error, Debug, Clone)]
#[derive(Error, Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum Error {
/// Block does not exist
Expand Down Expand Up @@ -268,7 +268,7 @@ impl From<PmtConversionError> for Error {
}

/// Container for information supporting `ConnectError`
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ConnectCtx {
/// Source block ID
pub src_block_id: usize,
Expand Down Expand Up @@ -330,7 +330,7 @@ impl Display for ConnectCtx {

/// Description of the [`Block`] under which an [`InvalidMessagePort`] or
/// [`InvalidStreamPort`] error occurred.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BlockPortCtx {
/// BlockId is not specified
None,
Expand Down
81 changes: 81 additions & 0 deletions tests/mocker.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
use futuresdr::anyhow::Result;
use futuresdr::blocks::Apply;
use futuresdr::blocks::MessageCopy;
use futuresdr::runtime::copy_tag_propagation;
use futuresdr::runtime::ItemTag;
use futuresdr::runtime::Mocker;
use futuresdr::runtime::Pmt;
use futuresdr::runtime::PortId;
use futuresdr::runtime::Tag;
use rand::Rng;

#[test]
Expand All @@ -24,3 +31,77 @@ fn multi_input_mock() {
assert_eq!(a + 1, *b);
}
}

#[test]
fn tags_through_mock() -> Result<()> {
let mut noop = Apply::<_, f32, f32>::new_typed(|x| *x);
noop.sio.set_tag_propagation(Box::new(copy_tag_propagation));

let mut mock = Mocker::new(noop);
let input = vec![0.0_f32; 1024];
let tags = vec![
ItemTag {
index: 0,
tag: Tag::Id(0),
},
ItemTag {
index: 256,
tag: Tag::Id(256),
},
ItemTag {
index: 555,
tag: Tag::Id(555),
},
];
mock.init();
mock.init_output::<f32>(0, input.len() * 2);
mock.input(0, input.clone());
mock.run();

let (out_buffer, out_tags) = mock.output::<f32>(0);
assert_eq!(out_buffer.len(), 1024);
assert_eq!(out_tags.len(), 0);

mock.input_with_tags(0, input, tags.clone());
mock.run();
mock.deinit();

let (out_buffer, out_tags) = mock.output::<f32>(0);
assert_eq!(out_buffer.len(), 2048);
assert_eq!(out_tags.len(), 3);

for (i, tag) in tags.iter().enumerate() {
assert_eq!(out_tags[i].index, tag.index + 1024);
let Tag::Id(tag_id) = tag.tag else {
unreachable!()
};
assert!(matches!(out_tags[i].tag, Tag::Id(t) if t == tag_id));
}

let (out_buffer, out_tags) = mock.take_output::<f32>(0);
assert_eq!(out_buffer.len(), 2048);
assert_eq!(out_tags.len(), 3);

let (out_buffer, out_tags) = mock.output::<f32>(0);
assert_eq!(out_buffer.len(), 0);
assert_eq!(out_tags.len(), 0);

Ok(())
}

#[test]
fn mock_pmts() -> Result<()> {
let copy = MessageCopy::new_typed();

let mut mock = Mocker::new(copy);
mock.init();

let ret = mock.post(PortId::Index(0), Pmt::Usize(123));
assert_eq!(ret, Ok(Pmt::Ok));
mock.run();

let pmts = mock.take_messages();
assert_eq!(pmts, vec![vec![Pmt::Usize(123)]]);

Ok(())
}
63 changes: 0 additions & 63 deletions tests/tag.rs

This file was deleted.

0 comments on commit 8e643c6

Please sign in to comment.