Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: server health call implementation #81

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

All notable changes to this project will be documented in this file.

## [0.2.3] - 2024-10-15

### 🚀 Features

- Server health call implementation

## [0.2.2] - 2024-10-10

### 🚀 Features
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "kalatori"
authors = ["Alzymologist Oy <[email protected]>"]
version = "0.2.2"
version = "0.2.3"
edition = "2021"
description = "A gateway daemon for Kalatori."
license = "GPL-3.0-or-later"
Expand Down
3 changes: 2 additions & 1 deletion src/chain/definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
tracker::ChainWatcher,
},
definitions::{
api_v2::{OrderInfo, Timestamp},
api_v2::{OrderInfo, RpcInfo, Timestamp},
Balance,
},
error::{ChainError, NotHex},
Expand Down Expand Up @@ -89,6 +89,7 @@ pub enum ChainRequest {
WatchAccount(WatchAccount),
Reap(WatchAccount),
Shutdown(oneshot::Sender<()>),
GetConnectedRpcs(oneshot::Sender<Vec<RpcInfo>>),
}

#[derive(Debug)]
Expand Down
125 changes: 81 additions & 44 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Everything related to actual interaction with blockchain

use std::collections::HashMap;

use substrate_crypto_light::common::AccountId32;
use tokio::{
sync::{mpsc, oneshot},
Expand All @@ -23,6 +22,7 @@ pub mod rpc;
pub mod tracker;
pub mod utils;

use crate::definitions::api_v2::{Health, RpcInfo, ServerHealth};
use definitions::{ChainRequest, ChainTrackerRequest, WatchAccount};
use tracker::start_chain_watch;

Expand All @@ -35,7 +35,7 @@ const SHUTDOWN_TIMEOUT: Duration = Duration::from_millis(120000);
/// RPC server handle
#[derive(Clone, Debug)]
pub struct ChainManager {
pub tx: tokio::sync::mpsc::Sender<ChainRequest>,
pub tx: mpsc::Sender<ChainRequest>,
}

impl ChainManager {
Expand All @@ -54,6 +54,9 @@ impl ChainManager {

let mut currency_map = HashMap::new();

// Create a channel for receiving RPC status updates
let (rpc_update_tx, mut rpc_update_rx) = mpsc::channel(1024);

// start network monitors
for c in chain {
if c.endpoints.is_empty() {
Expand Down Expand Up @@ -82,61 +85,86 @@ impl ChainManager {
signer.interface(),
task_tracker.clone(),
cancellation_token.clone(),
rpc_update_tx.clone(),
);
}

task_tracker
.clone()
.spawn("Blockchain connections manager", async move {
let mut rpc_statuses: HashMap<(String, String), Health> = HashMap::new();


// start requests engine
while let Some(request) = rx.recv().await {
match request {
ChainRequest::WatchAccount(request) => {
if let Some(chain) = currency_map.get(&request.currency) {
if let Some(receiver) = watch_chain.get(chain) {
let _unused = receiver
.send(ChainTrackerRequest::WatchAccount(request))
.await;
} else {
let _unused = request
.res
.send(Err(ChainError::InvalidChain(chain.to_string())));
loop {
tokio::select! {
Some(request) = rx.recv() => {
match request {
ChainRequest::WatchAccount(request) => {
if let Some(chain) = currency_map.get(&request.currency) {
if let Some(receiver) = watch_chain.get(chain) {
let _unused = receiver
.send(ChainTrackerRequest::WatchAccount(request))
.await;
} else {
let _unused = request
.res
.send(Err(ChainError::InvalidChain(chain.to_string())));
}
} else {
let _unused = request
.res
.send(Err(ChainError::InvalidCurrency(request.currency)));
}
}
} else {
let _unused = request
.res
.send(Err(ChainError::InvalidCurrency(request.currency)));
}
}
ChainRequest::Reap(request) => {
if let Some(chain) = currency_map.get(&request.currency) {
if let Some(receiver) = watch_chain.get(chain) {
let _unused =
receiver.send(ChainTrackerRequest::Reap(request)).await;
} else {
let _unused = request
.res
.send(Err(ChainError::InvalidChain(chain.to_string())));
ChainRequest::Reap(request) => {
if let Some(chain) = currency_map.get(&request.currency) {
if let Some(receiver) = watch_chain.get(chain) {
let _unused =
receiver.send(ChainTrackerRequest::Reap(request)).await;
} else {
let _unused = request
.res
.send(Err(ChainError::InvalidChain(chain.to_string())));
}
} else {
let _unused = request
.res
.send(Err(ChainError::InvalidCurrency(request.currency)));
}
}
} else {
let _unused = request
.res
.send(Err(ChainError::InvalidCurrency(request.currency)));
}
}
ChainRequest::Shutdown(res) => {
for (name, chain) in watch_chain.drain() {
let (tx, rx) = oneshot::channel();
if chain.send(ChainTrackerRequest::Shutdown(tx)).await.is_ok() {
if timeout(SHUTDOWN_TIMEOUT, rx).await.is_err() {
tracing::error!("Chain monitor for {name} took too much time to wind down, probably it was frozen. Discarding it.");
};
ChainRequest::Shutdown(res) => {
for (name, chain) in watch_chain.drain() {
let (tx, rx) = oneshot::channel();
if chain.send(ChainTrackerRequest::Shutdown(tx)).await.is_ok() {
if timeout(SHUTDOWN_TIMEOUT, rx).await.is_err() {
tracing::error!("Chain monitor for {name} took too much time to wind down, probably it was frozen. Discarding it.");
};
}
}
let _ = res.send(());
break;
}
ChainRequest::GetConnectedRpcs(res_tx) => {
// Collect the RpcInfo from rpc_statuses
let connected_rpcs: Vec<RpcInfo> = rpc_statuses.iter().map(|((chain_name, rpc_url), status)| {
RpcInfo {
chain_name: chain_name.clone(),
rpc_url: rpc_url.clone(),
status: *status,
}
}).collect();
let _ = res_tx.send(connected_rpcs);
}
}
let _ = res.send(());
break;
}
Some(rpc_update) = rpc_update_rx.recv() => {
rpc_statuses.insert(
(rpc_update.chain_name.clone(), rpc_update.rpc_url.clone()),
rpc_update.status,
);
}
else => break,
}
}

Expand All @@ -162,6 +190,15 @@ impl ChainManager {
rx.await.map_err(|_| ChainError::MessageDropped)?
}

pub async fn get_connected_rpcs(&self) -> Result<Vec<RpcInfo>, Error> {
let (res_tx, res_rx) = oneshot::channel();
self.tx
.send(ChainRequest::GetConnectedRpcs(res_tx))
.await
.map_err(|_| Error::Fatal)?;
res_rx.await.map_err(|_| Error::Fatal)
}

pub async fn reap(
&self,
id: String,
Expand Down
34 changes: 31 additions & 3 deletions src/chain/tracker.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
//! A tracker that follows individual chain

use std::{collections::HashMap, time::SystemTime};

use frame_metadata::v15::RuntimeMetadataV15;
use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
use serde_json::Value;
use std::{collections::HashMap, time::SystemTime};
use substrate_parser::{AsMetadata, ShortSpecs};
use tokio::{
sync::mpsc,
time::{timeout, Duration},
};
use tokio_util::sync::CancellationToken;

use crate::definitions::api_v2::{Health, RpcInfo};
use crate::{
chain::{
definitions::{BlockHash, ChainTrackerRequest, Invoice},
Expand All @@ -38,6 +38,7 @@ pub fn start_chain_watch(
signer: Signer,
task_tracker: TaskTracker,
cancellation_token: CancellationToken,
rpc_update_tx: mpsc::Sender<RpcInfo>,
) {
task_tracker
.clone()
Expand All @@ -51,9 +52,30 @@ pub fn start_chain_watch(
if shutdown || cancellation_token.is_cancelled() {
break;
}

let _ = rpc_update_tx.send(RpcInfo {
chain_name: chain.name.clone(),
rpc_url: endpoint.clone(),
status: Health::Degraded,
}).await;

if let Ok(client) = WsClientBuilder::default().build(endpoint).await {
let _ = rpc_update_tx.send(RpcInfo {
chain_name: chain.name.clone(),
rpc_url: endpoint.clone(),
status: Health::Ok,
}).await;

// prepare chain
let watcher = match ChainWatcher::prepare_chain(&client, chain.clone(), &mut watched_accounts, endpoint, chain_tx.clone(), state.interface(), task_tracker.clone())
let watcher = match ChainWatcher::prepare_chain(
&client,
chain.clone(),
&mut watched_accounts,
endpoint,
chain_tx.clone(),
state.interface(),
task_tracker.clone(),
)
.await
{
Ok(a) => a,
Expand Down Expand Up @@ -165,6 +187,12 @@ pub fn start_chain_watch(
}
}
}
} else {
let _ = rpc_update_tx.send(RpcInfo {
chain_name: chain.name.clone(),
rpc_url: endpoint.clone(),
status: Health::Critical,
}).await;
}
}
Ok(format!("Chain {} monitor shut down", chain.name).into())
Expand Down
24 changes: 12 additions & 12 deletions src/definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ pub mod api_v2 {
pub enum WithdrawalStatus {
Waiting,
Failed,
Forced,
Completed,
}

Expand All @@ -185,24 +186,23 @@ pub mod api_v2 {
pub supported_currencies: HashMap<std::string::String, CurrencyProperties>,
}

#[allow(dead_code)] // TODO: Use this for health response?
#[derive(Debug, Serialize)]
struct ServerHealth {
server_info: ServerInfo,
connected_rpcs: Vec<RpcInfo>,
status: Health,
pub struct ServerHealth {
pub server_info: ServerInfo,
pub connected_rpcs: Vec<RpcInfo>,
pub status: Health,
}

#[derive(Debug, Serialize)]
struct RpcInfo {
rpc_url: String,
chain_name: String,
status: Health,
#[derive(Debug, Serialize, Clone)]
pub struct RpcInfo {
pub rpc_url: String,
pub chain_name: String,
pub status: Health,
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Clone, PartialEq, Copy)]
#[serde(rename_all = "lowercase")]
enum Health {
pub enum Health {
Ok,
Degraded,
Critical,
Expand Down
14 changes: 10 additions & 4 deletions src/handlers/health.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::definitions::api_v2::ServerStatus;
use crate::definitions::api_v2::{ServerHealth, ServerStatus};
use crate::state::State;
use axum::{extract::State as ExtractState, http::StatusCode, Json};

Expand All @@ -18,12 +18,18 @@ pub async fn status(
}

pub async fn health(
ExtractState(_state): ExtractState<State>,
ExtractState(state): ExtractState<State>,
) -> (
[(axum::http::header::HeaderName, &'static str); 1],
Json<ServerStatus>,
Json<ServerHealth>,
) {
todo!();
match state.server_health().await {
Ok(status) => (
[(axum::http::header::CACHE_CONTROL, "no-store")],
Json(status),
),
Err(_) => panic!("db connection is down, state is lost"),
}
}

pub async fn audit(ExtractState(_state): ExtractState<State>) -> StatusCode {
Expand Down
Loading
Loading