Skip to content

Commit

Permalink
feat(sink): add metrics to monitor sink back pressure (#16593)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored May 7, 2024
1 parent 746faae commit 4030ddc
Show file tree
Hide file tree
Showing 16 changed files with 118 additions and 69 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -3849,6 +3849,16 @@ def section_sink_metrics(outer_panels):
),
],
),
panels.timeseries_percentage(
"Log Store Backpressure Ratio",
"",
[
panels.target(
f"avg(rate({metric('log_store_reader_wait_new_future_duration_ns')}[$__rate_interval])) by (connector, sink_id, executor_id) / 1000000000",
"Backpressure @ {{connector}} {{sink_id}} {{executor_id}}",
),
],
),
panels.timeseries_latency(
"Log Store Consume Persistent Log Lag",
"",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl LogReader for MockRangeLogReader {
}
}

async fn truncate(&mut self, _offset: TruncateOffset) -> LogStoreResult<()> {
fn truncate(&mut self, _offset: TruncateOffset) -> LogStoreResult<()> {
Ok(())
}

Expand Down
4 changes: 1 addition & 3 deletions src/connector/src/sink/iceberg/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,7 @@ impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for IcebergLogSinkerOf<W> {
sink_metrics
.sink_commit_duration_metrics
.observe(start_time.elapsed().as_millis() as f64);
log_reader
.truncate(TruncateOffset::Barrier { epoch })
.await?;
log_reader.truncate(TruncateOffset::Barrier { epoch })?;
current_checkpoint = 0;
} else {
sink_writer.barrier(false).await?;
Expand Down
87 changes: 71 additions & 16 deletions src/connector/src/sink/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ use std::fmt::Debug;
use std::future::{poll_fn, Future};
use std::sync::Arc;
use std::task::Poll;
use std::time::Instant;

use futures::{TryFuture, TryFutureExt};
use risingwave_common::array::StreamChunk;
use risingwave_common::bail;
use risingwave_common::buffer::Bitmap;
use risingwave_common::metrics::LabelGuardedIntCounter;
use risingwave_common::util::epoch::{EpochPair, INVALID_EPOCH};

use crate::sink::SinkMetrics;
Expand Down Expand Up @@ -161,10 +163,7 @@ pub trait LogReader: Send + Sized + 'static {

/// Mark that all items emitted so far have been consumed and it is safe to truncate the log
/// from the current offset.
fn truncate(
&mut self,
offset: TruncateOffset,
) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;

/// Reset the log reader to after the latest truncate offset
///
Expand Down Expand Up @@ -205,10 +204,7 @@ impl<F: Fn(StreamChunk) -> StreamChunk + Send + 'static, R: LogReader> LogReader
Ok((epoch, item))
}

fn truncate(
&mut self,
offset: TruncateOffset,
) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
self.inner.truncate(offset)
}

Expand All @@ -219,12 +215,71 @@ impl<F: Fn(StreamChunk) -> StreamChunk + Send + 'static, R: LogReader> LogReader
}
}

pub struct BackpressureMonitoredLogReader<R: LogReader> {
inner: R,
/// Start time to wait for new future after poll ready
wait_new_future_start_time: Option<Instant>,
wait_new_future_duration_ns: LabelGuardedIntCounter<3>,
}

impl<R: LogReader> BackpressureMonitoredLogReader<R> {
fn new(inner: R, wait_new_future_duration_ns: LabelGuardedIntCounter<3>) -> Self {
Self {
inner,
wait_new_future_start_time: None,
wait_new_future_duration_ns,
}
}
}

impl<R: LogReader> LogReader for BackpressureMonitoredLogReader<R> {
fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
self.wait_new_future_start_time = None;
self.inner.init()
}

fn next_item(
&mut self,
) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
if let Some(start_time) = self.wait_new_future_start_time.take() {
self.wait_new_future_duration_ns
.inc_by(start_time.elapsed().as_nanos() as _);
}
self.inner.next_item().inspect_ok(|_| {
// Set start time when return ready
self.wait_new_future_start_time = Some(Instant::now());
})
}

fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
self.inner.truncate(offset)
}

fn rewind(
&mut self,
) -> impl Future<Output = LogStoreResult<(bool, Option<Bitmap>)>> + Send + '_ {
self.inner.rewind().inspect_ok(|_| {
self.wait_new_future_start_time = None;
})
}
}

pub struct MonitoredLogReader<R: LogReader> {
inner: R,
read_epoch: u64,
metrics: SinkMetrics,
}

impl<R: LogReader> MonitoredLogReader<R> {
pub fn new(inner: R, metrics: SinkMetrics) -> Self {
Self {
inner,
read_epoch: INVALID_EPOCH,
metrics,
}
}
}

impl<R: LogReader> LogReader for MonitoredLogReader<R> {
async fn init(&mut self) -> LogStoreResult<()> {
self.inner.init().await
Expand All @@ -244,8 +299,8 @@ impl<R: LogReader> LogReader for MonitoredLogReader<R> {
})
}

async fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
self.inner.truncate(offset).await
fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
self.inner.truncate(offset)
}

fn rewind(
Expand All @@ -267,12 +322,12 @@ where
TransformChunkLogReader { f, inner: self }
}

pub fn monitored(self, metrics: SinkMetrics) -> MonitoredLogReader<T> {
MonitoredLogReader {
read_epoch: INVALID_EPOCH,
inner: self,
metrics,
}
pub fn monitored(self, metrics: SinkMetrics) -> impl LogReader {
let wait_new_future_duration = metrics.log_store_reader_wait_new_future_duration_ns.clone();
BackpressureMonitoredLogReader::new(
MonitoredLogReader::new(self, metrics),
wait_new_future_duration,
)
}
}

