Skip to content

Commit

Permalink
Further improvements around kstat creation times
Browse files Browse the repository at this point in the history
- Update the kstat chain more frequently, including when targets are
  added and sampled.
- Handle situation when a target signals interest in zero kstats, and
  clarify documentation. This is actually what happens when a kstat
  itself disappears and we then update the chain (previously, it was an
  error because the sampling method did _not_ update the chain). Treat
  this like an error, and increment the expiration counters. Make clear
  in the documentation that this situation is included in those
  counters.
- Add a per-kstat (not target) mapping that stores the creation times.
  These are included whenever we add a target, and also at sampling
  time, if needed. This lets us track the creation time in UTC reliably,
  while also providing it accurately to the `KstatTarget::to_samples()`
  method. These are removed only when the kstat itself goes away, which
  we check for periodically in the main `run()` loop, to avoid keeping
  them around forever.
  • Loading branch information
bnaecker committed Nov 7, 2023
1 parent 4a5f6e4 commit 3ac753a
Show file tree
Hide file tree
Showing 3 changed files with 367 additions and 133 deletions.
115 changes: 106 additions & 9 deletions oximeter/instruments/src/kstat/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

//! Report metrics about Ethernet data links on the host system
use crate::kstat::hrtime_to_utc;
use crate::kstat::ConvertNamedData;
use crate::kstat::Error;
use crate::kstat::KstatList;
use crate::kstat::KstatTarget;
use crate::kstat::TimeReference;
use chrono::DateTime;
use chrono::Utc;
use kstat_rs::Data;
Expand Down Expand Up @@ -207,19 +207,19 @@ where

fn to_samples(
&self,
time_ref: &TimeReference,
kstats: KstatList<'_, '_>,
) -> Result<Vec<Sample>, Error> {
let (kstat, data) = kstats.first().unwrap();
let creation_time = time_ref.to_utc(kstat.ks_crtime)?;
let snapshot_time = time_ref.to_utc(kstat.ks_snaptime)?;
let Some((creation_time, kstat, data)) = kstats.first() else {
return Ok(vec![]);
};
let snapshot_time = hrtime_to_utc(kstat.ks_snaptime)?;
let Data::Named(named) = data else {
return Err(Error::ExpectedNamedKstat);
};
named
.iter()
.filter_map(|nd| {
extract_link_kstats(self, nd, creation_time, snapshot_time)
extract_link_kstats(self, nd, *creation_time, snapshot_time)
})
.collect()
}
Expand All @@ -228,6 +228,8 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::kstat::sampler::KstatPath;
use crate::kstat::sampler::CREATION_TIME_PRUNE_INTERVAL;
use crate::kstat::CollectionDetails;
use crate::kstat::KstatSampler;
use crate::kstat::TargetStatus;
Expand Down Expand Up @@ -336,9 +338,9 @@ mod tests {
.filter(Some("link"), Some(0), Some(dl.link_name.as_str()))
.next()
.unwrap();
let time_ref = TimeReference::new();
let creation_time = hrtime_to_utc(kstat.ks_crtime).unwrap();
let data = ctl.read(&mut kstat).unwrap();
let samples = dl.to_samples(&time_ref, &[(kstat, data)]).unwrap();
let samples = dl.to_samples(&[(creation_time, kstat, data)]).unwrap();
println!("{samples:#?}");
}

Expand Down Expand Up @@ -483,7 +485,7 @@ mod tests {
drop(link);
info!(log, "dropped test etherstub");

// Pause time, and advance until we shoudl have expired the target.
// Pause time, and advance until we should have expired the target.
tokio::time::pause();
const MAX_DURATION: Duration = Duration::from_secs(3);
let now = Instant::now();
Expand Down Expand Up @@ -553,4 +555,99 @@ mod tests {
t == first
}));
}

