diff --git a/amqprs/src/net/reader_handler.rs b/amqprs/src/net/reader_handler.rs index 8ae0b7b..442a991 100644 --- a/amqprs/src/net/reader_handler.rs +++ b/amqprs/src/net/reader_handler.rs @@ -21,6 +21,8 @@ use super::{ ///////////////////////////////////////////////////////////////////////////// +const MAX_HEARTBEAT_MISS: u64 = 3; + pub(crate) struct ReaderHandler { stream: BufIoReader, @@ -211,6 +213,7 @@ impl ReaderHandler { let max_interval: u64 = heartbeat.into(); let mut expiration = time::Instant::now() + time::Duration::from_secs(max_interval); let mut is_network_failure = false; + let mut heartbeat_miss = 0; loop { tokio::select! { biased; @@ -285,10 +288,15 @@ impl ReaderHandler { if expiration <= time::Instant::now() { expiration = time::Instant::now() + time::Duration::from_secs(max_interval); - // TODO: what to do with missing heartbeat? // should call self.io_failure_notify.notify_one();? #[cfg(feature="traces")] error!("missing heartbeat from server for {}", self.amqp_connection); + heartbeat_miss += 1; + if heartbeat_miss >= MAX_HEARTBEAT_MISS { + // Shutdown connection due to heartbeat timeout + is_network_failure = true; + break; + } } } else => {