Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add (optional) statsd metrics reporting #51

Merged
merged 4 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ Line wrap the file at 100 chars. Th


## [Unreleased]
### Changed
- Add (optional) statsd metrics reporting support to `tcp2udp` binary and library module when the
`statsd` cargo feature is enabled.


## [0.3.1] - 2023-10-25
Expand Down
29 changes: 29 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ opt-level = 3
lto = true
codegen-units = 1

[features]
# Enable this feature to make it possible to have tcp2udp report metrics over statsd
statsd = ["cadence"]

[dependencies]
tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "net", "time", "io-util"] }
err-context = "0.1.0"
Expand All @@ -32,6 +36,7 @@ lazy_static = "1.4.0"
# Only used by the binaries in src/bin/ and is optional so it's not
# pulled in when built as a library.
env_logger = { version = "0.10.0", optional = true }
cadence = { version = "1.0.0", optional = true }

[target.'cfg(target_os = "linux")'.dependencies]
nix = { version = "0.27.1", features = ["socket"] }
1 change: 1 addition & 0 deletions build-static-bins.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ RUSTFLAGS="-C target-feature=+crt-static" \
--target x86_64-unknown-linux-gnu \
--features env_logger \
--features clap \
--features statsd \
--bins
146 changes: 146 additions & 0 deletions src/statsd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
#[cfg(feature = "statsd")]
pub use real::Error;

pub struct StatsdMetrics(StatsdMetricsChooser);

enum StatsdMetricsChooser {
Dummy,
#[cfg(feature = "statsd")]
Real(real::StatsdMetrics),
}

impl StatsdMetrics {
/// Creates a dummy statsd metrics instance. Does not actually connect to any statds
/// server, nor emits any events. Used as an API compatible drop in when metrics
/// should not be emitted.
pub fn dummy() -> Self {
Self(StatsdMetricsChooser::Dummy)
}

/// Creates a statsd metric reporting instance connecting to the given host addr.
#[cfg(feature = "statsd")]
pub fn real(host: std::net::SocketAddr) -> Result<Self, Error> {
let statsd = real::StatsdMetrics::new(host)?;
Ok(Self(StatsdMetricsChooser::Real(statsd)))
}

/// Emit a metric saying we failed to accept an incoming TCP connection (probably ran out of file descriptors)
pub fn accept_error(&self) {
#[cfg(feature = "statsd")]
if let StatsdMetricsChooser::Real(statsd) = &self.0 {
statsd.accept_error()
}
}

/// Increment the connection counter inside this metrics instance and emit that new gauge value
pub fn incr_connections(&self) {
#[cfg(feature = "statsd")]
if let StatsdMetricsChooser::Real(statsd) = &self.0 {
statsd.incr_connections()
}
}

/// Decrement the connection counter inside this metrics instance and emit that new gauge value
pub fn decr_connections(&self) {
#[cfg(feature = "statsd")]
if let StatsdMetricsChooser::Real(statsd) = &self.0 {
statsd.decr_connections()
}
}
}

#[cfg(feature = "statsd")]
mod real {
use cadence::{CountedExt, Gauged, QueuingMetricSink, StatsdClient, UdpMetricSink};
use std::sync::atomic::{AtomicU64, Ordering};

/// Queue with a maximum capacity of 8K events.
/// This program is extremely unlikely to ever reach that upper bound.
/// The bound is still here so that if it ever were to happen, we drop events
/// instead of indefinitely filling the memory with unsent events.
const QUEUE_SIZE: usize = 8 * 1024;

const PREFIX: &str = "tcp2udp";

#[derive(Debug)]
pub enum Error {
/// Failed to create + bind the statsd UDP socket.
BindUdpSocket(std::io::Error),
/// Failed to create statsd client.
CreateStatsdClient(cadence::MetricError),
}

impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use Error::*;
match self {
BindUdpSocket(_) => "Failed to bind the UDP socket".fmt(f),
CreateStatsdClient(e) => e.fmt(f),
}
}
}

impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
use Error::*;
match self {
BindUdpSocket(e) => Some(e),
CreateStatsdClient(e) => e.source(),
}
}
}

pub struct StatsdMetrics {
client: StatsdClient,
num_connections: AtomicU64,
}

impl StatsdMetrics {
pub fn new(host: std::net::SocketAddr) -> Result<Self, Error> {
let socket = std::net::UdpSocket::bind("0.0.0.0:0").map_err(Error::BindUdpSocket)?;
log::debug!(
"Statsd socket bound to {}",
socket
.local_addr()
.map(|a| a.to_string())
.unwrap_or_else(|_| "Unknown".to_owned())
);

// Create a non-buffered blocking metrics sink. It is important that it's not buffered,
// so events are emitted instantly when they happen (this program does not emit a lot of
// events, nor does it attach timestamps to the events.
// The fact that it's blocking does not matter, since the `QueuingMetricSink` will make sure
// the `UdpMetricSink` runs in its own thread anyway.
let udp_sink = UdpMetricSink::from(host, socket).map_err(Error::CreateStatsdClient)?;
let queuing_sink = QueuingMetricSink::with_capacity(udp_sink, QUEUE_SIZE);
let statds_client = StatsdClient::from_sink(PREFIX, queuing_sink);
Ok(Self {
client: statds_client,
num_connections: AtomicU64::new(0),
})
}

pub fn accept_error(&self) {
log::debug!("Sending statsd tcp_accept_errors");
if let Err(e) = self.client.incr("tcp_accept_errors") {
log::error!("Failed to emit statsd tcp_accept_errors: {e}");
}
}

pub fn incr_connections(&self) {
let num_connections = self.num_connections.fetch_add(1, Ordering::Relaxed) + 1;
log::debug!("Sending statsd num_connections = {num_connections}");
if let Err(e) = self.client.gauge("num_connections", num_connections) {
log::error!("Failed to emit statsd num_connections: {e}");
}
}

pub fn decr_connections(&self) {
let num_connections = self.num_connections.fetch_sub(1, Ordering::Relaxed) - 1;
log::debug!("Sending statsd num_connections = {num_connections}");
if let Err(e) = self.client.gauge("num_connections", num_connections) {
log::error!("Failed to emit statsd num_connections: {e}");
}
}
}
}
34 changes: 34 additions & 0 deletions src/tcp2udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ use std::convert::Infallible;
use std::fmt;
use std::io;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;
use tokio::net::{TcpListener, TcpSocket, TcpStream, UdpSocket};
use tokio::time::sleep;

#[path = "statsd.rs"]
mod statsd;

#[derive(Debug)]
#[cfg_attr(feature = "clap", derive(clap::Parser))]
#[cfg_attr(feature = "clap", group(skip))]
Expand All @@ -31,6 +35,11 @@ pub struct Options {

#[cfg_attr(feature = "clap", clap(flatten))]
pub tcp_options: crate::tcp_options::TcpOptions,

#[cfg(feature = "statsd")]
/// Host to send statsd metrics to.
#[cfg_attr(feature = "clap", clap(long))]
statsd_host: Option<SocketAddr>,
}

/// Error returned from [`run`] if something goes wrong.
Expand All @@ -47,6 +56,9 @@ pub enum Tcp2UdpError {
BindTcpSocket(io::Error, SocketAddr),
/// Failed to start listening on TCP socket
ListenTcpSocket(io::Error, SocketAddr),
#[cfg(feature = "statsd")]
/// Failed to initialize statsd client
CreateStatsdClient(statsd::Error),
}

