Skip to content

Commit

Permalink
chore: udpate docs for the telemetry crate (#25431)
Browse files Browse the repository at this point in the history
* chore: udpate docs for the telemetry crate

* chore: Update influxdb3_telemetry/src/stats.rs

Co-authored-by: Michael Gattozzi <[email protected]>

* chore: Update influxdb3_telemetry/src/sender.rs

Co-authored-by: Michael Gattozzi <[email protected]>

---------

Co-authored-by: Michael Gattozzi <[email protected]>
  • Loading branch information
praveen-influx and mgattozzi authored Oct 8, 2024
1 parent 42672e0 commit bd20f80
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 6 deletions.
4 changes: 4 additions & 0 deletions influxdb3_telemetry/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ use observability_deps::tracing::debug;

use crate::stats::Stats;

/// This bucket holds all the event metrics like reads/writes. As
/// new read or write comes in we update the stats for them. Then once
/// a minute when a sample is taken these metrics are reset to collect
/// the events again till next sample is taken.
#[derive(Debug, Default)]
pub(crate) struct EventsBucket {
pub writes: PerMinuteWrites,
Expand Down
2 changes: 2 additions & 0 deletions influxdb3_telemetry/src/sampler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ impl CpuAndMemorySampler {
Self { system }
}

/// This method picks the memory and cpu usage for this process using the
/// pid.
pub fn get_cpu_and_mem_used(&mut self) -> Result<(f32, u64)> {
let pid = sysinfo::get_current_pid().map_err(TelemetryError::CannotGetPid)?;
self.system.refresh_pids_specifics(
Expand Down
4 changes: 4 additions & 0 deletions influxdb3_telemetry/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ impl TelemetrySender {
}
}

/// This is the actual payload that is sent to the telemetry
/// server
#[derive(Serialize, Debug)]
pub(crate) struct TelemetryPayload {
pub os: Arc<str>,
Expand Down Expand Up @@ -68,6 +70,8 @@ pub(crate) struct TelemetryPayload {
pub parquet_row_count: u64,
}

/// This function runs in the background and if any call fails
/// there is no retrying mechanism and it is ok to lose a few samples
pub(crate) async fn send_telemetry_in_background(
store: Arc<TelemetryStore>,
duration_secs: Duration,
Expand Down
42 changes: 42 additions & 0 deletions influxdb3_telemetry/src/stats.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
use num::{Num, NumCast};

/// This type is responsible for calculating stats in a rolling fashion.
/// By rolling, it means that there is already some stats calculated
/// which needs to be further aggregated. This is commonly the case when
/// the sampling is done at a higher precision interval (say 1 minute) and
/// then further aggregated (say 1 hour).
///
/// For example the number of lines written per hour is collected as new
/// write requests come in. However, the bucket [`crate::bucket::EventsBucket`]
/// holds `lines` as [`crate::stats::Stats<u64>`], to hold min/max/avg lines
/// written per minute. Then when taking samples per minute to calculate
/// hourly aggregates, [`RollingStats<T>`] is used. To see how it is calculated
/// see the [`RollingStats<T>::update`] method
#[derive(Debug, Default)]
pub(crate) struct RollingStats<T> {
pub min: T,
Expand All @@ -13,6 +25,12 @@ impl<T: Default + Num + Copy + NumCast + PartialOrd> RollingStats<T> {
RollingStats::default()
}

/// Update the rolling stats [`Self::min`]/[`Self::max`]/[`Self::avg`] using
/// reference to an higher precision stats that is passed in. This is usually a
/// per minute interval stats. One thing to note here is the [`Self::num_samples`]
/// is updated locally here to calculate the rolling average for usually
/// an hour for a metric. Refer to [`crate::metrics::Writes`] or
/// [`crate::metrics::Queries`] to see how it is used
pub fn update(&mut self, higher_precision_stats: &Stats<T>) -> Option<()> {
if self.num_samples == 0 {
self.min = higher_precision_stats.min;
Expand Down Expand Up @@ -41,6 +59,8 @@ impl<T: Default + Num + Copy + NumCast + PartialOrd> RollingStats<T> {
}
}

/// This is basic stats to keep a tab on min/max/avg for a specific
/// metric
#[derive(Debug, Default)]
pub(crate) struct Stats<T> {
pub min: T,
Expand All @@ -54,6 +74,8 @@ impl<T: Default + Num + Copy + NumCast + PartialOrd> Stats<T> {
Stats::default()
}

/// Update the [`Self::min`]/[`Self::max`]/[`Self::avg`] from a
/// new value that is sampled.
pub fn update(&mut self, new_val: T) -> Option<()> {
if self.num_samples == 0 {
self.min = new_val;
Expand All @@ -75,6 +97,22 @@ impl<T: Default + Num + Copy + NumCast + PartialOrd> Stats<T> {
}
}

/// Generic function to calculate min/max/avg from another set of stats.
/// This function works for all types of numbers (unsigned/signed/floats).
/// It calculates min/max/avg by using already calculated min/max/avg for
/// possibly a higher resolution.
///
/// For eg.
///
/// Let's say we're looking at the stats for number of lines written.
/// And we have 1st sample's minimum was 20 and the 3rd sample's
/// minimum was 10. This means in the 1st sample for a whole minute
/// 20 was the minimum number of lines written in a single request and in
/// the 3rd sample (3rd minute) 10 is the minimum number of lines written
/// in a single request. These are already stats at per minute interval, when we
/// calculate the minimum number of lines for the whole hour we compare the samples
/// taken at per minute interval for whole hour. In this case 10 will be the new
/// minimum for the whole hour.
fn rollup_stats<T: Num + Copy + NumCast + PartialOrd>(
current_min: T,
current_max: T,
Expand All @@ -91,6 +129,10 @@ fn rollup_stats<T: Num + Copy + NumCast + PartialOrd>(
Some((min, max, avg))
}

/// Generic function to calculate min/max/avg from a new sampled value.
/// This function works for all types of numbers (unsigned/signed/floats).
/// One thing to note here is the average function, it is an incremental average
/// to avoid holding all the samples in memory.
fn stats<T: Num + Copy + NumCast + PartialOrd>(
current_min: T,
current_max: T,
Expand Down
18 changes: 12 additions & 6 deletions influxdb3_telemetry/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,18 @@ use crate::{
sender::{send_telemetry_in_background, TelemetryPayload},
};

/// This store is responsible for holding all the stats which
/// will be sent in the background to the server.
/// This store is responsible for holding all the stats which will be sent in the background
/// to the server. There are primarily 4 different types of data held in the store:
/// - static info (like instance ids, OS etc): These are passed in to create telemetry store.
/// - hourly samples (like parquet file metrics): These are sampled at the point of creating
/// payload before sending to the server.
/// - rates (cpu/mem): These are sampled every minute but these are regular time
/// series data. These metrics are backed by [`crate::stats::Stats<T>`] type.
/// - events (reads/writes): These are just raw events and in order to convert it into a
/// time series, it is collected in a bucket first and then sampled at per minute interval.
/// These metrics are usually backed by [`crate::stats::RollingStats<T>`] type.
/// There are couple of metrics like number of writes/reads that is backed by just
/// [`crate::stats::Stats<T>`] type as they are just counters for per minute
#[derive(Debug)]
pub struct TelemetryStore {
inner: parking_lot::Mutex<TelemetryStoreInner>,
Expand Down Expand Up @@ -151,14 +161,10 @@ impl TelemetryStoreInner {
influx_version,
storage_type,
cores,
// cpu
cpu: Cpu::default(),
// mem
memory: Memory::default(),
per_minute_events_bucket: EventsBucket::new(),
// writes
writes: Writes::default(),
// reads
reads: Queries::default(),
}
}
Expand Down

0 comments on commit bd20f80

Please sign in to comment.