From 31aa925988feba274d4335b1aa0b714320569774 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Tue, 26 Sep 2023 01:13:11 +0800 Subject: [PATCH] fix(sink): try poll response stream error while sending request (#12525) --- .../connector/SinkWriterStreamObserver.java | 14 +++++-- src/rpc_client/src/lib.rs | 38 ++++++++++++++----- 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java index 5caa9e3533e5..132313351916 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java @@ -38,6 +38,8 @@ public class SinkWriterStreamObserver private TableSchema tableSchema; + private boolean finished = false; + private boolean epochStarted; private long currentEpoch; private Long currentBatchId; @@ -58,6 +60,9 @@ public SinkWriterStreamObserver( @Override public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) { + if (finished) { + throw new RuntimeException("unexpected onNext call on a finished writer stream"); + } try { if (sinkTask.hasStart()) { if (isInitialized()) { @@ -169,26 +174,27 @@ public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) { throw INVALID_ARGUMENT.withDescription("invalid sink task").asRuntimeException(); } } catch (Exception e) { - LOG.error("sink task error: ", e); + LOG.error("sink writer error: ", e); + cleanup(); responseObserver.onError(e); } } @Override public void onError(Throwable throwable) { - LOG.error("sink task error: ", throwable); + LOG.error("sink writer finishes with error: ", throwable); cleanup(); - responseObserver.onError(throwable); } @Override public void onCompleted() { - LOG.debug("sink task completed"); + LOG.info("sink writer completed"); cleanup(); responseObserver.onCompleted(); } private void cleanup() { + finished = true; if (sink != null) { sink.drop(); } diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index 2d8017e87aa0..6bbcbd2ebf3e 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -31,12 +31,13 @@ use std::any::type_name; use std::fmt::{Debug, Formatter}; use std::future::Future; use std::iter::repeat; +use std::pin::pin; use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; -use futures::future::try_join_all; -use futures::stream::BoxStream; +use futures::future::{select, try_join_all, Either}; +use futures::stream::{BoxStream, Peekable}; use futures::{Stream, StreamExt}; use moka::future::Cache; use rand::prelude::SliceRandom; @@ -58,6 +59,8 @@ mod sink_coordinate_client; mod stream_client; mod tracing; +use std::pin::Pin; + pub use compactor_client::{CompactorClient, GrpcCompactorProxyClient}; pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef}; pub use connector_client::{ConnectorClient, SinkCoordinatorStreamHandle, SinkWriterStreamHandle}; @@ -173,7 +176,7 @@ macro_rules! meta_rpc_client_method_impl { pub struct BidiStreamHandle { request_sender: Sender, - response_stream: BoxStream<'static, std::result::Result>, + response_stream: Peekable>>, } impl Debug for BidiStreamHandle { @@ -189,7 +192,7 @@ impl BidiStreamHandle { ) -> Self { Self { request_sender, - response_stream, + response_stream: response_stream.peekable(), } } @@ -223,7 +226,7 @@ impl BidiStreamHandle { Ok(( Self { request_sender, - response_stream: response_stream.boxed(), + response_stream: response_stream.boxed().peekable(), }, first_response, )) @@ -238,10 +241,25 @@ impl BidiStreamHandle { } pub async fn send_request(&mut self, request: REQ) -> Result<()> { - Ok(self - .request_sender - .send(request) - .await - .map_err(|_| anyhow!("unable to send request {}", type_name::()))?) + // Poll the response stream to early see the error + let send_request_result = match select( + pin!(self.request_sender.send(request)), + pin!(Pin::new(&mut self.response_stream).peek()), + ) + .await + { + Either::Left((result, _)) => result, + Either::Right((response_result, send_future)) => match response_result { + None => { + return Err(anyhow!("end of response stream").into()); + } + Some(Err(e)) => { + return Err(e.clone().into()); + } + Some(Ok(_)) => send_future.await, + }, + }; + send_request_result + .map_err(|_| anyhow!("unable to send request {}", type_name::()).into()) } }