Expand Down
14 changes: 6 additions & 8 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ pub struct SinkMetrics {
pub log_store_latest_read_epoch: LabelGuardedIntGauge<3>,
pub log_store_read_rows: LabelGuardedIntCounter<3>,

pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter<3>,

pub iceberg_write_qps: LabelGuardedIntCounter<2>,
pub iceberg_write_latency: LabelGuardedHistogram<2>,
pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGauge<2>,
Expand All @@ -271,6 +273,8 @@ impl SinkMetrics {
log_store_latest_read_epoch: LabelGuardedIntGauge::test_int_gauge(),
log_store_write_rows: LabelGuardedIntCounter::test_int_counter(),
log_store_read_rows: LabelGuardedIntCounter::test_int_counter(),
log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter::test_int_counter(
),
iceberg_write_qps: LabelGuardedIntCounter::test_int_counter(),
iceberg_write_latency: LabelGuardedHistogram::test_histogram(),
iceberg_rolling_unflushed_data_file: LabelGuardedIntGauge::test_int_gauge(),
Expand Down Expand Up @@ -361,10 +365,7 @@ pub trait SinkLogReader: Send + Sized + 'static {

/// Mark that all items emitted so far have been consumed and it is safe to truncate the log
/// from the current offset.
fn truncate(
&mut self,
offset: TruncateOffset,
) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
}

impl<R: LogReader> SinkLogReader for R {
Expand All @@ -374,10 +375,7 @@ impl<R: LogReader> SinkLogReader for R {
<Self as LogReader>::next_item(self)
}

fn truncate(
&mut self,
offset: TruncateOffset,
) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
<Self as LogReader>::truncate(self, offset)
}
}
Expand Down
10 changes: 4 additions & 6 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ impl LogSinker for RemoteLogSinker {
};

let poll_consume_log_and_sink = async move {
async fn truncate_matched_offset(
fn truncate_matched_offset(
queue: &mut VecDeque<(TruncateOffset, Option<Instant>)>,
persisted_offset: TruncateOffset,
log_reader: &mut impl SinkLogReader,
Expand Down Expand Up @@ -361,7 +361,7 @@ impl LogSinker for RemoteLogSinker {
.observe(start_time.elapsed().as_millis() as f64);
}

log_reader.truncate(persisted_offset).await?;
log_reader.truncate(persisted_offset)?;
Ok(())
}

Expand Down Expand Up @@ -406,8 +406,7 @@ impl LogSinker for RemoteLogSinker {
},
log_reader,
&sink_metrics,
)
.await?;
)?;
}
SinkWriterStreamResponse {
response:
Expand All @@ -426,8 +425,7 @@ impl LogSinker for RemoteLogSinker {
TruncateOffset::Barrier { epoch },
log_reader,
&sink_metrics,
)
.await?;
)?;
}
response => {
return Err(SinkError::Remote(anyhow!(
Expand Down
8 changes: 2 additions & 6 deletions src/connector/src/sink/trivial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,10 @@ impl<T: TrivialSinkName> LogSinker for TrivialSink<T> {
let (epoch, item) = log_reader.next_item().await?;
match item {
LogStoreReadItem::StreamChunk { chunk_id, .. } => {
log_reader
.truncate(TruncateOffset::Chunk { epoch, chunk_id })
.await?;
log_reader.truncate(TruncateOffset::Chunk { epoch, chunk_id })?;
}
LogStoreReadItem::Barrier { .. } => {
log_reader
.truncate(TruncateOffset::Barrier { epoch })
.await?;
log_reader.truncate(TruncateOffset::Barrier { epoch })?;
}
LogStoreReadItem::UpdateVnodeBitmap(_) => {}
}
Expand Down
6 changes: 2 additions & 4 deletions src/connector/src/sink/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,7 @@ impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for LogSinkerOf<W> {
sink_metrics
.sink_commit_duration_metrics
.observe(start_time.elapsed().as_millis() as f64);
log_reader
.truncate(TruncateOffset::Barrier { epoch })
.await?;
log_reader.truncate(TruncateOffset::Barrier { epoch })?;
} else {
sink_writer.barrier(false).await?;
}
Expand Down Expand Up @@ -270,7 +268,7 @@ impl<W: AsyncTruncateSinkWriter> LogSinker for AsyncTruncateLogSinkerOf<W> {
}
Either::Right(offset_result) => {
let offset = offset_result?;
log_reader.truncate(offset).await?;
log_reader.truncate(offset)?;
}
}
}
Expand Down
5 changes: 1 addition & 4 deletions src/stream/src/common/log_store_impl/in_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl LogReader for BoundedInMemLogStoreReader {
}
}

async fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
// check the truncate offset is higher than prev truncate offset
if self.truncate_offset >= offset {
return Err(anyhow!(
Expand Down Expand Up @@ -421,7 +421,6 @@ mod tests {
epoch: init_epoch,
chunk_id: chunk_id1_2,
})
.await
.unwrap();
assert!(poll_fn(|cx| Poll::Ready(join_handle.poll_unpin(cx)))
.await
Expand All @@ -431,14 +430,12 @@ mod tests {
epoch: epoch1,
chunk_id: chunk_id2_1,
})
.await
.unwrap();
assert!(poll_fn(|cx| Poll::Ready(join_handle.poll_unpin(cx)))
.await
.is_pending());
reader
.truncate(TruncateOffset::Barrier { epoch: epoch1 })
.await
.unwrap();
join_handle.await.unwrap();
}
Expand Down
Loading

0 comments on commit 4030ddc

Please sign in to comment.