Skip to content

Commit

Permalink
feat(sink): add sink throughput( mb/s) metrics (#19587)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
Co-authored-by: Richard Chien <[email protected]>
  • Loading branch information
xxhZs and stdrc authored Dec 10, 2024
1 parent 152cab2 commit 2b819a3
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 4 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

45 changes: 43 additions & 2 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ def section_streaming(outer_panels):
# TODO: These 2 metrics should be deprecated because they are unaware of Log Store
# Let's remove them when all sinks are migrated to Log Store
panels.timeseries_rowsps(
"Sink Throughput(rows/s) *",
"Sink Throughput(rows/s)",
"The number of rows streamed into each sink per second. For sinks with 'sink_decouple = true', please refer to the 'Sink Metrics' section",
[
panels.target(
Expand All @@ -961,7 +961,7 @@ def section_streaming(outer_panels):
],
),
panels.timeseries_rowsps(
"Sink Throughput(rows/s) per Partition *",
"Sink Throughput(rows/s) per Partition",
"The number of rows streamed into each sink per second. For sinks with 'sink_decouple = true', please refer to the 'Sink Metrics' section",
[
panels.target(
Expand All @@ -970,6 +970,26 @@ def section_streaming(outer_panels):
),
],
),
panels.timeseries_bytesps(
"Sink Throughput(MB/s)",
"The figure shows the number of bytes written each sink per second. For sinks with 'sink_decouple = true', please refer to the 'Sink Metrics' section",
[
panels.target(
f"(sum(rate({metric('stream_sink_input_bytes')}[$__rate_interval])) by (sink_id) * on(sink_id) group_left(sink_name) group({metric('sink_info')}) by (sink_id, sink_name)) / (1000*1000)",
"sink {{sink_id}} {{sink_name}}",
),
],
),
panels.timeseries_bytesps(
"Sink Throughput(MB/s) per Partition",
"The number of bytes streamed into each sink per second. For sinks with 'sink_decouple = true', please refer to the 'Sink Metrics' section",
[
panels.target(
f"(sum(rate({metric('stream_sink_input_bytes')}[$__rate_interval])) by (sink_id, actor_id) * on(actor_id) group_left(sink_name) {metric('sink_info')}) / (1000*1000)",
"sink {{sink_id}} {{sink_name}} - actor {{actor_id}}",
),
],
),
panels.timeseries_rowsps(
"Materialized View Throughput(rows/s)",
"The figure shows the number of rows written into each materialized view per second.",
Expand Down Expand Up @@ -4433,6 +4453,27 @@ def section_sink_metrics(outer_panels):
),
],
),
panels.timeseries_bytesps(
"Log Store Consume Throughput(MB/s)",
"",
[
panels.target(
f"sum(rate({metric('log_store_read_bytes')}[$__rate_interval])) by (connector, sink_id, sink_name) / (1000*1000)",
"{{sink_id}} {{sink_name}} ({{connector}})",
),
],
),
panels.timeseries_bytesps(
"Executor Log Store Consume Throughput(MB/s)",
"",
[
panels.target(
f"sum(rate({metric('log_store_read_bytes')}[$__rate_interval])) by ({NODE_LABEL}, connector, sink_id, actor_id, sink_name) / (1000*1000)",
"{{sink_id}} {{sink_name}} ({{connector}}) actor {{actor_id}} @ {{%s}}"
% NODE_LABEL,
),
],
),
panels.timeseries_rowsps(
"Log Store Write Throughput(rows)",
"",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions src/connector/src/sink/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use risingwave_common::bail;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
use risingwave_common::util::epoch::{EpochPair, INVALID_EPOCH};
use risingwave_common_estimate_size::EstimateSize;

pub type LogStoreResult<T> = Result<T, anyhow::Error>;
pub type ChunkId = usize;
Expand Down Expand Up @@ -272,6 +273,7 @@ pub struct MonitoredLogReader<R: LogReader> {
pub struct LogReaderMetrics {
pub log_store_latest_read_epoch: LabelGuardedIntGauge<4>,
pub log_store_read_rows: LabelGuardedIntCounter<4>,
pub log_store_read_bytes: LabelGuardedIntCounter<4>,
pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter<4>,
}

Expand Down Expand Up @@ -304,6 +306,9 @@ impl<R: LogReader> LogReader for MonitoredLogReader<R> {
self.metrics
.log_store_read_rows
.inc_by(chunk.cardinality() as _);
self.metrics
.log_store_read_bytes
.inc_by(chunk.estimated_size() as u64);
}
})
}
Expand Down
10 changes: 10 additions & 0 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ pub struct SinkMetrics {
// Log store reader metrics
pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec<4>,
pub log_store_read_rows: LabelGuardedIntCounterVec<4>,
pub log_store_read_bytes: LabelGuardedIntCounterVec<4>,
pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec<4>,

// Iceberg metrics
Expand Down Expand Up @@ -386,6 +387,14 @@ impl SinkMetrics {
)
.unwrap();

let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
"log_store_read_bytes",
"Total size of chunks read by log reader",
&["actor_id", "connector", "sink_id", "sink_name"],
registry
)
.unwrap();

