Skip to content

Commit

Permalink
refactor(metrics): display actor-level metrics based on configuration
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Aug 30, 2024
1 parent 04619f6 commit 606bb71
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 15 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion e2e_test/nexmark/create_sources.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ CREATE SOURCE nexmark (
) WITH (
connector = 'nexmark',
nexmark.split.num = '2',
nexmark.min.event.gap.in.ns = '100'
nexmark.min.event.gap.in.ns = '100000000'
);

statement ok
Expand Down
21 changes: 11 additions & 10 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1121,7 +1121,8 @@ def section_streaming_cdc(outer_panels):
]


def section_streaming_actors(outer_panels):
def section_streaming_actors(outer_panels: Panels):
actor_level_filter = "actor_id!=''"
panels = outer_panels.sub_panel()
return [
outer_panels.row_collapsed(
Expand Down Expand Up @@ -1159,10 +1160,10 @@ def section_streaming_actors(outer_panels):
[
panels.target(
f"sum(rate({metric('stream_actor_in_record_cnt')}[$__rate_interval])) by (fragment_id, upstream_fragment_id)",
"fragment {{fragment_id}}<-{{upstream_fragment_id}}",
"fragment total {{fragment_id}}<-{{upstream_fragment_id}}",
),
panels.target_hidden(
f"rate({metric('stream_actor_in_record_cnt')}[$__rate_interval])",
panels.target(
f"rate({metric('stream_actor_in_record_cnt', actor_level_filter)}[$__rate_interval])",
"actor {{actor_id}}",
),
],
Expand All @@ -1173,10 +1174,10 @@ def section_streaming_actors(outer_panels):
[
panels.target(
f"sum(rate({metric('stream_actor_out_record_cnt')}[$__rate_interval])) by (fragment_id)",
"fragment {{fragment_id}}",
"fragment total {{fragment_id}}",
),
panels.target_hidden(
f"rate({metric('stream_actor_out_record_cnt')}[$__rate_interval])",
panels.target(
f"rate({metric('stream_actor_out_record_cnt', actor_level_filter)}[$__rate_interval])",
"actor {{actor_id}}",
),
],
Expand All @@ -1189,9 +1190,9 @@ def section_streaming_actors(outer_panels):
f"sum({metric('stream_memory_usage')}) by (table_id, desc)",
"table {{table_id}} desc: {{desc}}",
),
panels.target_hidden(
f"{metric('stream_memory_usage')}",
"table {{table_id}} actor {{actor_id}} desc: {{desc}}",
panels.target(
f"{metric('stream_memory_usage', actor_level_filter)}",
"actor {{actor_id}} table {{table_id}} desc: {{desc}}",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

17 changes: 15 additions & 2 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ pub struct StreamingMetrics {
pub actor_count: LabelGuardedIntGaugeVec<1>,
#[expect(dead_code)]
actor_memory_usage: LabelGuardedIntGaugeVec<2>,
actor_in_record_cnt: LabelGuardedIntCounterVec<3>,
pub actor_out_record_cnt: LabelGuardedIntCounterVec<2>,
actor_in_record_cnt: RelabeledGuardedIntCounterVec<3>,
pub actor_out_record_cnt: RelabeledGuardedIntCounterVec<2>,

// Source
pub source_output_row_count: LabelGuardedIntCounterVec<4>,
Expand Down Expand Up @@ -396,6 +396,12 @@ impl StreamingMetrics {
registry
)
.unwrap();
let actor_in_record_cnt = RelabeledGuardedIntCounterVec::with_metric_level_relabel_n(
MetricLevel::Debug,
actor_in_record_cnt,
level,
1,
);

let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
"stream_actor_out_record_cnt",
Expand All @@ -404,6 +410,12 @@ impl StreamingMetrics {
registry
)
.unwrap();
let actor_out_record_cnt = RelabeledGuardedIntCounterVec::with_metric_level_relabel_n(
MetricLevel::Debug,
actor_out_record_cnt,
level,
1,
);

let actor_count = register_guarded_int_gauge_vec_with_registry!(
"stream_actor_count",
Expand All @@ -413,6 +425,7 @@ impl StreamingMetrics {
)
.unwrap();

// dead code
let actor_memory_usage = register_guarded_int_gauge_vec_with_registry!(
"actor_memory_usage",
"Memory usage (bytes)",
Expand Down

0 comments on commit 606bb71

Please sign in to comment.