Skip to content

Commit

Permalink
feat(sink): monitor error in remote sink response stream (#13028)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Oct 25, 2023
1 parent 272eff4 commit 8fdcfb8
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 72 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 35 additions & 2 deletions src/common/src/util/future_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
// limitations under the License.

use std::future::pending;
use std::pin::{pin, Pin};

use futures::future::Either;
use futures::{Future, FutureExt, Stream};
use futures::future::{select, Either};
use futures::stream::Peekable;
use futures::{Future, FutureExt, Stream, StreamExt};

/// Convert a list of streams into a [`Stream`] of results from the streams.
pub fn select_all<S: Stream + Unpin>(
Expand Down Expand Up @@ -43,3 +45,34 @@ pub fn drop_either_future<A, B>(
Either::Right((right, _)) => Either::Right(right),
}
}

/// Await on a future while monitoring on a peekable stream that may return error.
/// The peekable stream is polled at a higher priority than the future.
///
/// When the peekable stream returns with a error and end of stream, the future will
/// return the error immediately. Otherwise, it will keep polling the given future.
///
/// Return:
/// - Ok(output) as the output of the given future.
/// - Err(None) to indicate that the stream has reached the end.
/// - Err(e) to indicate that the stream returns an error.
pub async fn await_future_with_monitor_error_stream<T, E, F: Future>(
peek_stream: &mut Peekable<impl Stream<Item = Result<T, E>> + Unpin>,
future: F,
) -> Result<F::Output, Option<E>> {
// Poll the response stream to early see the error
match select(pin!(Pin::new(&mut *peek_stream).peek()), pin!(future)).await {
Either::Left((response_result, send_future)) => match response_result {
None => Err(None),
Some(Err(_)) => {
let err = match peek_stream.next().now_or_never() {
Some(Some(Err(err))) => err,
_ => unreachable!("peek has output, peek output not None, have check err"),
};
Err(Some(err))
}
Some(Ok(_)) => Ok(send_future.await),
},
Either::Right((output, _)) => Ok(output),
}
}
4 changes: 3 additions & 1 deletion src/common/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ pub mod tracing;
pub mod value_encoding;
pub mod worker_util;

pub use future_utils::{drop_either_future, pending_on_none, select_all};
pub use future_utils::{
await_future_with_monitor_error_stream, drop_either_future, pending_on_none, select_all,
};
#[macro_use]
pub mod match_util;

Expand Down
Loading

0 comments on commit 8fdcfb8

Please sign in to comment.