Skip to content

Commit

Permalink
feat(stream): Report compute_error_count to prometheus (#7832)
Browse files Browse the repository at this point in the history
Step 1: Report user_error_count to prometheus.

![image](https://user-images.githubusercontent.com/9093549/218353166-4ae72621-6624-4967-99c3-a68331801b22.png)

Approved-By: TennyZhuang

Co-Authored-By: jon-chuang <[email protected]>
Co-Authored-By: jon-chuang <[email protected]>
  • Loading branch information
jon-chuang and jon-chuang authored Feb 20, 2023
1 parent 1c94d22 commit f5b7fd7
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 4 deletions.
22 changes: 22 additions & 0 deletions grafana/risingwave-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1457,6 +1457,27 @@ def section_streaming_exchange(outer_panels):
]


def section_streaming_errors(outer_panels):
panels = outer_panels.sub_panel()
return [
outer_panels.row_collapsed(
"Streaming Errors",
[
panels.timeseries_count(
"User Errors by Type",
"",
[
panels.target(
f"sum({metric('user_error_count')}) by (error_type, error_msg, fragment_id, executor_name)",
"{{error_type}}: {{error_msg}} ({{executor_name}}: fragment_id={{fragment_id}})",
),
],
),
],
),
]


def section_batch_exchange(outer_panels):
panels = outer_panels.sub_panel()
return [
Expand Down Expand Up @@ -2502,6 +2523,7 @@ def section_memory_manager(outer_panels):
*section_streaming(panels),
*section_streaming_actors(panels),
*section_streaming_exchange(panels),
*section_streaming_errors(panels),
*section_batch_exchange(panels),
*section_hummock(panels),
*section_compaction(panels),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

23 changes: 22 additions & 1 deletion src/stream/src/executor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ use crate::task::{ActorId, SharedContext};
/// Shared by all operators of an actor.
pub struct ActorContext {
pub id: ActorId,
fragment_id: u32,

// TODO: report errors and prompt the user.
pub errors: Mutex<HashMap<String, Vec<ExprError>>>,

last_mem_val: Arc<AtomicUsize>,
cur_mem_val: Arc<AtomicUsize>,
total_mem_val: Arc<TrAdder<i64>>,
streaming_metrics: Arc<StreamingMetrics>,
}

pub type ActorContextRef = Arc<ActorContext>;
Expand All @@ -50,25 +52,44 @@ impl ActorContext {
pub fn create(id: ActorId) -> ActorContextRef {
Arc::new(Self {
id,
fragment_id: 0,
errors: Default::default(),
cur_mem_val: Arc::new(0.into()),
last_mem_val: Arc::new(0.into()),
total_mem_val: Arc::new(TrAdder::new()),
streaming_metrics: Arc::new(StreamingMetrics::unused()),
})
}

pub fn create_with_counter(id: ActorId, total_mem_val: Arc<TrAdder<i64>>) -> ActorContextRef {
pub fn create_with_metrics(
id: ActorId,
fragment_id: u32,
total_mem_val: Arc<TrAdder<i64>>,
streaming_metrics: Arc<StreamingMetrics>,
) -> ActorContextRef {
Arc::new(Self {
id,
fragment_id,
errors: Default::default(),
cur_mem_val: Arc::new(0.into()),
last_mem_val: Arc::new(0.into()),
total_mem_val,
streaming_metrics,
})
}

pub fn on_compute_error(&self, err: ExprError, identity: &str) {
tracing::error!("Compute error: {}, executor: {identity}", err);
let executor_name = identity.split(' ').next().unwrap_or("name_not_found");
self.streaming_metrics
.user_error_count
.with_label_values(&[
"ExprError",
&err.to_string(),
executor_name,
&self.fragment_id.to_string(),
])
.inc();
self.errors
.lock()
.entry(identity.to_owned())
Expand Down
12 changes: 12 additions & 0 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ pub struct StreamingMetrics {
pub lru_runtime_loop_count: IntCounter,
pub lru_watermark_step: IntGauge,
pub jemalloc_allocated_bytes: IntGauge,

/// User error reporting
pub user_error_count: GenericCounterVec<AtomicU64>,
}

impl StreamingMetrics {
Expand Down Expand Up @@ -430,6 +433,14 @@ impl StreamingMetrics {
)
.unwrap();

let user_error_count = register_int_counter_vec_with_registry!(
"user_error_count",
"user errors in the system, queryable by tags",
&["error_type", "error_msg", "executor_name", "fragment_id"],
registry,
)
.unwrap();

Self {
registry,
executor_row_count,
Expand Down Expand Up @@ -476,6 +487,7 @@ impl StreamingMetrics {
lru_runtime_loop_count,
lru_watermark_step,
jemalloc_allocated_bytes,
user_error_count,
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -614,8 +614,12 @@ impl LocalStreamManagerCore {
StreamError::from(anyhow!("No such actor with actor id:{}", actor_id))
})?;
let mview_definition = &actor.mview_definition;
let actor_context =
ActorContext::create_with_counter(actor_id, self.total_mem_val.clone());
let actor_context = ActorContext::create_with_metrics(
actor_id,
actor.fragment_id,
self.total_mem_val.clone(),
self.streaming_metrics.clone(),
);
let vnode_bitmap = actor
.vnode_bitmap
.as_ref()
Expand Down

0 comments on commit f5b7fd7

Please sign in to comment.