Skip to content

Commit

Permalink
feat: add squad to stats
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler committed Jan 10, 2025
1 parent 94b17b5 commit 65af13c
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 31 deletions.
24 changes: 4 additions & 20 deletions src/server/http/stats/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use std::collections::HashMap;

use axum::{
extract::{Query, State},
extract::{connect_info, Query, State},
http::StatusCode,
Json,
};
Expand Down Expand Up @@ -223,16 +223,12 @@ pub(crate) async fn handle_get_stats(State(state): State<AppState>) -> Result<Js
let timer = std::time::Instant::now();
info!(target: LOG_TARGET, "handle_get_stats");

let last_gossip_message = state.stats_client.get_last_gossip_message().await.map_err(|error| {
let (last_gossip_message, peer_id, squad) = state.stats_client.get_stats_info().await.map_err(|error| {
error!(target: LOG_TARGET, "Failed to get last gossip message: {error:?}");
StatusCode::INTERNAL_SERVER_ERROR
})?;

let (rx_stats, sha3x_stats) = get_chain_stats(state.clone()).await?;
// let peer_count = state.peer_store.peer_count().await;
// let peer_count = 0;
// let connected = peer_count > 0;
// let connected_since = state.peer_store.last_connected();
let connected_since = None;
let (tx, rx) = oneshot::channel();
state
Expand All @@ -248,26 +244,14 @@ pub(crate) async fn handle_get_stats(State(state): State<AppState>) -> Result<Js
error!(target: LOG_TARGET, "Failed to get connection info: {error:?}");
StatusCode::INTERNAL_SERVER_ERROR
})?;
// let connection_info = ConnectionInfo {
// listener_addresses: vec![],
// connected_peers: 0,
// network_info: NetworkInfo {
// num_peers: 0,
// connection_counters: ConnectionCounters {
// pending_incoming: 0,
// pending_outgoing: 0,
// established_incoming: 0,
// established_outgoing: 0,
// },
// },
// };