impl fmt::Display for Tcp2UdpError {
Expand All @@ -63,6 +75,8 @@ impl fmt::Display for Tcp2UdpError {
"Failed to start listening on TCP socket bound to {}",
addr
),
#[cfg(feature = "statsd")]
CreateStatsdClient(_) => "Failed to init metrics client".fmt(f),
}
}
}
Expand All @@ -77,6 +91,8 @@ impl std::error::Error for Tcp2UdpError {
SetReuseAddr(e) => Some(e),
BindTcpSocket(e, _) => Some(e),
ListenTcpSocket(e, _) => Some(e),
#[cfg(feature = "statsd")]
CreateStatsdClient(e) => Some(e),
}
}
}
Expand All @@ -98,6 +114,16 @@ pub async fn run(options: Options) -> Result<Infallible, Tcp2UdpError> {
}
});

#[cfg(not(feature = "statsd"))]
let statsd = Arc::new(statsd::StatsdMetrics::dummy());
#[cfg(feature = "statsd")]
let statsd = Arc::new(match options.statsd_host {
None => statsd::StatsdMetrics::dummy(),
Some(statsd_host) => {
statsd::StatsdMetrics::real(statsd_host).map_err(Tcp2UdpError::CreateStatsdClient)?
}
});

let mut join_handles = Vec::with_capacity(options.tcp_listen_addrs.len());
for tcp_listen_addr in options.tcp_listen_addrs {
let tcp_listener = create_listening_socket(tcp_listen_addr, &options.tcp_options)?;
Expand All @@ -106,13 +132,15 @@ pub async fn run(options: Options) -> Result<Infallible, Tcp2UdpError> {
let udp_forward_addr = options.udp_forward_addr;
let tcp_recv_timeout = options.tcp_options.recv_timeout;
let tcp_nodelay = options.tcp_options.nodelay;
let statsd = Arc::clone(&statsd);
join_handles.push(tokio::spawn(async move {
process_tcp_listener(
tcp_listener,
udp_bind_ip,
udp_forward_addr,
tcp_recv_timeout,
tcp_nodelay,
statsd,
)
.await;
}));
Expand Down Expand Up @@ -150,6 +178,7 @@ async fn process_tcp_listener(
udp_forward_addr: SocketAddr,
tcp_recv_timeout: Option<Duration>,
tcp_nodelay: bool,
statsd: Arc<statsd::StatsdMetrics>,
) -> ! {
let mut cooldown =
ExponentialBackoff::new(Duration::from_millis(50), Duration::from_millis(5000));
Expand All @@ -160,7 +189,9 @@ async fn process_tcp_listener(
if let Err(error) = crate::tcp_options::set_nodelay(&tcp_stream, tcp_nodelay) {
log::error!("Error: {}", error.display("\nCaused by: "));
}
let statsd = statsd.clone();
tokio::spawn(async move {
statsd.incr_connections();
if let Err(error) = process_socket(
tcp_stream,
tcp_peer_addr,
Expand All @@ -172,12 +203,15 @@ async fn process_tcp_listener(
{
log::error!("Error: {}", error.display("\nCaused by: "));
}
statsd.decr_connections();
});
cooldown.reset();
}
Err(error) => {
log::error!("Error when accepting incoming TCP connection: {}", error);

statsd.accept_error();

// If the process runs out of file descriptors, it will fail to accept a socket.
// But that socket will also remain in the queue, so it will fail again immediately.
// This will busy loop consuming the CPU and filling any logs. To prevent this,
Expand Down
2 changes: 1 addition & 1 deletion tcp2udp.service
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ LimitNOFILE=16384
# Uncomment this to have the logs not contain the IPs of the peers using this service
#Environment=REDACT_LOGS=1
Environment=RUST_LOG=debug
ExecStart=/usr/local/bin/tcp2udp --threads=2 --tcp-listen 0.0.0.0:443 --udp-bind=127.0.0.1 --udp-forward 127.0.0.1:51820 --tcp-recv-timeout=130 --nodelay
ExecStart=/usr/local/bin/tcp2udp --threads=2 --statsd-host 127.0.0.1:8125 --tcp-listen 0.0.0.0:443 --udp-bind=127.0.0.1 --udp-forward 127.0.0.1:51820 --tcp-recv-timeout=130 --nodelay

Restart=always
RestartSec=2
Expand Down
Loading