#[tokio::test]
async fn test_prune_creation_times_when_kstat_is_gone() {
// Create a VNIC, which we'll start tracking from, then delete it and
// make sure the creation times are pruned.
let log = test_logger();
let sampler = KstatSampler::new(&log).unwrap();
let sn = String::from("BRM000001");
let link = TestEtherstub::new();
let path = KstatPath {
module: "link".to_string(),
instance: 0,
name: link.name.clone(),
};
info!(log, "created test etherstub"; "name" => &link.name);
let dl = PhysicalDataLink {
rack_id: RACK_ID,
sled_id: SLED_ID,
serial: sn.clone(),
hostname: sn,
link_name: link.name.to_string(),
};
let collection_interval = Duration::from_secs(1);
let expiry = Duration::from_secs(1);
let details = CollectionDetails::duration(collection_interval, expiry);
let id = sampler.add_target(dl, details).await.unwrap();
info!(log, "target added"; "id" => ?id);
assert!(matches!(
sampler.target_status(id).await.unwrap(),
TargetStatus::Ok { .. },
));

// Delete the link right away.
drop(link);
info!(log, "dropped test etherstub");

// Advance time through the prune interval.
tokio::time::pause();
let now = Instant::now();
while now.elapsed() < CREATION_TIME_PRUNE_INTERVAL + expiry {
tokio::time::advance(expiry).await;
}

// Now check that the creation times are pruned.
let times = sampler.creation_times().await;
assert!(!times.contains_key(&path));
}

#[tokio::test]
async fn test_prune_creation_times_when_target_is_removed() {
// Create a VNIC, which we'll start tracking from, then delete it and
// make sure the creation times are pruned.
let log = test_logger();
let sampler = KstatSampler::new(&log).unwrap();
let sn = String::from("BRM000001");
let link = TestEtherstub::new();
let path = KstatPath {
module: "link".to_string(),
instance: 0,
name: link.name.clone(),
};
info!(log, "created test etherstub"; "name" => &link.name);
let dl = PhysicalDataLink {
rack_id: RACK_ID,
sled_id: SLED_ID,
serial: sn.clone(),
hostname: sn,
link_name: link.name.to_string(),
};
let collection_interval = Duration::from_secs(1);
let expiry = Duration::from_secs(1);
let details = CollectionDetails::duration(collection_interval, expiry);
let id = sampler.add_target(dl, details).await.unwrap();
info!(log, "target added"; "id" => ?id);
assert!(matches!(
sampler.target_status(id).await.unwrap(),
TargetStatus::Ok { .. },
));

// Remove the target, but do not drop the link. This will mean that the
// underlying kstat is still around, even though there's no target
// that's interested in it. We should keep it, in this case.
sampler.remove_target(id).await.unwrap();

// Advance time through the prune interval.
tokio::time::pause();
let now = Instant::now();
while now.elapsed() < CREATION_TIME_PRUNE_INTERVAL + expiry {
tokio::time::advance(expiry).await;
}

// Now check that the creation time is still around.
let times = sampler.creation_times().await;
assert!(times.contains_key(&path));
}
}
115 changes: 23 additions & 92 deletions oximeter/instruments/src/kstat/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@
//!
//! When users register a target for sampling, they are required to include
//! details about how often their target should be sampled, and what to do if we
//! cannot produce samples due to an error. These two are captured in the
//! cannot produce samples due to an error, or if there are _no kstats_ that the
//! target is interested in. These details are captured in the
//! [`CollectionDetails`] type.
//!
//! After a configurable period of errors (expressed in either consecutive error
Expand Down Expand Up @@ -110,7 +111,7 @@ pub struct Expiration {
/// The reason for expiration.
pub reason: ExpirationReason,
/// The last error before expiration.
pub error: KstatError,
pub error: Box<Error>,
/// The time at which the expiration occurred.
#[cfg(test)]
pub expired_at: tokio::time::Instant,
Expand Down Expand Up @@ -161,11 +162,13 @@ pub enum Error {
Expired(Expiration),

#[error("Expired after unsucessfull collections for {duration:?}")]
ExpiredAfterDuration { duration: Duration, error: KstatError },
ExpiredAfterDuration { duration: Duration, error: Box<Error> },
}

/// Type alias for a list of Kstats and their data.
pub type KstatList<'a, 'k> = &'a [(Kstat<'k>, Data<'k>)];
/// Type alias for a list of kstats.
///
/// This includes the kstat's creation time, the kstat itself, and its data.
pub type KstatList<'a, 'k> = &'a [(DateTime<Utc>, Kstat<'k>, Data<'k>)];

/// A trait for generating oximeter samples from a kstat.
///
Expand All @@ -180,78 +183,25 @@ pub trait KstatTarget:
/// Convert from a kstat and its data to a list of samples.
fn to_samples(
&self,
time_ref: &TimeReference,
kstats: KstatList<'_, '_>,
) -> Result<Vec<Sample>, Error>;
}

