diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index d69e27631..b939bbbc1 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -1286,4 +1286,36 @@ mod tests { tokio::time::sleep(Duration::from_millis(500)).await; } } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn tx_pipeline_closed() -> ZResult<()> { + // Pipeline + let tct = TransportPriorityTx::make(Bits::from(TransportSn::MAX))?; + let priorities = vec![tct]; + let (producer, mut consumer) = + TransmissionPipeline::make(CONFIG_NOT_STREAMED, priorities.as_slice()); + // Drop consumer to close the pipeline + drop(consumer); + // Push a message and verify that it's rejected with an error + let message: NetworkMessage = Push { + wire_expr: "test".into(), + ext_qos: ext::QoSType::new(Priority::Control, CongestionControl::Block, false), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + payload: PushBody::Put(Put { + timestamp: None, + encoding: Encoding::empty(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_attachment: None, + ext_unknown: vec![], + payload: vec![42u8].into(), + }), + } + .into(); + assert!(producer.push_network_message(message.clone()).is_err()); + + Ok(()) + } }