Skip to content

Commit

Permalink
fix: fix dumb merge resolution
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Dec 11, 2024
1 parent 19b15a5 commit 9d051b1
Showing 1 changed file with 103 additions and 3 deletions.
106 changes: 103 additions & 3 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ impl TransmissionPipelineProducer {
let (wait_time, max_wait_time) = if msg.is_droppable() {
// Checked if we are blocked on the priority queue and we drop directly the message
if self.status.is_congested(priority) {
return false;
return Ok(false);
}
(self.wait_before_drop.0, Some(self.wait_before_drop.1))
} else {
Expand All @@ -806,11 +806,11 @@ impl TransmissionPipelineProducer {
let mut deadline = Deadline::new(wait_time, max_wait_time);
// Lock the channel. We are the only one that will be writing on it.
let mut queue = zlock!(self.stage_in[idx]);
let sent = queue.push_network_message(&mut msg, priority, &mut deadline);
let sent = queue.push_network_message(&mut msg, priority, &mut deadline)?;
if !sent {
self.status.set_congested(priority, true);
}
sent
Ok(sent)
}

#[inline]
Expand Down Expand Up @@ -1286,4 +1286,104 @@ mod tests {
tokio::time::sleep(Duration::from_millis(500)).await;
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn tx_pipeline_closed() -> ZResult<()> {
fn schedule(queue: TransmissionPipelineProducer, counter: Arc<AtomicUsize>, id: usize) {
// Make sure to put only one message per batch: set the payload size
// to half of the batch in such a way the serialized zenoh message
// will be larger then half of the batch size (header + payload).
let payload_size = (CONFIG_STREAMED.batch.mtu / 2) as usize;

// Send reliable messages
let key = "test".into();
let payload = ZBuf::from(vec![0_u8; payload_size]);

let message: NetworkMessage = Push {
wire_expr: key,
ext_qos: ext::QoSType::new(Priority::Control, CongestionControl::Block, false),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
payload: PushBody::Put(Put {
timestamp: None,
encoding: Encoding::empty(),
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: None,
ext_unknown: vec![],
payload,
}),
}
.into();

// The last push should block since there shouldn't any more batches
// available for serialization.
let num_msg = 1 + CONFIG_STREAMED.queue_size[0];
for i in 0..num_msg {
println!(
"Pipeline Blocking [>>>]: ({id}) Scheduling message #{i} with payload size of {payload_size} bytes"
);
queue.push_network_message(message.clone()).unwrap();
let c = counter.fetch_add(1, Ordering::AcqRel);
println!(
"Pipeline Blocking [>>>]: ({}) Scheduled message #{} (tot {}) with payload size of {} bytes",
id, i, c + 1,
payload_size
);
}
}

// Pipeline
let tct = TransportPriorityTx::make(Bits::from(TransportSn::MAX))?;
let priorities = vec![tct];
let (producer, mut consumer) =
TransmissionPipeline::make(CONFIG_NOT_STREAMED, priorities.as_slice());

let counter = Arc::new(AtomicUsize::new(0));

let c_producer = producer.clone();
let c_counter = counter.clone();
let h1 = task::spawn_blocking(move || {
schedule(c_producer, c_counter, 1);
});

let c_counter = counter.clone();
let h2 = task::spawn_blocking(move || {
schedule(producer, c_counter, 2);
});

// Wait to have sent enough messages and to have blocked
println!(
"Pipeline Blocking [---]: waiting to have {} messages being scheduled",
CONFIG_STREAMED.queue_size[Priority::MAX as usize]
);
let check = async {
while counter.load(Ordering::Acquire)
< CONFIG_STREAMED.queue_size[Priority::MAX as usize]
{
tokio::time::sleep(SLEEP).await;
}
};

timeout(TIMEOUT, check).await?;

// Disable and drain the queue
timeout(
TIMEOUT,
task::spawn_blocking(move || {
println!("Pipeline Blocking [---]: draining the queue");
let _ = consumer.drain();
}),
)
.await??;

// Make sure that the tasks scheduling have been unblocked
println!("Pipeline Blocking [---]: waiting for schedule (1) to be unblocked");
timeout(TIMEOUT, h1).await??;
println!("Pipeline Blocking [---]: waiting for schedule (2) to be unblocked");
timeout(TIMEOUT, h2).await??;

Ok(())
}
}

0 comments on commit 9d051b1

Please sign in to comment.