Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): add sink throughput( mb/s) metrics #19587

Merged
merged 5 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

45 changes: 43 additions & 2 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ def section_streaming(outer_panels):
# TODO: These 2 metrics should be deprecated because they are unaware of Log Store
# Let's remove them when all sinks are migrated to Log Store
panels.timeseries_rowsps(
"Sink Throughput(rows/s) *",
"Sink Throughput(rows/s)",
"The number of rows streamed into each sink per second. For sinks with 'sink_decouple = true', please refer to the 'Sink Metrics' section",
[
panels.target(
Expand All @@ -961,7 +961,7 @@ def section_streaming(outer_panels):
],
),
panels.timeseries_rowsps(
"Sink Throughput(rows/s) per Partition *",
"Sink Throughput(rows/s) per Partition",
"The number of rows streamed into each sink per second. For sinks with 'sink_decouple = true', please refer to the 'Sink Metrics' section",
[
panels.target(
Expand All @@ -970,6 +970,26 @@ def section_streaming(outer_panels):
),
],
),
panels.timeseries_bytesps(
"Sink Throughput(MB/s)",
"The figure shows the number of bytes written each sink per second. For sinks with 'sink_decouple = true', please refer to the 'Sink Metrics' section",
[
panels.target(
f"(sum(rate({metric('stream_sink_input_bytes')}[$__rate_interval])) by (sink_id) * on(sink_id) group_left(sink_name) group({metric('sink_info')}) by (sink_id, sink_name)) / (1000*1000)",
"sink {{sink_id}} {{sink_name}}",
),
],
),
panels.timeseries_bytesps(
"Sink Throughput(MB/s) per Partition",
"The number of bytes streamed into each sink per second. For sinks with 'sink_decouple = true', please refer to the 'Sink Metrics' section",
[
panels.target(
f"(sum(rate({metric('stream_sink_input_bytes')}[$__rate_interval])) by (sink_id, actor_id) * on(actor_id) group_left(sink_name) {metric('sink_info')}) / (1000*1000)",
"sink {{sink_id}} {{sink_name}} - actor {{actor_id}}",
),
],
),
panels.timeseries_rowsps(
"Materialized View Throughput(rows/s)",
"The figure shows the number of rows written into each materialized view per second.",
Expand Down Expand Up @@ -4433,6 +4453,27 @@ def section_sink_metrics(outer_panels):
),
],
),
panels.timeseries_bytesps(
"Log Store Consume Throughput(MB/s)",
"",
[
panels.target(
f"sum(rate({metric('log_store_read_bytes')}[$__rate_interval])) by (connector, sink_id, sink_name) / (1000*1000)",
"{{sink_id}} {{sink_name}} ({{connector}})",
),
],
),
panels.timeseries_bytesps(
"Executor Log Store Consume Throughput(MB/s)",
"",
[
panels.target(
f"sum(rate({metric('log_store_read_bytes')}[$__rate_interval])) by ({NODE_LABEL}, connector, sink_id, actor_id, sink_name) / (1000*1000)",
"{{sink_id}} {{sink_name}} ({{connector}}) actor {{actor_id}} @ {{%s}}"
% NODE_LABEL,
),
],
),
panels.timeseries_rowsps(
"Log Store Write Throughput(rows)",
"",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions src/connector/src/sink/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use risingwave_common::bail;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
use risingwave_common::util::epoch::{EpochPair, INVALID_EPOCH};
use risingwave_common_estimate_size::EstimateSize;

pub type LogStoreResult<T> = Result<T, anyhow::Error>;
pub type ChunkId = usize;
Expand Down Expand Up @@ -272,6 +273,7 @@ pub struct MonitoredLogReader<R: LogReader> {
pub struct LogReaderMetrics {
pub log_store_latest_read_epoch: LabelGuardedIntGauge<4>,
pub log_store_read_rows: LabelGuardedIntCounter<4>,
pub log_store_read_bytes: LabelGuardedIntCounter<4>,
pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter<4>,
}

Expand Down Expand Up @@ -304,6 +306,9 @@ impl<R: LogReader> LogReader for MonitoredLogReader<R> {
self.metrics
.log_store_read_rows
.inc_by(chunk.cardinality() as _);
self.metrics
.log_store_read_bytes
.inc_by(chunk.estimated_size() as u64);
}
})
}
Expand Down
10 changes: 10 additions & 0 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ pub struct SinkMetrics {
// Log store reader metrics
pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec<4>,
pub log_store_read_rows: LabelGuardedIntCounterVec<4>,
pub log_store_read_bytes: LabelGuardedIntCounterVec<4>,
pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec<4>,

// Iceberg metrics
Expand Down Expand Up @@ -386,6 +387,14 @@ impl SinkMetrics {
)
.unwrap();

let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
"log_store_read_bytes",
"Total size of chunks read by log reader",
&["actor_id", "connector", "sink_id", "sink_name"],
registry
)
.unwrap();

