From 8e643c6982a487beb04fdc03c04f47568e8417ac Mon Sep 17 00:00:00 2001 From: Bastian Bloessl Date: Fri, 15 Nov 2024 10:09:25 +0100 Subject: [PATCH] mocker: message IO --- crates/types/src/port_id.rs | 2 +- src/blocks/message_copy.rs | 8 +++- src/runtime/mocker.rs | 74 ++++++++++++++++++++++++++++++++- src/runtime/mod.rs | 6 +-- tests/mocker.rs | 81 +++++++++++++++++++++++++++++++++++++ tests/tag.rs | 63 ----------------------------- 6 files changed, 164 insertions(+), 70 deletions(-) delete mode 100644 tests/tag.rs diff --git a/crates/types/src/port_id.rs b/crates/types/src/port_id.rs index 103b7f907..061eda7ef 100644 --- a/crates/types/src/port_id.rs +++ b/crates/types/src/port_id.rs @@ -1,7 +1,7 @@ use std::fmt; /// Port Identifier -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum PortId { /// Index Index(usize), diff --git a/src/blocks/message_copy.rs b/src/blocks/message_copy.rs index e317403e7..607c83f03 100644 --- a/src/blocks/message_copy.rs +++ b/src/blocks/message_copy.rs @@ -6,6 +6,7 @@ use crate::runtime::MessageIo; use crate::runtime::MessageIoBuilder; use crate::runtime::Pmt; use crate::runtime::StreamIoBuilder; +use crate::runtime::TypedBlock; use crate::runtime::WorkIo; /// Forward messages. @@ -14,7 +15,12 @@ pub struct MessageCopy {} impl MessageCopy { /// Create MessageCopy block pub fn new() -> Block { - Block::new( + Self::new_typed().into() + } + + /// Create MessageCopy block + pub fn new_typed() -> TypedBlock { + TypedBlock::new( BlockMetaBuilder::new("MessageCopy").build(), StreamIoBuilder::new().build(), MessageIoBuilder::new() diff --git a/src/runtime/mocker.rs b/src/runtime/mocker.rs index e47bb2b6f..14869e1c9 100644 --- a/src/runtime/mocker.rs +++ b/src/runtime/mocker.rs @@ -1,14 +1,21 @@ +use futures::channel::mpsc::channel; +use futures::channel::mpsc::Receiver; use futures::channel::mpsc::Sender; use std::any::Any; use std::fmt::Debug; use crate::runtime::buffer::BufferReaderHost; use crate::runtime::buffer::BufferWriterHost; +use crate::runtime::config::config; use crate::runtime::BlockMessage; +use crate::runtime::BlockPortCtx; use crate::runtime::BufferReader; use crate::runtime::BufferWriter; +use crate::runtime::Error; use crate::runtime::ItemTag; use crate::runtime::Kernel; +use crate::runtime::Pmt; +use crate::runtime::PortId; use crate::runtime::TypedBlock; use crate::runtime::WorkIo; @@ -17,12 +24,28 @@ use crate::runtime::WorkIo; /// A harness to run a block without a runtime. Used for unit tests and benchmarking. pub struct Mocker { block: TypedBlock, + message_sinks: Vec>, + messages: Vec>, } impl Mocker { /// Create mocker - pub fn new(block: TypedBlock) -> Self { - Mocker { block } + pub fn new(mut block: TypedBlock) -> Self { + let mut messages = Vec::new(); + let mut message_sinks = Vec::new(); + let msg_len = config().queue_size; + for (n, p) in block.mio.outputs_mut().iter_mut().enumerate() { + messages.push(Vec::new()); + let (tx, rx) = channel(msg_len); + message_sinks.push(rx); + p.connect(n, tx); + } + + Mocker { + block, + message_sinks, + messages, + } } /// Add input buffer with given data @@ -68,6 +91,31 @@ impl Mocker { .init(BufferWriter::Host(Box::new(MockWriter::::new(size)))); } + /// Post a PMT to a message handler of the block. + pub fn post(&mut self, id: PortId, p: Pmt) -> Result { + let id = match id { + PortId::Name(ref n) => self + .block + .mio + .input_name_to_id(n) + .ok_or(Error::InvalidMessagePort(BlockPortCtx::None, id))?, + PortId::Index(id) => id, + }; + + let mut io = WorkIo { + call_again: false, + finished: false, + block_on: None, + }; + + let TypedBlock { + meta, mio, kernel, .. + } = &mut self.block; + let h = mio.input(id).get_handler(); + let f = (h)(kernel, &mut io, mio, meta, p); + async_io::block_on(f).or(Err(Error::HandlerError)) + } + /// Get data from output buffer pub fn output(&mut self, id: usize) -> (Vec, Vec) where @@ -132,6 +180,16 @@ impl Mocker { }); } + /// Get produced PMTs from output message ports. + pub fn messages(&self) -> Vec> { + self.messages.clone() + } + + /// Take produced PMTs from output message ports. + pub fn take_messages(&mut self) -> Vec> { + std::mem::take(&mut self.messages) + } + /// Run the mocker async pub async fn run_async(&mut self) { let mut io = WorkIo { @@ -152,6 +210,18 @@ impl Mocker { .await .unwrap(); self.block.sio.commit(); + + for (n, r) in self.message_sinks.iter_mut().enumerate() { + while let Ok(Some(m)) = r.try_next() { + match m { + BlockMessage::Call { data, .. } => { + self.messages[n].push(data); + } + _ => panic!("Mocked Block produced unexpected BlockMessage {:?}", m), + } + } + } + if !io.call_again { break; } else { diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 6c72c6d72..2e65337ae 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -207,7 +207,7 @@ pub enum BlockMessage { } /// FutureSDR Error -#[derive(Error, Debug, Clone)] +#[derive(Error, Debug, Clone, PartialEq, Eq)] #[non_exhaustive] pub enum Error { /// Block does not exist @@ -268,7 +268,7 @@ impl From for Error { } /// Container for information supporting `ConnectError` -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct ConnectCtx { /// Source block ID pub src_block_id: usize, @@ -330,7 +330,7 @@ impl Display for ConnectCtx { /// Description of the [`Block`] under which an [`InvalidMessagePort`] or /// [`InvalidStreamPort`] error occurred. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum BlockPortCtx { /// BlockId is not specified None, diff --git a/tests/mocker.rs b/tests/mocker.rs index 17599010e..a71a3d847 100644 --- a/tests/mocker.rs +++ b/tests/mocker.rs @@ -1,5 +1,12 @@ +use futuresdr::anyhow::Result; use futuresdr::blocks::Apply; +use futuresdr::blocks::MessageCopy; +use futuresdr::runtime::copy_tag_propagation; +use futuresdr::runtime::ItemTag; use futuresdr::runtime::Mocker; +use futuresdr::runtime::Pmt; +use futuresdr::runtime::PortId; +use futuresdr::runtime::Tag; use rand::Rng; #[test] @@ -24,3 +31,77 @@ fn multi_input_mock() { assert_eq!(a + 1, *b); } } + +#[test] +fn tags_through_mock() -> Result<()> { + let mut noop = Apply::<_, f32, f32>::new_typed(|x| *x); + noop.sio.set_tag_propagation(Box::new(copy_tag_propagation)); + + let mut mock = Mocker::new(noop); + let input = vec![0.0_f32; 1024]; + let tags = vec![ + ItemTag { + index: 0, + tag: Tag::Id(0), + }, + ItemTag { + index: 256, + tag: Tag::Id(256), + }, + ItemTag { + index: 555, + tag: Tag::Id(555), + }, + ]; + mock.init(); + mock.init_output::(0, input.len() * 2); + mock.input(0, input.clone()); + mock.run(); + + let (out_buffer, out_tags) = mock.output::(0); + assert_eq!(out_buffer.len(), 1024); + assert_eq!(out_tags.len(), 0); + + mock.input_with_tags(0, input, tags.clone()); + mock.run(); + mock.deinit(); + + let (out_buffer, out_tags) = mock.output::(0); + assert_eq!(out_buffer.len(), 2048); + assert_eq!(out_tags.len(), 3); + + for (i, tag) in tags.iter().enumerate() { + assert_eq!(out_tags[i].index, tag.index + 1024); + let Tag::Id(tag_id) = tag.tag else { + unreachable!() + }; + assert!(matches!(out_tags[i].tag, Tag::Id(t) if t == tag_id)); + } + + let (out_buffer, out_tags) = mock.take_output::(0); + assert_eq!(out_buffer.len(), 2048); + assert_eq!(out_tags.len(), 3); + + let (out_buffer, out_tags) = mock.output::(0); + assert_eq!(out_buffer.len(), 0); + assert_eq!(out_tags.len(), 0); + + Ok(()) +} + +#[test] +fn mock_pmts() -> Result<()> { + let copy = MessageCopy::new_typed(); + + let mut mock = Mocker::new(copy); + mock.init(); + + let ret = mock.post(PortId::Index(0), Pmt::Usize(123)); + assert_eq!(ret, Ok(Pmt::Ok)); + mock.run(); + + let pmts = mock.take_messages(); + assert_eq!(pmts, vec![vec![Pmt::Usize(123)]]); + + Ok(()) +} diff --git a/tests/tag.rs b/tests/tag.rs deleted file mode 100644 index 293c0a002..000000000 --- a/tests/tag.rs +++ /dev/null @@ -1,63 +0,0 @@ -use futuresdr::anyhow::Result; -use futuresdr::blocks::Apply; -use futuresdr::runtime::copy_tag_propagation; -use futuresdr::runtime::ItemTag; -use futuresdr::runtime::Mocker; -use futuresdr::runtime::Tag; - -#[test] -fn tags_through_mock() -> Result<()> { - let mut noop = Apply::<_, f32, f32>::new_typed(|x| *x); - noop.sio.set_tag_propagation(Box::new(copy_tag_propagation)); - - let mut mock = Mocker::new(noop); - let input = vec![0.0_f32; 1024]; - let tags = vec![ - ItemTag { - index: 0, - tag: Tag::Id(0), - }, - ItemTag { - index: 256, - tag: Tag::Id(256), - }, - ItemTag { - index: 555, - tag: Tag::Id(555), - }, - ]; - mock.init(); - mock.init_output::(0, input.len() * 2); - mock.input(0, input.clone()); - mock.run(); - - let (out_buffer, out_tags) = mock.output::(0); - assert_eq!(out_buffer.len(), 1024); - assert_eq!(out_tags.len(), 0); - - mock.input_with_tags(0, input, tags.clone()); - mock.run(); - mock.deinit(); - - let (out_buffer, out_tags) = mock.output::(0); - assert_eq!(out_buffer.len(), 2048); - assert_eq!(out_tags.len(), 3); - - for (i, tag) in tags.iter().enumerate() { - assert_eq!(out_tags[i].index, tag.index + 1024); - let Tag::Id(tag_id) = tag.tag else { - unreachable!() - }; - assert!(matches!(out_tags[i].tag, Tag::Id(t) if t == tag_id)); - } - - let (out_buffer, out_tags) = mock.take_output::(0); - assert_eq!(out_buffer.len(), 2048); - assert_eq!(out_tags.len(), 3); - - let (out_buffer, out_tags) = mock.output::(0); - assert_eq!(out_buffer.len(), 0); - assert_eq!(out_tags.len(), 0); - - Ok(()) -}