Skip to content

Commit

Permalink
fix(sink): try poll response stream error while sending request (#12525)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Sep 25, 2023
1 parent d0484eb commit 31aa925
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class SinkWriterStreamObserver

private TableSchema tableSchema;

private boolean finished = false;

private boolean epochStarted;
private long currentEpoch;
private Long currentBatchId;
Expand All @@ -58,6 +60,9 @@ public SinkWriterStreamObserver(

@Override
public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) {
if (finished) {
throw new RuntimeException("unexpected onNext call on a finished writer stream");
}
try {
if (sinkTask.hasStart()) {
if (isInitialized()) {
Expand Down Expand Up @@ -169,26 +174,27 @@ public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) {
throw INVALID_ARGUMENT.withDescription("invalid sink task").asRuntimeException();
}
} catch (Exception e) {
LOG.error("sink task error: ", e);
LOG.error("sink writer error: ", e);
cleanup();
responseObserver.onError(e);
}
}

@Override
public void onError(Throwable throwable) {
LOG.error("sink task error: ", throwable);
LOG.error("sink writer finishes with error: ", throwable);
cleanup();
responseObserver.onError(throwable);
}

@Override
public void onCompleted() {
LOG.debug("sink task completed");
LOG.info("sink writer completed");
cleanup();
responseObserver.onCompleted();
}

private void cleanup() {
finished = true;
if (sink != null) {
sink.drop();
}
Expand Down
38 changes: 28 additions & 10 deletions src/rpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ use std::any::type_name;
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::iter::repeat;
use std::pin::pin;
use std::sync::Arc;

use anyhow::anyhow;
use async_trait::async_trait;
use futures::future::try_join_all;
use futures::stream::BoxStream;
use futures::future::{select, try_join_all, Either};
use futures::stream::{BoxStream, Peekable};
use futures::{Stream, StreamExt};
use moka::future::Cache;
use rand::prelude::SliceRandom;
Expand All @@ -58,6 +59,8 @@ mod sink_coordinate_client;
mod stream_client;
mod tracing;

use std::pin::Pin;

pub use compactor_client::{CompactorClient, GrpcCompactorProxyClient};
pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef};
pub use connector_client::{ConnectorClient, SinkCoordinatorStreamHandle, SinkWriterStreamHandle};
Expand Down Expand Up @@ -173,7 +176,7 @@ macro_rules! meta_rpc_client_method_impl {

pub struct BidiStreamHandle<REQ: 'static, RSP: 'static> {
request_sender: Sender<REQ>,
response_stream: BoxStream<'static, std::result::Result<RSP, Status>>,
response_stream: Peekable<BoxStream<'static, std::result::Result<RSP, Status>>>,
}

impl<REQ: 'static, RSP: 'static> Debug for BidiStreamHandle<REQ, RSP> {
Expand All @@ -189,7 +192,7 @@ impl<REQ: 'static, RSP: 'static> BidiStreamHandle<REQ, RSP> {
) -> Self {
Self {
request_sender,
response_stream,
response_stream: response_stream.peekable(),
}
}

Expand Down Expand Up @@ -223,7 +226,7 @@ impl<REQ: 'static, RSP: 'static> BidiStreamHandle<REQ, RSP> {
Ok((
Self {
request_sender,
response_stream: response_stream.boxed(),
response_stream: response_stream.boxed().peekable(),
},
first_response,
))
Expand All @@ -238,10 +241,25 @@ impl<REQ: 'static, RSP: 'static> BidiStreamHandle<REQ, RSP> {
}

pub async fn send_request(&mut self, request: REQ) -> Result<()> {
Ok(self
.request_sender
.send(request)
.await
.map_err(|_| anyhow!("unable to send request {}", type_name::<REQ>()))?)
// Poll the response stream to early see the error
let send_request_result = match select(
pin!(self.request_sender.send(request)),
pin!(Pin::new(&mut self.response_stream).peek()),
)
.await
{
Either::Left((result, _)) => result,
Either::Right((response_result, send_future)) => match response_result {
None => {
return Err(anyhow!("end of response stream").into());
}
Some(Err(e)) => {
return Err(e.clone().into());
}
Some(Ok(_)) => send_future.await,
},
};
send_request_result
.map_err(|_| anyhow!("unable to send request {}", type_name::<REQ>()).into())
}
}

0 comments on commit 31aa925

Please sign in to comment.