let log_store_reader_wait_new_future_duration_ns =
register_guarded_int_counter_vec_with_registry!(
"log_store_reader_wait_new_future_duration_ns",
Expand Down Expand Up @@ -451,6 +460,7 @@ impl SinkMetrics {
log_store_write_rows,
log_store_latest_read_epoch,
log_store_read_rows,
log_store_read_bytes,
log_store_reader_wait_new_future_duration_ns,
iceberg_write_qps,
iceberg_write_latency,
Expand Down
12 changes: 12 additions & 0 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub struct StreamingMetrics {

// Sink
sink_input_row_count: LabelGuardedIntCounterVec<3>,
sink_input_bytes: LabelGuardedIntCounterVec<3>,
sink_chunk_buffer_size: LabelGuardedIntGaugeVec<3>,

// Exchange (see also `compute::ExchangeServiceMetrics`)
Expand Down Expand Up @@ -244,6 +245,14 @@ impl StreamingMetrics {
)
.unwrap();

let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
"stream_sink_input_bytes",
"Total size of chunks streamed into sink executors",
&["sink_id", "actor_id", "fragment_id"],
registry
)
.unwrap();

let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
"stream_mview_input_row_count",
"Total number of rows streamed into materialize executors",
Expand Down Expand Up @@ -1065,6 +1074,7 @@ impl StreamingMetrics {
source_split_change_count,
source_backfill_row_count,
sink_input_row_count,
sink_input_bytes,
sink_chunk_buffer_size,
exchange_frag_recv_size,
merge_barrier_align_duration,
Expand Down Expand Up @@ -1239,6 +1249,7 @@ impl StreamingMetrics {
sink_input_row_count: self
.sink_input_row_count
.with_guarded_label_values(label_list),
sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
sink_chunk_buffer_size: self
.sink_chunk_buffer_size
.with_guarded_label_values(label_list),
Expand Down Expand Up @@ -1533,6 +1544,7 @@ pub struct ActorMetrics {

pub struct SinkExecutorMetrics {
pub sink_input_row_count: LabelGuardedIntCounter<3>,
pub sink_input_bytes: LabelGuardedIntCounter<3>,
pub sink_chunk_buffer_size: LabelGuardedIntGauge<3>,
}

Expand Down
5 changes: 5 additions & 0 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
let input = input.inspect_ok(move |msg| {
if let Message::Chunk(c) = msg {
metrics.sink_input_row_count.inc_by(c.capacity() as u64);
metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
}
});

Expand Down Expand Up @@ -482,12 +483,16 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
let log_store_read_rows = GLOBAL_SINK_METRICS
.log_store_read_rows
.with_guarded_label_values(&labels);
let log_store_read_bytes = GLOBAL_SINK_METRICS
.log_store_read_bytes
.with_guarded_label_values(&labels);
let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
.log_store_latest_read_epoch
.with_guarded_label_values(&labels);
let metrics = LogReaderMetrics {
log_store_latest_read_epoch,
log_store_read_rows,
log_store_read_bytes,
log_store_reader_wait_new_future_duration_ns,
};

Expand Down
Loading