Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cp][aptos-release-v1.25] [Inspection Service] Add simple consensus health check endpoint. #15555

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions crates/aptos-inspection-service/src/server/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
server::utils::CONTENT_TYPE_TEXT, CONFIGURATION_PATH, FORGE_METRICS_PATH, JSON_METRICS_PATH,
METRICS_PATH, PEER_INFORMATION_PATH, SYSTEM_INFORMATION_PATH,
server::utils::CONTENT_TYPE_TEXT, CONFIGURATION_PATH, CONSENSUS_HEALTH_CHECK_PATH,
FORGE_METRICS_PATH, JSON_METRICS_PATH, METRICS_PATH, PEER_INFORMATION_PATH,
SYSTEM_INFORMATION_PATH,
};
use hyper::{Body, StatusCode};

Expand All @@ -25,6 +26,7 @@ fn get_index_response() -> String {
index_response.push("Welcome to the Aptos Inspection Service!".into());
index_response.push("The following endpoints are available:".into());
index_response.push(format!("\t- {}", CONFIGURATION_PATH));
index_response.push(format!("\t- {}", CONSENSUS_HEALTH_CHECK_PATH));
index_response.push(format!("\t- {}", FORGE_METRICS_PATH));
index_response.push(format!("\t- {}", JSON_METRICS_PATH));
index_response.push(format!("\t- {}", METRICS_PATH));
Expand Down
38 changes: 38 additions & 0 deletions crates/aptos-inspection-service/src/server/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,47 @@ use crate::server::{
utils,
utils::{CONTENT_TYPE_JSON, CONTENT_TYPE_TEXT},
};
use aptos_config::config::NodeConfig;
use hyper::{Body, StatusCode};
use prometheus::TextEncoder;

// The metric key for the consensus execution gauge
const CONSENSUS_EXECUTION_GAUGE: &str = "aptos_state_sync_consensus_executing_gauge{}";

/// Handles a consensus health check request. This method returns
/// 200 if the node is currently participating in consensus.
///
/// Note: we assume that this endpoint will only be used every few seconds.
pub async fn handle_consensus_health_check(node_config: &NodeConfig) -> (StatusCode, Body, String) {
// Verify the node is a validator. If not, return an error.
if !node_config.base.role.is_validator() {
return (
StatusCode::BAD_REQUEST,
Body::from("This node is not a validator!"),
CONTENT_TYPE_TEXT.into(),
);
}

// Check the value of the consensus execution gauge
let metrics = utils::get_all_metrics();
if let Some(gauge_value) = metrics.get(CONSENSUS_EXECUTION_GAUGE) {
if gauge_value == "1" {
return (
StatusCode::OK,
Body::from("Consensus health check passed!"),
CONTENT_TYPE_TEXT.into(),
);
}
}

// Otherwise, consensus is not executing
(
StatusCode::INTERNAL_SERVER_ERROR,
Body::from("Consensus health check failed! Consensus is not executing!"),
CONTENT_TYPE_TEXT.into(),
)
}

/// Handles a new forge metrics request
pub fn handle_forge_metrics() -> (StatusCode, Body, String) {
// Get and encode the metrics
Expand Down
6 changes: 6 additions & 0 deletions crates/aptos-inspection-service/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod tests;

// The list of endpoints offered by the inspection service
pub const CONFIGURATION_PATH: &str = "/configuration";
pub const CONSENSUS_HEALTH_CHECK_PATH: &str = "/consensus_health_check";
pub const FORGE_METRICS_PATH: &str = "/forge_metrics";
pub const INDEX_PATH: &str = "/";
pub const JSON_METRICS_PATH: &str = "/json_metrics";
Expand Down Expand Up @@ -111,6 +112,11 @@ async fn serve_requests(
// Exposes the node configuration
configuration::handle_configuration_request(&node_config)
},
CONSENSUS_HEALTH_CHECK_PATH => {
// /consensus_health_check
// Exposes the consensus health check
metrics::handle_consensus_health_check(&node_config).await
},
FORGE_METRICS_PATH => {
// /forge_metrics
// Exposes forge encoded metrics
Expand Down
63 changes: 38 additions & 25 deletions state-sync/state-sync-driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,9 @@ impl<

/// Checks that state sync is making progress
async fn drive_progress(&mut self) {
// Update the executing component metrics
self.update_executing_component_metrics();

// Fetch the global data summary and verify we have active peers
let global_data_summary = self.aptos_data_client.get_global_data_summary();
if global_data_summary.is_empty() {
Expand All @@ -673,15 +676,6 @@ impl<

// If consensus or consensus observer is executing, there's nothing to do
if self.check_if_consensus_or_observer_executing() {
let executing_component = if self.driver_configuration.role.is_validator() {
ExecutingComponent::Consensus
} else {
ExecutingComponent::ConsensusObserver
};
metrics::increment_counter(
&metrics::EXECUTING_COMPONENT,
executing_component.get_label(),
);
return;
}

Expand All @@ -691,10 +685,6 @@ impl<
let consensus_sync_request = self.consensus_notification_handler.get_sync_request();

// Attempt to continuously sync
metrics::increment_counter(
&metrics::EXECUTING_COMPONENT,
ExecutingComponent::ContinuousSyncer.get_label(),
);
if let Err(error) = self
.continuous_syncer
.drive_progress(consensus_sync_request)
Expand All @@ -708,20 +698,43 @@ impl<
);
metrics::increment_counter(&metrics::CONTINUOUS_SYNCER_ERRORS, error.get_label());
}
} else {
metrics::increment_counter(
&metrics::EXECUTING_COMPONENT,
ExecutingComponent::Bootstrapper.get_label(),
} else if let Err(error) = self.bootstrapper.drive_progress(&global_data_summary).await {
sample!(
SampleRate::Duration(Duration::from_secs(DRIVER_ERROR_LOG_FREQ_SECS)),
warn!(LogSchema::new(LogEntry::Driver)
.error(&error)
.message("Error found when checking the bootstrapper progress!"));
);
if let Err(error) = self.bootstrapper.drive_progress(&global_data_summary).await {
sample!(
SampleRate::Duration(Duration::from_secs(DRIVER_ERROR_LOG_FREQ_SECS)),
warn!(LogSchema::new(LogEntry::Driver)
.error(&error)
.message("Error found when checking the bootstrapper progress!"));
);
metrics::increment_counter(&metrics::BOOTSTRAPPER_ERRORS, error.get_label());
metrics::increment_counter(&metrics::BOOTSTRAPPER_ERRORS, error.get_label());
};
}

/// Updates the executing component metrics for the driver
fn update_executing_component_metrics(&self) {
// Determine the executing component
let executing_component = if self.check_if_consensus_or_observer_executing() {
if self.driver_configuration.role.is_validator() {
ExecutingComponent::Consensus
} else {
ExecutingComponent::ConsensusObserver
}
} else if self.bootstrapper.is_bootstrapped() {
ExecutingComponent::ContinuousSyncer
} else {
ExecutingComponent::Bootstrapper
};

// Increment the executing component counter
metrics::increment_counter(
&metrics::EXECUTING_COMPONENT,
executing_component.get_label(),
);

// Set the consensus executing gauge
if executing_component == ExecutingComponent::Consensus {
metrics::CONSENSUS_EXECUTING_GAUGE.set(1);
} else {
metrics::CONSENSUS_EXECUTING_GAUGE.set(0);
}
}
}
15 changes: 13 additions & 2 deletions state-sync/state-sync-driver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

use aptos_metrics_core::{
exponential_buckets, histogram_opts, register_histogram_vec, register_int_counter_vec,
register_int_gauge_vec, HistogramTimer, HistogramVec, IntCounterVec, IntGaugeVec,
register_int_gauge, register_int_gauge_vec, HistogramTimer, HistogramVec, IntCounterVec,
IntGauge, IntGaugeVec,
};
use once_cell::sync::Lazy;
use std::time::Instant;
Expand Down Expand Up @@ -42,6 +43,7 @@ pub const STORAGE_SYNCHRONIZER_COMMIT_POST_PROCESSOR: &str = "commit_post_proces
pub const STORAGE_SYNCHRONIZER_STATE_SNAPSHOT_RECEIVER: &str = "state_snapshot_receiver";

/// An enum representing the component currently executing
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ExecutingComponent {
Bootstrapper,
Consensus,
Expand Down Expand Up @@ -105,6 +107,15 @@ pub static BOOTSTRAPPER_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});

/// Gauge indicating whether consensus is currently executing
pub static CONSENSUS_EXECUTING_GAUGE: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"aptos_state_sync_consensus_executing_gauge",
"Gauge indicating whether consensus is currently executing"
)
.unwrap()
});

/// Gauge for state sync continuous syncer fallback mode
pub static CONTINUOUS_SYNCER_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
Expand Down Expand Up @@ -146,7 +157,7 @@ pub static DRIVER_FALLBACK_MODE: Lazy<IntGaugeVec> = Lazy::new(|| {
.unwrap()
});

/// Counters related to the currently executing component
/// Counters related to the currently executing component in the main driver loop
pub static EXECUTING_COMPONENT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_state_sync_executing_component_counters",
Expand Down