Skip to content

Commit

Permalink
feat: add source backfill metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Feb 28, 2024
1 parent d3f4864 commit 5adcd05
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 4 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,16 @@ def section_streaming(outer_panels):
)
],
),
panels.timeseries_rowsps(
"Source Backfill Throughput(rows/s)",
"The figure shows the number of rows read by each source per second.",
[
panels.target(
f"sum(rate({metric('stream_source_backfill_rows_counts')}[$__rate_interval])) by (source_id, source_name, fragment_id)",
"{{source_id}} {{source_name}} (fragment {{fragment_id}})",
),
],
),
panels.timeseries_count(
"Source Upstream Status",
"Monitor each source upstream, 0 means the upstream is not normal, 1 means the source is ready.",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions grafana/risingwave-user-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,16 @@ def section_streaming(outer_panels):
)
],
),
panels.timeseries_rowsps(
"Source Backfill Throughput(rows/s)",
"The figure shows the number of rows read by each source per second.",
[
panels.target(
f"sum(rate({metric('stream_source_backfill_rows_counts')}[$__rate_interval])) by (source_id, source_name, fragment_id)",
"{{source_id}} {{source_name}} (fragment {{fragment_id}})",
),
],
),
panels.timeseries_rowsps(
"Materialized View Throughput(rows/s)",
"The figure shows the number of rows written into each materialized executor actor per second.",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub struct StreamingMetrics {
pub source_output_row_count: GenericCounterVec<AtomicU64>,
pub source_row_per_barrier: GenericCounterVec<AtomicU64>,
pub source_split_change_count: GenericCounterVec<AtomicU64>,
pub source_backfill_row_count: GenericCounterVec<AtomicU64>,

// Sink & materialized view
pub sink_input_row_count: LabelGuardedIntCounterVec<3>,
Expand Down Expand Up @@ -234,6 +235,14 @@ impl StreamingMetrics {
)
.unwrap();

let source_backfill_row_count = register_int_counter_vec_with_registry!(
"stream_source_backfill_rows_counts",
"Total number of rows that have been backfilled for source",
&["source_id", "source_name", "actor_id", "fragment_id"],
registry
)
.unwrap();

let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
"stream_sink_input_row_count",
"Total number of rows streamed into sink executors",
Expand Down Expand Up @@ -1079,6 +1088,7 @@ impl StreamingMetrics {
source_output_row_count,
source_row_per_barrier,
source_split_change_count,
source_backfill_row_count,
sink_input_row_count,
mview_input_row_count,
exchange_frag_recv_size,
Expand Down
41 changes: 41 additions & 0 deletions src/stream/src/executor/source/kafka_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,17 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
Ok((futures::stream::select_all(streams).boxed(), abort_handles))
}

/// `source_id | source_name | actor_id | fragment_id`
#[inline]
fn get_metric_labels(&self) -> [String; 4] {
[
self.stream_source_core.source_id.to_string(),
format!("{}_backfill", self.stream_source_core.source_name.clone()),
self.actor_ctx.id.to_string(),
self.actor_ctx.fragment_id.to_string(),
]
}

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute(mut self, input: Executor) {
let mut input = input.execute();
Expand Down Expand Up @@ -564,6 +575,16 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
"Unexpected backfilling state, split_id: {split_id}"
);
});
self.metrics
.source_backfill_row_count
.with_label_values(
&self
.get_metric_labels()
.iter()
.map(AsRef::as_ref)
.collect::<Vec<&str>>(),
)
.inc_by(chunk.cardinality() as u64);

yield Message::Chunk(chunk);
}
Expand Down Expand Up @@ -648,6 +669,16 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
stage: &mut BackfillStage,
should_trim_state: bool,
) -> StreamExecutorResult<bool> {
self.metrics
.source_split_change_count
.with_label_values(
&self
.get_metric_labels()
.iter()
.map(AsRef::as_ref)
.collect::<Vec<&str>>(),
)
.inc();
if let Some(target_splits) = split_assignment.get(&self.actor_ctx.id).cloned() {
if self
.update_state_if_changed(target_splits, stage, should_trim_state)
Expand Down Expand Up @@ -754,6 +785,16 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
splits: &mut HashSet<SplitId>,
should_trim_state: bool,
) -> StreamExecutorResult<()> {
self.metrics
.source_split_change_count
.with_label_values(
&self
.get_metric_labels()
.iter()
.map(AsRef::as_ref)
.collect::<Vec<&str>>(),
)
.inc();
if let Some(target_splits) = split_assignment.get(&self.actor_ctx.id).cloned() {
self.update_state_if_changed_forward_stage(target_splits, splits, should_trim_state)
.await?;
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ impl<S: StateStore> SourceExecutor<S> {
.map_err(StreamExecutorError::connector_error)
}

/// `source_id | source_name | actor_id | fragment_id`
#[inline]
fn get_metric_labels(&self) -> [String; 4] {
[
Expand Down

0 comments on commit 5adcd05

Please sign in to comment.