From 157b80dead4f943a2ee6548eb2245787fbe9c5bd Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Fri, 4 Nov 2022 11:39:41 -0400 Subject: [PATCH] Panic with consistent message on TCP failure (#486) To allow downstream crates to sniff out these panics and downgrade them, if appropriate. --- communication/src/allocator/zero_copy/tcp.rs | 45 +++++++++++++------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/communication/src/allocator/zero_copy/tcp.rs b/communication/src/allocator/zero_copy/tcp.rs index 5f3855b87..d9950fb83 100644 --- a/communication/src/allocator/zero_copy/tcp.rs +++ b/communication/src/allocator/zero_copy/tcp.rs @@ -1,6 +1,6 @@ //! -use std::io::{Read, Write}; +use std::io::{self, Read, Write}; use std::net::TcpStream; use crossbeam_channel::{Sender, Receiver}; @@ -13,12 +13,22 @@ use logging_core::Logger; use crate::logging::{CommunicationEvent, CommunicationSetup, MessageEvent, StateEvent}; +fn tcp_panic(context: &'static str, cause: io::Error) -> ! { + // NOTE: some downstream crates sniff out "timely communication error:" from + // the panic message. Avoid removing or rewording this message if possible. + // It'd be nice to instead use `panic_any` here with a structured error + // type, but the panic message for `panic_any` is no good (Box). + panic!("timely communication error: {}: {}", context, cause) +} + /// Repeatedly reads from a TcpStream and carves out messages. /// /// The intended communication pattern is a sequence of (header, message)^* for valid /// messages, followed by a header for a zero length message indicating the end of stream. -/// If the stream ends without being shut down, the receive thread panics in an attempt to -/// take down the computation and cause the failures to cascade. +/// +/// If the stream ends without being shut down, or if reading from the stream fails, the +/// receive thread panics with a message that starts with "timely communication error:" +/// in an attempt to take down the computation and cause the failures to cascade. pub fn recv_loop( mut reader: TcpStream, targets: Vec>, @@ -56,15 +66,16 @@ pub fn recv_loop( // Attempt to read some more bytes into self.buffer. let read = match reader.read(&mut buffer.empty()) { + Err(x) => tcp_panic("reading data", x), + Ok(n) if n == 0 => { + tcp_panic( + "reading data", + std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "socket closed"), + ); + } Ok(n) => n, - Err(x) => { - // We don't expect this, as socket closure results in Ok(0) reads. - println!("Error: {:?}", x); - 0 - }, }; - assert!(read > 0); buffer.make_valid(read); // Consume complete messages from the front of self.buffer. @@ -89,7 +100,7 @@ pub fn recv_loop( panic!("Clean shutdown followed by data."); } buffer.ensure_capacity(1); - if reader.read(&mut buffer.empty()).expect("read failure") > 0 { + if reader.read(&mut buffer.empty()).unwrap_or_else(|e| tcp_panic("reading EOF", e)) > 0 { panic!("Clean shutdown followed by data."); } } @@ -111,6 +122,10 @@ pub fn recv_loop( /// /// The intended communication pattern is a sequence of (header, message)^* for valid /// messages, followed by a header for a zero length message indicating the end of stream. +/// +/// If writing to the stream fails, the send thread panics with a message that starts with +/// "timely communication error:" in an attempt to take down the computation and cause the +/// failures to cascade. pub fn send_loop( // TODO: Maybe we don't need BufWriter with consolidation in writes. writer: TcpStream, @@ -148,7 +163,7 @@ pub fn send_loop( // still be a signal incoming. // // We could get awoken by more data, a channel closing, or spuriously perhaps. - writer.flush().expect("Failed to flush writer."); + writer.flush().unwrap_or_else(|e| tcp_panic("flushing writer", e)); sources.retain(|source| !source.is_complete()); if !sources.is_empty() { std::thread::park(); @@ -167,7 +182,7 @@ pub fn send_loop( } }); - writer.write_all(&bytes[..]).expect("Write failure in send_loop."); + writer.write_all(&bytes[..]).unwrap_or_else(|e| tcp_panic("writing data", e)); } } } @@ -182,9 +197,9 @@ pub fn send_loop( length: 0, seqno: 0, }; - header.write_to(&mut writer).expect("Failed to write header!"); - writer.flush().expect("Failed to flush writer."); - writer.get_mut().shutdown(::std::net::Shutdown::Write).expect("Write shutdown failed"); + header.write_to(&mut writer).unwrap_or_else(|e| tcp_panic("writing data", e)); + writer.flush().unwrap_or_else(|e| tcp_panic("flushing writer", e)); + writer.get_mut().shutdown(::std::net::Shutdown::Write).unwrap_or_else(|e| tcp_panic("shutting down writer", e)); logger.as_mut().map(|logger| logger.log(MessageEvent { is_send: true, header })); // Log the send thread's end.