From 57f0dbc8f3d6092489976436cf78393488b7646c Mon Sep 17 00:00:00 2001 From: aldenhu Date: Fri, 8 Sep 2023 18:19:21 +0000 Subject: [PATCH] trivial: add phase timers --- consensus/src/counters.rs | 14 ++++++++++++++ consensus/src/experimental/execution_phase.rs | 2 ++ consensus/src/experimental/persisting_phase.rs | 2 ++ consensus/src/experimental/pipeline_phase.rs | 15 +++++++++++++-- consensus/src/experimental/signing_phase.rs | 2 ++ 5 files changed, 33 insertions(+), 2 deletions(-) diff --git a/consensus/src/counters.rs b/consensus/src/counters.rs index 90a6cd0c687d5..33b2e1a893c49 100644 --- a/consensus/src/counters.rs +++ b/consensus/src/counters.rs @@ -792,3 +792,17 @@ pub static BATCH_WAIT_DURATION: Lazy = Lazy::new(|| { .unwrap(), ) }); + +/// Histogram of timers for each of the buffer manager phase processors. +pub static BUFFER_MANAGER_PHASE_PROCESS_SECONDS: Lazy = Lazy::new(|| { + register_histogram_vec!( + // metric name + "aptos_consensus_buffer_manager_phase_process_seconds", + // metric description + "Timer for buffer manager PipelinePhase::process()", + // metric labels (dimensions) + &["name"], + exponential_buckets(/*start=*/ 1e-6, /*factor=*/ 2.0, /*count=*/ 22).unwrap(), + ) + .unwrap() +}); diff --git a/consensus/src/experimental/execution_phase.rs b/consensus/src/experimental/execution_phase.rs index 60774f4a905e3..c27fbcf3e5be5 100644 --- a/consensus/src/experimental/execution_phase.rs +++ b/consensus/src/experimental/execution_phase.rs @@ -55,6 +55,8 @@ impl StatelessPipeline for ExecutionPhase { type Request = ExecutionRequest; type Response = ExecutionResponse; + const NAME: &'static str = "execution"; + async fn process(&self, req: ExecutionRequest) -> ExecutionResponse { let ExecutionRequest { ordered_blocks } = req; diff --git a/consensus/src/experimental/persisting_phase.rs b/consensus/src/experimental/persisting_phase.rs index be2317f7faca2..20c69f41301d3 100644 --- a/consensus/src/experimental/persisting_phase.rs +++ b/consensus/src/experimental/persisting_phase.rs @@ -59,6 +59,8 @@ impl StatelessPipeline for PersistingPhase { type Request = PersistingRequest; type Response = PersistingResponse; + const NAME: &'static str = "persisting"; + async fn process(&self, req: PersistingRequest) -> PersistingResponse { let PersistingRequest { blocks, diff --git a/consensus/src/experimental/pipeline_phase.rs b/consensus/src/experimental/pipeline_phase.rs index c4b7f47d44cf7..061a9b9e9b7b5 100644 --- a/consensus/src/experimental/pipeline_phase.rs +++ b/consensus/src/experimental/pipeline_phase.rs @@ -2,7 +2,10 @@ // Parts of the project are originally copyright © Meta Platforms, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::experimental::buffer_manager::{Receiver, Sender}; +use crate::{ + counters::BUFFER_MANAGER_PHASE_PROCESS_SECONDS, + experimental::buffer_manager::{Receiver, Sender}, +}; use aptos_logger::debug; use async_trait::async_trait; use futures::{SinkExt, StreamExt}; @@ -15,6 +18,9 @@ use std::sync::{ pub trait StatelessPipeline: Send + Sync { type Request; type Response; + + const NAME: &'static str; + async fn process(&self, req: Self::Request) -> Self::Response; } @@ -70,7 +76,12 @@ impl PipelinePhase { // main loop while let Some(counted_req) = self.rx.next().await { let CountedRequest { req, guard: _guard } = counted_req; - let response = self.processor.process(req).await; + let response = { + let _timer = BUFFER_MANAGER_PHASE_PROCESS_SECONDS + .with_label_values(&[T::NAME]) + .start_timer(); + self.processor.process(req).await + }; if let Some(tx) = &mut self.maybe_tx { if tx.send(response).await.is_err() { debug!("Failed to send response, buffer manager probably dropped"); diff --git a/consensus/src/experimental/signing_phase.rs b/consensus/src/experimental/signing_phase.rs index d3fef1dc417fa..55411b0d2685b 100644 --- a/consensus/src/experimental/signing_phase.rs +++ b/consensus/src/experimental/signing_phase.rs @@ -61,6 +61,8 @@ impl StatelessPipeline for SigningPhase { type Request = SigningRequest; type Response = SigningResponse; + const NAME: &'static str = "signing"; + async fn process(&self, req: SigningRequest) -> SigningResponse { let SigningRequest { ordered_ledger_info,