Skip to content

Commit

Permalink
feat: add metrics of stream join cached rows & entries (#4891)
Browse files Browse the repository at this point in the history
* feat: add metrics of cached join rows/entries

* minor improve

* add dashboard panel

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
fuyufjh and mergify[bot] authored Aug 25, 2022
1 parent cf2ddd4 commit 3f97643
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 1 deletion.
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions grafana/risingwave-dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,16 @@ def section_streaming_actors(outer_panels):
"rate(stream_join_actor_input_waiting_duration_ns[$__rate_interval]) / 1000000000", "{{actor_id}}"
),
]),
panels.timeseries_count("Stream Join Cached Entries", [
panels.target(
"stream_join_cached_entries", "{{actor_id}} {{side}}"
),
]),
panels.timeseries_count("Stream Join Cached Rows", [
panels.target(
"stream_join_cached_rows", "{{actor_id}} {{side}}"
),
])
])
]

Expand Down
20 changes: 20 additions & 0 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,26 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
self.side_r.ht.state_table.update_vnode_bitmap(vnode_bitmap);
}

// Report metrics of cached join rows/entries
let cached_rows_l: usize = self.side_l.ht.values().map(|e| e.size()).sum();
let cached_rows_r: usize = self.side_r.ht.values().map(|e| e.size()).sum();
self.metrics
.join_cached_rows
.with_label_values(&[&actor_id_str, "left"])
.set(cached_rows_l as i64);
self.metrics
.join_cached_rows
.with_label_values(&[&actor_id_str, "right"])
.set(cached_rows_r as i64);
self.metrics
.join_cached_entries
.with_label_values(&[&actor_id_str, "left"])
.set(self.side_l.ht.len() as i64);
self.metrics
.join_cached_entries
.with_label_values(&[&actor_id_str, "right"])
.set(self.side_r.ht.len() as i64);

yield Message::Barrier(barrier);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type JoinEntryStateValuesMut<'a> = btree_map::ValuesMut<'a, PkType, StateValueTy

/// We manages a `BTreeMap` in memory for all entries belonging to a join key.
/// When evicted, `cached` does not hold any entries.
///
/// If a `JoinEntryState` exists for a join key, the all records under this
/// join key will be presented in the cache.
pub struct JoinEntryState {
Expand Down Expand Up @@ -67,6 +68,10 @@ impl JoinEntryState {
(encoded, decoded)
})
}

pub fn size(&self) -> usize {
self.cached.len()
}
}

#[cfg(test)]
Expand Down
23 changes: 23 additions & 0 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,15 @@ pub struct StreamingMetrics {
pub source_output_row_count: GenericCounterVec<AtomicU64>,
pub exchange_recv_size: GenericCounterVec<AtomicU64>,
pub exchange_frag_recv_size: GenericCounterVec<AtomicU64>,

// Streaming Join
pub join_lookup_miss_count: GenericCounterVec<AtomicU64>,
pub join_total_lookup_count: GenericCounterVec<AtomicU64>,
pub join_actor_input_waiting_duration_ns: GenericCounterVec<AtomicU64>,
pub join_barrier_align_duration: HistogramVec,
pub join_cached_entries: GenericGaugeVec<AtomicI64>,
pub join_cached_rows: GenericGaugeVec<AtomicI64>,

/// The duration from receipt of barrier to all actors collection.
/// And the max of all node `barrier_inflight_latency` is the latency for a barrier
/// to flow through the graph.
Expand Down Expand Up @@ -251,6 +256,22 @@ impl StreamingMetrics {
register_histogram_vec_with_registry!(opts, &["actor_id", "wait_side"], registry)
.unwrap();

let join_cached_entries = register_int_gauge_vec_with_registry!(
"stream_join_cached_entries",
"Number of cached entries in streaming join operators",
&["actor_id", "side"],
registry
)
.unwrap();

let join_cached_rows = register_int_gauge_vec_with_registry!(
"stream_join_cached_rows",
"Number of cached rows in streaming join operators",
&["actor_id", "side"],
registry
)
.unwrap();

let opts = histogram_opts!(
"stream_barrier_inflight_duration_seconds",
"barrier_inflight_latency",
Expand Down Expand Up @@ -298,6 +319,8 @@ impl StreamingMetrics {
join_total_lookup_count,
join_actor_input_waiting_duration_ns,
join_barrier_align_duration,
join_cached_entries,
join_cached_rows,
barrier_inflight_latency,
barrier_sync_latency,
sink_commit_duration,
Expand Down

0 comments on commit 3f97643

Please sign in to comment.