diff --git a/nativelink-util/src/buf_channel.rs b/nativelink-util/src/buf_channel.rs index d797cb7650..ed1f9b0ec2 100644 --- a/nativelink-util/src/buf_channel.rs +++ b/nativelink-util/src/buf_channel.rs @@ -23,6 +23,7 @@ use futures::task::Context; use futures::{Future, Stream, TryFutureExt}; use nativelink_error::{error_if, make_err, make_input_err, Code, Error, ResultExt}; use tokio::sync::mpsc; +use tracing::{event, Level}; const ZERO_DATA: Bytes = Bytes::new(); @@ -151,12 +152,15 @@ impl DropCloserWriteHalf { /// Sends an EOF (End of File) message to the receiver which will gracefully let the /// stream know it has no more data. This will close the stream. pub fn send_eof(&mut self) -> Result<(), Error> { - error_if!( - self.tx.is_none(), - "Tried to send an EOF when pipe is broken" - ); // Flag that we have sent the EOF. - self.eof_sent.store(true, Ordering::Release); + let eof_was_sent = self.eof_sent.swap(true, Ordering::Release); + if eof_was_sent { + event!( + Level::WARN, + "Stream already closed when eof already was sent. This is often ok for retry was triggered, but should not happen on happy path." + ); + return Ok(()); + } // Now close our stream. self.tx = None;