diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index b701a0b8be43..8c0ade401cb4 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -34,6 +34,7 @@ #![feature(negative_impls)] #![feature(register_tool)] #![feature(assert_matches)] +#![feature(never_type)] #![register_tool(rw)] #![recursion_limit = "256"] diff --git a/src/connector/src/sink/decouple_checkpoint_log_sink.rs b/src/connector/src/sink/decouple_checkpoint_log_sink.rs index 9eaba2a10f12..26576cf3e366 100644 --- a/src/connector/src/sink/decouple_checkpoint_log_sink.rs +++ b/src/connector/src/sink/decouple_checkpoint_log_sink.rs @@ -48,7 +48,7 @@ impl DecoupleCheckpointLogSinkerOf { #[async_trait] impl> LogSinker for DecoupleCheckpointLogSinkerOf { - async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()> { + async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result { let mut sink_writer = self.writer; let sink_metrics = self.sink_metrics; #[derive(Debug)] diff --git a/src/connector/src/sink/log_store.rs b/src/connector/src/sink/log_store.rs index d696feb1d861..d609aa7170d0 100644 --- a/src/connector/src/sink/log_store.rs +++ b/src/connector/src/sink/log_store.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use std::task::Poll; use std::time::Instant; +use await_tree::InstrumentAwait; use futures::{TryFuture, TryFutureExt}; use risingwave_common::array::StreamChunk; use risingwave_common::bail; @@ -282,21 +283,25 @@ impl MonitoredLogReader { impl LogReader for MonitoredLogReader { async fn init(&mut self) -> LogStoreResult<()> { - self.inner.init().await + self.inner.init().instrument_await("log_reader_init").await } async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> { - self.inner.next_item().await.inspect(|(epoch, item)| { - if self.read_epoch != *epoch { - self.read_epoch = *epoch; - self.metrics.log_store_latest_read_epoch.set(*epoch as _); - } - if let LogStoreReadItem::StreamChunk { chunk, .. } = item { - self.metrics - .log_store_read_rows - .inc_by(chunk.cardinality() as _); - } - }) + self.inner + .next_item() + .instrument_await("log_reader_next_item") + .await + .inspect(|(epoch, item)| { + if self.read_epoch != *epoch { + self.read_epoch = *epoch; + self.metrics.log_store_latest_read_epoch.set(*epoch as _); + } + if let LogStoreReadItem::StreamChunk { chunk, .. } = item { + self.metrics + .log_store_read_rows + .inc_by(chunk.cardinality() as _); + } + }) } fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> { @@ -306,7 +311,7 @@ impl LogReader for MonitoredLogReader { fn rewind( &mut self, ) -> impl Future)>> + Send + '_ { - self.inner.rewind() + self.inner.rewind().instrument_await("log_reader_rewind") } } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index e5a5f6143e41..bdd923f786ec 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -386,7 +386,7 @@ impl SinkLogReader for R { #[async_trait] pub trait LogSinker: 'static { - async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()>; + async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result; } #[async_trait] diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index ea247991f9eb..f8b84fc64eb8 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -301,7 +301,7 @@ impl RemoteLogSinker { #[async_trait] impl LogSinker for RemoteLogSinker { - async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()> { + async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result { let mut request_tx = self.request_sender; let mut response_err_stream_rx = self.response_stream; let sink_metrics = self.sink_metrics; @@ -313,7 +313,7 @@ impl LogSinker for RemoteLogSinker { let result = response_err_stream_rx .stream .try_next() - .instrument_await("Wait Response Stream") + .instrument_await("log_sinker_wait_next_response") .await; match result { Ok(Some(response)) => { @@ -368,20 +368,12 @@ impl LogSinker for RemoteLogSinker { let mut sent_offset_queue: VecDeque<(TruncateOffset, Option)> = VecDeque::new(); - let mut curr_epoch = 0; - loop { let either_result: futures::future::Either< Option, LogStoreResult<(u64, LogStoreReadItem)>, > = drop_either_future( - select( - pin!(response_rx.recv()), - pin!(log_reader - .next_item() - .instrument_await(format!("Wait Next Item: {}", curr_epoch))), - ) - .await, + select(pin!(response_rx.recv()), pin!(log_reader.next_item())).await, ); match either_result { futures::future::Either::Left(opt) => { @@ -435,7 +427,6 @@ impl LogSinker for RemoteLogSinker { } futures::future::Either::Right(result) => { let (epoch, item): (u64, LogStoreReadItem) = result?; - curr_epoch = epoch; match item { LogStoreReadItem::StreamChunk { chunk, chunk_id } => { @@ -456,8 +447,7 @@ impl LogSinker for RemoteLogSinker { chunk, }) .instrument_await(format!( - "Send Chunk Request: {} {}", - curr_epoch, chunk_id + "log_sinker_send_chunk (chunk {chunk_id})" )) .await?; prev_offset = Some(offset); @@ -473,15 +463,16 @@ impl LogSinker for RemoteLogSinker { let start_time = Instant::now(); request_tx .barrier(epoch, true) - .instrument_await(format!("Commit: {}", curr_epoch)) + .instrument_await(format!( + "log_sinker_commit_checkpoint (epoch {epoch})" + )) .await?; Some(start_time) } else { request_tx .barrier(epoch, false) .instrument_await(format!( - "Send Barrier Request: {}", - curr_epoch + "log_sinker_send_barrier (epoch {epoch})" )) .await?; None diff --git a/src/connector/src/sink/trivial.rs b/src/connector/src/sink/trivial.rs index 60e47949a8fd..0cfa82c5c4d1 100644 --- a/src/connector/src/sink/trivial.rs +++ b/src/connector/src/sink/trivial.rs @@ -75,7 +75,7 @@ impl Sink for TrivialSink { #[async_trait] impl LogSinker for TrivialSink { - async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()> { + async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result { loop { let (epoch, item) = log_reader.next_item().await?; match item { diff --git a/src/connector/src/sink/writer.rs b/src/connector/src/sink/writer.rs index ba7df7449527..00917f26c416 100644 --- a/src/connector/src/sink/writer.rs +++ b/src/connector/src/sink/writer.rs @@ -126,7 +126,7 @@ impl LogSinkerOf { #[async_trait] impl> LogSinker for LogSinkerOf { - async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()> { + async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result { let mut sink_writer = self.writer; let sink_metrics = self.sink_metrics; #[derive(Debug)] @@ -242,7 +242,7 @@ impl AsyncTruncateLogSinkerOf { #[async_trait] impl LogSinker for AsyncTruncateLogSinkerOf { - async fn consume_log_and_sink(mut self, log_reader: &mut impl SinkLogReader) -> Result<()> { + async fn consume_log_and_sink(mut self, log_reader: &mut impl SinkLogReader) -> Result { loop { let select_result = drop_either_future( select( diff --git a/src/stream/src/common/log_store_impl/in_mem.rs b/src/stream/src/common/log_store_impl/in_mem.rs index 3aafb865f09a..d1ce2e6e5aa3 100644 --- a/src/stream/src/common/log_store_impl/in_mem.rs +++ b/src/stream/src/common/log_store_impl/in_mem.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use anyhow::{anyhow, Context}; +use await_tree::InstrumentAwait; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::Bitmap; use risingwave_common::util::epoch::{EpochExt, EpochPair, INVALID_EPOCH}; @@ -260,6 +261,7 @@ impl LogWriter for BoundedInMemLogStoreWriter { async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> { self.item_tx .send(InMemLogStoreItem::StreamChunk(chunk)) + .instrument_await("in_mem_send_item_chunk") .await .map_err(|_| anyhow!("unable to send stream chunk"))?; Ok(()) @@ -275,6 +277,7 @@ impl LogWriter for BoundedInMemLogStoreWriter { next_epoch, is_checkpoint, }) + .instrument_await("in_mem_send_item_barrier") .await .map_err(|_| anyhow!("unable to send barrier"))?; @@ -287,6 +290,7 @@ impl LogWriter for BoundedInMemLogStoreWriter { let truncated_epoch = self .truncated_epoch_rx .recv() + .instrument_await("in_mem_recv_truncated_epoch") .await .ok_or_else(|| anyhow!("cannot get truncated epoch"))?; assert_eq!(truncated_epoch, prev_epoch); @@ -298,6 +302,7 @@ impl LogWriter for BoundedInMemLogStoreWriter { async fn update_vnode_bitmap(&mut self, new_vnodes: Arc) -> LogStoreResult<()> { self.item_tx .send(InMemLogStoreItem::UpdateVnodeBitmap(new_vnodes)) + .instrument_await("in_mem_send_item_vnode_bitmap") .await .map_err(|_| anyhow!("unable to send vnode bitmap")) } diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 52377c49c322..e8dfc09e9130 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -125,7 +125,6 @@ impl SinkExecutor { let sink_id = self.sink_param.sink_id; let actor_id = self.actor_context.id; let fragment_id = self.actor_context.fragment_id; - let executor_id = self.sink_writer_param.executor_id; let stream_key = self.info.pk_indices.clone(); let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics( @@ -217,10 +216,9 @@ impl SinkExecutor { self.sink_writer_param, self.actor_context, ) - .instrument_await(format!( - "Consume Log: sink_id: {} actor_id: {}, executor_id: {}", - sink_id, actor_id, executor_id, - )); + .instrument_await(format!("consume_log (sink_id {sink_id})")) + .map_ok(|never| match never {}); // unify return type to `Message` + // TODO: may try to remove the boxed select(consume_log_stream.into_stream(), write_log_stream).boxed() }) @@ -403,7 +401,7 @@ impl SinkExecutor { sink_param: SinkParam, mut sink_writer_param: SinkWriterParam, actor_context: ActorContextRef, - ) -> StreamExecutorResult { + ) -> StreamExecutorResult { let metrics = sink_writer_param.sink_metrics.clone(); let visible_columns = columns