Skip to content

Commit

Permalink
test: add test
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Dec 11, 2024
1 parent f862ece commit fa339b7
Showing 1 changed file with 32 additions and 0 deletions.
32 changes: 32 additions & 0 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1286,4 +1286,36 @@ mod tests {
tokio::time::sleep(Duration::from_millis(500)).await;
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn tx_pipeline_closed() -> ZResult<()> {
// Pipeline
let tct = TransportPriorityTx::make(Bits::from(TransportSn::MAX))?;
let priorities = vec![tct];
let (producer, mut consumer) =
TransmissionPipeline::make(CONFIG_NOT_STREAMED, priorities.as_slice());
// Drop consumer to close the pipeline
drop(consumer);
// Push a message and verify that it's rejected with an error
let message: NetworkMessage = Push {
wire_expr: "test".into(),
ext_qos: ext::QoSType::new(Priority::Control, CongestionControl::Block, false),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
payload: PushBody::Put(Put {
timestamp: None,
encoding: Encoding::empty(),
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: None,
ext_unknown: vec![],
payload: vec![42u8].into(),
}),
}
.into();
assert!(producer.push_network_message(message.clone()).is_err());

Ok(())
}
}

0 comments on commit fa339b7

Please sign in to comment.