From 64a2d929aa941475a7f9eeee5b15eb0f9cee5fcd Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Thu, 14 Dec 2023 21:22:06 +0800 Subject: [PATCH] fix(stream): reuse label guarded metrics for actor input output metrics (#13994) Co-authored-by: Eric Fu --- src/stream/src/executor/dispatch.rs | 17 ++++++---- src/stream/src/executor/merge.rs | 30 ++++++++-------- .../src/executor/monitor/streaming_stats.rs | 8 ++--- src/stream/src/executor/receiver.rs | 34 +++++++++---------- src/stream/src/executor/utils.rs | 31 +++++++++++++++++ 5 files changed, 78 insertions(+), 42 deletions(-) diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index 321c177e153b1..c9bb30f7dfdd5 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -25,6 +25,7 @@ use itertools::Itertools; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::BitmapBuilder; use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode}; +use risingwave_common::metrics::LabelGuardedIntCounter; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate; use risingwave_pb::stream_plan::PbDispatcher; @@ -54,6 +55,7 @@ struct DispatchExecutorInner { fragment_id_str: String, context: Arc, metrics: Arc, + actor_out_record_cnt: LabelGuardedIntCounter<2>, } impl DispatchExecutorInner { @@ -97,10 +99,7 @@ impl DispatchExecutorInner { }) .await?; - self.metrics - .actor_out_record_cnt - .with_label_values(&[&self.actor_id_str, &self.fragment_id_str]) - .inc_by(chunk.cardinality() as _); + self.actor_out_record_cnt.inc_by(chunk.cardinality() as _); } Message::Barrier(barrier) => { let mutation = barrier.mutation.clone(); @@ -313,15 +312,21 @@ impl DispatchExecutor { context: Arc, metrics: Arc, ) -> Self { + let actor_id_str = actor_id.to_string(); + let fragment_id_str = fragment_id.to_string(); + let actor_out_record_cnt = metrics + .actor_out_record_cnt + .with_label_values(&[&actor_id_str, &fragment_id_str]); Self { input, inner: DispatchExecutorInner { dispatchers, actor_id, - actor_id_str: actor_id.to_string(), - fragment_id_str: fragment_id.to_string(), + actor_id_str, + fragment_id_str, context, metrics, + actor_out_record_cnt, }, } } diff --git a/src/stream/src/executor/merge.rs b/src/stream/src/executor/merge.rs index a975c3830d042..79a764e984667 100644 --- a/src/stream/src/executor/merge.rs +++ b/src/stream/src/executor/merge.rs @@ -29,6 +29,7 @@ use super::watermark::*; use super::*; use crate::executor::exchange::input::new_input; use crate::executor::monitor::StreamingMetrics; +use crate::executor::utils::ActorInputMetrics; use crate::task::{FragmentId, SharedContext}; /// `MergeExecutor` merges data from multiple channels. Dataflow from one channel @@ -109,17 +110,20 @@ impl MergeExecutor { // Futures of all active upstreams. let select_all = SelectReceivers::new(self.actor_context.id, self.upstreams); let actor_id = self.actor_context.id; - let actor_id_str = actor_id.to_string(); - let fragment_id_str = self.fragment_id.to_string(); - let mut upstream_fragment_id_str = self.upstream_fragment_id.to_string(); + + let mut metrics = ActorInputMetrics::new( + &self.metrics, + actor_id, + self.fragment_id, + self.upstream_fragment_id, + ); // Channels that're blocked by the barrier to align. let mut start_time = Instant::now(); pin_mut!(select_all); while let Some(msg) = select_all.next().await { - self.metrics + metrics .actor_input_buffer_blocking_duration_ns - .with_label_values(&[&actor_id_str, &fragment_id_str, &upstream_fragment_id_str]) .inc_by(start_time.elapsed().as_nanos() as u64); let mut msg: Message = msg?; @@ -128,14 +132,7 @@ impl MergeExecutor { // Do nothing. } Message::Chunk(chunk) => { - self.metrics - .actor_in_record_cnt - .with_label_values(&[ - &actor_id_str, - &fragment_id_str, - &upstream_fragment_id_str, - ]) - .inc_by(chunk.cardinality() as _); + metrics.actor_in_record_cnt.inc_by(chunk.cardinality() as _); } Message::Barrier(barrier) => { tracing::debug!( @@ -230,7 +227,12 @@ impl MergeExecutor { } self.upstream_fragment_id = new_upstream_fragment_id; - upstream_fragment_id_str = new_upstream_fragment_id.to_string(); + metrics = ActorInputMetrics::new( + &self.metrics, + actor_id, + self.fragment_id, + self.upstream_fragment_id, + ); select_all.update_actor_ids(); } diff --git a/src/stream/src/executor/monitor/streaming_stats.rs b/src/stream/src/executor/monitor/streaming_stats.rs index dacd663eb0f9b..4d824072fbe6a 100644 --- a/src/stream/src/executor/monitor/streaming_stats.rs +++ b/src/stream/src/executor/monitor/streaming_stats.rs @@ -55,8 +55,8 @@ pub struct StreamingMetrics { // Streaming actor pub actor_memory_usage: GenericGaugeVec, - pub actor_in_record_cnt: GenericCounterVec, - pub actor_out_record_cnt: GenericCounterVec, + pub actor_in_record_cnt: LabelGuardedIntCounterVec<3>, + pub actor_out_record_cnt: LabelGuardedIntCounterVec<2>, // Source pub source_output_row_count: GenericCounterVec, @@ -353,7 +353,7 @@ impl StreamingMetrics { ) .unwrap(); - let actor_in_record_cnt = register_int_counter_vec_with_registry!( + let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!( "stream_actor_in_record_cnt", "Total number of rows actor received", &["actor_id", "fragment_id", "upstream_fragment_id"], @@ -361,7 +361,7 @@ impl StreamingMetrics { ) .unwrap(); - let actor_out_record_cnt = register_int_counter_vec_with_registry!( + let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!( "stream_actor_out_record_cnt", "Total number of rows actor sent", &["actor_id", "fragment_id"], diff --git a/src/stream/src/executor/receiver.rs b/src/stream/src/executor/receiver.rs index 7ca0a8312ccc2..9ecb9d3f45371 100644 --- a/src/stream/src/executor/receiver.rs +++ b/src/stream/src/executor/receiver.rs @@ -24,6 +24,7 @@ use super::exchange::input::BoxedInput; use super::ActorContextRef; use crate::executor::exchange::input::new_input; use crate::executor::monitor::StreamingMetrics; +use crate::executor::utils::ActorInputMetrics; use crate::executor::{ expect_first_barrier, BoxedMessageStream, Executor, ExecutorInfo, Message, PkIndicesRef, }; @@ -112,21 +113,20 @@ impl ReceiverExecutor { impl Executor for ReceiverExecutor { fn execute(mut self: Box) -> BoxedMessageStream { let actor_id = self.actor_context.id; - let actor_id_str = actor_id.to_string(); - let fragment_id_str = self.fragment_id.to_string(); - let mut upstream_fragment_id_str = self.upstream_fragment_id.to_string(); + + let mut metrics = ActorInputMetrics::new( + &self.metrics, + actor_id, + self.fragment_id, + self.upstream_fragment_id, + ); let stream = #[try_stream] async move { let mut start_time = Instant::now(); while let Some(msg) = self.input.next().await { - self.metrics + metrics .actor_input_buffer_blocking_duration_ns - .with_label_values(&[ - &actor_id_str, - &fragment_id_str, - &upstream_fragment_id_str, - ]) .inc_by(start_time.elapsed().as_nanos() as u64); let mut msg: Message = msg?; @@ -135,14 +135,7 @@ impl Executor for ReceiverExecutor { // Do nothing. } Message::Chunk(chunk) => { - self.metrics - .actor_in_record_cnt - .with_label_values(&[ - &actor_id_str, - &fragment_id_str, - &upstream_fragment_id_str, - ]) - .inc_by(chunk.cardinality() as _); + metrics.actor_in_record_cnt.inc_by(chunk.cardinality() as _); } Message::Barrier(barrier) => { tracing::debug!( @@ -197,7 +190,12 @@ impl Executor for ReceiverExecutor { self.input = new_upstream; self.upstream_fragment_id = new_upstream_fragment_id; - upstream_fragment_id_str = new_upstream_fragment_id.to_string(); + metrics = ActorInputMetrics::new( + &self.metrics, + actor_id, + self.fragment_id, + self.upstream_fragment_id, + ); } } }; diff --git a/src/stream/src/executor/utils.rs b/src/stream/src/executor/utils.rs index 03a815d292c13..fa81077f65b52 100644 --- a/src/stream/src/executor/utils.rs +++ b/src/stream/src/executor/utils.rs @@ -14,8 +14,11 @@ use futures::StreamExt; use risingwave_common::catalog::Schema; +use risingwave_common::metrics::LabelGuardedIntCounter; +use crate::executor::monitor::StreamingMetrics; use crate::executor::{BoxedMessageStream, Executor, ExecutorInfo, PkIndicesRef}; +use crate::task::{ActorId, FragmentId}; #[derive(Default)] pub struct DummyExecutor { @@ -45,3 +48,31 @@ impl Executor for DummyExecutor { &self.info.identity } } + +pub(crate) struct ActorInputMetrics { + pub(crate) actor_in_record_cnt: LabelGuardedIntCounter<3>, + pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter<3>, +} + +impl ActorInputMetrics { + pub(crate) fn new( + metrics: &StreamingMetrics, + actor_id: ActorId, + fragment_id: FragmentId, + upstream_fragment_id: FragmentId, + ) -> Self { + let actor_id_str = actor_id.to_string(); + let fragment_id_str = fragment_id.to_string(); + let upstream_fragment_id_str = upstream_fragment_id.to_string(); + Self { + actor_in_record_cnt: metrics.actor_in_record_cnt.with_label_values(&[ + &actor_id_str, + &fragment_id_str, + &upstream_fragment_id_str, + ]), + actor_input_buffer_blocking_duration_ns: metrics + .actor_input_buffer_blocking_duration_ns + .with_label_values(&[&actor_id_str, &fragment_id_str, &upstream_fragment_id_str]), + } + } +}