From 1d194b808f481436ca9f72ffb352cd592554945f Mon Sep 17 00:00:00 2001 From: Josh Lind Date: Thu, 5 Dec 2024 09:45:30 -0500 Subject: [PATCH] [Inspection Service] Add simple consensus health check endpoint. --- .../src/server/index.rs | 6 +- .../src/server/metrics.rs | 38 +++++++++++ .../src/server/mod.rs | 6 ++ state-sync/state-sync-driver/src/driver.rs | 63 +++++++++++-------- state-sync/state-sync-driver/src/metrics.rs | 15 ++++- 5 files changed, 99 insertions(+), 29 deletions(-) diff --git a/crates/aptos-inspection-service/src/server/index.rs b/crates/aptos-inspection-service/src/server/index.rs index ef8ccfea8f24a..d57f80bfde383 100644 --- a/crates/aptos-inspection-service/src/server/index.rs +++ b/crates/aptos-inspection-service/src/server/index.rs @@ -2,8 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - server::utils::CONTENT_TYPE_TEXT, CONFIGURATION_PATH, FORGE_METRICS_PATH, JSON_METRICS_PATH, - METRICS_PATH, PEER_INFORMATION_PATH, SYSTEM_INFORMATION_PATH, + server::utils::CONTENT_TYPE_TEXT, CONFIGURATION_PATH, CONSENSUS_HEALTH_CHECK_PATH, + FORGE_METRICS_PATH, JSON_METRICS_PATH, METRICS_PATH, PEER_INFORMATION_PATH, + SYSTEM_INFORMATION_PATH, }; use hyper::{Body, StatusCode}; @@ -25,6 +26,7 @@ fn get_index_response() -> String { index_response.push("Welcome to the Aptos Inspection Service!".into()); index_response.push("The following endpoints are available:".into()); index_response.push(format!("\t- {}", CONFIGURATION_PATH)); + index_response.push(format!("\t- {}", CONSENSUS_HEALTH_CHECK_PATH)); index_response.push(format!("\t- {}", FORGE_METRICS_PATH)); index_response.push(format!("\t- {}", JSON_METRICS_PATH)); index_response.push(format!("\t- {}", METRICS_PATH)); diff --git a/crates/aptos-inspection-service/src/server/metrics.rs b/crates/aptos-inspection-service/src/server/metrics.rs index a762688cd9525..9857927e5afd8 100644 --- a/crates/aptos-inspection-service/src/server/metrics.rs +++ b/crates/aptos-inspection-service/src/server/metrics.rs @@ -6,9 +6,47 @@ use crate::server::{ utils, utils::{CONTENT_TYPE_JSON, CONTENT_TYPE_TEXT}, }; +use aptos_config::config::NodeConfig; use hyper::{Body, StatusCode}; use prometheus::TextEncoder; +// The metric key for the consensus execution gauge +const CONSENSUS_EXECUTION_GAUGE: &str = "aptos_state_sync_consensus_executing_gauge{}"; + +/// Handles a consensus health check request. This method returns +/// 200 if the node is currently participating in consensus. +/// +/// Note: we assume that this endpoint will only be used every few seconds. +pub async fn handle_consensus_health_check(node_config: &NodeConfig) -> (StatusCode, Body, String) { + // Verify the node is a validator. If not, return an error. + if !node_config.base.role.is_validator() { + return ( + StatusCode::BAD_REQUEST, + Body::from("This node is not a validator!"), + CONTENT_TYPE_TEXT.into(), + ); + } + + // Check the value of the consensus execution gauge + let metrics = utils::get_all_metrics(); + if let Some(gauge_value) = metrics.get(CONSENSUS_EXECUTION_GAUGE) { + if gauge_value == "1" { + return ( + StatusCode::OK, + Body::from("Consensus health check passed!"), + CONTENT_TYPE_TEXT.into(), + ); + } + } + + // Otherwise, consensus is not executing + ( + StatusCode::INTERNAL_SERVER_ERROR, + Body::from("Consensus health check failed! Consensus is not executing!"), + CONTENT_TYPE_TEXT.into(), + ) +} + /// Handles a new forge metrics request pub fn handle_forge_metrics() -> (StatusCode, Body, String) { // Get and encode the metrics diff --git a/crates/aptos-inspection-service/src/server/mod.rs b/crates/aptos-inspection-service/src/server/mod.rs index c352d4de373a8..5b442e3a3518e 100644 --- a/crates/aptos-inspection-service/src/server/mod.rs +++ b/crates/aptos-inspection-service/src/server/mod.rs @@ -30,6 +30,7 @@ mod tests; // The list of endpoints offered by the inspection service pub const CONFIGURATION_PATH: &str = "/configuration"; +pub const CONSENSUS_HEALTH_CHECK_PATH: &str = "/consensus_health_check"; pub const FORGE_METRICS_PATH: &str = "/forge_metrics"; pub const INDEX_PATH: &str = "/"; pub const JSON_METRICS_PATH: &str = "/json_metrics"; @@ -111,6 +112,11 @@ async fn serve_requests( // Exposes the node configuration configuration::handle_configuration_request(&node_config) }, + CONSENSUS_HEALTH_CHECK_PATH => { + // /consensus_health_check + // Exposes the consensus health check + metrics::handle_consensus_health_check(&node_config).await + }, FORGE_METRICS_PATH => { // /forge_metrics // Exposes forge encoded metrics diff --git a/state-sync/state-sync-driver/src/driver.rs b/state-sync/state-sync-driver/src/driver.rs index 2ed74268af4cf..2542665851a25 100644 --- a/state-sync/state-sync-driver/src/driver.rs +++ b/state-sync/state-sync-driver/src/driver.rs @@ -655,6 +655,9 @@ impl< /// Checks that state sync is making progress async fn drive_progress(&mut self) { + // Update the executing component metrics + self.update_executing_component_metrics(); + // Fetch the global data summary and verify we have active peers let global_data_summary = self.aptos_data_client.get_global_data_summary(); if global_data_summary.is_empty() { @@ -673,15 +676,6 @@ impl< // If consensus or consensus observer is executing, there's nothing to do if self.check_if_consensus_or_observer_executing() { - let executing_component = if self.driver_configuration.role.is_validator() { - ExecutingComponent::Consensus - } else { - ExecutingComponent::ConsensusObserver - }; - metrics::increment_counter( - &metrics::EXECUTING_COMPONENT, - executing_component.get_label(), - ); return; } @@ -691,10 +685,6 @@ impl< let consensus_sync_request = self.consensus_notification_handler.get_sync_request(); // Attempt to continuously sync - metrics::increment_counter( - &metrics::EXECUTING_COMPONENT, - ExecutingComponent::ContinuousSyncer.get_label(), - ); if let Err(error) = self .continuous_syncer .drive_progress(consensus_sync_request) @@ -708,20 +698,43 @@ impl< ); metrics::increment_counter(&metrics::CONTINUOUS_SYNCER_ERRORS, error.get_label()); } - } else { - metrics::increment_counter( - &metrics::EXECUTING_COMPONENT, - ExecutingComponent::Bootstrapper.get_label(), + } else if let Err(error) = self.bootstrapper.drive_progress(&global_data_summary).await { + sample!( + SampleRate::Duration(Duration::from_secs(DRIVER_ERROR_LOG_FREQ_SECS)), + warn!(LogSchema::new(LogEntry::Driver) + .error(&error) + .message("Error found when checking the bootstrapper progress!")); ); - if let Err(error) = self.bootstrapper.drive_progress(&global_data_summary).await { - sample!( - SampleRate::Duration(Duration::from_secs(DRIVER_ERROR_LOG_FREQ_SECS)), - warn!(LogSchema::new(LogEntry::Driver) - .error(&error) - .message("Error found when checking the bootstrapper progress!")); - ); - metrics::increment_counter(&metrics::BOOTSTRAPPER_ERRORS, error.get_label()); + metrics::increment_counter(&metrics::BOOTSTRAPPER_ERRORS, error.get_label()); + }; + } + + /// Updates the executing component metrics for the driver + fn update_executing_component_metrics(&self) { + // Determine the executing component + let executing_component = if self.check_if_consensus_or_observer_executing() { + if self.driver_configuration.role.is_validator() { + ExecutingComponent::Consensus + } else { + ExecutingComponent::ConsensusObserver } + } else if self.bootstrapper.is_bootstrapped() { + ExecutingComponent::ContinuousSyncer + } else { + ExecutingComponent::Bootstrapper }; + + // Increment the executing component counter + metrics::increment_counter( + &metrics::EXECUTING_COMPONENT, + executing_component.get_label(), + ); + + // Set the consensus executing gauge + if executing_component == ExecutingComponent::Consensus { + metrics::CONSENSUS_EXECUTING_GAUGE.set(1); + } else { + metrics::CONSENSUS_EXECUTING_GAUGE.set(0); + } } } diff --git a/state-sync/state-sync-driver/src/metrics.rs b/state-sync/state-sync-driver/src/metrics.rs index dae906ff5016a..777c506e05e11 100644 --- a/state-sync/state-sync-driver/src/metrics.rs +++ b/state-sync/state-sync-driver/src/metrics.rs @@ -3,7 +3,8 @@ use aptos_metrics_core::{ exponential_buckets, histogram_opts, register_histogram_vec, register_int_counter_vec, - register_int_gauge_vec, HistogramTimer, HistogramVec, IntCounterVec, IntGaugeVec, + register_int_gauge, register_int_gauge_vec, HistogramTimer, HistogramVec, IntCounterVec, + IntGauge, IntGaugeVec, }; use once_cell::sync::Lazy; use std::time::Instant; @@ -42,6 +43,7 @@ pub const STORAGE_SYNCHRONIZER_COMMIT_POST_PROCESSOR: &str = "commit_post_proces pub const STORAGE_SYNCHRONIZER_STATE_SNAPSHOT_RECEIVER: &str = "state_snapshot_receiver"; /// An enum representing the component currently executing +#[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum ExecutingComponent { Bootstrapper, Consensus, @@ -105,6 +107,15 @@ pub static BOOTSTRAPPER_ERRORS: Lazy = Lazy::new(|| { .unwrap() }); +/// Gauge indicating whether consensus is currently executing +pub static CONSENSUS_EXECUTING_GAUGE: Lazy = Lazy::new(|| { + register_int_gauge!( + "aptos_state_sync_consensus_executing_gauge", + "Gauge indicating whether consensus is currently executing" + ) + .unwrap() +}); + /// Gauge for state sync continuous syncer fallback mode pub static CONTINUOUS_SYNCER_ERRORS: Lazy = Lazy::new(|| { register_int_counter_vec!( @@ -146,7 +157,7 @@ pub static DRIVER_FALLBACK_MODE: Lazy = Lazy::new(|| { .unwrap() }); -/// Counters related to the currently executing component +/// Counters related to the currently executing component in the main driver loop pub static EXECUTING_COMPONENT: Lazy = Lazy::new(|| { register_int_counter_vec!( "aptos_state_sync_executing_component_counters",