diff --git a/src/runtime/block.rs b/src/runtime/block.rs index cdbd6dc2..f5a7f147 100644 --- a/src/runtime/block.rs +++ b/src/runtime/block.rs @@ -10,8 +10,6 @@ use std::fmt; use std::future::Future; use std::pin::Pin; -use crate::anyhow::Context; -use crate::anyhow::Result; use crate::runtime::BlockDescription; use crate::runtime::BlockMessage; use crate::runtime::BlockMeta; @@ -78,7 +76,7 @@ pub trait Kernel: Send { _s: &mut StreamIo, _m: &mut MessageIo, _b: &mut BlockMeta, - ) -> Result<()> { + ) -> anyhow::Result<()> { Ok(()) } /// Initialize kernel @@ -87,7 +85,7 @@ pub trait Kernel: Send { _s: &mut StreamIo, _m: &mut MessageIo, _b: &mut BlockMeta, - ) -> Result<()> { + ) -> anyhow::Result<()> { Ok(()) } /// De-initialize kernel @@ -96,7 +94,7 @@ pub trait Kernel: Send { _s: &mut StreamIo, _m: &mut MessageIo, _b: &mut BlockMeta, - ) -> Result<()> { + ) -> anyhow::Result<()> { Ok(()) } } @@ -111,7 +109,7 @@ pub trait BlockT: Send + Any { block_id: usize, main_inbox: Sender, inbox: Receiver, - ); + ) -> Result<(), Error>; // ##### META fn instance_name(&self) -> Option<&str>; @@ -161,19 +159,11 @@ impl TypedBlock { } } - /// Set Instance name of Block. + /// Set instance name of block. pub fn set_instance_name(&mut self, name: impl Into) { self.meta.set_instance_name(name) } -} - -pub(crate) struct TypedBlockWrapper { - pub(crate) inner: Option>, -} -// clippy bug -#[allow(clippy::needless_pass_by_ref_mut)] -impl TypedBlockWrapper { async fn call_handler( io: &mut WorkIo, mio: &mut MessageIo, @@ -212,16 +202,18 @@ impl TypedBlockWrapper { } async fn run_impl( - TypedBlock { - mut meta, - mut sio, - mut mio, - mut kernel, - }: TypedBlock, + &mut self, block_id: usize, mut main_inbox: Sender, mut inbox: Receiver, - ) -> Result<()> { + ) -> Result<(), Error> { + let TypedBlock { + meta, + sio, + mio, + kernel, + } = self; + // init work io let mut work_io = WorkIo { call_again: false, @@ -231,29 +223,23 @@ impl TypedBlockWrapper { // setup phase loop { - match inbox.next().await.context("no msg")? { + match inbox + .next() + .await + .ok_or_else(|| Error::RuntimeError("no msg".to_string()))? + { BlockMessage::Initialize => { - if let Err(e) = kernel.init(&mut sio, &mut mio, &mut meta).await { + if let Err(e) = kernel.init(sio, mio, meta).await { error!( "{}: Error during initialization. Terminating.", meta.instance_name().unwrap() ); - main_inbox - .send(FlowgraphMessage::BlockError { - block_id, - block: Block(Box::new(TypedBlockWrapper { - inner: Some(TypedBlock { - sio, - mio, - meta, - kernel, - }), - })), - }) - .await?; - return Err(e); + return Err(Error::RuntimeError(e.to_string())); } else { - main_inbox.send(FlowgraphMessage::Initialized).await?; + main_inbox + .send(FlowgraphMessage::Initialized) + .await + .map_err(|e| Error::RuntimeError(e.to_string()))?; } break; } @@ -315,15 +301,8 @@ impl TypedBlockWrapper { work_io.finished = true; } Some(Some(BlockMessage::Call { port_id, data })) => { - match Self::call_handler( - &mut work_io, - &mut mio, - &mut meta, - &mut kernel, - port_id, - data, - ) - .await + match Self::call_handler(&mut work_io, mio, meta, kernel, port_id, data) + .await { Err(Error::InvalidMessagePort(_, port_id)) => { error!( @@ -336,20 +315,7 @@ impl TypedBlockWrapper { "{}: BlockMessage::Call -> HandlerError. Terminating.", meta.instance_name().unwrap(), ); - main_inbox - .send(FlowgraphMessage::BlockError { - block_id, - block: Block(Box::new(TypedBlockWrapper { - inner: Some(TypedBlock { - sio, - mio, - meta, - kernel, - }), - })), - }) - .await?; - return Err(Error::HandlerError.into()); + return Err(Error::HandlerError); } _ => {} } @@ -357,9 +323,9 @@ impl TypedBlockWrapper { Some(Some(BlockMessage::Callback { port_id, data, tx })) => { match Self::call_handler( &mut work_io, - &mut mio, - &mut meta, - &mut kernel, + mio, + meta, + kernel, port_id.clone(), data, ) @@ -374,20 +340,7 @@ impl TypedBlockWrapper { BlockPortCtx::Id(block_id), port_id, ))); - main_inbox - .send(FlowgraphMessage::BlockError { - block_id, - block: Block(Box::new(TypedBlockWrapper { - inner: Some(TypedBlock { - sio, - mio, - meta, - kernel, - }), - })), - }) - .await?; - return Err(Error::HandlerError.into()); + return Err(Error::HandlerError); } res => { let _ = tx.send(res); @@ -409,21 +362,8 @@ impl TypedBlockWrapper { join_all(sio.outputs_mut().iter_mut().map(|o| o.notify_finished())).await; join_all(mio.outputs_mut().iter_mut().map(|o| o.notify_finished())).await; - match kernel.deinit(&mut sio, &mut mio, &mut meta).await { + match kernel.deinit(sio, mio, meta).await { Ok(_) => { - let _ = main_inbox - .send(FlowgraphMessage::BlockDone { - block_id, - block: Block(Box::new(TypedBlockWrapper { - inner: Some(TypedBlock { - sio, - mio, - meta, - kernel, - }), - })), - }) - .await; break; } Err(e) => { @@ -432,20 +372,7 @@ impl TypedBlockWrapper { meta.instance_name().unwrap(), e ); - main_inbox - .send(FlowgraphMessage::BlockError { - block_id, - block: Block(Box::new(TypedBlockWrapper { - inner: Some(TypedBlock { - sio, - mio, - meta, - kernel, - }), - })), - }) - .await?; - return Err(e); + return Err(Error::RuntimeError(e.to_string())); } }; } @@ -472,29 +399,13 @@ impl TypedBlockWrapper { // ================== work work_io.call_again = false; - if let Err(e) = kernel - .work(&mut work_io, &mut sio, &mut mio, &mut meta) - .await - { + if let Err(e) = kernel.work(&mut work_io, sio, mio, meta).await { error!( "{}: Error in work(). Terminating. ({:?})", meta.instance_name().unwrap(), e ); - main_inbox - .send(FlowgraphMessage::BlockError { - block_id, - block: Block(Box::new(TypedBlockWrapper { - inner: Some(TypedBlock { - sio, - mio, - meta, - kernel, - }), - })), - }) - .await?; - return Err(e); + return Err(Error::RuntimeError(e.to_string())); } sio.commit(); @@ -506,7 +417,7 @@ impl TypedBlockWrapper { } #[async_trait] -impl BlockT for TypedBlockWrapper { +impl BlockT for TypedBlock { // ##### Block fn as_any(&self) -> &dyn Any { self @@ -517,18 +428,16 @@ impl BlockT for TypedBlockWrapper { // ##### META fn instance_name(&self) -> Option<&str> { - self.inner.as_ref().and_then(|i| i.meta.instance_name()) + self.meta.instance_name() } fn set_instance_name(&mut self, name: &str) { - if let Some(i) = self.inner.as_mut() { - i.meta.set_instance_name(name) - } + self.meta.set_instance_name(name) } fn type_name(&self) -> &str { - self.inner.as_ref().map(|i| i.meta.type_name()).unwrap() + self.meta.type_name() } fn is_blocking(&self) -> bool { - self.inner.as_ref().map(|i| i.meta.is_blocking()).unwrap() + self.meta.is_blocking() } // ##### KERNEL @@ -537,17 +446,8 @@ impl BlockT for TypedBlockWrapper { block_id: usize, main_inbox: Sender, inbox: Receiver, - ) { - let block = self.inner.take().unwrap(); - - let instance_name = block - .meta - .instance_name() - .unwrap_or("") - .to_string(); - if let Err(e) = Self::run_impl(block, block_id, main_inbox, inbox).await { - error!("{}: Error in Block.run() {:?}", instance_name, e); - } + ) -> Result<(), Error> { + self.run_impl(block_id, main_inbox, inbox).await } // ##### STREAM IO @@ -555,50 +455,36 @@ impl BlockT for TypedBlockWrapper { &mut self, f: Box, ) { - if let Some(i) = self.inner.as_mut() { - i.sio.set_tag_propagation(f) - } + self.sio.set_tag_propagation(f) } fn stream_inputs(&self) -> &Vec { - self.inner.as_ref().map(|i| i.sio.inputs()).unwrap() + self.sio.inputs() } fn stream_input(&self, id: usize) -> &StreamInput { - self.inner.as_ref().map(|i| i.sio.input_ref(id)).unwrap() + self.sio.input_ref(id) } fn stream_input_name_to_id(&self, name: &str) -> Option { - self.inner - .as_ref() - .map(|i| i.sio.input_name_to_id(name)) - .unwrap() + self.sio.input_name_to_id(name) } fn stream_outputs(&self) -> &Vec { - self.inner.as_ref().map(|i| i.sio.outputs()).unwrap() + self.sio.outputs() } fn stream_output(&self, id: usize) -> &StreamOutput { - self.inner.as_ref().map(|i| i.sio.output_ref(id)).unwrap() + self.sio.output_ref(id) } fn stream_output_name_to_id(&self, name: &str) -> Option { - self.inner - .as_ref() - .map(|i| i.sio.output_name_to_id(name)) - .unwrap() + self.sio.output_name_to_id(name) } // ##### MESSAGE IO fn message_input_name_to_id(&self, name: &str) -> Option { - self.inner - .as_ref() - .map(|i| i.mio.input_name_to_id(name)) - .unwrap() + self.mio.input_name_to_id(name) } fn message_outputs(&self) -> &Vec { - self.inner.as_ref().map(|i| i.mio.outputs()).unwrap() + self.mio.outputs() } fn message_output_name_to_id(&self, name: &str) -> Option { - self.inner - .as_ref() - .map(|i| i.mio.output_name_to_id(name)) - .unwrap() + self.mio.output_name_to_id(name) } } @@ -616,33 +502,29 @@ impl Block { mio: MessageIo, kernel: T, ) -> Block { - Self(Box::new(TypedBlockWrapper { - inner: Some(TypedBlock { - meta, - sio, - mio, - kernel, - }), + Self(Box::new(TypedBlock { + meta, + sio, + mio, + kernel, })) } /// Create block by wrapping a [`TypedBlock`]. pub fn from_typed(b: TypedBlock) -> Block { - Self(Box::new(TypedBlockWrapper { inner: Some(b) })) + Self(Box::new(b)) } /// Try to cast to a given kernel type pub fn kernel(&self) -> Option<&T> { self.0 .as_any() - .downcast_ref::>() - .and_then(|b| b.inner.as_ref()) + .downcast_ref::>() .map(|b| &b.kernel) } /// Try to mutably cast to a given kernel type pub fn kernel_mut(&mut self) -> Option<&T> { self.0 .as_any_mut() - .downcast_mut::>() - .and_then(|b| b.inner.as_mut()) + .downcast_mut::>() .map(|b| &b.kernel) } @@ -667,10 +549,32 @@ impl Block { pub(crate) async fn run( mut self, block_id: usize, - main_inbox: Sender, + mut main_inbox: Sender, inbox: Receiver, ) { - self.0.run(block_id, main_inbox, inbox).await + match self.0.run(block_id, main_inbox.clone(), inbox).await { + Ok(_) => { + let _ = main_inbox + .send(FlowgraphMessage::BlockDone { + block_id, + block: self, + }) + .await; + } + Err(e) => { + let instance_name = self + .instance_name() + .unwrap_or("") + .to_string(); + error!("{}: Error in Block.run() {:?}", instance_name, e); + let _ = main_inbox + .send(FlowgraphMessage::BlockError { + block_id, + block: self, + }) + .await; + } + } } // ##### STREAM IO @@ -728,14 +632,6 @@ impl From> for Block { } } -impl fmt::Debug for TypedBlockWrapper { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("AsyncBlock") - .field("type_name", &self.type_name().to_string()) - .finish() - } -} - impl fmt::Debug for dyn BlockT { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("BlockT")