Skip to content

Commit

Permalink
chore(streaming): remove unused metrics (#12878)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Oct 17, 2023
1 parent bff674b commit e995ea0
Show file tree
Hide file tree
Showing 4 changed files with 2 additions and 60 deletions.
10 changes: 0 additions & 10 deletions src/compute/src/rpc/service/exchange_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
#[derive(Clone)]
pub struct ExchangeServiceMetrics {
pub stream_fragment_exchange_bytes: GenericCounterVec<AtomicU64>,
pub actor_sampled_serialize_duration_ns: GenericCounterVec<AtomicU64>,
}

pub static GLOBAL_EXCHANGE_SERVICE_METRICS: LazyLock<ExchangeServiceMetrics> =
Expand All @@ -37,17 +36,8 @@ impl ExchangeServiceMetrics {
)
.unwrap();

let actor_sampled_serialize_duration_ns = register_int_counter_vec_with_registry!(
"actor_sampled_serialize_duration_ns",
"Duration (ns) of sampled chunk serialization",
&["actor_id"],
registry
)
.unwrap();

Self {
stream_fragment_exchange_bytes,
actor_sampled_serialize_duration_ns,
}
}
}
23 changes: 1 addition & 22 deletions src/compute/src/rpc/service/exchange_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;

use either::Either;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
Expand Down Expand Up @@ -96,7 +95,6 @@ impl ExchangeService for ExchangeServiceImpl {
};

let up_down_actor_ids = (get_req.up_actor_id, get_req.down_actor_id);
let up_down_fragment_ids = (get_req.up_fragment_id, get_req.down_fragment_id);
let receiver = self.stream_mgr.take_receiver(up_down_actor_ids).await?;

// Map the remaining stream to add-permits.
Expand All @@ -111,7 +109,6 @@ impl ExchangeService for ExchangeServiceImpl {
receiver,
add_permits_stream,
up_down_actor_ids,
up_down_fragment_ids,
)))
}
}
Expand All @@ -135,11 +132,9 @@ impl ExchangeServiceImpl {
peer_addr: SocketAddr,
mut receiver: Receiver,
add_permits_stream: impl Stream<Item = std::result::Result<permits::Value, tonic::Status>>,
up_down_actor_ids: (u32, u32),
up_down_fragment_ids: (u32, u32),
) {
tracing::debug!(target: "events::compute::exchange", peer_addr = %peer_addr, "serve stream exchange RPC");
let up_actor_id = up_down_actor_ids.0.to_string();
let up_fragment_id = up_down_fragment_ids.0.to_string();
let down_fragment_id = up_down_fragment_ids.1.to_string();

Expand All @@ -157,29 +152,13 @@ impl ExchangeServiceImpl {
);
pin_mut!(select_stream);

let mut rr = 0;
const SAMPLING_FREQUENCY: u64 = 100;

while let Some(r) = select_stream.try_next().await? {
match r {
Either::Left(permits_to_add) => {
permits.add_permits(permits_to_add);
}
Either::Right(MessageWithPermits { message, permits }) => {
// add serialization duration metric with given sampling frequency
let proto = if rr % SAMPLING_FREQUENCY == 0 {
let start_time = Instant::now();
let proto = message.to_protobuf();
metrics
.actor_sampled_serialize_duration_ns
.with_label_values(&[&up_actor_id])
.inc_by(start_time.elapsed().as_nanos() as u64);
proto
} else {
message.to_protobuf()
};
rr += 1;

let proto = message.to_protobuf();
// forward the acquired permit to the downstream
let response = GetStreamResponse {
message: Some(proto),
Expand Down
19 changes: 1 addition & 18 deletions src/stream/src/executor/exchange/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Instant;

use anyhow::Context as _;
use futures::{pin_mut, Stream};
Expand Down Expand Up @@ -149,12 +148,9 @@ impl RemoteInput {
.await?;

let up_actor_id = up_down_ids.0.to_string();
let down_actor_id = up_down_ids.1.to_string();
let up_fragment_id = up_down_frag.0.to_string();
let down_fragment_id = up_down_frag.1.to_string();

let mut rr = 0;
const SAMPLING_FREQUENCY: u64 = 100;
let span: await_tree::Span = format!("RemoteInput (actor {up_actor_id})").into();

let mut batched_permits_accumulated = 0;
Expand All @@ -171,20 +167,7 @@ impl RemoteInput {
.with_label_values(&[&up_fragment_id, &down_fragment_id])
.inc_by(bytes as u64);

// add deserialization duration metric with given sampling frequency
let msg_res = if rr % SAMPLING_FREQUENCY == 0 {
let start_time = Instant::now();
let msg_res = Message::from_protobuf(&msg);
metrics
.actor_sampled_deserialize_duration_ns
.with_label_values(&[&down_actor_id, &down_fragment_id])
.inc_by(start_time.elapsed().as_nanos() as u64);
msg_res
} else {
Message::from_protobuf(&msg)
};
rr += 1;

let msg_res = Message::from_protobuf(&msg);
if let Some(add_back_permits) = match permits.unwrap().value {
// For records, batch the permits we received to reduce the backward
// `AddPermits` messages.
Expand Down
10 changes: 0 additions & 10 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ pub struct StreamingMetrics {
pub actor_memory_usage: GenericGaugeVec<AtomicI64>,
pub actor_in_record_cnt: GenericCounterVec<AtomicU64>,
pub actor_out_record_cnt: GenericCounterVec<AtomicU64>,
pub actor_sampled_deserialize_duration_ns: GenericCounterVec<AtomicU64>,

// Source
pub source_output_row_count: GenericCounterVec<AtomicU64>,
Expand Down Expand Up @@ -359,14 +358,6 @@ impl StreamingMetrics {
)
.unwrap();

let actor_sampled_deserialize_duration_ns = register_int_counter_vec_with_registry!(
"actor_sampled_deserialize_duration_ns",
"Duration (ns) of sampled chunk deserialization",
&["actor_id", "fragment_id"],
registry
)
.unwrap();

let actor_memory_usage = register_int_gauge_vec_with_registry!(
"actor_memory_usage",
"Memory usage (bytes)",
Expand Down Expand Up @@ -938,7 +929,6 @@ impl StreamingMetrics {
actor_memory_usage,
actor_in_record_cnt,
actor_out_record_cnt,
actor_sampled_deserialize_duration_ns,
source_output_row_count,
source_row_per_barrier,
source_split_change_count,
Expand Down

0 comments on commit e995ea0

Please sign in to comment.