diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index 7a3c951292086..85846557a3c4a 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -37,9 +37,11 @@ pub struct ActorContext { pub id: ActorId, pub fragment_id: u32, + // TODO(eric): these seem to be useless now? last_mem_val: Arc, cur_mem_val: Arc, total_mem_val: Arc>, + pub streaming_metrics: Arc, pub error_suppressor: Arc>, } diff --git a/src/stream/src/executor/aggregation/distinct.rs b/src/stream/src/executor/aggregation/distinct.rs index bcc12d065169e..9e1d8d66da848 100644 --- a/src/stream/src/executor/aggregation/distinct.rs +++ b/src/stream/src/executor/aggregation/distinct.rs @@ -69,6 +69,7 @@ impl ColumnDeduplicater { .map(|_| BitmapBuilder::zeroed(column.len())) .collect_vec(); let actor_id_str = ctx.id.to_string(); + let fragment_id_str = ctx.fragment_id.to_string(); let table_id_str = dedup_table.table_id().to_string(); for (datum_idx, (op, datum)) in ops.iter().zip_eq_fast(column.iter()).enumerate() { // skip if this item is hidden to all agg calls (this is likely to happen) @@ -85,7 +86,7 @@ impl ColumnDeduplicater { self.metrics_info .metrics .agg_distinct_total_cache_count - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .inc(); // TODO(yuhao): avoid this `contains`. // https://github.com/risingwavelabs/risingwave/issues/9233 @@ -95,7 +96,7 @@ impl ColumnDeduplicater { self.metrics_info .metrics .agg_distinct_cache_miss_count - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .inc(); // load from table into the cache let counts = if let Some(counts_row) = @@ -190,11 +191,12 @@ impl ColumnDeduplicater { // WARN: if you want to change to batching the write to table. please remember to change // `self.cache.evict()` too. let actor_id_str = ctx.id.to_string(); + let fragment_id_str = ctx.fragment_id.to_string(); let table_id_str = dedup_table.table_id().to_string(); self.metrics_info .metrics .agg_distinct_cached_entry_count - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .set(self.cache.len() as i64); self.cache.evict(); } diff --git a/src/stream/src/executor/exchange/input.rs b/src/stream/src/executor/exchange/input.rs index 576542ecfcecd..eb2fc97532c16 100644 --- a/src/stream/src/executor/exchange/input.rs +++ b/src/stream/src/executor/exchange/input.rs @@ -177,7 +177,7 @@ impl RemoteInput { let msg_res = Message::from_protobuf(&msg); metrics .actor_sampled_deserialize_duration_ns - .with_label_values(&[&down_actor_id]) + .with_label_values(&[&down_actor_id, &down_fragment_id]) .inc_by(start_time.elapsed().as_nanos() as u64); msg_res } else { diff --git a/src/stream/src/executor/hash_agg.rs b/src/stream/src/executor/hash_agg.rs index e338341156091..6f2e6a490ad9c 100644 --- a/src/stream/src/executor/hash_agg.rs +++ b/src/stream/src/executor/hash_agg.rs @@ -405,15 +405,17 @@ impl HashAggExecutor { // Update the metrics. let actor_id_str = this.actor_ctx.id.to_string(); + let fragment_id_str = this.actor_ctx.fragment_id.to_string(); let table_id_str = this.intermediate_state_table.table_id().to_string(); - let metric_dirty_count = this - .metrics - .agg_dirty_group_count - .with_label_values(&[&table_id_str, &actor_id_str]); + let metric_dirty_count = this.metrics.agg_dirty_group_count.with_label_values(&[ + &table_id_str, + &actor_id_str, + &fragment_id_str, + ]); let metric_dirty_heap_size = this .metrics .agg_dirty_group_heap_size - .with_label_values(&[&table_id_str, &actor_id_str]); + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]); let new_group_size = agg_group.estimated_size(); if let Some(old_group_size) = old_group_size { match new_group_size.cmp(&old_group_size) { @@ -448,29 +450,30 @@ impl HashAggExecutor { ) { // Update metrics. let actor_id_str = this.actor_ctx.id.to_string(); + let fragment_id_str = this.actor_ctx.fragment_id.to_string(); let table_id_str = this.intermediate_state_table.table_id().to_string(); this.metrics .agg_lookup_miss_count - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .inc_by(vars.stats.lookup_miss_count); vars.stats.lookup_miss_count = 0; this.metrics .agg_total_lookup_count - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .inc_by(vars.stats.total_lookup_count); vars.stats.total_lookup_count = 0; this.metrics .agg_cached_keys - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .set(vars.agg_group_cache.len() as i64); this.metrics .agg_chunk_lookup_miss_count - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .inc_by(vars.stats.chunk_lookup_miss_count); vars.stats.chunk_lookup_miss_count = 0; this.metrics .agg_chunk_total_lookup_count - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .inc_by(vars.stats.chunk_total_lookup_count); vars.stats.chunk_total_lookup_count = 0; @@ -552,11 +555,11 @@ impl HashAggExecutor { vars.dirty_groups_heap_size.set(0); this.metrics .agg_dirty_group_count - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .set(0); this.metrics .agg_dirty_group_heap_size - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .set(0); // Yield the remaining rows in chunk builder. diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 8c1998811c159..ad380a56b3da0 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -639,6 +639,7 @@ impl HashJoinExecutor HashJoinExecutor HashJoinExecutor HashJoinExecutor { @@ -761,7 +764,7 @@ impl HashJoinExecutor { @@ -787,7 +790,7 @@ impl HashJoinExecutor { @@ -817,17 +820,17 @@ impl HashJoinExecutor LookupExecutor { .into_owned_row(); let table_id_str = self.arrangement.storage_table.table_id().to_string(); let actor_id_str = self.ctx.id.to_string(); + let fragment_id_str = self.ctx.fragment_id.to_string(); self.ctx .streaming_metrics .lookup_total_query_cache_count - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .inc(); if let Some(result) = self.lookup_cache.lookup(&lookup_row) { return Ok(result.iter().cloned().collect_vec()); @@ -384,7 +385,7 @@ impl LookupExecutor { self.ctx .streaming_metrics .lookup_cache_miss_count - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .inc(); tracing::trace!(target: "events::stream::lookup::lookup_row", "{:?}", lookup_row); @@ -433,7 +434,7 @@ impl LookupExecutor { self.ctx .streaming_metrics .lookup_cached_entry_count - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .set(self.lookup_cache.len() as i64); Ok(all_rows.into_inner()) diff --git a/src/stream/src/executor/managed_state/join/mod.rs b/src/stream/src/executor/managed_state/join/mod.rs index 7ee23c06a5631..cbb90a7cb5ae6 100644 --- a/src/stream/src/executor/managed_state/join/mod.rs +++ b/src/stream/src/executor/managed_state/join/mod.rs @@ -40,7 +40,7 @@ use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTable; use crate::executor::error::StreamExecutorResult; use crate::executor::monitor::StreamingMetrics; -use crate::task::{ActorId, AtomicU64Ref}; +use crate::task::{ActorId, AtomicU64Ref, FragmentId}; type DegreeType = u64; @@ -161,6 +161,7 @@ pub struct JoinHashMapMetrics { metrics: Arc, /// Basic information actor_id: String, + fragment_id: String, join_table_id: String, degree_table_id: String, side: &'static str, @@ -175,6 +176,7 @@ impl JoinHashMapMetrics { pub fn new( metrics: Arc, actor_id: ActorId, + fragment_id: FragmentId, side: &'static str, join_table_id: u32, degree_table_id: u32, @@ -182,6 +184,7 @@ impl JoinHashMapMetrics { Self { metrics, actor_id: actor_id.to_string(), + fragment_id: fragment_id.to_string(), join_table_id: join_table_id.to_string(), degree_table_id: degree_table_id.to_string(), side, @@ -199,6 +202,7 @@ impl JoinHashMapMetrics { &self.join_table_id, &self.degree_table_id, &self.actor_id, + &self.fragment_id, ]) .inc_by(self.lookup_miss_count as u64); self.metrics @@ -208,6 +212,7 @@ impl JoinHashMapMetrics { &self.join_table_id, &self.degree_table_id, &self.actor_id, + &self.fragment_id, ]) .inc_by(self.total_lookup_count as u64); self.metrics @@ -217,6 +222,7 @@ impl JoinHashMapMetrics { &self.join_table_id, &self.degree_table_id, &self.actor_id, + &self.fragment_id, ]) .inc_by(self.insert_cache_miss_count as u64); self.total_lookup_count = 0; @@ -284,6 +290,7 @@ impl JoinHashMap { pk_contained_in_jk: bool, metrics: Arc, actor_id: ActorId, + fragment_id: FragmentId, side: &'static str, ) -> Self { let alloc = StatsAlloc::new(Global).shared(); @@ -335,6 +342,7 @@ impl JoinHashMap { metrics: JoinHashMapMetrics::new( metrics, actor_id, + fragment_id, side, join_table_id, degree_table_id, diff --git a/src/stream/src/executor/merge.rs b/src/stream/src/executor/merge.rs index 4df4ae4d3e302..9c71684f529b8 100644 --- a/src/stream/src/executor/merge.rs +++ b/src/stream/src/executor/merge.rs @@ -134,7 +134,7 @@ impl MergeExecutor { Message::Chunk(chunk) => { self.metrics .actor_in_record_cnt - .with_label_values(&[&actor_id_str]) + .with_label_values(&[&actor_id_str, &fragment_id_str]) .inc_by(chunk.cardinality() as _); } Message::Barrier(barrier) => { diff --git a/src/stream/src/executor/monitor/streaming_stats.rs b/src/stream/src/executor/monitor/streaming_stats.rs index 7737e84c3d0eb..189eb1ae334d1 100644 --- a/src/stream/src/executor/monitor/streaming_stats.rs +++ b/src/stream/src/executor/monitor/streaming_stats.rs @@ -33,7 +33,7 @@ pub struct StreamingMetrics { // Executor metrics (disabled by default) pub executor_row_count: GenericCounterVec, - // Actor metrics + // Streaming actor metrics from tokio (disabled by default) pub actor_execution_time: GenericGaugeVec, pub actor_output_buffer_blocking_duration_ns: GenericCounterVec, pub actor_input_buffer_blocking_duration_ns: GenericCounterVec, @@ -47,6 +47,8 @@ pub struct StreamingMetrics { pub actor_poll_cnt: GenericGaugeVec, pub actor_idle_duration: GenericGaugeVec, pub actor_idle_cnt: GenericGaugeVec, + + // Streaming actor pub actor_memory_usage: GenericGaugeVec, pub actor_in_record_cnt: GenericCounterVec, pub actor_out_record_cnt: GenericCounterVec, @@ -96,7 +98,7 @@ pub struct StreamingMetrics { pub group_top_n_appendonly_total_query_cache_count: GenericCounterVec, pub group_top_n_appendonly_cached_entry_count: GenericGaugeVec, - // look up + // Lookup executor pub lookup_cache_miss_count: GenericCounterVec, pub lookup_total_query_cache_count: GenericCounterVec, pub lookup_cached_entry_count: GenericGaugeVec, @@ -328,7 +330,7 @@ impl StreamingMetrics { let actor_in_record_cnt = register_int_counter_vec_with_registry!( "stream_actor_in_record_cnt", "Total number of rows actor received", - &["actor_id"], + &["actor_id", "fragment_id"], registry ) .unwrap(); @@ -344,7 +346,7 @@ impl StreamingMetrics { let actor_sampled_deserialize_duration_ns = register_int_counter_vec_with_registry!( "actor_sampled_deserialize_duration_ns", "Duration (ns) of sampled chunk deserialization", - &["actor_id"], + &["actor_id", "fragment_id"], registry ) .unwrap(); @@ -352,7 +354,7 @@ impl StreamingMetrics { let actor_memory_usage = register_int_gauge_vec_with_registry!( "actor_memory_usage", "Memory usage (bytes)", - &["actor_id"], + &["actor_id", "fragment_id"], registry, ) .unwrap(); @@ -360,7 +362,13 @@ impl StreamingMetrics { let join_lookup_miss_count = register_int_counter_vec_with_registry!( "stream_join_lookup_miss_count", "Join executor lookup miss duration", - &["side", "join_table_id", "degree_table_id", "actor_id"], + &[ + "side", + "join_table_id", + "degree_table_id", + "actor_id", + "fragment_id" + ], registry ) .unwrap(); @@ -368,7 +376,13 @@ impl StreamingMetrics { let join_total_lookup_count = register_int_counter_vec_with_registry!( "stream_join_lookup_total_count", "Join executor lookup total operation", - &["side", "join_table_id", "degree_table_id", "actor_id"], + &[ + "side", + "join_table_id", + "degree_table_id", + "actor_id", + "fragment_id" + ], registry ) .unwrap(); @@ -376,7 +390,13 @@ impl StreamingMetrics { let join_insert_cache_miss_count = register_int_counter_vec_with_registry!( "stream_join_insert_cache_miss_count", "Join executor cache miss when insert operation", - &["side", "join_table_id", "degree_table_id", "actor_id"], + &[ + "side", + "join_table_id", + "degree_table_id", + "actor_id", + "fragment_id" + ], registry ) .unwrap(); @@ -384,7 +404,7 @@ impl StreamingMetrics { let join_actor_input_waiting_duration_ns = register_int_counter_vec_with_registry!( "stream_join_actor_input_waiting_duration_ns", "Total waiting duration (ns) of input buffer of join actor", - &["actor_id"], + &["actor_id", "fragment_id"], registry ) .unwrap(); @@ -392,7 +412,7 @@ impl StreamingMetrics { let join_match_duration_ns = register_int_counter_vec_with_registry!( "stream_join_match_duration_ns", "Matching duration for each side", - &["actor_id", "side"], + &["actor_id", "fragment_id", "side"], registry ) .unwrap(); @@ -419,7 +439,7 @@ impl StreamingMetrics { let join_cached_entries = register_int_gauge_vec_with_registry!( "stream_join_cached_entries", "Number of cached entries in streaming join operators", - &["actor_id", "side"], + &["actor_id", "fragment_id", "side"], registry ) .unwrap(); @@ -427,7 +447,7 @@ impl StreamingMetrics { let join_cached_rows = register_int_gauge_vec_with_registry!( "stream_join_cached_rows", "Number of cached rows in streaming join operators", - &["actor_id", "side"], + &["actor_id", "fragment_id", "side"], registry ) .unwrap(); @@ -435,7 +455,7 @@ impl StreamingMetrics { let join_cached_estimated_size = register_int_gauge_vec_with_registry!( "stream_join_cached_estimated_size", "Estimated size of all cached entries in streaming join operators", - &["actor_id", "side"], + &["actor_id", "fragment_id", "side"], registry ) .unwrap(); @@ -463,7 +483,7 @@ impl StreamingMetrics { let agg_lookup_miss_count = register_int_counter_vec_with_registry!( "stream_agg_lookup_miss_count", "Aggregation executor lookup miss duration", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); @@ -471,7 +491,7 @@ impl StreamingMetrics { let agg_total_lookup_count = register_int_counter_vec_with_registry!( "stream_agg_lookup_total_count", "Aggregation executor lookup total operation", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); @@ -479,7 +499,7 @@ impl StreamingMetrics { let agg_distinct_cache_miss_count = register_int_counter_vec_with_registry!( "stream_agg_distinct_cache_miss_count", "Aggregation executor dinsinct miss duration", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); @@ -487,7 +507,7 @@ impl StreamingMetrics { let agg_distinct_total_cache_count = register_int_counter_vec_with_registry!( "stream_agg_distinct_total_cache_count", "Aggregation executor distinct total operation", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); @@ -495,7 +515,7 @@ impl StreamingMetrics { let agg_distinct_cached_entry_count = register_int_gauge_vec_with_registry!( "stream_agg_distinct_cached_entry_count", "Total entry counts in distinct aggregation executor cache", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); @@ -503,7 +523,7 @@ impl StreamingMetrics { let agg_dirty_group_count = register_int_gauge_vec_with_registry!( "stream_agg_dirty_group_count", "Total dirty group counts in aggregation executor", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); @@ -511,7 +531,7 @@ impl StreamingMetrics { let agg_dirty_group_heap_size = register_int_gauge_vec_with_registry!( "stream_agg_dirty_group_heap_size", "Total dirty group heap size in aggregation executor", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); @@ -519,7 +539,7 @@ impl StreamingMetrics { let group_top_n_cache_miss_count = register_int_counter_vec_with_registry!( "stream_group_top_n_cache_miss_count", "Group top n executor cache miss count", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); @@ -527,7 +547,7 @@ impl StreamingMetrics { let group_top_n_total_query_cache_count = register_int_counter_vec_with_registry!( "stream_group_top_n_total_query_cache_count", "Group top n executor query cache total count", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); @@ -535,7 +555,7 @@ impl StreamingMetrics { let group_top_n_cached_entry_count = register_int_gauge_vec_with_registry!( "stream_group_top_n_cached_entry_count", "Total entry counts in group top n executor cache", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); @@ -543,7 +563,7 @@ impl StreamingMetrics { let group_top_n_appendonly_cache_miss_count = register_int_counter_vec_with_registry!( "stream_group_top_n_appendonly_cache_miss_count", "Group top n appendonly executor cache miss count", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); @@ -552,7 +572,7 @@ impl StreamingMetrics { register_int_counter_vec_with_registry!( "stream_group_top_n_appendonly_total_query_cache_count", "Group top n appendonly executor total cache count", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); @@ -560,7 +580,7 @@ impl StreamingMetrics { let group_top_n_appendonly_cached_entry_count = register_int_gauge_vec_with_registry!( "stream_group_top_n_appendonly_cached_entry_count", "Total entry counts in group top n appendonly executor cache", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); @@ -568,7 +588,7 @@ impl StreamingMetrics { let lookup_cache_miss_count = register_int_counter_vec_with_registry!( "stream_lookup_cache_miss_count", "Lookup executor cache miss count", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); @@ -576,7 +596,7 @@ impl StreamingMetrics { let lookup_total_query_cache_count = register_int_counter_vec_with_registry!( "stream_lookup_total_query_cache_count", "Lookup executor query cache total count", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); @@ -584,7 +604,7 @@ impl StreamingMetrics { let lookup_cached_entry_count = register_int_gauge_vec_with_registry!( "stream_lookup_cached_entry_count", "Total entry counts in lookup executor cache", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); @@ -592,7 +612,7 @@ impl StreamingMetrics { let temporal_join_cache_miss_count = register_int_counter_vec_with_registry!( "stream_temporal_join_cache_miss_count", "Temporal join executor cache miss count", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); @@ -600,7 +620,7 @@ impl StreamingMetrics { let temporal_join_total_query_cache_count = register_int_counter_vec_with_registry!( "stream_temporal_join_total_query_cache_count", "Temporal join executor query cache total count", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); @@ -608,7 +628,7 @@ impl StreamingMetrics { let temporal_join_cached_entry_count = register_int_gauge_vec_with_registry!( "stream_temporal_join_cached_entry_count", "Total entry count in temporal join executor cache", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); @@ -616,7 +636,7 @@ impl StreamingMetrics { let agg_cached_keys = register_int_gauge_vec_with_registry!( "stream_agg_cached_keys", "Number of cached keys in streaming aggregation operators", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); @@ -624,7 +644,7 @@ impl StreamingMetrics { let agg_chunk_lookup_miss_count = register_int_counter_vec_with_registry!( "stream_agg_chunk_lookup_miss_count", "Aggregation executor chunk-level lookup miss duration", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); @@ -632,7 +652,7 @@ impl StreamingMetrics { let agg_chunk_total_lookup_count = register_int_counter_vec_with_registry!( "stream_agg_chunk_lookup_total_count", "Aggregation executor chunk-level lookup total operation", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); diff --git a/src/stream/src/executor/receiver.rs b/src/stream/src/executor/receiver.rs index 64c4bfabc1b58..a99bbc2347a03 100644 --- a/src/stream/src/executor/receiver.rs +++ b/src/stream/src/executor/receiver.rs @@ -140,7 +140,7 @@ impl Executor for ReceiverExecutor { Message::Chunk(chunk) => { self.metrics .actor_in_record_cnt - .with_label_values(&[&actor_id_str]) + .with_label_values(&[&actor_id_str, &fragment_id_str]) .inc_by(chunk.cardinality() as _); } Message::Barrier(barrier) => { diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index 8267edc3154d5..3c8cde63c4ca9 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -154,10 +154,11 @@ impl TemporalSide { async fn lookup(&mut self, key: &K, epoch: HummockEpoch) -> StreamExecutorResult { let table_id_str = self.source.table_id().to_string(); let actor_id_str = self.ctx.id.to_string(); + let fragment_id_str = self.ctx.id.to_string(); self.ctx .streaming_metrics .temporal_join_total_query_cache_count - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .inc(); let res = if self.cache.contains(key) { @@ -168,7 +169,7 @@ impl TemporalSide { self.ctx .streaming_metrics .temporal_join_cache_miss_count - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .inc(); let pk_prefix = key.deserialize(&self.join_key_data_types)?; @@ -414,13 +415,14 @@ impl TemporalJoinExecutor let table_id_str = self.right_table.source.table_id().to_string(); let actor_id_str = self.ctx.id.to_string(); + let fragment_id_str = self.ctx.fragment_id.to_string(); #[for_await] for msg in align_input(self.left, self.right) { self.right_table.cache.evict(); self.ctx .streaming_metrics .temporal_join_cached_entry_count - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .set(self.right_table.cache.len() as i64); match msg? { InternalMessage::WaterMark(watermark) => { diff --git a/src/stream/src/executor/top_n/group_top_n.rs b/src/stream/src/executor/top_n/group_top_n.rs index 78c12ee82f3cd..031517403b0b3 100644 --- a/src/stream/src/executor/top_n/group_top_n.rs +++ b/src/stream/src/executor/top_n/group_top_n.rs @@ -184,6 +184,7 @@ where let keys = K::build(&self.group_by, chunk.data_chunk())?; let table_id_str = self.managed_state.state_table.table_id().to_string(); let actor_id_str = self.ctx.id.to_string(); + let fragment_id_str = self.ctx.fragment_id.to_string(); for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) { let Some((op, row_ref)) = r else { continue; @@ -196,7 +197,7 @@ where self.ctx .streaming_metrics .group_top_n_total_query_cache_count - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .inc(); // If 'self.caches' does not already have a cache for the current group, create a new // cache for it and insert it into `self.caches` @@ -204,7 +205,7 @@ where self.ctx .streaming_metrics .group_top_n_cache_miss_count - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .inc(); let mut topn_cache = TopNCache::new(self.offset, self.limit, self.schema().data_types()); @@ -241,7 +242,7 @@ where self.ctx .streaming_metrics .group_top_n_cached_entry_count - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .set(self.caches.len() as i64); generate_output(res_rows, res_ops, self.schema()) } diff --git a/src/stream/src/executor/top_n/group_top_n_appendonly.rs b/src/stream/src/executor/top_n/group_top_n_appendonly.rs index 140a06984e586..45e05abfd21dc 100644 --- a/src/stream/src/executor/top_n/group_top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/group_top_n_appendonly.rs @@ -181,6 +181,7 @@ where let row_deserializer = RowDeserializer::new(data_types.clone()); let table_id_str = self.managed_state.state_table.table_id().to_string(); let actor_id_str = self.ctx.id.to_string(); + let fragment_id_str = self.ctx.fragment_id.to_string(); for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) { let Some((op, row_ref)) = r else { continue; @@ -193,7 +194,7 @@ where self.ctx .streaming_metrics .group_top_n_appendonly_total_query_cache_count - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .inc(); // If 'self.caches' does not already have a cache for the current group, create a new // cache for it and insert it into `self.caches` @@ -201,7 +202,7 @@ where self.ctx .streaming_metrics .group_top_n_appendonly_cache_miss_count - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .inc(); let mut topn_cache = TopNCache::new(self.offset, self.limit, data_types.clone()); self.managed_state @@ -224,7 +225,7 @@ where self.ctx .streaming_metrics .group_top_n_appendonly_cached_entry_count - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .set(self.caches.len() as i64); generate_output(res_rows, res_ops, self.schema()) } diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index eee41e62f321a..d1afaeb061809 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -709,13 +709,14 @@ impl LocalStreamManagerCore { { let metrics = self.streaming_metrics.clone(); let actor_id_str = actor_id.to_string(); + let fragment_id_str = actor_context.fragment_id.to_string(); let allocation_stated = task_stats_alloc::allocation_stat( instrumented, Duration::from_millis(1000), move |bytes| { metrics .actor_memory_usage - .with_label_values(&[&actor_id_str]) + .with_label_values(&[&actor_id_str, &fragment_id_str]) .set(bytes as i64); actor_context.store_mem_usage(bytes);