From 47dfc209aaa16f15e9e45fab41e5e5682b8d6639 Mon Sep 17 00:00:00 2001 From: "Nathan (Blaise) Bruer" Date: Fri, 30 Aug 2024 13:41:19 -0500 Subject: [PATCH] Fix rare case where eof was sent on buf_channel when retry happens (#1295) In a rare case if an EOF is sent, then a retry is triggered by contract an EOF needs to be sent again, but without this patch we are not allowed to send and EOF after one is already sent. --- nativelink-util/src/buf_channel.rs | 14 +++++++++----- nativelink-util/tests/buf_channel_test.rs | 10 ++++++++++ 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/nativelink-util/src/buf_channel.rs b/nativelink-util/src/buf_channel.rs index d797cb765..ed1f9b0ec 100644 --- a/nativelink-util/src/buf_channel.rs +++ b/nativelink-util/src/buf_channel.rs @@ -23,6 +23,7 @@ use futures::task::Context; use futures::{Future, Stream, TryFutureExt}; use nativelink_error::{error_if, make_err, make_input_err, Code, Error, ResultExt}; use tokio::sync::mpsc; +use tracing::{event, Level}; const ZERO_DATA: Bytes = Bytes::new(); @@ -151,12 +152,15 @@ impl DropCloserWriteHalf { /// Sends an EOF (End of File) message to the receiver which will gracefully let the /// stream know it has no more data. This will close the stream. pub fn send_eof(&mut self) -> Result<(), Error> { - error_if!( - self.tx.is_none(), - "Tried to send an EOF when pipe is broken" - ); // Flag that we have sent the EOF. - self.eof_sent.store(true, Ordering::Release); + let eof_was_sent = self.eof_sent.swap(true, Ordering::Release); + if eof_was_sent { + event!( + Level::WARN, + "Stream already closed when eof already was sent. This is often ok for retry was triggered, but should not happen on happy path." + ); + return Ok(()); + } // Now close our stream. self.tx = None; diff --git a/nativelink-util/tests/buf_channel_test.rs b/nativelink-util/tests/buf_channel_test.rs index e908a2340..08a792ccb 100644 --- a/nativelink-util/tests/buf_channel_test.rs +++ b/nativelink-util/tests/buf_channel_test.rs @@ -301,3 +301,13 @@ async fn bind_buffered_test() -> Result<(), Error> { .unwrap(); Ok(()) } + +#[nativelink_test] +async fn eof_can_send_twice() -> Result<(), Error> { + let (mut tx, _rx) = make_buf_channel_pair(); + tx.send(DATA1.into()).await.unwrap(); + tx.send_eof().unwrap(); + // EOF needs to be able to be sent twice just in case a "retry" is triggered. + tx.send_eof().unwrap(); + Ok(()) +}