From 40eb90793dcbc8bb94678fb25347d9868ab01a8b Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 13 Jun 2024 16:45:55 +0800 Subject: [PATCH] refine sink await-tree Signed-off-by: Bugen Zhao --- .../src/sink/decouple_checkpoint_log_sink.rs | 3 +- src/connector/src/sink/log_store.rs | 31 +++++++++++-------- src/connector/src/sink/mod.rs | 3 +- src/connector/src/sink/remote.rs | 26 ++++++---------- src/connector/src/sink/trivial.rs | 3 +- src/connector/src/sink/writer.rs | 8 +++-- .../src/common/log_store_impl/in_mem.rs | 5 +++ src/stream/src/executor/sink.rs | 10 +++--- 8 files changed, 49 insertions(+), 40 deletions(-) diff --git a/src/connector/src/sink/decouple_checkpoint_log_sink.rs b/src/connector/src/sink/decouple_checkpoint_log_sink.rs index 9eaba2a10f121..72edad9608bf6 100644 --- a/src/connector/src/sink/decouple_checkpoint_log_sink.rs +++ b/src/connector/src/sink/decouple_checkpoint_log_sink.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::convert::Infallible; use std::num::NonZeroU64; use std::time::Instant; @@ -48,7 +49,7 @@ impl DecoupleCheckpointLogSinkerOf { #[async_trait] impl> LogSinker for DecoupleCheckpointLogSinkerOf { - async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()> { + async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result { let mut sink_writer = self.writer; let sink_metrics = self.sink_metrics; #[derive(Debug)] diff --git a/src/connector/src/sink/log_store.rs b/src/connector/src/sink/log_store.rs index d696feb1d8617..d609aa7170d0d 100644 --- a/src/connector/src/sink/log_store.rs +++ b/src/connector/src/sink/log_store.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use std::task::Poll; use std::time::Instant; +use await_tree::InstrumentAwait; use futures::{TryFuture, TryFutureExt}; use risingwave_common::array::StreamChunk; use risingwave_common::bail; @@ -282,21 +283,25 @@ impl MonitoredLogReader { impl LogReader for MonitoredLogReader { async fn init(&mut self) -> LogStoreResult<()> { - self.inner.init().await + self.inner.init().instrument_await("log_reader_init").await } async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> { - self.inner.next_item().await.inspect(|(epoch, item)| { - if self.read_epoch != *epoch { - self.read_epoch = *epoch; - self.metrics.log_store_latest_read_epoch.set(*epoch as _); - } - if let LogStoreReadItem::StreamChunk { chunk, .. } = item { - self.metrics - .log_store_read_rows - .inc_by(chunk.cardinality() as _); - } - }) + self.inner + .next_item() + .instrument_await("log_reader_next_item") + .await + .inspect(|(epoch, item)| { + if self.read_epoch != *epoch { + self.read_epoch = *epoch; + self.metrics.log_store_latest_read_epoch.set(*epoch as _); + } + if let LogStoreReadItem::StreamChunk { chunk, .. } = item { + self.metrics + .log_store_read_rows + .inc_by(chunk.cardinality() as _); + } + }) } fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> { @@ -306,7 +311,7 @@ impl LogReader for MonitoredLogReader { fn rewind( &mut self, ) -> impl Future)>> + Send + '_ { - self.inner.rewind() + self.inner.rewind().instrument_await("log_reader_rewind") } } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index e5a5f6143e419..de0daab48b19f 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -46,6 +46,7 @@ pub mod utils; pub mod writer; use std::collections::BTreeMap; +use std::convert::Infallible; use std::future::Future; use ::clickhouse::error::Error as ClickHouseError; @@ -386,7 +387,7 @@ impl SinkLogReader for R { #[async_trait] pub trait LogSinker: 'static { - async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()>; + async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result; } #[async_trait] diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index ea247991f9eb8..b61db79792848 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::{BTreeMap, VecDeque}; +use std::convert::Infallible; use std::marker::PhantomData; use std::ops::Deref; use std::pin::pin; @@ -301,7 +302,7 @@ impl RemoteLogSinker { #[async_trait] impl LogSinker for RemoteLogSinker { - async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()> { + async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result { let mut request_tx = self.request_sender; let mut response_err_stream_rx = self.response_stream; let sink_metrics = self.sink_metrics; @@ -313,7 +314,7 @@ impl LogSinker for RemoteLogSinker { let result = response_err_stream_rx .stream .try_next() - .instrument_await("Wait Response Stream") + .instrument_await("log_sinker_wait_next_response") .await; match result { Ok(Some(response)) => { @@ -368,20 +369,12 @@ impl LogSinker for RemoteLogSinker { let mut sent_offset_queue: VecDeque<(TruncateOffset, Option)> = VecDeque::new(); - let mut curr_epoch = 0; - loop { let either_result: futures::future::Either< Option, LogStoreResult<(u64, LogStoreReadItem)>, > = drop_either_future( - select( - pin!(response_rx.recv()), - pin!(log_reader - .next_item() - .instrument_await(format!("Wait Next Item: {}", curr_epoch))), - ) - .await, + select(pin!(response_rx.recv()), pin!(log_reader.next_item())).await, ); match either_result { futures::future::Either::Left(opt) => { @@ -435,7 +428,6 @@ impl LogSinker for RemoteLogSinker { } futures::future::Either::Right(result) => { let (epoch, item): (u64, LogStoreReadItem) = result?; - curr_epoch = epoch; match item { LogStoreReadItem::StreamChunk { chunk, chunk_id } => { @@ -456,8 +448,7 @@ impl LogSinker for RemoteLogSinker { chunk, }) .instrument_await(format!( - "Send Chunk Request: {} {}", - curr_epoch, chunk_id + "log_sinker_send_chunk (chunk {chunk_id})" )) .await?; prev_offset = Some(offset); @@ -473,15 +464,16 @@ impl LogSinker for RemoteLogSinker { let start_time = Instant::now(); request_tx .barrier(epoch, true) - .instrument_await(format!("Commit: {}", curr_epoch)) + .instrument_await(format!( + "log_sinker_commit_checkpoint (epoch {epoch})" + )) .await?; Some(start_time) } else { request_tx .barrier(epoch, false) .instrument_await(format!( - "Send Barrier Request: {}", - curr_epoch + "log_sinker_send_barrier (epoch {epoch})" )) .await?; None diff --git a/src/connector/src/sink/trivial.rs b/src/connector/src/sink/trivial.rs index 60e47949a8fdf..e78cfe936247c 100644 --- a/src/connector/src/sink/trivial.rs +++ b/src/connector/src/sink/trivial.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::convert::Infallible; use std::marker::PhantomData; use async_trait::async_trait; @@ -75,7 +76,7 @@ impl Sink for TrivialSink { #[async_trait] impl LogSinker for TrivialSink { - async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()> { + async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result { loop { let (epoch, item) = log_reader.next_item().await?; match item { diff --git a/src/connector/src/sink/writer.rs b/src/connector/src/sink/writer.rs index ba7df74495272..dd7c8aef8d204 100644 --- a/src/connector/src/sink/writer.rs +++ b/src/connector/src/sink/writer.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::convert::Infallible; use std::future::{Future, Ready}; use std::pin::pin; use std::sync::Arc; @@ -126,7 +127,7 @@ impl LogSinkerOf { #[async_trait] impl> LogSinker for LogSinkerOf { - async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()> { + async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result { let mut sink_writer = self.writer; let sink_metrics = self.sink_metrics; #[derive(Debug)] @@ -242,7 +243,10 @@ impl AsyncTruncateLogSinkerOf { #[async_trait] impl LogSinker for AsyncTruncateLogSinkerOf { - async fn consume_log_and_sink(mut self, log_reader: &mut impl SinkLogReader) -> Result<()> { + async fn consume_log_and_sink( + mut self, + log_reader: &mut impl SinkLogReader, + ) -> Result { loop { let select_result = drop_either_future( select( diff --git a/src/stream/src/common/log_store_impl/in_mem.rs b/src/stream/src/common/log_store_impl/in_mem.rs index 3aafb865f09ac..d1ce2e6e5aa35 100644 --- a/src/stream/src/common/log_store_impl/in_mem.rs +++ b/src/stream/src/common/log_store_impl/in_mem.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use anyhow::{anyhow, Context}; +use await_tree::InstrumentAwait; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::Bitmap; use risingwave_common::util::epoch::{EpochExt, EpochPair, INVALID_EPOCH}; @@ -260,6 +261,7 @@ impl LogWriter for BoundedInMemLogStoreWriter { async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> { self.item_tx .send(InMemLogStoreItem::StreamChunk(chunk)) + .instrument_await("in_mem_send_item_chunk") .await .map_err(|_| anyhow!("unable to send stream chunk"))?; Ok(()) @@ -275,6 +277,7 @@ impl LogWriter for BoundedInMemLogStoreWriter { next_epoch, is_checkpoint, }) + .instrument_await("in_mem_send_item_barrier") .await .map_err(|_| anyhow!("unable to send barrier"))?; @@ -287,6 +290,7 @@ impl LogWriter for BoundedInMemLogStoreWriter { let truncated_epoch = self .truncated_epoch_rx .recv() + .instrument_await("in_mem_recv_truncated_epoch") .await .ok_or_else(|| anyhow!("cannot get truncated epoch"))?; assert_eq!(truncated_epoch, prev_epoch); @@ -298,6 +302,7 @@ impl LogWriter for BoundedInMemLogStoreWriter { async fn update_vnode_bitmap(&mut self, new_vnodes: Arc) -> LogStoreResult<()> { self.item_tx .send(InMemLogStoreItem::UpdateVnodeBitmap(new_vnodes)) + .instrument_await("in_mem_send_item_vnode_bitmap") .await .map_err(|_| anyhow!("unable to send vnode bitmap")) } diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 52377c49c322a..80a7235137e52 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::convert::Infallible; use std::mem; use anyhow::anyhow; @@ -217,10 +218,9 @@ impl SinkExecutor { self.sink_writer_param, self.actor_context, ) - .instrument_await(format!( - "Consume Log: sink_id: {} actor_id: {}, executor_id: {}", - sink_id, actor_id, executor_id, - )); + .instrument_await(format!("consume_log (sink_id {sink_id})")) + .map_ok(|f| match f {}); // unify return type to `Message` + // TODO: may try to remove the boxed select(consume_log_stream.into_stream(), write_log_stream).boxed() }) @@ -403,7 +403,7 @@ impl SinkExecutor { sink_param: SinkParam, mut sink_writer_param: SinkWriterParam, actor_context: ActorContextRef, - ) -> StreamExecutorResult { + ) -> StreamExecutorResult { let metrics = sink_writer_param.sink_metrics.clone(); let visible_columns = columns