/// A time reference point, for mapping between `hrtime_t`s and UTC.
///
/// Kernel statistics contain two timestamps, the creation time (`ks_crtime`)
/// and snapshot time (`ks_snaptime`). Both of these are high-resolution
/// timestamps (`hrtime_t`s`), which give the time in nanoseconds. However,
/// from `man gethrtime`:
///
/// > Time is expressed as nanoseconds since some arbitrary time in the past;
/// it is not correlated in any way to the time of day
///
/// In other words, the zero or reference time is arbitrary, and so only
/// _differences_ between `hrtime_t`s is useful.
///
/// On the other hand, `oximeter` records all timestamps in UTC. This type is
/// used to convert between the two, **at a particular point in time**. This
/// does not matter so much for gauges or computing the current sample
/// timestamp, but it is important for constructing the _start time_ of a
/// cumulative metric. Without a reliable estimate, creation times will differ
/// slightly as the mapping from high-res time and UTC changes. This can
/// complicate querying substantially.
///
/// The mapping between UTC and an hrtime can change over time. For example, as
/// updates from NTP adjust the local system time, the relationship between
/// these two will also change. High-res times are monotonic, but UTC is not
/// necessarily so.
///
/// # Usage
///
/// When a `KstatTarget` is added to the sampler, with
/// [`KstatSampler::add_target()`], a time reference is created. This is passed
/// to every call to `KstatTarget::to_samples()`. That allows the consumer to
/// reliably compute a single point in time, such as mapping the kstat creation
/// time to UTC. That mapping can be ignored for types which do not need it.
///
/// # Examples
///
/// See [`crate::kstat::link`] for examples of how this is used to reliably
/// compute a start time for a cumulative metric.
#[derive(Clone, Copy, Debug)]
pub struct TimeReference {
// The zero time in terms of `hrtime`, i.e., nanoseconds since boot.
hrtime: i64,
// The zero time in UTC.
utc: DateTime<Utc>,
}

impl TimeReference {
pub fn new() -> Self {
let utc = Utc::now();
let hrtime = unsafe { gethrtime() };
Self { hrtime, utc }
}

/// Convert from a high-res timestamp into UTC, if possible.
pub fn to_utc(&self, timestamp: i64) -> Result<DateTime<Utc>, Error> {
match self.hrtime.cmp(&timestamp) {
Ordering::Equal => Ok(self.utc),
Ordering::Less => {
let offset = u64::try_from(timestamp - self.hrtime)
.map_err(|_| Error::TimestampOverflow)?;
Ok(self.utc + Duration::from_nanos(offset))
}
Ordering::Greater => {
let offset = u64::try_from(self.hrtime - timestamp)
.map_err(|_| Error::TimestampOverflow)?;
Ok(self.utc - Duration::from_nanos(offset))
}
/// Convert from a high-res timestamp into UTC, if possible.
pub fn hrtime_to_utc(hrtime: i64) -> Result<DateTime<Utc>, Error> {
let utc_now = Utc::now();
let hrtime_now = unsafe { gethrtime() };
match hrtime_now.cmp(&hrtime) {
Ordering::Equal => Ok(utc_now),
Ordering::Less => {
let offset = u64::try_from(hrtime - hrtime_now)
.map_err(|_| Error::TimestampOverflow)?;
Ok(utc_now + Duration::from_nanos(offset))
}
Ordering::Greater => {
let offset = u64::try_from(hrtime_now - hrtime)
.map_err(|_| Error::TimestampOverflow)?;
Ok(utc_now - Duration::from_nanos(offset))
}
}
}
Expand Down Expand Up @@ -315,22 +265,3 @@ impl<'a> ConvertNamedData for NamedData<'a> {
extern "C" {
fn gethrtime() -> i64;
}

#[cfg(test)]
mod tests {
use super::Duration;
use super::TimeReference;

#[test]
fn test_time_reference() {
let tr = TimeReference::new();

let utc = tr.to_utc(tr.hrtime).unwrap();
assert_eq!(utc, tr.utc);

let offset = Duration::from_secs(1);
let t0 = tr.to_utc(0).unwrap();
let t1 = tr.to_utc(offset.as_nanos() as _).unwrap();
assert_eq!(t0 + offset, t1);
}
}
Loading

0 comments on commit 3ac753a

Please sign in to comment.