Skip to content

Commit

Permalink
refactor(streaming): refine sink await-tree (#17242)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Jun 13, 2024
1 parent a3da000 commit b2e1d15
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 41 deletions.
1 change: 1 addition & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#![feature(negative_impls)]
#![feature(register_tool)]
#![feature(assert_matches)]
#![feature(never_type)]
#![register_tool(rw)]
#![recursion_limit = "256"]

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/decouple_checkpoint_log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl<W> DecoupleCheckpointLogSinkerOf<W> {

#[async_trait]
impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for DecoupleCheckpointLogSinkerOf<W> {
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()> {
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<!> {
let mut sink_writer = self.writer;
let sink_metrics = self.sink_metrics;
#[derive(Debug)]
Expand Down
31 changes: 18 additions & 13 deletions src/connector/src/sink/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
use std::task::Poll;
use std::time::Instant;

use await_tree::InstrumentAwait;
use futures::{TryFuture, TryFutureExt};
use risingwave_common::array::StreamChunk;
use risingwave_common::bail;
Expand Down Expand Up @@ -282,21 +283,25 @@ impl<R: LogReader> MonitoredLogReader<R> {

impl<R: LogReader> LogReader for MonitoredLogReader<R> {
async fn init(&mut self) -> LogStoreResult<()> {
self.inner.init().await
self.inner.init().instrument_await("log_reader_init").await
}

async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
self.inner.next_item().await.inspect(|(epoch, item)| {
if self.read_epoch != *epoch {
self.read_epoch = *epoch;
self.metrics.log_store_latest_read_epoch.set(*epoch as _);
}
if let LogStoreReadItem::StreamChunk { chunk, .. } = item {
self.metrics
.log_store_read_rows
.inc_by(chunk.cardinality() as _);
}
})
self.inner
.next_item()
.instrument_await("log_reader_next_item")
.await
.inspect(|(epoch, item)| {
if self.read_epoch != *epoch {
self.read_epoch = *epoch;
self.metrics.log_store_latest_read_epoch.set(*epoch as _);
}
if let LogStoreReadItem::StreamChunk { chunk, .. } = item {
self.metrics
.log_store_read_rows
.inc_by(chunk.cardinality() as _);
}
})
}

fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
Expand All @@ -306,7 +311,7 @@ impl<R: LogReader> LogReader for MonitoredLogReader<R> {
fn rewind(
&mut self,
) -> impl Future<Output = LogStoreResult<(bool, Option<Bitmap>)>> + Send + '_ {
self.inner.rewind()
self.inner.rewind().instrument_await("log_reader_rewind")
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ impl<R: LogReader> SinkLogReader for R {

#[async_trait]
pub trait LogSinker: 'static {
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()>;
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<!>;
}

#[async_trait]
Expand Down
25 changes: 8 additions & 17 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl RemoteLogSinker {

#[async_trait]
impl LogSinker for RemoteLogSinker {
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()> {
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<!> {
let mut request_tx = self.request_sender;
let mut response_err_stream_rx = self.response_stream;
let sink_metrics = self.sink_metrics;
Expand All @@ -313,7 +313,7 @@ impl LogSinker for RemoteLogSinker {
let result = response_err_stream_rx
.stream
.try_next()
.instrument_await("Wait Response Stream")
.instrument_await("log_sinker_wait_next_response")
.await;
match result {
Ok(Some(response)) => {
Expand Down Expand Up @@ -368,20 +368,12 @@ impl LogSinker for RemoteLogSinker {
let mut sent_offset_queue: VecDeque<(TruncateOffset, Option<Instant>)> =
VecDeque::new();

let mut curr_epoch = 0;

loop {
let either_result: futures::future::Either<
Option<SinkWriterStreamResponse>,
LogStoreResult<(u64, LogStoreReadItem)>,
> = drop_either_future(
select(
pin!(response_rx.recv()),
pin!(log_reader
.next_item()
.instrument_await(format!("Wait Next Item: {}", curr_epoch))),
)
.await,
select(pin!(response_rx.recv()), pin!(log_reader.next_item())).await,
);
match either_result {
futures::future::Either::Left(opt) => {
Expand Down Expand Up @@ -435,7 +427,6 @@ impl LogSinker for RemoteLogSinker {
}
futures::future::Either::Right(result) => {
let (epoch, item): (u64, LogStoreReadItem) = result?;
curr_epoch = epoch;

match item {
LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
Expand All @@ -456,8 +447,7 @@ impl LogSinker for RemoteLogSinker {
chunk,
})
.instrument_await(format!(
"Send Chunk Request: {} {}",
curr_epoch, chunk_id
"log_sinker_send_chunk (chunk {chunk_id})"
))
.await?;
prev_offset = Some(offset);
Expand All @@ -473,15 +463,16 @@ impl LogSinker for RemoteLogSinker {
let start_time = Instant::now();
request_tx
.barrier(epoch, true)
.instrument_await(format!("Commit: {}", curr_epoch))
.instrument_await(format!(
"log_sinker_commit_checkpoint (epoch {epoch})"
))
.await?;
Some(start_time)
} else {
request_tx
.barrier(epoch, false)
.instrument_await(format!(
"Send Barrier Request: {}",
curr_epoch
"log_sinker_send_barrier (epoch {epoch})"
))
.await?;
None
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/trivial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl<T: TrivialSinkName> Sink for TrivialSink<T> {

#[async_trait]
impl<T: TrivialSinkName> LogSinker for TrivialSink<T> {
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()> {
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<!> {
loop {
let (epoch, item) = log_reader.next_item().await?;
match item {
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/sink/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl<W> LogSinkerOf<W> {

#[async_trait]
impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for LogSinkerOf<W> {
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()> {
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<!> {
let mut sink_writer = self.writer;
let sink_metrics = self.sink_metrics;
#[derive(Debug)]
Expand Down Expand Up @@ -242,7 +242,7 @@ impl<W: AsyncTruncateSinkWriter> AsyncTruncateLogSinkerOf<W> {

#[async_trait]
impl<W: AsyncTruncateSinkWriter> LogSinker for AsyncTruncateLogSinkerOf<W> {
async fn consume_log_and_sink(mut self, log_reader: &mut impl SinkLogReader) -> Result<()> {
async fn consume_log_and_sink(mut self, log_reader: &mut impl SinkLogReader) -> Result<!> {
loop {
let select_result = drop_either_future(
select(
Expand Down
5 changes: 5 additions & 0 deletions src/stream/src/common/log_store_impl/in_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;

use anyhow::{anyhow, Context};
use await_tree::InstrumentAwait;
use risingwave_common::array::StreamChunk;
use risingwave_common::buffer::Bitmap;
use risingwave_common::util::epoch::{EpochExt, EpochPair, INVALID_EPOCH};
Expand Down Expand Up @@ -260,6 +261,7 @@ impl LogWriter for BoundedInMemLogStoreWriter {
async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> {
self.item_tx
.send(InMemLogStoreItem::StreamChunk(chunk))
.instrument_await("in_mem_send_item_chunk")
.await
.map_err(|_| anyhow!("unable to send stream chunk"))?;
Ok(())
Expand All @@ -275,6 +277,7 @@ impl LogWriter for BoundedInMemLogStoreWriter {
next_epoch,
is_checkpoint,
})
.instrument_await("in_mem_send_item_barrier")
.await
.map_err(|_| anyhow!("unable to send barrier"))?;

Expand All @@ -287,6 +290,7 @@ impl LogWriter for BoundedInMemLogStoreWriter {
let truncated_epoch = self
.truncated_epoch_rx
.recv()
.instrument_await("in_mem_recv_truncated_epoch")
.await
.ok_or_else(|| anyhow!("cannot get truncated epoch"))?;
assert_eq!(truncated_epoch, prev_epoch);
Expand All @@ -298,6 +302,7 @@ impl LogWriter for BoundedInMemLogStoreWriter {
async fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> LogStoreResult<()> {
self.item_tx
.send(InMemLogStoreItem::UpdateVnodeBitmap(new_vnodes))
.instrument_await("in_mem_send_item_vnode_bitmap")
.await
.map_err(|_| anyhow!("unable to send vnode bitmap"))
}
Expand Down
10 changes: 4 additions & 6 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
let sink_id = self.sink_param.sink_id;
let actor_id = self.actor_context.id;
let fragment_id = self.actor_context.fragment_id;
let executor_id = self.sink_writer_param.executor_id;

let stream_key = self.info.pk_indices.clone();
let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
Expand Down Expand Up @@ -217,10 +216,9 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
self.sink_writer_param,
self.actor_context,
)
.instrument_await(format!(
"Consume Log: sink_id: {} actor_id: {}, executor_id: {}",
sink_id, actor_id, executor_id,
));
.instrument_await(format!("consume_log (sink_id {sink_id})"))
.map_ok(|never| match never {}); // unify return type to `Message`

// TODO: may try to remove the boxed
select(consume_log_stream.into_stream(), write_log_stream).boxed()
})
Expand Down Expand Up @@ -403,7 +401,7 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
sink_param: SinkParam,
mut sink_writer_param: SinkWriterParam,
actor_context: ActorContextRef,
) -> StreamExecutorResult<Message> {
) -> StreamExecutorResult<!> {
let metrics = sink_writer_param.sink_metrics.clone();

let visible_columns = columns
Expand Down

0 comments on commit b2e1d15

Please sign in to comment.