Skip to content

Commit

Permalink
Add a time reference for converting hrtime to UTC
Browse files Browse the repository at this point in the history
- Adds a `TimeReference` type for consistently converting from hrtime to
  UTC, using the mapping from a single point in time. This is stored in
  each `SampledKstat` now, and passed to `KstatTarget::to_samples()`.
  The implementer can use this to consistently derive a start time for
  their kstats now, though this isn't (can't be?) consistent if you
  remove and re-add a target.
- Add some quick sanity tests for `TimeReference`, more tests verifying
  start time consistency
- Cleanup unused code
  • Loading branch information
bnaecker committed Nov 6, 2023
1 parent 430df98 commit 31ce9ea
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 77 deletions.
63 changes: 55 additions & 8 deletions oximeter/instruments/src/kstat/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

//! Report metrics about Ethernet data links on the host system
use crate::kstat::kstat_snapshot_time;
use crate::kstat::ConvertNamedData;
use crate::kstat::Error;
use crate::kstat::KstatList;
use crate::kstat::KstatTarget;
use crate::kstat::TimeReference;
use chrono::DateTime;
use chrono::Utc;
use kstat_rs::Data;
Expand All @@ -21,7 +21,7 @@ use oximeter::Target;
use uuid::Uuid;

/// Information about a single physical Ethernet link on a host.
#[derive(Debug, Target)]
#[derive(Clone, Debug, Target)]
pub struct PhysicalDataLink {
/// The ID of the rack (cluster) containing this host.
pub rack_id: Uuid,
Expand All @@ -39,7 +39,7 @@ pub struct PhysicalDataLink {
///
/// Note that this is specifically for a VNIC in on the host system, not a guest
/// data link.
#[derive(Debug, Target)]
#[derive(Clone, Debug, Target)]
pub struct VirtualDataLink {
/// The ID of the rack (cluster) containing this host.
pub rack_id: Uuid,
Expand All @@ -54,7 +54,7 @@ pub struct VirtualDataLink {
}

/// Information about a guest virtual Ethernet link.
#[derive(Debug, Target)]
#[derive(Clone, Debug, Target)]
pub struct GuestDataLink {
/// The ID of the rack (cluster) containing this host.
pub rack_id: Uuid,
Expand Down Expand Up @@ -207,17 +207,19 @@ where

fn to_samples(
&self,
time_ref: &TimeReference,
kstats: KstatList<'_, '_>,
) -> Result<Vec<Sample>, Error> {
let (creation_time, kstat, data) = kstats.first().unwrap();
let snapshot_time = kstat_snapshot_time(kstat)?;
let (kstat, data) = kstats.first().unwrap();
let creation_time = time_ref.to_utc(kstat.ks_crtime)?;
let snapshot_time = time_ref.to_utc(kstat.ks_snaptime)?;
let Data::Named(named) = data else {
return Err(Error::ExpectedNamedKstat);
};
named
.iter()
.filter_map(|nd| {
extract_link_kstats(self, nd, *creation_time, snapshot_time)
extract_link_kstats(self, nd, creation_time, snapshot_time)
})
.collect()
}
Expand Down Expand Up @@ -334,8 +336,9 @@ mod tests {
.filter(Some("link"), Some(0), Some(dl.link_name.as_str()))
.next()
.unwrap();
let time_ref = TimeReference::new();
let data = ctl.read(&mut kstat).unwrap();
let samples = dl.to_samples(&[(Utc::now(), kstat, data)]).unwrap();
let samples = dl.to_samples(&time_ref, &[(kstat, data)]).unwrap();
println!("{samples:#?}");
}

Expand Down Expand Up @@ -506,4 +509,48 @@ mod tests {
.collect();
assert_eq!(expiration_samples.len(), 1);
}

// A sanity check that a cumulative start time does not change over time,
// since we've fixed the time reference at the time it was added.
#[tokio::test]
async fn test_kstat_start_time_is_equal() {
let log = test_logger();
let mut sampler = KstatSampler::new(&log).unwrap();
let sn = String::from("BRM000001");
let link = TestEtherstub::new();
info!(log, "created test etherstub"; "name" => &link.name);
let dl = PhysicalDataLink {
rack_id: RACK_ID,
sled_id: SLED_ID,
serial: sn.clone(),
hostname: sn,
link_name: link.name.to_string(),
};
let collection_interval = Duration::from_secs(1);
let expiry = Duration::from_secs(1);
let details = CollectionDetails::duration(collection_interval, expiry);
let id = sampler.add_target(dl, details).await.unwrap();
info!(log, "target added"; "id" => ?id);
assert!(matches!(
sampler.target_status(id).await.unwrap(),
TargetStatus::Ok { .. },
));
tokio::time::pause();
let now = Instant::now();
while now.elapsed() < (expiry * 10) {
tokio::time::advance(expiry).await;
}
let samples = sampler.produce().unwrap();
let mut start_times = samples
.filter(|sample| {
sample.timeseries_name.as_str().starts_with("physical")
})
.map(|sample| sample.measurement.start_time().unwrap());
let first = start_times.next().unwrap();
println!("{first}");
assert!(start_times.all(|t| {
println!("{t}");
t == first
}));
}
}
128 changes: 88 additions & 40 deletions oximeter/instruments/src/kstat/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ pub enum Error {
}

/// Type alias for a list of Kstats and their data.
pub type KstatList<'a, 'k> = &'a [(DateTime<Utc>, Kstat<'k>, Data<'k>)];
pub type KstatList<'a, 'k> = &'a [(Kstat<'k>, Data<'k>)];

/// A trait for generating oximeter samples from a kstat.
///
Expand All @@ -180,10 +180,82 @@ pub trait KstatTarget:
/// Convert from a kstat and its data to a list of samples.
fn to_samples(
&self,
time_ref: &TimeReference,
kstats: KstatList<'_, '_>,
) -> Result<Vec<Sample>, Error>;
}

/// A time reference point, for mapping between `hrtime_t`s and UTC.
///
/// Kernel statistics contain two timestamps, the creation time (`ks_crtime`)
/// and snapshot time (`ks_snaptime`). Both of these are high-resolution
/// timestamps (`hrtime_t`s`), which give the time in nanoseconds. However,
/// from `man gethrtime`:
///
/// > Time is expressed as nanoseconds since some arbitrary time in the past;
/// it is not correlated in any way to the time of day
///
/// In other words, the zero or reference time is arbitrary, and so only
/// _differences_ between `hrtime_t`s is useful.
///
/// On the other hand, `oximeter` records all timestamps in UTC. This type is
/// used to convert between the two, **at a particular point in time**. This
/// does not matter so much for gauges or computing the current sample
/// timestamp, but it is important for constructing the _start time_ of a
/// cumulative metric. Without a reliable estimate, creation times will differ
/// slightly as the mapping from high-res time and UTC changes. This can
/// complicate querying substantially.
///
/// The mapping between UTC and an hrtime can change over time. For example, as
/// updates from NTP adjust the local system time, the relationship between
/// these two will also change. High-res times are monotonic, but UTC is not
/// necessarily so.
///
/// # Usage
///
/// When a `KstatTarget` is added to the sampler, with
/// [`KstatSampler::add_target()`], a time reference is created. This is passed
/// to every call to `KstatTarget::to_samples()`. That allows the consumer to
/// reliably compute a single point in time, such as mapping the kstat creation
/// time to UTC. That mapping can be ignored for types which do not need it.
///
/// # Examples
///
/// See [`crate::kstat::link`] for examples of how this is used to reliably
/// compute a start time for a cumulative metric.
#[derive(Clone, Copy, Debug)]
pub struct TimeReference {
// The zero time in terms of `hrtime`, i.e., nanoseconds since boot.
hrtime: i64,
// The zero time in UTC.
utc: DateTime<Utc>,
}

impl TimeReference {
pub fn new() -> Self {
let utc = Utc::now();
let hrtime = unsafe { gethrtime() };
Self { hrtime, utc }
}

/// Convert from a high-res timestamp into UTC, if possible.
pub fn to_utc(&self, timestamp: i64) -> Result<DateTime<Utc>, Error> {
match self.hrtime.cmp(&timestamp) {
Ordering::Equal => Ok(self.utc),
Ordering::Less => {
let offset = u64::try_from(timestamp - self.hrtime)
.map_err(|_| Error::TimestampOverflow)?;
Ok(self.utc + Duration::from_nanos(offset))
}
Ordering::Greater => {
let offset = u64::try_from(self.hrtime - timestamp)
.map_err(|_| Error::TimestampOverflow)?;
Ok(self.utc - Duration::from_nanos(offset))
}
}
}
}

// Helper trait for converting a `NamedData` item into a specific contained data
// type, if possible.
pub(crate) trait ConvertNamedData {
Expand Down Expand Up @@ -238,51 +310,27 @@ impl<'a> ConvertNamedData for NamedData<'a> {
}
}
}

#[link(name = "c")]
extern "C" {
fn gethrtime() -> i64;
}

// Return the creation time of a kstat, in UTC.
//
// Timestamps in the kstat framework are represented in nanoseconds, using the
// `gethrtime()` function. These timestamps are monotonic and linear, but
// referenced to an arbitrary time in the past. However, `oximeter` generally
// expects timestamps in UTC. This method attempts to derive an absolute time
// for the start of a kstat.
//
// We do this by taking a timestamp for "now", in both UTC and using
// `gethrtime()`. We then compute the difference from the kstat creation time,
// in nanoseconds, and offset UTC now by that same amount.
pub(crate) fn kstat_creation_time(
kstat: &Kstat<'_>,
) -> Result<DateTime<Utc>, Error> {
kstat_timestamp_to_utc(kstat.ks_crtime)
}
#[cfg(test)]
mod tests {
use super::Duration;
use super::TimeReference;

// Return the snapshot time of a kstat, in UTC.
//
// See [`kstat_creation_time`] for details.
pub(crate) fn kstat_snapshot_time(
kstat: &Kstat<'_>,
) -> Result<DateTime<Utc>, Error> {
kstat_timestamp_to_utc(kstat.ks_snaptime)
}
#[test]
fn test_time_reference() {
let tr = TimeReference::new();

fn kstat_timestamp_to_utc(timestamp: i64) -> Result<DateTime<Utc>, Error> {
let hrtime_now = unsafe { gethrtime() };
let utc_now = Utc::now();
match hrtime_now.cmp(&timestamp) {
Ordering::Equal => Ok(utc_now),
Ordering::Less => {
let offset = u64::try_from(timestamp - hrtime_now)
.map_err(|_| Error::TimestampOverflow)?;
Ok(utc_now + Duration::from_nanos(offset))
}
Ordering::Greater => {
let offset = u64::try_from(hrtime_now - timestamp)
.map_err(|_| Error::TimestampOverflow)?;
Ok(utc_now - Duration::from_nanos(offset))
}
let utc = tr.to_utc(tr.hrtime).unwrap();
assert_eq!(utc, tr.utc);

let offset = Duration::from_secs(1);
let t0 = tr.to_utc(0).unwrap();
let t1 = tr.to_utc(offset.as_nanos() as _).unwrap();
assert_eq!(t0 + offset, t1);
}
}
48 changes: 19 additions & 29 deletions oximeter/instruments/src/kstat/sampler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

//! Generate oximeter samples from kernel statistics.
use crate::kstat::kstat_creation_time;
use crate::kstat::Error;
use crate::kstat::Expiration;
use crate::kstat::ExpirationReason;
use crate::kstat::KstatTarget;
use chrono::DateTime;
use chrono::Utc;
use crate::kstat::TimeReference;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use kstat_rs::Ctl;
Expand All @@ -34,11 +34,14 @@ use tokio::sync::oneshot;
use tokio::time::sleep;
use tokio::time::Sleep;

#[cfg(test)]
use tokio::time::Instant;

use crate::kstat::Expiration;
use crate::kstat::ExpirationReason;
cfg_if::cfg_if! {
if #[cfg(test)] {
use tokio::time::Instant;
} else {
use chrono::DateTime;
use chrono::Utc;
}
}

// The `KstatSampler` generates some statistics about its own operation, mostly
// for surfacing failures to collect and dropped samples.
Expand Down Expand Up @@ -185,8 +188,8 @@ enum Request {
struct SampledKstat {
/// The target from which to collect.
target: Box<dyn KstatTarget>,
/// The time the interested kstats were created.
creation_times: Vec<DateTime<Utc>>,
/// The time reference at the time the kstat target was added.
time_ref: TimeReference,
/// The details around collection and expiration behavior.
details: CollectionDetails,
/// The time at which we _added_ this target to the sampler.
Expand Down Expand Up @@ -630,10 +633,7 @@ impl KstatSamplerWorker {
let kstats = ctl
.iter()
.filter(|kstat| sampled_kstat.target.interested(kstat))
.zip(sampled_kstat.creation_times.iter())
.map(|(mut kstat, creation_time)| {
ctl.read(&mut kstat).map(|d| (*creation_time, kstat, d))
})
.map(|mut kstat| ctl.read(&mut kstat).map(|d| (kstat, d)))
.collect::<Result<Vec<_>, _>>();
match kstats {
Ok(k) => {
Expand All @@ -645,7 +645,10 @@ impl KstatSamplerWorker {
}
}
sampled_kstat.attempts_since_last_collection = 0;
sampled_kstat.target.to_samples(&k).map(Option::Some)
sampled_kstat
.target
.to_samples(&sampled_kstat.time_ref, &k)
.map(Option::Some)
}
Err(e) => {
sampled_kstat.attempts_since_last_collection += 1;
Expand Down Expand Up @@ -821,19 +824,6 @@ impl KstatSamplerWorker {
}
None => {}
}
let creation_times = self
.ctl
.as_ref()
.unwrap()
.iter()
.filter_map(|kstat| {
if target.interested(&kstat) {
Some(kstat_creation_time(&kstat))
} else {
None
}
})
.collect::<Result<_, _>>()?;
cfg_if::cfg_if! {
if #[cfg(test)] {
let time_added = Instant::now();
Expand All @@ -844,7 +834,7 @@ impl KstatSamplerWorker {
let item = SampledKstat {
target,
details,
creation_times,
time_ref: TimeReference::new(),
time_added,
time_of_last_collection: None,
attempts_since_last_collection: 0,
Expand Down

0 comments on commit 31ce9ea

Please sign in to comment.