Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sink): try poll response stream error while sending request #12525

Merged
merged 1 commit into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class SinkWriterStreamObserver

private TableSchema tableSchema;

private boolean finished = false;

private boolean epochStarted;
private long currentEpoch;
private Long currentBatchId;
Expand All @@ -58,6 +60,9 @@ public SinkWriterStreamObserver(

@Override
public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) {
if (finished) {
throw new RuntimeException("unexpected onNext call on a finished writer stream");
}
try {
if (sinkTask.hasStart()) {
if (isInitialized()) {
Expand Down Expand Up @@ -169,26 +174,27 @@ public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) {
throw INVALID_ARGUMENT.withDescription("invalid sink task").asRuntimeException();
}
} catch (Exception e) {
LOG.error("sink task error: ", e);
LOG.error("sink writer error: ", e);
cleanup();
responseObserver.onError(e);
}
}

@Override
public void onError(Throwable throwable) {
LOG.error("sink task error: ", throwable);
LOG.error("sink writer finishes with error: ", throwable);
cleanup();
responseObserver.onError(throwable);
}

@Override
public void onCompleted() {
LOG.debug("sink task completed");
LOG.info("sink writer completed");
cleanup();
responseObserver.onCompleted();
}

private void cleanup() {
finished = true;
if (sink != null) {
sink.drop();
}
Expand Down
38 changes: 28 additions & 10 deletions src/rpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ use std::any::type_name;
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::iter::repeat;
use std::pin::pin;
use std::sync::Arc;

use anyhow::anyhow;
use async_trait::async_trait;
use futures::future::try_join_all;
use futures::stream::BoxStream;
use futures::future::{select, try_join_all, Either};
use futures::stream::{BoxStream, Peekable};
use futures::{Stream, StreamExt};
use moka::future::Cache;
use rand::prelude::SliceRandom;
Expand All @@ -58,6 +59,8 @@ mod sink_coordinate_client;
mod stream_client;
mod tracing;

use std::pin::Pin;

pub use compactor_client::{CompactorClient, GrpcCompactorProxyClient};
pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef};
pub use connector_client::{ConnectorClient, SinkCoordinatorStreamHandle, SinkWriterStreamHandle};
Expand Down Expand Up @@ -173,7 +176,7 @@ macro_rules! meta_rpc_client_method_impl {

pub struct BidiStreamHandle<REQ: 'static, RSP: 'static> {
request_sender: Sender<REQ>,
response_stream: BoxStream<'static, std::result::Result<RSP, Status>>,
response_stream: Peekable<BoxStream<'static, std::result::Result<RSP, Status>>>,
}

impl<REQ: 'static, RSP: 'static> Debug for BidiStreamHandle<REQ, RSP> {
Expand All @@ -189,7 +192,7 @@ impl<REQ: 'static, RSP: 'static> BidiStreamHandle<REQ, RSP> {
) -> Self {
Self {
request_sender,
response_stream,
response_stream: response_stream.peekable(),
}
}

Expand Down Expand Up @@ -223,7 +226,7 @@ impl<REQ: 'static, RSP: 'static> BidiStreamHandle<REQ, RSP> {
Ok((
Self {
request_sender,
response_stream: response_stream.boxed(),
response_stream: response_stream.boxed().peekable(),
},
first_response,
))
Expand All @@ -238,10 +241,25 @@ impl<REQ: 'static, RSP: 'static> BidiStreamHandle<REQ, RSP> {
}

pub async fn send_request(&mut self, request: REQ) -> Result<()> {
Ok(self
.request_sender
.send(request)
.await
.map_err(|_| anyhow!("unable to send request {}", type_name::<REQ>()))?)
// Poll the response stream to early see the error
let send_request_result = match select(
pin!(self.request_sender.send(request)),
pin!(Pin::new(&mut self.response_stream).peek()),
)
.await
{
Either::Left((result, _)) => result,
Either::Right((response_result, send_future)) => match response_result {
None => {
return Err(anyhow!("end of response stream").into());
}
Some(Err(e)) => {
return Err(e.clone().into());
}
Some(Ok(_)) => send_future.await,
},
};
send_request_result
.map_err(|_| anyhow!("unable to send request {}", type_name::<REQ>()).into())
}
}
Loading