let log_store_reader_wait_new_future_duration_ns =
register_guarded_int_counter_vec_with_registry!(
"log_store_reader_wait_new_future_duration_ns",
Expand Down Expand Up @@ -451,6 +460,7 @@ impl SinkMetrics {
log_store_write_rows,
log_store_latest_read_epoch,
log_store_read_rows,
log_store_read_bytes,
log_store_reader_wait_new_future_duration_ns,
iceberg_write_qps,
iceberg_write_latency,
Expand Down
12 changes: 12 additions & 0 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub struct StreamingMetrics {

// Sink
sink_input_row_count: LabelGuardedIntCounterVec<3>,
sink_input_bytes: LabelGuardedIntCounterVec<3>,
sink_chunk_buffer_size: LabelGuardedIntGaugeVec<3>,

// Exchange (see also `compute::ExchangeServiceMetrics`)
Expand Down Expand Up @@ -244,6 +245,14 @@ impl StreamingMetrics {
)
.unwrap();

let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
"stream_sink_input_bytes",
"Total size of chunks streamed into sink executors",
&["sink_id", "actor_id", "fragment_id"],
registry
)
.unwrap();

let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
"stream_mview_input_row_count",
"Total number of rows streamed into materialize executors",
Expand Down Expand Up @@ -1065,6 +1074,7 @@ impl StreamingMetrics {
source_split_change_count,
source_backfill_row_count,
sink_input_row_count,
sink_input_bytes,
sink_chunk_buffer_size,
exchange_frag_recv_size,
merge_barrier_align_duration,
Expand Down Expand Up @@ -1239,6 +1249,7 @@ impl StreamingMetrics {
sink_input_row_count: self
.sink_input_row_count
.with_guarded_label_values(label_list),
sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
sink_chunk_buffer_size: self
.sink_chunk_buffer_size
.with_guarded_label_values(label_list),
Expand Down Expand Up @@ -1533,6 +1544,7 @@ pub struct ActorMetrics {

pub struct SinkExecutorMetrics {
pub sink_input_row_count: LabelGuardedIntCounter<3>,
pub sink_input_bytes: LabelGuardedIntCounter<3>,
pub sink_chunk_buffer_size: LabelGuardedIntGauge<3>,
}

Expand Down
5 changes: 5 additions & 0 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
let input = input.inspect_ok(move |msg| {
if let Message::Chunk(c) = msg {
metrics.sink_input_row_count.inc_by(c.capacity() as u64);
metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
}
});

Expand Down Expand Up @@ -482,12 +483,16 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
let log_store_read_rows = GLOBAL_SINK_METRICS
.log_store_read_rows
.with_guarded_label_values(&labels);
let log_store_read_bytes = GLOBAL_SINK_METRICS
.log_store_read_bytes
.with_guarded_label_values(&labels);
let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
.log_store_latest_read_epoch
.with_guarded_label_values(&labels);
let metrics = LogReaderMetrics {
log_store_latest_read_epoch,
log_store_read_rows,
log_store_read_bytes,
log_store_reader_wait_new_future_duration_ns,
};

Expand Down

0 comments on commit 2b819a3

Please sign in to comment.