Skip to content

Commit

Permalink
fix(stream): reuse label guarded metrics for actor input output metri…
Browse files Browse the repository at this point in the history
…cs (#13994)

Co-authored-by: Eric Fu <[email protected]>
  • Loading branch information
wenym1 and fuyufjh authored Dec 14, 2023
1 parent 59e56c1 commit 64a2d92
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 42 deletions.
17 changes: 11 additions & 6 deletions src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use itertools::Itertools;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
use risingwave_common::metrics::LabelGuardedIntCounter;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
use risingwave_pb::stream_plan::PbDispatcher;
Expand Down Expand Up @@ -54,6 +55,7 @@ struct DispatchExecutorInner {
fragment_id_str: String,
context: Arc<SharedContext>,
metrics: Arc<StreamingMetrics>,
actor_out_record_cnt: LabelGuardedIntCounter<2>,
}

impl DispatchExecutorInner {
Expand Down Expand Up @@ -97,10 +99,7 @@ impl DispatchExecutorInner {
})
.await?;

self.metrics
.actor_out_record_cnt
.with_label_values(&[&self.actor_id_str, &self.fragment_id_str])
.inc_by(chunk.cardinality() as _);
self.actor_out_record_cnt.inc_by(chunk.cardinality() as _);
}
Message::Barrier(barrier) => {
let mutation = barrier.mutation.clone();
Expand Down Expand Up @@ -313,15 +312,21 @@ impl DispatchExecutor {
context: Arc<SharedContext>,
metrics: Arc<StreamingMetrics>,
) -> Self {
let actor_id_str = actor_id.to_string();
let fragment_id_str = fragment_id.to_string();
let actor_out_record_cnt = metrics
.actor_out_record_cnt
.with_label_values(&[&actor_id_str, &fragment_id_str]);
Self {
input,
inner: DispatchExecutorInner {
dispatchers,
actor_id,
actor_id_str: actor_id.to_string(),
fragment_id_str: fragment_id.to_string(),
actor_id_str,
fragment_id_str,
context,
metrics,
actor_out_record_cnt,
},
}
}
Expand Down
30 changes: 16 additions & 14 deletions src/stream/src/executor/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use super::watermark::*;
use super::*;
use crate::executor::exchange::input::new_input;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::utils::ActorInputMetrics;
use crate::task::{FragmentId, SharedContext};

/// `MergeExecutor` merges data from multiple channels. Dataflow from one channel
Expand Down Expand Up @@ -109,17 +110,20 @@ impl MergeExecutor {
// Futures of all active upstreams.
let select_all = SelectReceivers::new(self.actor_context.id, self.upstreams);
let actor_id = self.actor_context.id;
let actor_id_str = actor_id.to_string();
let fragment_id_str = self.fragment_id.to_string();
let mut upstream_fragment_id_str = self.upstream_fragment_id.to_string();

let mut metrics = ActorInputMetrics::new(
&self.metrics,
actor_id,
self.fragment_id,
self.upstream_fragment_id,
);

// Channels that're blocked by the barrier to align.
let mut start_time = Instant::now();
pin_mut!(select_all);
while let Some(msg) = select_all.next().await {
self.metrics
metrics
.actor_input_buffer_blocking_duration_ns
.with_label_values(&[&actor_id_str, &fragment_id_str, &upstream_fragment_id_str])
.inc_by(start_time.elapsed().as_nanos() as u64);
let mut msg: Message = msg?;

Expand All @@ -128,14 +132,7 @@ impl MergeExecutor {
// Do nothing.
}
Message::Chunk(chunk) => {
self.metrics
.actor_in_record_cnt
.with_label_values(&[
&actor_id_str,
&fragment_id_str,
&upstream_fragment_id_str,
])
.inc_by(chunk.cardinality() as _);
metrics.actor_in_record_cnt.inc_by(chunk.cardinality() as _);
}
Message::Barrier(barrier) => {
tracing::debug!(
Expand Down Expand Up @@ -230,7 +227,12 @@ impl MergeExecutor {
}

self.upstream_fragment_id = new_upstream_fragment_id;
upstream_fragment_id_str = new_upstream_fragment_id.to_string();
metrics = ActorInputMetrics::new(
&self.metrics,
actor_id,
self.fragment_id,
self.upstream_fragment_id,
);

select_all.update_actor_ids();
}
Expand Down
8 changes: 4 additions & 4 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ pub struct StreamingMetrics {

// Streaming actor
pub actor_memory_usage: GenericGaugeVec<AtomicI64>,
pub actor_in_record_cnt: GenericCounterVec<AtomicU64>,
pub actor_out_record_cnt: GenericCounterVec<AtomicU64>,
pub actor_in_record_cnt: LabelGuardedIntCounterVec<3>,
pub actor_out_record_cnt: LabelGuardedIntCounterVec<2>,

// Source
pub source_output_row_count: GenericCounterVec<AtomicU64>,
Expand Down Expand Up @@ -353,15 +353,15 @@ impl StreamingMetrics {
)
.unwrap();

let actor_in_record_cnt = register_int_counter_vec_with_registry!(
let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
"stream_actor_in_record_cnt",
"Total number of rows actor received",
&["actor_id", "fragment_id", "upstream_fragment_id"],
registry
)
.unwrap();

let actor_out_record_cnt = register_int_counter_vec_with_registry!(
let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
"stream_actor_out_record_cnt",
"Total number of rows actor sent",
&["actor_id", "fragment_id"],
Expand Down
34 changes: 16 additions & 18 deletions src/stream/src/executor/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use super::exchange::input::BoxedInput;
use super::ActorContextRef;
use crate::executor::exchange::input::new_input;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::utils::ActorInputMetrics;
use crate::executor::{
expect_first_barrier, BoxedMessageStream, Executor, ExecutorInfo, Message, PkIndicesRef,
};
Expand Down Expand Up @@ -112,21 +113,20 @@ impl ReceiverExecutor {
impl Executor for ReceiverExecutor {
fn execute(mut self: Box<Self>) -> BoxedMessageStream {
let actor_id = self.actor_context.id;
let actor_id_str = actor_id.to_string();
let fragment_id_str = self.fragment_id.to_string();
let mut upstream_fragment_id_str = self.upstream_fragment_id.to_string();

let mut metrics = ActorInputMetrics::new(
&self.metrics,
actor_id,
self.fragment_id,
self.upstream_fragment_id,
);

let stream = #[try_stream]
async move {
let mut start_time = Instant::now();
while let Some(msg) = self.input.next().await {
self.metrics
metrics
.actor_input_buffer_blocking_duration_ns
.with_label_values(&[
&actor_id_str,
&fragment_id_str,
&upstream_fragment_id_str,
])
.inc_by(start_time.elapsed().as_nanos() as u64);
let mut msg: Message = msg?;

Expand All @@ -135,14 +135,7 @@ impl Executor for ReceiverExecutor {
// Do nothing.
}
Message::Chunk(chunk) => {
self.metrics
.actor_in_record_cnt
.with_label_values(&[
&actor_id_str,
&fragment_id_str,
&upstream_fragment_id_str,
])
.inc_by(chunk.cardinality() as _);
metrics.actor_in_record_cnt.inc_by(chunk.cardinality() as _);
}
Message::Barrier(barrier) => {
tracing::debug!(
Expand Down Expand Up @@ -197,7 +190,12 @@ impl Executor for ReceiverExecutor {
self.input = new_upstream;

self.upstream_fragment_id = new_upstream_fragment_id;
upstream_fragment_id_str = new_upstream_fragment_id.to_string();
metrics = ActorInputMetrics::new(
&self.metrics,
actor_id,
self.fragment_id,
self.upstream_fragment_id,
);
}
}
};
Expand Down
31 changes: 31 additions & 0 deletions src/stream/src/executor/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@

use futures::StreamExt;
use risingwave_common::catalog::Schema;
use risingwave_common::metrics::LabelGuardedIntCounter;

use crate::executor::monitor::StreamingMetrics;
use crate::executor::{BoxedMessageStream, Executor, ExecutorInfo, PkIndicesRef};
use crate::task::{ActorId, FragmentId};

#[derive(Default)]
pub struct DummyExecutor {
Expand Down Expand Up @@ -45,3 +48,31 @@ impl Executor for DummyExecutor {
&self.info.identity
}
}

pub(crate) struct ActorInputMetrics {
pub(crate) actor_in_record_cnt: LabelGuardedIntCounter<3>,
pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter<3>,
}

impl ActorInputMetrics {
pub(crate) fn new(
metrics: &StreamingMetrics,
actor_id: ActorId,
fragment_id: FragmentId,
upstream_fragment_id: FragmentId,
) -> Self {
let actor_id_str = actor_id.to_string();
let fragment_id_str = fragment_id.to_string();
let upstream_fragment_id_str = upstream_fragment_id.to_string();
Self {
actor_in_record_cnt: metrics.actor_in_record_cnt.with_label_values(&[
&actor_id_str,
&fragment_id_str,
&upstream_fragment_id_str,
]),
actor_input_buffer_blocking_duration_ns: metrics
.actor_input_buffer_blocking_duration_ns
.with_label_values(&[&actor_id_str, &fragment_id_str, &upstream_fragment_id_str]),
}
}
}

0 comments on commit 64a2d92

Please sign in to comment.