Skip to content

Commit

Permalink
improve more
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Aug 30, 2024
1 parent 606bb71 commit 3513fc8
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 57 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

50 changes: 21 additions & 29 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,9 @@ def section_streaming_actors(outer_panels: Panels):
"much time it takes an actor to process a message, i.e. a barrier, a watermark or rows of data, "
"on average. Then we divide this duration by 1 second and show it as a percentage.",
[
# The metrics might be pre-aggregated locally on each compute node when `actor_id` is masked due to metrics level settings.
# Thus to calculate the average, we need to manually divide the actor count.
#
# Note: actor_count is equal to the number of dispatchers for a given downstream fragment,
# this holds true as long as we don't support multiple edges between two fragments.
panels.target(
Expand Down Expand Up @@ -1188,11 +1191,11 @@ def section_streaming_actors(outer_panels: Panels):
[
panels.target(
f"sum({metric('stream_memory_usage')}) by (table_id, desc)",
"table {{table_id}} desc: {{desc}}",
"table total {{table_id}}: {{desc}}",
),
panels.target(
f"{metric('stream_memory_usage', actor_level_filter)}",
"actor {{actor_id}} table {{table_id}} desc: {{desc}}",
"actor {{actor_id}} table {{table_id}}: {{desc}}",
),
],
),
Expand Down Expand Up @@ -1232,13 +1235,13 @@ def section_streaming_actors(outer_panels: Panels):
f"sum(rate({table_metric('stream_materialize_cache_total_count')}[$__rate_interval])) by (table_id, fragment_id)",
"total cached count - table {{table_id}} fragment {{fragment_id}}",
),
panels.target_hidden(
f"rate({table_metric('stream_materialize_cache_hit_count')}[$__rate_interval])",
"cache hit count - table {{table_id}} actor {{actor_id}}",
panels.target(
f"rate({table_metric('stream_materialize_cache_hit_count', actor_level_filter)}[$__rate_interval])",
"cache hit count - actor {{actor_id}} table {{table_id}} fragment {{fragment_id}}",
),
panels.target_hidden(
f"rate({table_metric('stream_materialize_cache_total_count')}[$__rate_interval])",
"total cached count - table {{table_id}} actor {{actor_id}}",
panels.target(
f"rate({table_metric('stream_materialize_cache_total_count', actor_level_filter)}[$__rate_interval])",
"total cached count - actor {{actor_id}} table {{table_id}} fragment {{fragment_id}}",
),
],
),
Expand Down Expand Up @@ -1331,12 +1334,15 @@ def section_streaming_actors(outer_panels: Panels):
"Executor Barrier Align Per Second",
"",
[
# The metrics might be pre-aggregated locally on each compute node when `actor_id` is masked due to metrics level settings.
# Thus to calculate the average, we need to manually divide the actor count.
panels.target(
f"avg(rate({metric('stream_barrier_align_duration_ns')}[$__rate_interval]) / 1000000000) by (fragment_id,wait_side, executor)",
"fragment {{fragment_id}} {{wait_side}} {{executor}}",
f"sum(rate({metric('stream_barrier_align_duration_ns')}[$__rate_interval]) / 1000000000) by (fragment_id, wait_side, executor) \
/ ignoring (wait_side, executor) group_left sum({metric('stream_actor_count')}) by (fragment_id)",
"fragment avg {{fragment_id}} {{wait_side}} {{executor}}",
),
panels.target_hidden(
f"rate({metric('stream_barrier_align_duration_ns')}[$__rate_interval]) / 1000000000",
panels.target(
f"rate({metric('stream_barrier_align_duration_ns', actor_level_filter)}[$__rate_interval]) / 1000000000",
"actor {{actor_id}} fragment {{fragment_id}} {{wait_side}} {{executor}}",
),
],
Expand Down Expand Up @@ -1566,25 +1572,11 @@ def section_streaming_actors(outer_panels: Panels):
[
panels.target(
f"sum(rate({metric('stream_executor_row_count')}[$__rate_interval])) by (executor_identity, fragment_id)",
"{{executor_identity}} fragment {{fragment_id}}",
),
panels.target_hidden(
f"rate({metric('stream_executor_row_count')}[$__rate_interval])",
"{{executor_identity}} actor {{actor_id}}",
"{{executor_identity}} fragment total {{fragment_id}}",
),
],
),
panels.timeseries_bytes(
"Actor Memory Usage (TaskLocalAlloc)",
"The actor-level memory usage statistics reported by TaskLocalAlloc. (Disabled by default)",
[
panels.target(
f"sum({metric('actor_memory_usage')}) by (fragment_id)",
"fragment {{fragment_id}}",
),
panels.target_hidden(
f"{metric('actor_memory_usage')}",
"actor {{actor_id}}",
f"rate({metric('stream_executor_row_count', actor_level_filter)}[$__rate_interval])",
"{{executor_identity}} actor {{actor_id}}",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

9 changes: 0 additions & 9 deletions src/common/src/jemalloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,3 @@ macro_rules! enable_jemalloc {
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
};
}

#[macro_export]
macro_rules! enable_task_local_jemalloc {
() => {
#[global_allocator]
static GLOBAL: task_stats_alloc::TaskLocalAlloc<tikv_jemallocator::Jemalloc> =
task_stats_alloc::TaskLocalAlloc(tikv_jemallocator::Jemalloc);
};
}
2 changes: 1 addition & 1 deletion src/stream/src/cache/managed_lru.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ where
.metrics
.stream_memory_usage
.with_guarded_label_values(&[
&metrics_info.table_id,
&metrics_info.actor_id,
&metrics_info.table_id,
&metrics_info.desc,
]);
memory_usage_metrics.set(0.into());
Expand Down
37 changes: 21 additions & 16 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::config::MetricLevel;
use risingwave_common::metrics::{
LabelGuardedGauge, LabelGuardedGaugeVec, LabelGuardedHistogramVec, LabelGuardedIntCounter,
LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec,
RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use risingwave_common::util::epoch::Epoch;
Expand All @@ -46,7 +46,7 @@ pub struct StreamingMetrics {
pub level: MetricLevel,

// Executor metrics (disabled by default)
pub executor_row_count: LabelGuardedIntCounterVec<3>,
pub executor_row_count: RelabeledGuardedIntCounterVec<3>,

// Streaming actor metrics from tokio (disabled by default)
actor_execution_time: LabelGuardedGaugeVec<1>,
Expand Down Expand Up @@ -199,12 +199,12 @@ pub struct StreamingMetrics {
pub jemalloc_metadata_bytes: IntGauge,
pub jvm_allocated_bytes: IntGauge,
pub jvm_active_bytes: IntGauge,
pub stream_memory_usage: LabelGuardedIntGaugeVec<3>,
pub stream_memory_usage: RelabeledGuardedIntGaugeVec<3>,

// Materialized view
materialize_cache_hit_count: LabelGuardedIntCounterVec<3>,
materialize_cache_total_count: LabelGuardedIntCounterVec<3>,
materialize_input_row_count: LabelGuardedIntCounterVec<3>,
materialize_cache_hit_count: RelabeledGuardedIntCounterVec<3>,
materialize_cache_total_count: RelabeledGuardedIntCounterVec<3>,
materialize_input_row_count: RelabeledGuardedIntCounterVec<3>,
}

pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
Expand All @@ -223,7 +223,8 @@ impl StreamingMetrics {
&["actor_id", "fragment_id", "executor_identity"],
registry
)
.unwrap();
.unwrap()
.relabel_debug_1(level);

let source_output_row_count = register_guarded_int_counter_vec_with_registry!(
"stream_source_output_rows_counts",
Expand Down Expand Up @@ -260,10 +261,11 @@ impl StreamingMetrics {
let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
"stream_mview_input_row_count",
"Total number of rows streamed into materialize executors",
&["table_id", "actor_id", "fragment_id"],
&["actor_id", "table_id", "fragment_id"],
registry
)
.unwrap();
.unwrap()
.relabel_debug_1(level);

let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
"stream_sink_chunk_buffer_size",
Expand Down Expand Up @@ -1057,26 +1059,29 @@ impl StreamingMetrics {
let materialize_cache_hit_count = register_guarded_int_counter_vec_with_registry!(
"stream_materialize_cache_hit_count",
"Materialize executor cache hit count",
&["table_id", "actor_id", "fragment_id"],
&["actor_id", "table_id", "fragment_id"],
registry
)
.unwrap();
.unwrap()
.relabel_debug_1(level);

let materialize_cache_total_count = register_guarded_int_counter_vec_with_registry!(
"stream_materialize_cache_total_count",
"Materialize executor cache total operation",
&["table_id", "actor_id", "fragment_id"],
&["actor_id", "table_id", "fragment_id"],
registry
)
.unwrap();
.unwrap()
.relabel_debug_1(level);

let stream_memory_usage = register_guarded_int_gauge_vec_with_registry!(
"stream_memory_usage",
"Memory usage for stream executors",
&["table_id", "actor_id", "desc"],
&["actor_id", "table_id", "desc"],
registry
)
.unwrap();
.unwrap()
.relabel_debug_1(level);

let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
"iceberg_write_qps",
Expand Down Expand Up @@ -1634,8 +1639,8 @@ impl StreamingMetrics {
fragment_id: FragmentId,
) -> MaterializeMetrics {
let label_list: &[&str; 3] = &[
&table_id.to_string(),
&actor_id.to_string(),
&table_id.to_string(),
&fragment_id.to_string(),
];
MaterializeMetrics {
Expand Down

0 comments on commit 3513fc8

Please sign in to comment.