Skip to content

Commit

Permalink
trivial: add phase timers
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse committed Sep 9, 2023
1 parent 30f83e9 commit 57f0dbc
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 2 deletions.
14 changes: 14 additions & 0 deletions consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,3 +792,17 @@ pub static BATCH_WAIT_DURATION: Lazy<DurationHistogram> = Lazy::new(|| {
.unwrap(),
)
});

/// Histogram of timers for each of the buffer manager phase processors.
pub static BUFFER_MANAGER_PHASE_PROCESS_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
// metric name
"aptos_consensus_buffer_manager_phase_process_seconds",
// metric description
"Timer for buffer manager PipelinePhase::process()",
// metric labels (dimensions)
&["name"],
exponential_buckets(/*start=*/ 1e-6, /*factor=*/ 2.0, /*count=*/ 22).unwrap(),
)
.unwrap()
});
2 changes: 2 additions & 0 deletions consensus/src/experimental/execution_phase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ impl StatelessPipeline for ExecutionPhase {
type Request = ExecutionRequest;
type Response = ExecutionResponse;

const NAME: &'static str = "execution";

async fn process(&self, req: ExecutionRequest) -> ExecutionResponse {
let ExecutionRequest { ordered_blocks } = req;

Expand Down
2 changes: 2 additions & 0 deletions consensus/src/experimental/persisting_phase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ impl StatelessPipeline for PersistingPhase {
type Request = PersistingRequest;
type Response = PersistingResponse;

const NAME: &'static str = "persisting";

async fn process(&self, req: PersistingRequest) -> PersistingResponse {
let PersistingRequest {
blocks,
Expand Down
15 changes: 13 additions & 2 deletions consensus/src/experimental/pipeline_phase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::experimental::buffer_manager::{Receiver, Sender};
use crate::{
counters::BUFFER_MANAGER_PHASE_PROCESS_SECONDS,
experimental::buffer_manager::{Receiver, Sender},
};
use aptos_logger::debug;
use async_trait::async_trait;
use futures::{SinkExt, StreamExt};
Expand All @@ -15,6 +18,9 @@ use std::sync::{
pub trait StatelessPipeline: Send + Sync {
type Request;
type Response;

const NAME: &'static str;

async fn process(&self, req: Self::Request) -> Self::Response;
}

Expand Down Expand Up @@ -70,7 +76,12 @@ impl<T: StatelessPipeline> PipelinePhase<T> {
// main loop
while let Some(counted_req) = self.rx.next().await {
let CountedRequest { req, guard: _guard } = counted_req;
let response = self.processor.process(req).await;
let response = {
let _timer = BUFFER_MANAGER_PHASE_PROCESS_SECONDS
.with_label_values(&[T::NAME])
.start_timer();
self.processor.process(req).await
};
if let Some(tx) = &mut self.maybe_tx {
if tx.send(response).await.is_err() {
debug!("Failed to send response, buffer manager probably dropped");
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/experimental/signing_phase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ impl StatelessPipeline for SigningPhase {
type Request = SigningRequest;
type Response = SigningResponse;

const NAME: &'static str = "signing";

async fn process(&self, req: SigningRequest) -> SigningResponse {
let SigningRequest {
ordered_ledger_info,
Expand Down

0 comments on commit 57f0dbc

Please sign in to comment.