Skip to content

Commit

Permalink
adjust payload size limit per transport + change default flush interv…
Browse files Browse the repository at this point in the history
…al per agg mode
  • Loading branch information
tobz committed Jan 19, 2025
1 parent 1ce169e commit 587fccc
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 19 deletions.
129 changes: 110 additions & 19 deletions metrics-exporter-dogstatsd/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{net::SocketAddr, sync::Arc, time::Duration};
use std::{fmt, net::SocketAddr, sync::Arc, time::Duration};

use thiserror::Error;
use tracing::debug;

use crate::{
forwarder::{self, ForwarderConfiguration, RemoteAddr},
Expand All @@ -10,13 +11,13 @@ use crate::{

// Maximum data length for a UDP datagram.
//
// Realistically, users should basically never send payloads anywhere _near_ this large, but we're only trying to ensure
// we're not about to do anything that we _know_ is technically invalid.
// Realistically, users should never send payloads anywhere _near_ this large, but we're only trying to ensure we're not
// about to do anything that we _know_ is technically invalid.
const UDP_DATAGRAM_MAX_PAYLOAD_LEN: usize = (u16::MAX as usize) - 8;

const DEFAULT_WRITE_TIMEOUT: Duration = Duration::from_secs(1);
const DEFAULT_MAX_PAYLOAD_LEN: usize = 8192;
const DEFAULT_FLUSH_INTERVAL: Duration = Duration::from_secs(3);
const DEFAULT_FLUSH_INTERVAL_CONSERVATIVE: Duration = Duration::from_secs(3);
const DEFAULT_FLUSH_INTERVAL_AGGRESSIVE: Duration = Duration::from_secs(10);
const DEFAULT_HISTOGRAM_RESERVOIR_SIZE: usize = 1024;

/// Errors that could occur while building or installing a DogStatsD recorder/exporter.
Expand Down Expand Up @@ -65,13 +66,31 @@ pub enum AggregationMode {
Aggressive,
}

impl AggregationMode {
fn default_flush_interval(&self) -> Duration {
match self {
AggregationMode::Conservative => DEFAULT_FLUSH_INTERVAL_CONSERVATIVE,
AggregationMode::Aggressive => DEFAULT_FLUSH_INTERVAL_AGGRESSIVE,
}
}
}

impl fmt::Display for AggregationMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AggregationMode::Conservative => write!(f, "conservative"),
AggregationMode::Aggressive => write!(f, "aggressive"),
}
}
}