let stats = Stats {
connection_info,
connected_since,
randomx_stats: rx_stats,
sha3x_stats,
last_gossip_message,
peer_id: peer_id.map(|p| p.to_base58()).unwrap_or_default(),
squad,
};
if timer.elapsed() > MAX_ACCEPTABLE_HTTP_TIMEOUT {
error!(target: LOG_TARGET, "handle_get_stats took too long: {}ms", timer.elapsed().as_millis());
Expand Down
2 changes: 2 additions & 0 deletions src/server/http/stats/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub struct Stats {
pub randomx_stats: GetStatsResponse,
pub sha3x_stats: GetStatsResponse,
pub last_gossip_message: EpochTime,
pub peer_id: String,
pub squad: String,
}

#[derive(Serialize, Deserialize, Clone)]
Expand Down
31 changes: 21 additions & 10 deletions src/server/http/stats_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::time::Duration;

use human_format::Formatter;
use libp2p::PeerId;
use log::{debug, error, info};
use serde::Serialize;
use tari_core::proof_of_work::{Difficulty, PowAlgorithm};
Expand All @@ -14,6 +15,8 @@ use tokio::{
time::MissedTickBehavior,
};

use crate::server::p2p::ConnectionInfo;

const LOG_TARGET: &str = "tari::p2pool::server::stats_collector";
pub(crate) struct StatsCollector {
shutdown_signal: ShutdownSignal,
Expand All @@ -22,6 +25,7 @@ pub(crate) struct StatsCollector {
request_rx: tokio::sync::mpsc::Receiver<StatsRequest>,
first_stat_received: Option<EpochTime>,
last_squad: Option<String>,
local_peer_id: Option<PeerId>,
miner_rx_accepted: u64,
miner_sha_accepted: u64,
// miner_rejected: u64,
Expand Down Expand Up @@ -55,6 +59,7 @@ impl StatsCollector {
request_rx: rx,
request_tx: tx,
last_squad: None,
local_peer_id: None,
first_stat_received: None,
miner_rx_accepted: 0,
miner_sha_accepted: 0,
Expand Down Expand Up @@ -89,8 +94,11 @@ impl StatsCollector {

fn handle_stat(&mut self, sample: StatData) {
match sample {
StatData::SquadChanged { squad, .. } => {
StatData::InfoChanged {
squad, local_peer_id, ..
} => {
self.last_squad = Some(squad);
self.local_peer_id = Some(local_peer_id);
},
StatData::MinerBlockAccepted { pow_algo, .. } => match pow_algo {
PowAlgorithm::Sha3x => {
Expand Down Expand Up @@ -240,8 +248,9 @@ impl StatsCollector {
},
}
},
Some(StatsRequest::GetLastGossipMessage(tx)) => {
let _ = tx.send(self.last_gossip_message).inspect_err(|e| error!(target: LOG_TARGET, "ShareChainError sending last gossip message: {:?}", e));
Some(StatsRequest::GetLatestStats(tx)) => {
let res = (self.last_gossip_message, self.local_peer_id.clone(), self.last_squad.clone().unwrap_or_default());
let _ = tx.send(res).inspect_err(|e| error!(target: LOG_TARGET, "ShareChainError sending latest stats message: {:?}", e));
},
None => {
break;
Expand Down Expand Up @@ -276,7 +285,7 @@ impl StatsCollector {

pub(crate) enum StatsRequest {
GetStats(PowAlgorithm, tokio::sync::oneshot::Sender<GetStatsResponse>),
GetLastGossipMessage(tokio::sync::oneshot::Sender<EpochTime>),
GetLatestStats(tokio::sync::oneshot::Sender<(EpochTime, Option<PeerId>, String)>),
}

#[derive(Serialize, Clone, Debug)]
Expand All @@ -289,8 +298,9 @@ pub(crate) struct GetStatsResponse {

#[derive(Clone)]
pub(crate) enum StatData {
SquadChanged {
InfoChanged {
squad: String,
local_peer_id: PeerId,
timestamp: EpochTime,
},
TargetDifficultyChanged {
Expand Down Expand Up @@ -338,7 +348,7 @@ pub(crate) enum StatData {
impl StatData {
pub fn timestamp(&self) -> EpochTime {
match self {
StatData::SquadChanged { timestamp, .. } => *timestamp,
StatData::InfoChanged { timestamp, .. } => *timestamp,
StatData::MinerBlockAccepted { timestamp, .. } => *timestamp,
StatData::PoolBlockAccepted { timestamp, .. } => *timestamp,
StatData::ChainChanged { timestamp, .. } => *timestamp,
Expand All @@ -363,9 +373,9 @@ impl StatsClient {
Ok(rx.await?)
}

pub async fn get_last_gossip_message(&self) -> Result<EpochTime, anyhow::Error> {
pub async fn get_stats_info(&self) -> Result<(EpochTime, Option<PeerId>, String), anyhow::Error> {
let (tx, rx) = oneshot::channel();
self.request_tx.send(StatsRequest::GetLastGossipMessage(tx)).await?;
self.request_tx.send(StatsRequest::GetLatestStats(tx)).await?;
Ok(rx.await?)
}
}
Expand Down Expand Up @@ -420,9 +430,10 @@ impl StatsBroadcastClient {
self.broadcast(data)
}

pub fn send_squad_changed(&self, squad: String) -> Result<(), anyhow::Error> {
let data = StatData::SquadChanged {
pub fn send_info_changed(&self, squad: String, local_peer_id: PeerId) -> Result<(), anyhow::Error> {
let data = StatData::InfoChanged {
squad,
local_peer_id,
timestamp: EpochTime::now(),
};
self.broadcast(data)
Expand Down
2 changes: 1 addition & 1 deletion src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ where S: ShareChain
(*swarm.local_peer_id().to_bytes().last().unwrap_or(&0) as usize) % config.p2p_service.num_squads;
let squad = format!("{}_{}", config.p2p_service.squad_prefix.clone(), squad_id);
info!(target: LOG_TARGET, "Swarm created. Our id: {}, our squad:{}", swarm.local_peer_id(), squad);
let _res = stats_broadcast_client.send_squad_changed(squad.clone());
let _res = stats_broadcast_client.send_info_changed(squad.clone(), swarm.local_peer_id().clone());

let network_peer_store = PeerStore::new(stats_broadcast_client.clone(), squad.clone());
// client related channels
Expand Down

0 comments on commit 65af13c

Please sign in to comment.