Skip to content

Commit

Permalink
refactor: implement event-driven connection statistics
Browse files Browse the repository at this point in the history
- Extract connection statistics into dedicated StatsManager
- Add configurable stats management with StatsConfig
- Implement event-based stats collection using StatEvent enum
- Move connection tracking logic from ConnectionManager to StatsManager
- Add comprehensive test coverage for connection lifecycle
- Replace direct counter updates with async event channel
- Improve error handling and logging for stats operations
- Fix connection cleanup by separating stats and connection state
  • Loading branch information
aljen committed Dec 6, 2024
1 parent 2fb18ba commit 08100d0
Show file tree
Hide file tree
Showing 13 changed files with 952 additions and 470 deletions.
2 changes: 2 additions & 0 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod http;
mod logging;
mod relay;
mod rtu;
mod stats;
mod tcp;
mod types;

Expand All @@ -13,5 +14,6 @@ pub use http::Config as HttpConfig;
pub use logging::Config as LoggingConfig;
pub use relay::Config as RelayConfig;
pub use rtu::Config as RtuConfig;
pub use stats::Config as StatsConfig;
pub use tcp::Config as TcpConfig;
pub use types::{DataBits, Parity, RtsType, StopBits};
25 changes: 25 additions & 0 deletions src/config/stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use std::time::Duration;

use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
#[serde(with = "humantime_serde")]
pub cleanup_interval: Duration,
#[serde(with = "humantime_serde")]
pub idle_timeout: Duration,
#[serde(with = "humantime_serde")]
pub error_timeout: Duration,
pub max_events_per_second: u32,
}

impl Default for Config {
fn default() -> Self {
Self {
cleanup_interval: Duration::from_secs(60),
idle_timeout: Duration::from_secs(300),
error_timeout: Duration::from_secs(300),
max_events_per_second: 10000,
}
}
}
27 changes: 23 additions & 4 deletions src/connection/events.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,26 @@
use std::time::Duration;
use super::{stats::ClientStats, ConnectionStats};
use std::net::SocketAddr;
use tokio::sync::oneshot;

#[derive(Debug, Clone)]
#[derive(Debug)]
pub enum StatEvent {
Request { success: bool },
ResponseTime(Duration),
/// Client connected from address
ClientConnected(SocketAddr),
/// Client disconnected from address
ClientDisconnected(SocketAddr),
/// Request processed with success/failure and duration
RequestProcessed {
addr: SocketAddr,
success: bool,
duration_ms: u64,
},
/// Query stats for specific address
QueryStats {
addr: SocketAddr,
response_tx: oneshot::Sender<ClientStats>,
},
/// Query global connection stats
QueryConnectionStats {
response_tx: oneshot::Sender<ConnectionStats>,
},
}
12 changes: 2 additions & 10 deletions src/connection/guard.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{net::SocketAddr, sync::Arc};

use tokio::sync::OwnedSemaphorePermit;
use tracing::debug;

Expand All @@ -16,20 +15,13 @@ pub struct ConnectionGuard {

impl Drop for ConnectionGuard {
fn drop(&mut self) {
let manager = Arc::clone(&self.manager);
let manager = self.manager.clone();
let addr = self.addr;

debug!("Closing connection from {}", addr);

tokio::spawn(async move {
let mut stats = manager.stats.lock().await;
if let Some(client_stats) = stats.get_mut(&addr) {
client_stats.active_connections -= 1;
debug!(
"Connection from {} closed, active connections: {}",
addr, client_stats.active_connections
);
}
manager.decrease_connection_count(addr).await;
});
}
}
Loading

0 comments on commit 08100d0

Please sign in to comment.