From ce5014b34c094dc92c36d506cf32dd50304facd3 Mon Sep 17 00:00:00 2001 From: Arnaud Fontaine <33485997+github-af@users.noreply.github.com> Date: Wed, 10 Jul 2024 15:46:31 +0200 Subject: [PATCH] try to fix issue #3 --- src/protocol.rs | 13 ++++++--- src/receive/decoding.rs | 58 +++++++++++++++++++---------------------- src/receive/dispatch.rs | 16 ++++++++++-- 3 files changed, 50 insertions(+), 37 deletions(-) diff --git a/src/protocol.rs b/src/protocol.rs index 83435d3..27fb799 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -56,6 +56,7 @@ impl From for Error { pub(crate) enum MessageType { Heartbeat, + LostSynchronization, Start, Data, Abort, @@ -66,6 +67,7 @@ impl MessageType { fn serialized(self) -> u8 { match self { Self::Heartbeat => ID_HEARTBEAT, + Self::LostSynchronization => ID_LOST_SYNCHRONIZATION, Self::Start => ID_START, Self::Data => ID_DATA, Self::Abort => ID_ABORT, @@ -78,6 +80,7 @@ impl fmt::Display for MessageType { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { match self { Self::Heartbeat => write!(fmt, "Heartbeat"), + Self::LostSynchronization => write!(fmt, "LostSynchronization"), Self::Start => write!(fmt, "Start"), Self::Data => write!(fmt, "Data"), Self::Abort => write!(fmt, "Abort"), @@ -87,10 +90,11 @@ impl fmt::Display for MessageType { } const ID_HEARTBEAT: u8 = 0x00; -const ID_START: u8 = 0x01; -const ID_DATA: u8 = 0x02; -const ID_ABORT: u8 = 0x03; -const ID_END: u8 = 0x04; +const ID_LOST_SYNCHRONIZATION: u8 = 0x01; +const ID_START: u8 = 0x02; +const ID_DATA: u8 = 0x03; +const ID_ABORT: u8 = 0x04; +const ID_END: u8 = 0x05; pub(crate) type ClientId = u32; @@ -150,6 +154,7 @@ impl Message { pub(crate) fn message_type(&self) -> Result { match self.0.get(4) { Some(&ID_HEARTBEAT) => Ok(MessageType::Heartbeat), + Some(&ID_LOST_SYNCHRONIZATION) => Ok(MessageType::LostSynchronization), Some(&ID_START) => Ok(MessageType::Start), Some(&ID_DATA) => Ok(MessageType::Data), Some(&ID_ABORT) => Ok(MessageType::Abort), diff --git a/src/receive/decoding.rs b/src/receive/decoding.rs index 8e1e989..64e5e72 100644 --- a/src/receive/decoding.rs +++ b/src/receive/decoding.rs @@ -1,7 +1,5 @@ //! Worker that decodes RaptorQ packets into protocol messages -use std::{cmp::Ordering, thread::yield_now}; - use crate::{protocol, receive}; pub(crate) fn start(receiver: &receive::Receiver) -> Result<(), receive::Error> { @@ -23,38 +21,36 @@ pub(crate) fn start(receiver: &receive::Receiver) -> Result<(), receive::E match decoder.decode(packets) { None => { - log::warn!("lost block {block_id}"); + log::error!("lost block {block_id}, synchronization lost"); + + let message = + protocol::Message::new(protocol::MessageType::LostSynchronization, 0, 0, None); + + receiver.to_dispatch.send(message)?; + continue; } Some(block) => { - log::trace!("block {} decoded with {} bytes!", block_id, block.len()); - - let mut retry_cnt = 0; - - loop { - let mut to_receive = receiver.block_to_receive.lock().expect("acquire lock"); - match block_id.cmp(&to_receive) { - Ordering::Equal => { - receiver - .to_dispatch - .send(protocol::Message::deserialize(block))?; - *to_receive = to_receive.wrapping_add(1); - break; - } - Ordering::Greater => { - // Thread is too late, drop the packet and kill the current job - log::warn!("Dropping the packet {block_id}"); - break; - } - Ordering::Less => { - if retry_cnt < 10 { - retry_cnt +=1; - yield_now(); - } else { - break; - } - - } + log::trace!("block {block_id} decoded with {} bytes!", block.len()); + + let mut to_receive = receiver.block_to_receive.lock().expect("acquire lock"); + + if *to_receive == block_id { + receiver + .to_dispatch + .send(protocol::Message::deserialize(block))?; + *to_receive = to_receive.wrapping_add(1); + break; + } else { + if to_receive.wrapping_sub(1) == block_id { + log::warn!("receiving block {block_id} from the past, assuming it was already decoded successfully"); + break; + } else { + log::error!("receiving unknown block {block_id}, synchronization lost"); + let message = + protocol::Message::new(protocol::MessageType::LostSynchronization, 0, 0, None); + receiver.to_dispatch.send(message)?; + break; } } } diff --git a/src/receive/dispatch.rs b/src/receive/dispatch.rs index ade2cfc..917a156 100644 --- a/src/receive/dispatch.rs +++ b/src/receive/dispatch.rs @@ -56,12 +56,24 @@ pub(crate) fn start(receiver: &receive::Receiver) -> Result<(), receive::E continue; } + protocol::MessageType::LostSynchronization => { + log::error!("synchronization lost, aborting all transfers"); + for (client_id, client_sendq) in active_transfers.into_iter() { + let message = + protocol::Message::new(protocol::MessageType::Abort, 0, client_id, None); + if let Err(e) = client_sendq.send(message) { + log::warn!("failed to send abnort to client {client_id:x}: {e}"); + } + failed_transfers.insert(client_id); + } + active_transfers = BTreeMap::new(); + continue; + } + protocol::MessageType::Start => { let (client_sendq, client_recvq) = crossbeam_channel::unbounded::(); - active_transfers.insert(client_id, client_sendq); - receiver.to_clients.send((client_id, client_recvq))?; }