Skip to content

Commit

Permalink
Handle "kstat-based metrics produce samples from the 1980's" (#6589)
Browse files Browse the repository at this point in the history
Closes #5899.

Instead of checking for dates or anything based on the data sampled,
this update starts collecting kstat samples (i.e. being interested in
them) once the sled agent is synchronized with NTP.

We leverage the metrics manager associated with the agent to now look
for a new message and update the tracked links we've added/tracked thus
far.
zeeshanlakhani authored Sep 18, 2024
1 parent 96813ae commit 18b8ab3
Showing 5 changed files with 417 additions and 89 deletions.
136 changes: 112 additions & 24 deletions oximeter/instruments/src/kstat/link.rs
Original file line number Diff line number Diff line change
@@ -15,20 +15,22 @@ use kstat_rs::Data;
use kstat_rs::Kstat;
use kstat_rs::Named;
use oximeter::types::Cumulative;
use oximeter::FieldType;
use oximeter::FieldValue;
use oximeter::Sample;
use oximeter::Target;
use uuid::Uuid;

oximeter::use_timeseries!("sled-data-link.toml");
pub use self::sled_data_link::SledDataLink as SledDataLinkTarget;

/// Helper function to extract the same kstat metrics from all link targets.
fn extract_link_kstats<T>(
target: &T,
fn extract_link_kstats(
target: &SledDataLink,
named_data: &Named,
creation_time: DateTime<Utc>,
snapshot_time: DateTime<Utc>,
) -> Option<Result<Sample, Error>>
where
T: KstatTarget,
{
) -> Option<Result<Sample, Error>> {
let Named { name, value } = named_data;
if *name == "rbytes64" {
Some(value.as_u64().and_then(|x| {
@@ -83,23 +85,53 @@ where
}
}

// Helper trait for defining `KstatTarget` for all the link-based stats.
trait LinkKstatTarget: KstatTarget {
fn link_name(&self) -> &str;
#[derive(Clone, Debug)]
pub struct SledDataLink {
/// The target for this link.
pub target: SledDataLinkTarget,
/// Flag indicating whether the sled associated with this link is synced with
/// NTP.
pub time_synced: bool,
}

impl LinkKstatTarget for sled_data_link::SledDataLink {
fn link_name(&self) -> &str {
&self.link_name
impl SledDataLink {
/// Create a new `SledDataLink` with the given target and synchronization
/// flag.
pub fn new(target: SledDataLinkTarget, time_synced: bool) -> Self {
Self { target, time_synced }
}

/// Create a new `SledDataLink` with the given target .
#[cfg(test)]
pub fn unsynced(target: SledDataLinkTarget) -> Self {
Self { target, time_synced: false }
}

/// Return the name of the link.
pub fn link_name(&self) -> &str {
&self.target.link_name
}

/// Return the zone name of the link.
pub fn zone_name(&self) -> &str {
&self.target.zone_name
}

/// Return the kind of link.
pub fn kind(&self) -> &str {
&self.target.kind
}

/// Return the idenity of the sled.
pub fn sled_id(&self) -> Uuid {
self.target.sled_id
}
}

impl<T> KstatTarget for T
where
T: LinkKstatTarget,
{
impl KstatTarget for SledDataLink {
fn interested(&self, kstat: &Kstat<'_>) -> bool {
kstat.ks_module == "link"
self.time_synced
&& kstat.ks_module == "link"
&& kstat.ks_instance == 0
&& kstat.ks_name == self.link_name()
}
@@ -124,6 +156,25 @@ where
}
}

// NOTE: Delegate to the inner target type for this implementation.
impl Target for SledDataLink {
fn name(&self) -> &'static str {
self.target.name()
}

fn field_names(&self) -> &'static [&'static str] {
self.target.field_names()
}

fn field_types(&self) -> Vec<FieldType> {
self.target.field_types()
}

fn field_values(&self) -> Vec<FieldValue> {
self.target.field_values()
}
}

#[cfg(all(test, target_os = "illumos"))]
mod tests {
use super::*;
@@ -225,10 +276,40 @@ mod tests {
}
}

#[test]
fn test_kstat_interested() {
let link = TestEtherstub::new();
let target = SledDataLinkTarget {
rack_id: RACK_ID,
sled_id: SLED_ID,
sled_serial: SLED_SERIAL.into(),
link_name: link.name.clone().into(),
kind: KIND.into(),
sled_model: SLED_MODEL.into(),
sled_revision: SLED_REVISION,
zone_name: ZONE_NAME.into(),
};
// not with a synced sled (by default)
let mut dl = SledDataLink::unsynced(target);

let ctl = Ctl::new().unwrap();
let ctl = ctl.update().unwrap();
let kstat = ctl
.filter(Some("link"), Some(0), Some(link.name.as_str()))
.next()
.unwrap();

assert!(!dl.interested(&kstat));

// with a synced sled
dl.time_synced = true;
assert!(dl.interested(&kstat));
}

#[test]
fn test_sled_datalink() {
let link = TestEtherstub::new();
let dl = sled_data_link::SledDataLink {
let target = SledDataLinkTarget {
rack_id: RACK_ID,
sled_id: SLED_ID,
sled_serial: SLED_SERIAL.into(),
@@ -238,6 +319,7 @@ mod tests {
sled_revision: SLED_REVISION,
zone_name: ZONE_NAME.into(),
};
let dl = SledDataLink::new(target, true);
let ctl = Ctl::new().unwrap();
let ctl = ctl.update().unwrap();
let mut kstat = ctl
@@ -254,7 +336,7 @@ mod tests {
async fn test_kstat_sampler() {
let mut sampler = KstatSampler::new(&test_logger()).unwrap();
let link = TestEtherstub::new();
let dl = sled_data_link::SledDataLink {
let target = SledDataLinkTarget {
rack_id: RACK_ID,
sled_id: SLED_ID,
sled_serial: SLED_SERIAL.into(),
@@ -264,6 +346,7 @@ mod tests {
sled_revision: SLED_REVISION,
zone_name: ZONE_NAME.into(),
};
let dl = SledDataLink::new(target, true);
let details = CollectionDetails::never(Duration::from_secs(1));
let id = sampler.add_target(dl, details).await.unwrap();
let samples: Vec<_> = sampler.produce().unwrap().collect();
@@ -304,7 +387,7 @@ mod tests {
let mut sampler =
KstatSampler::with_sample_limit(&test_logger(), limit).unwrap();
let link = TestEtherstub::new();
let dl = sled_data_link::SledDataLink {
let target = SledDataLinkTarget {
rack_id: RACK_ID,
sled_id: SLED_ID,
sled_serial: SLED_SERIAL.into(),
@@ -314,6 +397,7 @@ mod tests {
sled_revision: SLED_REVISION,
zone_name: ZONE_NAME.into(),
};
let dl = SledDataLink::new(target, true);
let details = CollectionDetails::never(Duration::from_secs(1));
sampler.add_target(dl, details).await.unwrap();
let samples: Vec<_> = sampler.produce().unwrap().collect();
@@ -373,7 +457,7 @@ mod tests {
let mut sampler = KstatSampler::new(&log).unwrap();
let link = TestEtherstub::new();
info!(log, "created test etherstub"; "name" => &link.name);
let dl = sled_data_link::SledDataLink {
let target = SledDataLinkTarget {
rack_id: RACK_ID,
sled_id: SLED_ID,
sled_serial: SLED_SERIAL.into(),
@@ -383,6 +467,7 @@ mod tests {
sled_revision: SLED_REVISION,
zone_name: ZONE_NAME.into(),
};
let dl = SledDataLink::new(target, true);
let collection_interval = Duration::from_secs(1);
let expiry = Duration::from_secs(1);
let details = CollectionDetails::duration(collection_interval, expiry);
@@ -432,7 +517,7 @@ mod tests {
let mut sampler = KstatSampler::new(&log).unwrap();
let link = TestEtherstub::new();
info!(log, "created test etherstub"; "name" => &link.name);
let dl = sled_data_link::SledDataLink {
let target = SledDataLinkTarget {
rack_id: RACK_ID,
sled_id: SLED_ID,
sled_serial: SLED_SERIAL.into(),
@@ -442,6 +527,7 @@ mod tests {
sled_revision: SLED_REVISION,
zone_name: ZONE_NAME.into(),
};
let dl = SledDataLink::new(target, true);
let collection_interval = Duration::from_secs(1);
let expiry = Duration::from_secs(1);
let details = CollectionDetails::duration(collection_interval, expiry);
@@ -483,7 +569,7 @@ mod tests {
name: link.name.clone(),
};
info!(log, "created test etherstub"; "name" => &link.name);
let dl = sled_data_link::SledDataLink {
let target = SledDataLinkTarget {
rack_id: RACK_ID,
sled_id: SLED_ID,
sled_serial: SLED_SERIAL.into(),
@@ -493,6 +579,7 @@ mod tests {
sled_revision: SLED_REVISION,
zone_name: ZONE_NAME.into(),
};
let dl = SledDataLink::new(target, true);
let collection_interval = Duration::from_secs(1);
let expiry = Duration::from_secs(1);
let details = CollectionDetails::duration(collection_interval, expiry);
@@ -532,7 +619,7 @@ mod tests {
name: link.name.clone(),
};
info!(log, "created test etherstub"; "name" => &link.name);
let dl = sled_data_link::SledDataLink {
let target = SledDataLinkTarget {
rack_id: RACK_ID,
sled_id: SLED_ID,
sled_serial: SLED_SERIAL.into(),
@@ -542,6 +629,7 @@ mod tests {
sled_revision: SLED_REVISION,
zone_name: ZONE_NAME.into(),
};
let dl = SledDataLink::new(target, true);
let collection_interval = Duration::from_secs(1);
let expiry = Duration::from_secs(1);
let details = CollectionDetails::duration(collection_interval, expiry);
1 change: 1 addition & 0 deletions oximeter/instruments/src/kstat/mod.rs
Original file line number Diff line number Diff line change
@@ -91,6 +91,7 @@ use std::time::Duration;
pub mod link;
mod sampler;

pub use link::SledDataLink;
pub use sampler::CollectionDetails;
pub use sampler::ExpirationBehavior;
pub use sampler::KstatSampler;
118 changes: 108 additions & 10 deletions oximeter/instruments/src/kstat/sampler.rs
Original file line number Diff line number Diff line change
@@ -173,6 +173,12 @@ enum Request {
id: TargetId,
reply_tx: oneshot::Sender<Result<TargetStatus, Error>>,
},
/// Update a target.
UpdateTarget {
target: Box<dyn KstatTarget>,
details: CollectionDetails,
reply_tx: oneshot::Sender<Result<TargetId, Error>>,
},
/// Remove a target.
RemoveTarget { id: TargetId, reply_tx: oneshot::Sender<Result<(), Error>> },
/// Return the creation times of all tracked / extant kstats.
@@ -550,6 +556,45 @@ impl KstatSamplerWorker {
}
}
}
Request::UpdateTarget { target, details, reply_tx } => {
match self.update_target(target, details) {
Ok(id) => {
let timeout = YieldIdAfter::new(id, details.interval);
sample_timeouts.push(timeout);
trace!(
self.log,
"updated target with timeout";
"id" => ?id,
"details" => ?details,
);
match reply_tx.send(Ok(id)) {
Ok(_) => trace!(self.log, "sent reply"),
Err(e) => error!(
self.log,
"failed to send reply";
"id" => ?id,
"error" => ?e,
)
}
}
Err(e) => {
error!(
self.log,
"failed to update target";
"error" => ?e,
);
match reply_tx.send(Err(e)) {
Ok(_) => trace!(self.log, "sent reply"),
Err(e) => error!(
self.log,
"failed to send reply";
"error" => ?e,
)
}
}
}

}
Request::RemoveTarget { id, reply_tx } => {
self.targets.remove(&id);
if let Some(remaining_samples) = self.samples.lock().unwrap().remove(&id) {
@@ -966,6 +1011,52 @@ impl KstatSamplerWorker {
}
None => {}
}

self.insert_target(id, target, details)
}

fn update_target(
&mut self,
target: Box<dyn KstatTarget>,
details: CollectionDetails,
) -> Result<TargetId, Error> {
let id = hash_target(&*target);
match self.targets.get(&id) {
// If the target is already expired, we'll replace it with the new
// target and start sampling it again.
Some(SampledObject::Expired(e)) => {
warn!(
self.log,
"replacing expired kstat target";
"id" => ?id,
"expiration_reason" => ?e.reason,
"error" => ?e.error,
"expired_at" => ?e.expired_at,
);
}
Some(_) => {}
None => return Err(Error::NoSuchTarget),
}

self.insert_target(id, target, details)
}

fn update_chain(&mut self) -> Result<(), Error> {
let new_ctl = match self.ctl.take() {
None => Ctl::new(),
Some(old) => old.update(),
}
.map_err(Error::Kstat)?;
let _ = self.ctl.insert(new_ctl);
Ok(())
}

fn insert_target(
&mut self,
id: TargetId,
target: Box<dyn KstatTarget>,
details: CollectionDetails,
) -> Result<TargetId, Error> {
self.ensure_creation_times_for_target(&*target)?;
let item = SampledKstat {
target,
@@ -995,17 +1086,8 @@ impl KstatSamplerWorker {
"n_samples" => n,
),
}
Ok(id)
}

fn update_chain(&mut self) -> Result<(), Error> {
let new_ctl = match self.ctl.take() {
None => Ctl::new(),
Some(old) => old.update(),
}
.map_err(Error::Kstat)?;
let _ = self.ctl.insert(new_ctl);
Ok(())
Ok(id)
}
}

@@ -1094,6 +1176,22 @@ impl KstatSampler {
reply_rx.await.map_err(|_| Error::RecvError)?
}

/// Update the details for a target.
pub async fn update_target(
&self,
target: impl KstatTarget,
details: CollectionDetails,
) -> Result<TargetId, Error> {
let (reply_tx, reply_rx) = oneshot::channel();
let request = Request::UpdateTarget {
target: Box::new(target),
details,
reply_tx,
};
self.outbox.send(request).await.map_err(|_| Error::SendError)?;
reply_rx.await.map_err(|_| Error::RecvError)?
}

/// Fetch the status for a target.
///
/// If the target is being collected normally, then `TargetStatus::Ok` is
119 changes: 93 additions & 26 deletions sled-agent/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -8,7 +8,8 @@ use illumos_utils::running_zone::RunningZone;
use omicron_common::api::internal::nexus::ProducerEndpoint;
use omicron_common::api::internal::nexus::ProducerKind;
use omicron_common::api::internal::shared::SledIdentifiers;
use oximeter_instruments::kstat::link::sled_data_link::SledDataLink;
use oximeter_instruments::kstat::link::SledDataLink;
use oximeter_instruments::kstat::link::SledDataLinkTarget;
use oximeter_instruments::kstat::CollectionDetails;
use oximeter_instruments::kstat::Error as KstatError;
use oximeter_instruments::kstat::KstatSampler;
@@ -24,6 +25,8 @@ use std::time::Duration;
use tokio::sync::mpsc;
use uuid::Uuid;

type TrackedLinks = HashMap<String, Target>;

/// The interval on which we ask `oximeter` to poll us for metric data.
const METRIC_COLLECTION_INTERVAL: Duration = Duration::from_secs(30);

@@ -84,6 +87,8 @@ pub(crate) enum Message {
TrackOptePort { zone_name: String, name: String },
/// Stop tracking the named OPTE port.
UntrackOptePort { name: String },
/// Notify the task that a sled has been synced with NTP.
TimeSynced { sled_id: Uuid },
// TODO-completeness: We will probably want to track other kinds of
// statistics here too. For example, we could send messages when a zone is
// created / destroyed to track zonestats; we might also want to support
@@ -100,6 +105,22 @@ impl LinkKind {
const OPTE: &'static str = "opte";
}

struct Target {
id: TargetId,
sled_datalink: SledDataLink,
}

fn get_collection_details(kind: &str) -> CollectionDetails {
if is_transient_link(kind) {
CollectionDetails::duration(
LINK_SAMPLE_INTERVAL,
TRANSIENT_LINK_EXPIRATION_INTERVAL,
)
} else {
CollectionDetails::never(LINK_SAMPLE_INTERVAL)
}
}

/// The main task used to collect and publish sled-agent metrics.
async fn metrics_task(
sled_identifiers: SledIdentifiers,
@@ -108,7 +129,8 @@ async fn metrics_task(
log: Logger,
mut rx: mpsc::Receiver<Message>,
) {
let mut tracked_links: HashMap<String, TargetId> = HashMap::new();
let mut tracked_links: TrackedLinks = HashMap::new();
let mut sled_time_synced: bool = false;

// Main polling loop, waiting for messages from other pieces of the code to
// track various statistics.
@@ -118,9 +140,10 @@ async fn metrics_task(
return;
};
trace!(log, "received message"; "message" => ?message);

match message {
Message::TrackPhysical { zone_name, name } => {
let link = SledDataLink {
let target = SledDataLinkTarget {
kind: LinkKind::PHYSICAL.into(),
link_name: name.into(),
rack_id: sled_identifiers.rack_id,
@@ -130,11 +153,12 @@ async fn metrics_task(
sled_serial: sled_identifiers.serial.clone().into(),
zone_name: zone_name.into(),
};
let link = SledDataLink::new(target, sled_time_synced);
add_datalink(&log, &mut tracked_links, &kstat_sampler, link)
.await;
}
Message::TrackVnic { zone_name, name } => {
let link = SledDataLink {
let target = SledDataLinkTarget {
kind: LinkKind::VNIC.into(),
link_name: name.into(),
rack_id: sled_identifiers.rack_id,
@@ -144,6 +168,7 @@ async fn metrics_task(
sled_serial: sled_identifiers.serial.clone().into(),
zone_name: zone_name.into(),
};
let link = SledDataLink::new(target, sled_time_synced);
add_datalink(&log, &mut tracked_links, &kstat_sampler, link)
.await;
}
@@ -152,7 +177,7 @@ async fn metrics_task(
.await
}
Message::TrackOptePort { zone_name, name } => {
let link = SledDataLink {
let target = SledDataLinkTarget {
kind: LinkKind::OPTE.into(),
link_name: name.into(),
rack_id: sled_identifiers.rack_id,
@@ -162,26 +187,39 @@ async fn metrics_task(
sled_serial: sled_identifiers.serial.clone().into(),
zone_name: zone_name.into(),
};
let link = SledDataLink::new(target, sled_time_synced);
add_datalink(&log, &mut tracked_links, &kstat_sampler, link)
.await;
}
Message::UntrackOptePort { name } => {
remove_datalink(&log, &mut tracked_links, &kstat_sampler, name)
.await
}
Message::TimeSynced { sled_id } => {
assert!(!sled_time_synced, "This message should only be sent once (on first synchronization with NTP)");
if sled_id == sled_identifiers.sled_id {
sled_time_synced = true;
sync_sled_datalinks(
&log,
&mut tracked_links,
&kstat_sampler,
)
.await
}
}
}
}
}

/// Stop tracking a link by name.
async fn remove_datalink(
log: &Logger,
tracked_links: &mut HashMap<String, TargetId>,
tracked_links: &mut HashMap<String, Target>,
kstat_sampler: &KstatSampler,
name: String,
) {
match tracked_links.remove(&name) {
Some(id) => match kstat_sampler.remove_target(id).await {
Some(target) => match kstat_sampler.remove_target(target.id).await {
Ok(_) => {
debug!(
log,
@@ -213,41 +251,33 @@ async fn remove_datalink(
/// Start tracking a new link of the specified kind.
async fn add_datalink(
log: &Logger,
tracked_links: &mut HashMap<String, TargetId>,
tracked_links: &mut HashMap<String, Target>,
kstat_sampler: &KstatSampler,
link: SledDataLink,
) {
match tracked_links.entry(link.link_name.to_string()) {
match tracked_links.entry(link.link_name().to_string()) {
Entry::Vacant(entry) => {
let details = if is_transient_link(&link.kind) {
CollectionDetails::duration(
LINK_SAMPLE_INTERVAL,
TRANSIENT_LINK_EXPIRATION_INTERVAL,
)
} else {
CollectionDetails::never(LINK_SAMPLE_INTERVAL)
};
let kind = link.kind.clone();
let zone_name = link.zone_name.clone();
match kstat_sampler.add_target(link, details).await {
let details = get_collection_details(link.kind());
let link_to_add = link.clone();
match kstat_sampler.add_target(link_to_add, details).await {
Ok(id) => {
debug!(
log,
"Added new link to kstat sampler";
"link_name" => entry.key(),
"link_kind" => %kind,
"zone_name" => %zone_name,
"link_kind" => %link.kind(),
"zone_name" => %link.zone_name(),
);
entry.insert(id);
entry.insert(Target { id, sled_datalink: link });
}
Err(err) => {
error!(
log,
"Failed to add VNIC to kstat sampler, \
no metrics will be collected for it";
"link_name" => entry.key(),
"link_kind" => %kind,
"zone_name" => %zone_name,
"link_kind" => %link.kind(),
"zone_name" => %link.zone_name(),
"error" => ?err,
);
}
@@ -264,6 +294,38 @@ async fn add_datalink(
}
}

/// Update tracked links when a sled is synced.
async fn sync_sled_datalinks(
log: &Logger,
tracked_links: &mut TrackedLinks,
kstat_sampler: &KstatSampler,
) {
for (link_name, target) in tracked_links.iter_mut() {
target.sled_datalink.time_synced = true;
let details = get_collection_details(target.sled_datalink.kind());
match kstat_sampler
.update_target(target.sled_datalink.clone(), details)
.await
{
Ok(_) => {
debug!(
log,
"Updated link already tracked by kstat sampler";
"link_name" => link_name,
);
}
Err(err) => {
error!(
log,
"Failed to update link already tracked by kstat sampler";
"link_name" => link_name,
"error" => ?err,
);
}
}
}
}

/// Return true if this is considered a transient link, from the perspective of
/// its expiration behavior.
fn is_transient_link(kind: &str) -> bool {
@@ -278,7 +340,7 @@ fn is_transient_link(kind: &str) -> bool {
/// `MetricsHandle`.
#[derive(Debug)]
pub struct MetricsManager {
/// Receive-side of a channel used to pass the background task messages.
/// Sender-side of a channel used to pass the background task messages.
#[cfg_attr(test, allow(dead_code))]
tx: mpsc::Sender<Message>,
/// The background task itself.
@@ -438,6 +500,11 @@ impl MetricsRequestQueue {
}
success
}

/// Notify the task that a sled's state has been synchronized with NTP.
pub async fn notify_time_synced_sled(&self, sled_id: Uuid) -> bool {
self.0.send(Message::TimeSynced { sled_id }).await.is_ok()
}
}

/// Start a metric producer server.
132 changes: 103 additions & 29 deletions sled-agent/src/services.rs
Original file line number Diff line number Diff line change
@@ -954,6 +954,26 @@ impl ServiceManager {
self.inner.sled_mode
}

/// Returns the sled's identifier
fn sled_id(&self) -> Uuid {
self.inner
.sled_info
.get()
.expect("sled agent not started")
.config
.sled_id
}

/// Returns the metrics queue for the sled agent if it is running.
fn maybe_metrics_queue(&self) -> Option<&MetricsRequestQueue> {
self.inner.sled_info.get().map(|info| &info.metrics_queue)
}

/// Returns the metrics queue for the sled agent.
fn metrics_queue(&self) -> &MetricsRequestQueue {
&self.maybe_metrics_queue().expect("Sled agent should have started")
}

// Advertise the /64 prefix of `address`, unless we already have.
//
// This method only blocks long enough to check our HashSet of
@@ -3105,9 +3125,7 @@ impl ServiceManager {
// point. The only exception is the switch zone, during bootstrapping
// but before we've either run RSS or unlocked the rack. In both those
// cases, we have a `StartSledAgentRequest`, and so a metrics queue.
if let Some(queue) =
self.inner.sled_info.get().map(|sa| &sa.metrics_queue)
{
if let Some(queue) = self.maybe_metrics_queue() {
if !queue.track_zone_links(&running_zone).await {
error!(
self.inner.log,
@@ -3484,9 +3502,7 @@ impl ServiceManager {
};
// Ensure that the sled agent's metrics task is not tracking the zone's
// VNICs or OPTE ports.
if let Some(queue) =
self.inner.sled_info.get().map(|sa| &sa.metrics_queue)
{
if let Some(queue) = self.maybe_metrics_queue() {
queue.untrack_zone_links(&zone.runtime).await;
}
debug!(
@@ -3712,17 +3728,8 @@ impl ServiceManager {
Ok(())
}

pub fn boottime_rewrite(&self) {
if self
.inner
.time_synced
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
// Already done.
return;
}

/// Adjust the system boot time to the latest boot time of all zones.
fn boottime_rewrite(&self) {
// Call out to the 'tmpx' utility program which will rewrite the wtmpx
// and utmpx databases in every zone, including the global zone, to
// reflect the adjusted system boot time.
@@ -3754,7 +3761,7 @@ impl ServiceManager {

if skip_timesync {
info!(self.inner.log, "Configured to skip timesync checks");
self.boottime_rewrite();
self.on_time_sync().await;
return Ok(TimeSync {
sync: true,
ref_id: 0,
@@ -3809,7 +3816,7 @@ impl ServiceManager {
&& correction.abs() <= 0.05;

if sync {
self.boottime_rewrite();
self.on_time_sync().await;
}

Ok(TimeSync {
@@ -3831,6 +3838,39 @@ impl ServiceManager {
}
}

/// Check if the synchronization state of the sled has shifted to true and
/// if so, execute the any out-of-band actions that need to be taken.
///
/// This function only executes the out-of-band actions once, once the
/// synchronization state has shifted to true.
async fn on_time_sync(&self) {
if self
.inner
.time_synced
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
debug!(self.inner.log, "Time is now synchronized");
// We only want to rewrite the boot time once, so we do it here
// when we know the time is synchronized.
self.boottime_rewrite();

// We expect to have a metrics queue by this point, so
// we can safely send a message on it to say the sled has
// been synchronized.
let queue = self.metrics_queue();
if !queue.notify_time_synced_sled(self.sled_id()).await {
error!(
self.inner.log,
"Failed to notify metrics queue of sled \
time synchronization, metrics may not be produced."
);
}
} else {
debug!(self.inner.log, "Time was already synchronized");
}
}

/// Ensures that a switch zone exists with the provided IP adddress.
pub async fn activate_switch(
&self,
@@ -4867,7 +4907,7 @@ mod test {

// Also send a message to the metrics task that the VNIC has been
// deleted.
let queue = &mgr.inner.sled_info.get().unwrap().metrics_queue;
let queue = mgr.metrics_queue();
for zone in mgr.inner.zones.lock().await.values() {
queue.untrack_zone_links(&zone.runtime).await;
}
@@ -5047,8 +5087,19 @@ mod test {
assert_eq!(found.zones.len(), 1);
assert_eq!(found.zones[0].id, id);

// Check that we received a message about the zone's VNIC.
let message = tokio::time::timeout(
// First check that we received the synced sled notification
let synced_message = tokio::time::timeout(
LINK_NOTIFICATION_TIMEOUT,
metrics_rx.recv(),
).await.expect("Should have received a message about the sled being synced within the timeout")
.expect("Should have received a message about the sled being synced");
assert_eq!(
synced_message,
metrics::Message::TimeSynced { sled_id: mgr.sled_id() },
);

// Then, check that we received a message about the zone's VNIC.
let vnic_message = tokio::time::timeout(
LINK_NOTIFICATION_TIMEOUT,
metrics_rx.recv(),
)
@@ -5059,7 +5110,7 @@ mod test {
.expect("Should have received a message about the zone's VNIC");
let zone_name = format!("oxz_ntp_{}", id);
assert_eq!(
message,
vnic_message,
metrics::Message::TrackVnic {
zone_name,
name: "oxControlService0".into()
@@ -5189,10 +5240,21 @@ mod test {
assert_eq!(found.zones.len(), 1);
assert_eq!(found.zones[0].id, id);

// First, we will get a message about the sled being synced.
let synced_message = tokio::time::timeout(
LINK_NOTIFICATION_TIMEOUT,
metrics_rx.recv(),
).await.expect("Should have received a message about the sled being synced within the timeout")
.expect("Should have received a message about the sled being synced");
assert_eq!(
synced_message,
metrics::Message::TimeSynced { sled_id: mgr.sled_id() }
);

// In this case, the manager creates the zone once, and then "ensuring"
// it a second time is a no-op. So we simply expect the same message
// sequence as starting a zone for the first time.
let message = tokio::time::timeout(
let vnic_message = tokio::time::timeout(
LINK_NOTIFICATION_TIMEOUT,
metrics_rx.recv(),
)
@@ -5203,7 +5265,7 @@ mod test {
.expect("Should have received a message about the zone's VNIC");
let zone_name = format!("oxz_ntp_{}", id);
assert_eq!(
message,
vnic_message,
metrics::Message::TrackVnic {
zone_name,
name: "oxControlService0".into()
@@ -5247,21 +5309,33 @@ mod test {
String::from(test_config.config_dir.path().as_str()),
)
.await;

let sled_id = mgr.sled_id();
drop_service_manager(mgr).await;

// First, we will get a message about the sled being synced.
let synced_message = tokio::time::timeout(
LINK_NOTIFICATION_TIMEOUT,
metrics_rx.recv(),
).await.expect("Should have received a message about the sled being synced within the timeout")
.expect("Should have received a message about the sled being synced");
assert_eq!(synced_message, metrics::Message::TimeSynced { sled_id });

// Check that we received a message about the zone's VNIC. Since the
// manager is being dropped, it should also send a message about the
// VNIC being deleted.
let zone_name = format!("oxz_ntp_{}", id);
for expected_message in [
for expected_vnic_message in [
metrics::Message::TrackVnic {
zone_name,
name: "oxControlService0".into(),
},
metrics::Message::UntrackVnic { name: "oxControlService0".into() },
] {
println!("Expecting message from manager: {expected_message:#?}");
let message = tokio::time::timeout(
println!(
"Expecting message from manager: {expected_vnic_message:#?}"
);
let vnic_message = tokio::time::timeout(
LINK_NOTIFICATION_TIMEOUT,
metrics_rx.recv(),
)
@@ -5270,7 +5344,7 @@ mod test {
"Should have received a message about the zone's VNIC within the timeout"
)
.expect("Should have received a message about the zone's VNIC");
assert_eq!(message, expected_message,);
assert_eq!(vnic_message, expected_vnic_message,);
}
// Note that the manager has been dropped, so we should get
// disconnected, not empty.

0 comments on commit 18b8ab3

Please sign in to comment.