Skip to content

Commit

Permalink
health response validation
Browse files Browse the repository at this point in the history
  • Loading branch information
ermalkaleci committed Apr 5, 2024
1 parent 08f44b8 commit 74919fb
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 32 deletions.
5 changes: 5 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ extensions:
health_check:
interval_sec: 10 # check interval, default is 10s
healthy_response_time_ms: 500 # max response time to be considered healthy, default is 500ms
health_method: system_health
expected_response: # response contains { isSyncing: false }
!object
- - isSyncing
- !value false
event_bus:
substrate_api:
stale_timeout_seconds: 180 # rotate endpoint if no new blocks for 3 minutes
Expand Down
12 changes: 12 additions & 0 deletions eth_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,18 @@ extensions:
client:
endpoints:
- wss://eth-rpc-karura-testnet.aca-staging.network
health_check:
interval_sec: 10 # check interval, default is 10s
healthy_response_time_ms: 500 # max response time to be considered healthy, default is 500ms
health_method: net_health # eth-rpc-adapter bodhijs
expected_response: # response contains { isHealthy: true, isRPCOK: true }
!object
- - isHealthy
- !value true
- - isRPCOK
- !value true
# health_method: eth_syncing # eth node
# expected_response: !value false
event_bus:
eth_api:
stale_timeout_seconds: 180 # rotate endpoint if no new blocks for 3 minutes
Expand Down
38 changes: 13 additions & 25 deletions src/extensions/client/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,16 @@ impl Health {
on_client_ready: Arc<tokio::sync::Notify>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
// no health method
if health.config.health_method.is_none() {
return;
}

// Wait for the client to be ready before starting the health check
on_client_ready.notified().await;

let method_name = health.config.health_method.as_str();
let method_name = health.config.health_method.as_ref().expect("checked above");
let expected_response = health.config.expected_response.clone();
let interval = Duration::from_secs(health.config.interval_sec);
let healthy_response_time = Duration::from_millis(health.config.healthy_response_time_ms);

Expand All @@ -127,31 +133,13 @@ impl Health {
Ok(response) => {
let duration = request_start.elapsed();

// Check known health responses
match method_name {
"system_health" => {
// Substrate node
if let Some(true) = response.get("isSyncing").and_then(|x| x.as_bool()) {
health.update(Event::StaleChain);
continue;
}
}
"net_health" => {
// Eth-RPC-Adapter (bodhijs)
if let Some(false) = response.get("isHealthy").and_then(|x| x.as_bool()) {
health.update(Event::StaleChain);
continue;
}
// Check response
if let Some(ref expected) = expected_response {
if !expected.validate(&response) {
health.update(Event::StaleChain);
continue;
}
"eth_syncing" => {
// Ethereum node
if response.as_bool().unwrap_or(true) {
health.update(Event::StaleChain);
continue;
}
}
_ => {}
};
}

// Check response time
if duration > healthy_response_time {
Expand Down
145 changes: 139 additions & 6 deletions src/extensions/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use async_trait::async_trait;
use jsonrpsee::core::{client::Subscription, Error, JsonValue};
use opentelemetry::trace::FutureExt;
use rand::{seq::SliceRandom, thread_rng};
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use tokio::sync::Notify;

use super::ExtensionRegistry;
Expand Down Expand Up @@ -60,16 +60,17 @@ pub struct HealthCheckConfig {
pub interval_sec: u64,
#[serde(default = "healthy_response_time_ms")]
pub healthy_response_time_ms: u64,
#[serde(default = "system_health")]
pub health_method: String,
pub health_method: Option<String>,
pub expected_response: Option<HealthResponse>,
}

impl Default for HealthCheckConfig {
fn default() -> Self {
Self {
interval_sec: interval_sec(),
healthy_response_time_ms: healthy_response_time_ms(),
health_method: system_health(),
health_method: None,
expected_response: None,
}
}
}
Expand All @@ -82,8 +83,32 @@ pub fn healthy_response_time_ms() -> u64 {
500
}

pub fn system_health() -> String {
"system_health".to_string()
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum HealthResponse {
Value(JsonValue),
Object(Vec<(String, Box<HealthResponse>)>),
}

impl HealthResponse {
pub fn validate(&self, response: &JsonValue) -> bool {
match (self, response) {
(HealthResponse::Value(value), response) => value.eq(response),
(HealthResponse::Object(items), response) => {
for (key, expected) in items {
if let Some(response) = response.get(key) {
if !expected.validate(response) {
return false;
}
} else {
// key missing
return false;
}
}
true
}
}
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -466,3 +491,111 @@ fn test_get_backoff_time() {
vec![100, 200, 500, 1000, 1700, 2600, 3700, 5000, 6500, 8200, 10100, 10100]
);
}

#[test]
fn health_response_serialize_deserialize_works() {
let response = HealthResponse::Object(vec![(
"isSyncing".to_string(),
Box::new(HealthResponse::Value(false.into())),
)]);

let expected = serde_yaml::from_str::<HealthResponse>(
&r#"
!object
- - isSyncing
- !value false
"#,
)
.unwrap();

assert_eq!(response, expected);
}

#[test]
fn health_response_validation_works() {
use serde_json::json;

let expected = serde_yaml::from_str::<HealthResponse>(
&r#"
!value true
"#,
)
.unwrap();
assert!(expected.validate(&json!(true)));
assert!(!expected.validate(&json!(false)));

let expected = serde_yaml::from_str::<HealthResponse>(
&r#"
!object
- - isSyncing
- !value false
"#,
)
.unwrap();
let cases = [
(json!({ "isSyncing": false }), true),
(json!({ "isSyncing": true }), false),
(json!({ "isSyncing": false, "peers": 2 }), true),
(json!({ "isSyncing": true, "peers": 2 }), false),
(json!({}), false),
(json!(true), false),
];
for (input, output) in cases {
assert_eq!(expected.validate(&input), output);
}

// multiple items
let expected = serde_yaml::from_str::<HealthResponse>(
&r#"
!object
- - isSyncing
- !value false
- - peers
- !value 3
"#,
)
.unwrap();
let cases = [
(json!({ "isSyncing": false, "peers": 3 }), true),
(json!({ "isSyncing": false, "peers": 2 }), false),
(json!({ "isSyncing": true, "peers": 3 }), false),
];
for (input, output) in cases {
assert_eq!(expected.validate(&input), output);
}

// works with strings
let expected = serde_yaml::from_str::<HealthResponse>(
&r#"
!object
- - foo
- !value bar
"#,
)
.unwrap();
assert!(expected.validate(&json!({ "foo": "bar" })));
assert!(!expected.validate(&json!({ "foo": "bar bar" })));

// multiple nested items
let expected = serde_yaml::from_str::<HealthResponse>(
&r#"
!object
- - foo
- !object
- - one
- !value subway
- - two
- !value subway
"#,
)
.unwrap();
let cases = [
(json!({ "foo": { "one": "subway", "two": "subway" } }), true),
(json!({ "foo": { "subway": "one" } }), false),
(json!({ "bar" : { "foo": { "subway": "one", "two": "subway" } }}), false),
(json!({ "foo": "subway" }), false),
];
for (input, output) in cases {
assert_eq!(expected.validate(&input), output);
}
}
6 changes: 5 additions & 1 deletion src/extensions/client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,11 @@ async fn health_check_works() {
Some(HealthCheckConfig {
interval_sec: 1,
healthy_response_time_ms: 250,
health_method: "system_health".into(),
health_method: Some("system_health".into()),
expected_response: Some(HealthResponse::Object(vec![(
"isSyncing".to_string(),
Box::new(HealthResponse::Value(false.into())),
)])),
}),
)
.unwrap();
Expand Down

0 comments on commit 74919fb

Please sign in to comment.