/// Builder for a DogStatsD exporter.
#[derive(Debug)]
pub struct DogStatsDBuilder {
remote_addr: RemoteAddr,
write_timeout: Duration,
max_payload_len: usize,
flush_interval: Duration,
max_payload_len: Option<usize>,
flush_interval: Option<Duration>,
synchronous: bool,
agg_mode: AggregationMode,
telemetry: bool,
Expand All @@ -81,20 +100,30 @@ pub struct DogStatsDBuilder {
}

impl DogStatsDBuilder {
fn get_max_payload_len(&self) -> usize {
self.max_payload_len.unwrap_or_else(|| self.remote_addr.default_max_payload_len())
}

fn get_flush_interval(&self) -> Duration {
self.flush_interval.unwrap_or_else(|| self.agg_mode.default_flush_interval())
}

fn validate_max_payload_len(&self) -> Result<(), BuildError> {
let max_payload_len = self.get_max_payload_len();

if let RemoteAddr::Udp(_) = &self.remote_addr {
if self.max_payload_len > UDP_DATAGRAM_MAX_PAYLOAD_LEN {
if max_payload_len > UDP_DATAGRAM_MAX_PAYLOAD_LEN {
return Err(BuildError::InvalidConfiguration {
reason: format!("maximum payload length ({} bytes) exceeds UDP datagram maximum length ({} bytes)", self.max_payload_len, UDP_DATAGRAM_MAX_PAYLOAD_LEN),
reason: format!("maximum payload length ({} bytes) exceeds UDP datagram maximum length ({} bytes)", max_payload_len, UDP_DATAGRAM_MAX_PAYLOAD_LEN),
});
}
}

if self.max_payload_len > u32::MAX as usize {
if max_payload_len > u32::MAX as usize {
return Err(BuildError::InvalidConfiguration {
reason: format!(
"maximum payload length ({} bytes) exceeds theoretical upper bound ({} bytes)",
self.max_payload_len,
max_payload_len,
u32::MAX
),
});
Expand Down Expand Up @@ -146,7 +175,7 @@ impl DogStatsDBuilder {
/// Setting a higher value is likely to lead to invalid metric payloads that are discarded by the Datadog Agent when
/// received.
///
/// Defaults to 8192 bytes.
/// Defaults to 1432 bytes for UDP, and 8192 bytes for Unix domain sockets.
///
/// # Errors
///
Expand All @@ -155,7 +184,7 @@ impl DogStatsDBuilder {
mut self,
max_payload_len: usize,
) -> Result<Self, BuildError> {
self.max_payload_len = max_payload_len;
self.max_payload_len = Some(max_payload_len);
self.validate_max_payload_len()?;

Ok(self)
Expand Down Expand Up @@ -193,10 +222,10 @@ impl DogStatsDBuilder {
/// aggregation. A shorter interval will provide more frequent updates to the remote server, but will result in more
/// network traffic and processing overhead.
///
/// Defaults to 3 seconds.
/// Defaults to 3 seconds in conservative mode, and 10 seconds in aggressive mode.
#[must_use]
pub fn with_flush_interval(mut self, flush_interval: Duration) -> Self {
self.flush_interval = flush_interval;
self.flush_interval = Some(flush_interval);
self
}

Expand Down Expand Up @@ -276,25 +305,45 @@ impl DogStatsDBuilder {
pub fn build(self) -> Result<DogStatsDRecorder, BuildError> {
self.validate_max_payload_len()?;

let max_payload_len = self.get_max_payload_len();
let flush_interval = self.get_flush_interval();

debug!(
agg_mode = %self.agg_mode,
histogram_sampling = self.histogram_sampling,
histogram_reservoir_size = self.histogram_reservoir_size,
histograms_as_distributions = self.histograms_as_distributions,
"Building DogStatsD exporter."
);
let state_config = StateConfiguration {
agg_mode: self.agg_mode,
telemetry: self.telemetry,
histogram_sampling: self.histogram_sampling,
histogram_reservoir_size: self.histogram_reservoir_size,
histograms_as_distributions: self.histograms_as_distributions,
};

let state = Arc::new(State::new(state_config));

let recorder = DogStatsDRecorder::new(Arc::clone(&state));

debug!(
remote_addr = %self.remote_addr,
max_payload_len,
?flush_interval,
write_timeout = ?self.write_timeout,
"Building DogStatsD forwarder."
);
let forwarder_config = ForwarderConfiguration {
remote_addr: self.remote_addr,
max_payload_len: self.max_payload_len,
flush_interval: self.flush_interval,
max_payload_len,
flush_interval,
write_timeout: self.write_timeout,
};

if self.synchronous {
debug!("Spawning synchronous forwarder backend.");

let forwarder = forwarder::sync::Forwarder::new(forwarder_config, state);

std::thread::Builder::new()
Expand Down Expand Up @@ -330,8 +379,8 @@ impl Default for DogStatsDBuilder {
DogStatsDBuilder {
remote_addr: RemoteAddr::Udp(vec![SocketAddr::from(([127, 0, 0, 1], 8125))]),
write_timeout: DEFAULT_WRITE_TIMEOUT,
max_payload_len: DEFAULT_MAX_PAYLOAD_LEN,
flush_interval: DEFAULT_FLUSH_INTERVAL,
max_payload_len: None,
flush_interval: None,
synchronous: true,
agg_mode: AggregationMode::Conservative,
telemetry: true,
Expand All @@ -346,6 +395,31 @@ impl Default for DogStatsDBuilder {
mod tests {
use super::*;

#[test]
fn default_flush_interval_agg_mode() {
let builder =
DogStatsDBuilder::default().with_aggregation_mode(AggregationMode::Conservative);
assert_eq!(builder.get_flush_interval(), DEFAULT_FLUSH_INTERVAL_CONSERVATIVE);

let builder =
DogStatsDBuilder::default().with_aggregation_mode(AggregationMode::Aggressive);
assert_eq!(builder.get_flush_interval(), DEFAULT_FLUSH_INTERVAL_AGGRESSIVE);

let custom_flush_interval = Duration::from_millis(123456789);
let builder = DogStatsDBuilder::default().with_flush_interval(custom_flush_interval);
assert_eq!(builder.get_flush_interval(), custom_flush_interval);
}

#[test]
fn default_max_payload_len_udp() {
let builder = DogStatsDBuilder::default()
.with_remote_address("127.0.0.1:9999")
.expect("address should be valid");

assert_eq!(builder.get_max_payload_len(), 1432);
assert!(builder.build().is_ok());
}

#[test]
fn max_payload_len_exceeds_udp_max_len() {
let builder =
Expand All @@ -367,6 +441,23 @@ mod tests {
mod linux {
use super::*;

#[test]
fn default_max_payload_len_uds() {
let builder = DogStatsDBuilder::default()
.with_remote_address("unix:///tmp/dogstatsd.sock")
.expect("address should be valid");

assert_eq!(builder.get_max_payload_len(), 8192);
assert!(builder.build().is_ok());

let builder = DogStatsDBuilder::default()
.with_remote_address("unixgram:///tmp/dogstatsd.sock")
.expect("address should be valid");

assert_eq!(builder.get_max_payload_len(), 8192);
assert!(builder.build().is_ok());
}

#[test]
fn max_payload_len_exceeds_udp_max_len_transport_change() {
let builder = DogStatsDBuilder::default()
Expand Down
36 changes: 36 additions & 0 deletions metrics-exporter-dogstatsd/src/forwarder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[cfg(target_os = "linux")]
use std::path::PathBuf;
use std::{
fmt,
net::{SocketAddr, ToSocketAddrs as _},
time::Duration,
};
Expand Down Expand Up @@ -32,6 +33,14 @@ impl RemoteAddr {
RemoteAddr::Unixgram(_) => "uds",
}
}

pub(crate) fn default_max_payload_len(&self) -> usize {
match self {
RemoteAddr::Udp(_) => 1432,
#[cfg(target_os = "linux")]
RemoteAddr::Unix(_) | RemoteAddr::Unixgram(_) => 8192,
}
}
}

impl<'a> TryFrom<&'a str> for RemoteAddr {
Expand Down Expand Up @@ -61,6 +70,33 @@ impl<'a> TryFrom<&'a str> for RemoteAddr {
}
}

impl fmt::Display for RemoteAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RemoteAddr::Udp(addrs) => {
if addrs.len() == 1 {
write!(f, "udp://{}", addrs[0])
} else {
write!(f, "udp://[")?;

for (idx, addr) in addrs.iter().enumerate() {
if idx == 0 {
write!(f, "{}", addr)?;
} else {
write!(f, ",{}", addr)?;
}
}
write!(f, "]")
}
}
#[cfg(target_os = "linux")]
RemoteAddr::Unix(path) | RemoteAddr::Unixgram(path) => {
write!(f, "unixgram://{}", path.display())
}
}
}
}

fn unknown_scheme_error_str(scheme: &str) -> String {
format!("invalid scheme '{scheme}' (expected 'udp', 'unix', or 'unixgram')")
}
Expand Down

0 comments on commit 587fccc

Please sign in to comment.