Skip to content

Commit

Permalink
fix(metrics): remove Source Throughput(rows) per barrier (#15484)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Mar 6, 2024
1 parent 4cba127 commit 0dad818
Show file tree
Hide file tree
Showing 6 changed files with 2 additions and 53 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

13 changes: 0 additions & 13 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,19 +720,6 @@ def section_streaming(outer_panels):
)
],
),
panels.timeseries_rowsps(
"Source Throughput(rows) per barrier",
"RisingWave ingests barriers periodically to trigger computation and checkpoints. The frequency of "
"barrier can be set by barrier_interval_ms. This metric shows how many rows are ingested between two "
"consecutive barriers.",
[
panels.target(
f"rate({metric('stream_source_rows_per_barrier_counts')}[$__rate_interval])",
"actor={{actor_id}} source={{source_id}} @ {{%s}}"
% NODE_LABEL,
)
],
),
panels.timeseries_count(
"Source Upstream Status",
"Monitor each source upstream, 0 means the upstream is not normal, 1 means the source is ready.",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

10 changes: 0 additions & 10 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ pub struct StreamingMetrics {

// Source
pub source_output_row_count: GenericCounterVec<AtomicU64>,
pub source_row_per_barrier: GenericCounterVec<AtomicU64>,
pub source_split_change_count: GenericCounterVec<AtomicU64>,

// Sink & materialized view
Expand Down Expand Up @@ -218,14 +217,6 @@ impl StreamingMetrics {
)
.unwrap();

let source_row_per_barrier = register_int_counter_vec_with_registry!(
"stream_source_rows_per_barrier_counts",
"Total number of rows that have been output from source per barrier",
&["actor_id", "executor_id", "fragment_id"],
registry
)
.unwrap();

let source_split_change_count = register_int_counter_vec_with_registry!(
"stream_source_split_change_event_count",
"Total number of split change events that have been operated by source",
Expand Down Expand Up @@ -1077,7 +1068,6 @@ impl StreamingMetrics {
actor_in_record_cnt,
actor_out_record_cnt,
source_output_row_count,
source_row_per_barrier,
source_split_change_count,
sink_input_row_count,
mview_input_row_count,
Expand Down
11 changes: 0 additions & 11 deletions src/stream/src/executor/source/fs_source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,6 @@ impl<S: StateStore> FsSourceExecutor<S> {
self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
let mut last_barrier_time = Instant::now();
let mut self_paused = false;
let mut metric_row_per_barrier: u64 = 0;
while let Some(msg) = stream.next().await {
match msg? {
// This branch will be preferred.
Expand Down Expand Up @@ -395,16 +394,6 @@ impl<S: StateStore> FsSourceExecutor<S> {
}
self.take_snapshot_and_clear_cache(epoch).await?;

self.metrics
.source_row_per_barrier
.with_label_values(&[
self.actor_ctx.id.to_string().as_str(),
self.stream_source_core.source_id.to_string().as_ref(),
self.actor_ctx.fragment_id.to_string().as_str(),
])
.inc_by(metric_row_per_barrier);
metric_row_per_barrier = 0;

yield msg;
}
_ => {
Expand Down
17 changes: 0 additions & 17 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,6 @@ impl<S: StateStore> SourceExecutor<S> {
self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
let mut last_barrier_time = Instant::now();
let mut self_paused = false;
let mut metric_row_per_barrier: u64 = 0;

while let Some(msg) = stream.next().await {
let Ok(msg) = msg else {
Expand Down Expand Up @@ -490,21 +489,6 @@ impl<S: StateStore> SourceExecutor<S> {

self.persist_state_and_clear_cache(epoch).await?;

self.metrics
.source_row_per_barrier
.with_label_values(&[
self.actor_ctx.id.to_string().as_str(),
self.stream_source_core
.as_ref()
.unwrap()
.source_id
.to_string()
.as_ref(),
self.actor_ctx.fragment_id.to_string().as_str(),
])
.inc_by(metric_row_per_barrier);
metric_row_per_barrier = 0;

yield Message::Barrier(barrier);
}
Either::Left(_) => {
Expand Down Expand Up @@ -562,7 +546,6 @@ impl<S: StateStore> SourceExecutor<S> {
.updated_splits_in_epoch
.extend(state);
}
metric_row_per_barrier += chunk.cardinality() as u64;

self.metrics
.source_output_row_count
Expand Down

0 comments on commit 0dad818

Please sign in to comment.