diff --git a/oximeter/instruments/src/kstat/link.rs b/oximeter/instruments/src/kstat/link.rs index 4d045131da..18f05f5cfe 100644 --- a/oximeter/instruments/src/kstat/link.rs +++ b/oximeter/instruments/src/kstat/link.rs @@ -15,20 +15,22 @@ use kstat_rs::Data; use kstat_rs::Kstat; use kstat_rs::Named; use oximeter::types::Cumulative; +use oximeter::FieldType; +use oximeter::FieldValue; use oximeter::Sample; +use oximeter::Target; +use uuid::Uuid; oximeter::use_timeseries!("sled-data-link.toml"); +pub use self::sled_data_link::SledDataLink as SledDataLinkTarget; /// Helper function to extract the same kstat metrics from all link targets. -fn extract_link_kstats( - target: &T, +fn extract_link_kstats( + target: &SledDataLink, named_data: &Named, creation_time: DateTime, snapshot_time: DateTime, -) -> Option> -where - T: KstatTarget, -{ +) -> Option> { let Named { name, value } = named_data; if *name == "rbytes64" { Some(value.as_u64().and_then(|x| { @@ -83,23 +85,53 @@ where } } -// Helper trait for defining `KstatTarget` for all the link-based stats. -trait LinkKstatTarget: KstatTarget { - fn link_name(&self) -> &str; +#[derive(Clone, Debug)] +pub struct SledDataLink { + /// The target for this link. + pub target: SledDataLinkTarget, + /// Flag indicating whether the sled associated with this link is synced with + /// NTP. + pub time_synced: bool, } -impl LinkKstatTarget for sled_data_link::SledDataLink { - fn link_name(&self) -> &str { - &self.link_name +impl SledDataLink { + /// Create a new `SledDataLink` with the given target and synchronization + /// flag. + pub fn new(target: SledDataLinkTarget, time_synced: bool) -> Self { + Self { target, time_synced } + } + + /// Create a new `SledDataLink` with the given target . + #[cfg(test)] + pub fn unsynced(target: SledDataLinkTarget) -> Self { + Self { target, time_synced: false } + } + + /// Return the name of the link. + pub fn link_name(&self) -> &str { + &self.target.link_name + } + + /// Return the zone name of the link. + pub fn zone_name(&self) -> &str { + &self.target.zone_name + } + + /// Return the kind of link. + pub fn kind(&self) -> &str { + &self.target.kind + } + + /// Return the idenity of the sled. + pub fn sled_id(&self) -> Uuid { + self.target.sled_id } } -impl KstatTarget for T -where - T: LinkKstatTarget, -{ +impl KstatTarget for SledDataLink { fn interested(&self, kstat: &Kstat<'_>) -> bool { - kstat.ks_module == "link" + self.time_synced + && kstat.ks_module == "link" && kstat.ks_instance == 0 && kstat.ks_name == self.link_name() } @@ -124,6 +156,25 @@ where } } +// NOTE: Delegate to the inner target type for this implementation. +impl Target for SledDataLink { + fn name(&self) -> &'static str { + self.target.name() + } + + fn field_names(&self) -> &'static [&'static str] { + self.target.field_names() + } + + fn field_types(&self) -> Vec { + self.target.field_types() + } + + fn field_values(&self) -> Vec { + self.target.field_values() + } +} + #[cfg(all(test, target_os = "illumos"))] mod tests { use super::*; @@ -225,10 +276,40 @@ mod tests { } } + #[test] + fn test_kstat_interested() { + let link = TestEtherstub::new(); + let target = SledDataLinkTarget { + rack_id: RACK_ID, + sled_id: SLED_ID, + sled_serial: SLED_SERIAL.into(), + link_name: link.name.clone().into(), + kind: KIND.into(), + sled_model: SLED_MODEL.into(), + sled_revision: SLED_REVISION, + zone_name: ZONE_NAME.into(), + }; + // not with a synced sled (by default) + let mut dl = SledDataLink::unsynced(target); + + let ctl = Ctl::new().unwrap(); + let ctl = ctl.update().unwrap(); + let kstat = ctl + .filter(Some("link"), Some(0), Some(link.name.as_str())) + .next() + .unwrap(); + + assert!(!dl.interested(&kstat)); + + // with a synced sled + dl.time_synced = true; + assert!(dl.interested(&kstat)); + } + #[test] fn test_sled_datalink() { let link = TestEtherstub::new(); - let dl = sled_data_link::SledDataLink { + let target = SledDataLinkTarget { rack_id: RACK_ID, sled_id: SLED_ID, sled_serial: SLED_SERIAL.into(), @@ -238,6 +319,7 @@ mod tests { sled_revision: SLED_REVISION, zone_name: ZONE_NAME.into(), }; + let dl = SledDataLink::new(target, true); let ctl = Ctl::new().unwrap(); let ctl = ctl.update().unwrap(); let mut kstat = ctl @@ -254,7 +336,7 @@ mod tests { async fn test_kstat_sampler() { let mut sampler = KstatSampler::new(&test_logger()).unwrap(); let link = TestEtherstub::new(); - let dl = sled_data_link::SledDataLink { + let target = SledDataLinkTarget { rack_id: RACK_ID, sled_id: SLED_ID, sled_serial: SLED_SERIAL.into(), @@ -264,6 +346,7 @@ mod tests { sled_revision: SLED_REVISION, zone_name: ZONE_NAME.into(), }; + let dl = SledDataLink::new(target, true); let details = CollectionDetails::never(Duration::from_secs(1)); let id = sampler.add_target(dl, details).await.unwrap(); let samples: Vec<_> = sampler.produce().unwrap().collect(); @@ -304,7 +387,7 @@ mod tests { let mut sampler = KstatSampler::with_sample_limit(&test_logger(), limit).unwrap(); let link = TestEtherstub::new(); - let dl = sled_data_link::SledDataLink { + let target = SledDataLinkTarget { rack_id: RACK_ID, sled_id: SLED_ID, sled_serial: SLED_SERIAL.into(), @@ -314,6 +397,7 @@ mod tests { sled_revision: SLED_REVISION, zone_name: ZONE_NAME.into(), }; + let dl = SledDataLink::new(target, true); let details = CollectionDetails::never(Duration::from_secs(1)); sampler.add_target(dl, details).await.unwrap(); let samples: Vec<_> = sampler.produce().unwrap().collect(); @@ -373,7 +457,7 @@ mod tests { let mut sampler = KstatSampler::new(&log).unwrap(); let link = TestEtherstub::new(); info!(log, "created test etherstub"; "name" => &link.name); - let dl = sled_data_link::SledDataLink { + let target = SledDataLinkTarget { rack_id: RACK_ID, sled_id: SLED_ID, sled_serial: SLED_SERIAL.into(), @@ -383,6 +467,7 @@ mod tests { sled_revision: SLED_REVISION, zone_name: ZONE_NAME.into(), }; + let dl = SledDataLink::new(target, true); let collection_interval = Duration::from_secs(1); let expiry = Duration::from_secs(1); let details = CollectionDetails::duration(collection_interval, expiry); @@ -432,7 +517,7 @@ mod tests { let mut sampler = KstatSampler::new(&log).unwrap(); let link = TestEtherstub::new(); info!(log, "created test etherstub"; "name" => &link.name); - let dl = sled_data_link::SledDataLink { + let target = SledDataLinkTarget { rack_id: RACK_ID, sled_id: SLED_ID, sled_serial: SLED_SERIAL.into(), @@ -442,6 +527,7 @@ mod tests { sled_revision: SLED_REVISION, zone_name: ZONE_NAME.into(), }; + let dl = SledDataLink::new(target, true); let collection_interval = Duration::from_secs(1); let expiry = Duration::from_secs(1); let details = CollectionDetails::duration(collection_interval, expiry); @@ -483,7 +569,7 @@ mod tests { name: link.name.clone(), }; info!(log, "created test etherstub"; "name" => &link.name); - let dl = sled_data_link::SledDataLink { + let target = SledDataLinkTarget { rack_id: RACK_ID, sled_id: SLED_ID, sled_serial: SLED_SERIAL.into(), @@ -493,6 +579,7 @@ mod tests { sled_revision: SLED_REVISION, zone_name: ZONE_NAME.into(), }; + let dl = SledDataLink::new(target, true); let collection_interval = Duration::from_secs(1); let expiry = Duration::from_secs(1); let details = CollectionDetails::duration(collection_interval, expiry); @@ -532,7 +619,7 @@ mod tests { name: link.name.clone(), }; info!(log, "created test etherstub"; "name" => &link.name); - let dl = sled_data_link::SledDataLink { + let target = SledDataLinkTarget { rack_id: RACK_ID, sled_id: SLED_ID, sled_serial: SLED_SERIAL.into(), @@ -542,6 +629,7 @@ mod tests { sled_revision: SLED_REVISION, zone_name: ZONE_NAME.into(), }; + let dl = SledDataLink::new(target, true); let collection_interval = Duration::from_secs(1); let expiry = Duration::from_secs(1); let details = CollectionDetails::duration(collection_interval, expiry); diff --git a/oximeter/instruments/src/kstat/mod.rs b/oximeter/instruments/src/kstat/mod.rs index a5020b9b61..7b0082a396 100644 --- a/oximeter/instruments/src/kstat/mod.rs +++ b/oximeter/instruments/src/kstat/mod.rs @@ -91,6 +91,7 @@ use std::time::Duration; pub mod link; mod sampler; +pub use link::SledDataLink; pub use sampler::CollectionDetails; pub use sampler::ExpirationBehavior; pub use sampler::KstatSampler; diff --git a/oximeter/instruments/src/kstat/sampler.rs b/oximeter/instruments/src/kstat/sampler.rs index 92466758c1..c6101ac06a 100644 --- a/oximeter/instruments/src/kstat/sampler.rs +++ b/oximeter/instruments/src/kstat/sampler.rs @@ -173,6 +173,12 @@ enum Request { id: TargetId, reply_tx: oneshot::Sender>, }, + /// Update a target. + UpdateTarget { + target: Box, + details: CollectionDetails, + reply_tx: oneshot::Sender>, + }, /// Remove a target. RemoveTarget { id: TargetId, reply_tx: oneshot::Sender> }, /// Return the creation times of all tracked / extant kstats. @@ -550,6 +556,45 @@ impl KstatSamplerWorker { } } } + Request::UpdateTarget { target, details, reply_tx } => { + match self.update_target(target, details) { + Ok(id) => { + let timeout = YieldIdAfter::new(id, details.interval); + sample_timeouts.push(timeout); + trace!( + self.log, + "updated target with timeout"; + "id" => ?id, + "details" => ?details, + ); + match reply_tx.send(Ok(id)) { + Ok(_) => trace!(self.log, "sent reply"), + Err(e) => error!( + self.log, + "failed to send reply"; + "id" => ?id, + "error" => ?e, + ) + } + } + Err(e) => { + error!( + self.log, + "failed to update target"; + "error" => ?e, + ); + match reply_tx.send(Err(e)) { + Ok(_) => trace!(self.log, "sent reply"), + Err(e) => error!( + self.log, + "failed to send reply"; + "error" => ?e, + ) + } + } + } + + } Request::RemoveTarget { id, reply_tx } => { self.targets.remove(&id); if let Some(remaining_samples) = self.samples.lock().unwrap().remove(&id) { @@ -966,6 +1011,52 @@ impl KstatSamplerWorker { } None => {} } + + self.insert_target(id, target, details) + } + + fn update_target( + &mut self, + target: Box, + details: CollectionDetails, + ) -> Result { + let id = hash_target(&*target); + match self.targets.get(&id) { + // If the target is already expired, we'll replace it with the new + // target and start sampling it again. + Some(SampledObject::Expired(e)) => { + warn!( + self.log, + "replacing expired kstat target"; + "id" => ?id, + "expiration_reason" => ?e.reason, + "error" => ?e.error, + "expired_at" => ?e.expired_at, + ); + } + Some(_) => {} + None => return Err(Error::NoSuchTarget), + } + + self.insert_target(id, target, details) + } + + fn update_chain(&mut self) -> Result<(), Error> { + let new_ctl = match self.ctl.take() { + None => Ctl::new(), + Some(old) => old.update(), + } + .map_err(Error::Kstat)?; + let _ = self.ctl.insert(new_ctl); + Ok(()) + } + + fn insert_target( + &mut self, + id: TargetId, + target: Box, + details: CollectionDetails, + ) -> Result { self.ensure_creation_times_for_target(&*target)?; let item = SampledKstat { target, @@ -995,17 +1086,8 @@ impl KstatSamplerWorker { "n_samples" => n, ), } - Ok(id) - } - fn update_chain(&mut self) -> Result<(), Error> { - let new_ctl = match self.ctl.take() { - None => Ctl::new(), - Some(old) => old.update(), - } - .map_err(Error::Kstat)?; - let _ = self.ctl.insert(new_ctl); - Ok(()) + Ok(id) } } @@ -1094,6 +1176,22 @@ impl KstatSampler { reply_rx.await.map_err(|_| Error::RecvError)? } + /// Update the details for a target. + pub async fn update_target( + &self, + target: impl KstatTarget, + details: CollectionDetails, + ) -> Result { + let (reply_tx, reply_rx) = oneshot::channel(); + let request = Request::UpdateTarget { + target: Box::new(target), + details, + reply_tx, + }; + self.outbox.send(request).await.map_err(|_| Error::SendError)?; + reply_rx.await.map_err(|_| Error::RecvError)? + } + /// Fetch the status for a target. /// /// If the target is being collected normally, then `TargetStatus::Ok` is diff --git a/sled-agent/src/metrics.rs b/sled-agent/src/metrics.rs index 1039302248..ef9edd9a07 100644 --- a/sled-agent/src/metrics.rs +++ b/sled-agent/src/metrics.rs @@ -8,7 +8,8 @@ use illumos_utils::running_zone::RunningZone; use omicron_common::api::internal::nexus::ProducerEndpoint; use omicron_common::api::internal::nexus::ProducerKind; use omicron_common::api::internal::shared::SledIdentifiers; -use oximeter_instruments::kstat::link::sled_data_link::SledDataLink; +use oximeter_instruments::kstat::link::SledDataLink; +use oximeter_instruments::kstat::link::SledDataLinkTarget; use oximeter_instruments::kstat::CollectionDetails; use oximeter_instruments::kstat::Error as KstatError; use oximeter_instruments::kstat::KstatSampler; @@ -24,6 +25,8 @@ use std::time::Duration; use tokio::sync::mpsc; use uuid::Uuid; +type TrackedLinks = HashMap; + /// The interval on which we ask `oximeter` to poll us for metric data. const METRIC_COLLECTION_INTERVAL: Duration = Duration::from_secs(30); @@ -84,6 +87,8 @@ pub(crate) enum Message { TrackOptePort { zone_name: String, name: String }, /// Stop tracking the named OPTE port. UntrackOptePort { name: String }, + /// Notify the task that a sled has been synced with NTP. + TimeSynced { sled_id: Uuid }, // TODO-completeness: We will probably want to track other kinds of // statistics here too. For example, we could send messages when a zone is // created / destroyed to track zonestats; we might also want to support @@ -100,6 +105,22 @@ impl LinkKind { const OPTE: &'static str = "opte"; } +struct Target { + id: TargetId, + sled_datalink: SledDataLink, +} + +fn get_collection_details(kind: &str) -> CollectionDetails { + if is_transient_link(kind) { + CollectionDetails::duration( + LINK_SAMPLE_INTERVAL, + TRANSIENT_LINK_EXPIRATION_INTERVAL, + ) + } else { + CollectionDetails::never(LINK_SAMPLE_INTERVAL) + } +} + /// The main task used to collect and publish sled-agent metrics. async fn metrics_task( sled_identifiers: SledIdentifiers, @@ -108,7 +129,8 @@ async fn metrics_task( log: Logger, mut rx: mpsc::Receiver, ) { - let mut tracked_links: HashMap = HashMap::new(); + let mut tracked_links: TrackedLinks = HashMap::new(); + let mut sled_time_synced: bool = false; // Main polling loop, waiting for messages from other pieces of the code to // track various statistics. @@ -118,9 +140,10 @@ async fn metrics_task( return; }; trace!(log, "received message"; "message" => ?message); + match message { Message::TrackPhysical { zone_name, name } => { - let link = SledDataLink { + let target = SledDataLinkTarget { kind: LinkKind::PHYSICAL.into(), link_name: name.into(), rack_id: sled_identifiers.rack_id, @@ -130,11 +153,12 @@ async fn metrics_task( sled_serial: sled_identifiers.serial.clone().into(), zone_name: zone_name.into(), }; + let link = SledDataLink::new(target, sled_time_synced); add_datalink(&log, &mut tracked_links, &kstat_sampler, link) .await; } Message::TrackVnic { zone_name, name } => { - let link = SledDataLink { + let target = SledDataLinkTarget { kind: LinkKind::VNIC.into(), link_name: name.into(), rack_id: sled_identifiers.rack_id, @@ -144,6 +168,7 @@ async fn metrics_task( sled_serial: sled_identifiers.serial.clone().into(), zone_name: zone_name.into(), }; + let link = SledDataLink::new(target, sled_time_synced); add_datalink(&log, &mut tracked_links, &kstat_sampler, link) .await; } @@ -152,7 +177,7 @@ async fn metrics_task( .await } Message::TrackOptePort { zone_name, name } => { - let link = SledDataLink { + let target = SledDataLinkTarget { kind: LinkKind::OPTE.into(), link_name: name.into(), rack_id: sled_identifiers.rack_id, @@ -162,6 +187,7 @@ async fn metrics_task( sled_serial: sled_identifiers.serial.clone().into(), zone_name: zone_name.into(), }; + let link = SledDataLink::new(target, sled_time_synced); add_datalink(&log, &mut tracked_links, &kstat_sampler, link) .await; } @@ -169,6 +195,18 @@ async fn metrics_task( remove_datalink(&log, &mut tracked_links, &kstat_sampler, name) .await } + Message::TimeSynced { sled_id } => { + assert!(!sled_time_synced, "This message should only be sent once (on first synchronization with NTP)"); + if sled_id == sled_identifiers.sled_id { + sled_time_synced = true; + sync_sled_datalinks( + &log, + &mut tracked_links, + &kstat_sampler, + ) + .await + } + } } } } @@ -176,12 +214,12 @@ async fn metrics_task( /// Stop tracking a link by name. async fn remove_datalink( log: &Logger, - tracked_links: &mut HashMap, + tracked_links: &mut HashMap, kstat_sampler: &KstatSampler, name: String, ) { match tracked_links.remove(&name) { - Some(id) => match kstat_sampler.remove_target(id).await { + Some(target) => match kstat_sampler.remove_target(target.id).await { Ok(_) => { debug!( log, @@ -213,32 +251,24 @@ async fn remove_datalink( /// Start tracking a new link of the specified kind. async fn add_datalink( log: &Logger, - tracked_links: &mut HashMap, + tracked_links: &mut HashMap, kstat_sampler: &KstatSampler, link: SledDataLink, ) { - match tracked_links.entry(link.link_name.to_string()) { + match tracked_links.entry(link.link_name().to_string()) { Entry::Vacant(entry) => { - let details = if is_transient_link(&link.kind) { - CollectionDetails::duration( - LINK_SAMPLE_INTERVAL, - TRANSIENT_LINK_EXPIRATION_INTERVAL, - ) - } else { - CollectionDetails::never(LINK_SAMPLE_INTERVAL) - }; - let kind = link.kind.clone(); - let zone_name = link.zone_name.clone(); - match kstat_sampler.add_target(link, details).await { + let details = get_collection_details(link.kind()); + let link_to_add = link.clone(); + match kstat_sampler.add_target(link_to_add, details).await { Ok(id) => { debug!( log, "Added new link to kstat sampler"; "link_name" => entry.key(), - "link_kind" => %kind, - "zone_name" => %zone_name, + "link_kind" => %link.kind(), + "zone_name" => %link.zone_name(), ); - entry.insert(id); + entry.insert(Target { id, sled_datalink: link }); } Err(err) => { error!( @@ -246,8 +276,8 @@ async fn add_datalink( "Failed to add VNIC to kstat sampler, \ no metrics will be collected for it"; "link_name" => entry.key(), - "link_kind" => %kind, - "zone_name" => %zone_name, + "link_kind" => %link.kind(), + "zone_name" => %link.zone_name(), "error" => ?err, ); } @@ -264,6 +294,38 @@ async fn add_datalink( } } +/// Update tracked links when a sled is synced. +async fn sync_sled_datalinks( + log: &Logger, + tracked_links: &mut TrackedLinks, + kstat_sampler: &KstatSampler, +) { + for (link_name, target) in tracked_links.iter_mut() { + target.sled_datalink.time_synced = true; + let details = get_collection_details(target.sled_datalink.kind()); + match kstat_sampler + .update_target(target.sled_datalink.clone(), details) + .await + { + Ok(_) => { + debug!( + log, + "Updated link already tracked by kstat sampler"; + "link_name" => link_name, + ); + } + Err(err) => { + error!( + log, + "Failed to update link already tracked by kstat sampler"; + "link_name" => link_name, + "error" => ?err, + ); + } + } + } +} + /// Return true if this is considered a transient link, from the perspective of /// its expiration behavior. fn is_transient_link(kind: &str) -> bool { @@ -278,7 +340,7 @@ fn is_transient_link(kind: &str) -> bool { /// `MetricsHandle`. #[derive(Debug)] pub struct MetricsManager { - /// Receive-side of a channel used to pass the background task messages. + /// Sender-side of a channel used to pass the background task messages. #[cfg_attr(test, allow(dead_code))] tx: mpsc::Sender, /// The background task itself. @@ -438,6 +500,11 @@ impl MetricsRequestQueue { } success } + + /// Notify the task that a sled's state has been synchronized with NTP. + pub async fn notify_time_synced_sled(&self, sled_id: Uuid) -> bool { + self.0.send(Message::TimeSynced { sled_id }).await.is_ok() + } } /// Start a metric producer server. diff --git a/sled-agent/src/services.rs b/sled-agent/src/services.rs index f386fd1d0f..2c83fc5ea3 100644 --- a/sled-agent/src/services.rs +++ b/sled-agent/src/services.rs @@ -954,6 +954,26 @@ impl ServiceManager { self.inner.sled_mode } + /// Returns the sled's identifier + fn sled_id(&self) -> Uuid { + self.inner + .sled_info + .get() + .expect("sled agent not started") + .config + .sled_id + } + + /// Returns the metrics queue for the sled agent if it is running. + fn maybe_metrics_queue(&self) -> Option<&MetricsRequestQueue> { + self.inner.sled_info.get().map(|info| &info.metrics_queue) + } + + /// Returns the metrics queue for the sled agent. + fn metrics_queue(&self) -> &MetricsRequestQueue { + &self.maybe_metrics_queue().expect("Sled agent should have started") + } + // Advertise the /64 prefix of `address`, unless we already have. // // This method only blocks long enough to check our HashSet of @@ -3105,9 +3125,7 @@ impl ServiceManager { // point. The only exception is the switch zone, during bootstrapping // but before we've either run RSS or unlocked the rack. In both those // cases, we have a `StartSledAgentRequest`, and so a metrics queue. - if let Some(queue) = - self.inner.sled_info.get().map(|sa| &sa.metrics_queue) - { + if let Some(queue) = self.maybe_metrics_queue() { if !queue.track_zone_links(&running_zone).await { error!( self.inner.log, @@ -3484,9 +3502,7 @@ impl ServiceManager { }; // Ensure that the sled agent's metrics task is not tracking the zone's // VNICs or OPTE ports. - if let Some(queue) = - self.inner.sled_info.get().map(|sa| &sa.metrics_queue) - { + if let Some(queue) = self.maybe_metrics_queue() { queue.untrack_zone_links(&zone.runtime).await; } debug!( @@ -3712,17 +3728,8 @@ impl ServiceManager { Ok(()) } - pub fn boottime_rewrite(&self) { - if self - .inner - .time_synced - .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) - .is_err() - { - // Already done. - return; - } - + /// Adjust the system boot time to the latest boot time of all zones. + fn boottime_rewrite(&self) { // Call out to the 'tmpx' utility program which will rewrite the wtmpx // and utmpx databases in every zone, including the global zone, to // reflect the adjusted system boot time. @@ -3754,7 +3761,7 @@ impl ServiceManager { if skip_timesync { info!(self.inner.log, "Configured to skip timesync checks"); - self.boottime_rewrite(); + self.on_time_sync().await; return Ok(TimeSync { sync: true, ref_id: 0, @@ -3809,7 +3816,7 @@ impl ServiceManager { && correction.abs() <= 0.05; if sync { - self.boottime_rewrite(); + self.on_time_sync().await; } Ok(TimeSync { @@ -3831,6 +3838,39 @@ impl ServiceManager { } } + /// Check if the synchronization state of the sled has shifted to true and + /// if so, execute the any out-of-band actions that need to be taken. + /// + /// This function only executes the out-of-band actions once, once the + /// synchronization state has shifted to true. + async fn on_time_sync(&self) { + if self + .inner + .time_synced + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { + debug!(self.inner.log, "Time is now synchronized"); + // We only want to rewrite the boot time once, so we do it here + // when we know the time is synchronized. + self.boottime_rewrite(); + + // We expect to have a metrics queue by this point, so + // we can safely send a message on it to say the sled has + // been synchronized. + let queue = self.metrics_queue(); + if !queue.notify_time_synced_sled(self.sled_id()).await { + error!( + self.inner.log, + "Failed to notify metrics queue of sled \ + time synchronization, metrics may not be produced." + ); + } + } else { + debug!(self.inner.log, "Time was already synchronized"); + } + } + /// Ensures that a switch zone exists with the provided IP adddress. pub async fn activate_switch( &self, @@ -4867,7 +4907,7 @@ mod test { // Also send a message to the metrics task that the VNIC has been // deleted. - let queue = &mgr.inner.sled_info.get().unwrap().metrics_queue; + let queue = mgr.metrics_queue(); for zone in mgr.inner.zones.lock().await.values() { queue.untrack_zone_links(&zone.runtime).await; } @@ -5047,8 +5087,19 @@ mod test { assert_eq!(found.zones.len(), 1); assert_eq!(found.zones[0].id, id); - // Check that we received a message about the zone's VNIC. - let message = tokio::time::timeout( + // First check that we received the synced sled notification + let synced_message = tokio::time::timeout( + LINK_NOTIFICATION_TIMEOUT, + metrics_rx.recv(), + ).await.expect("Should have received a message about the sled being synced within the timeout") + .expect("Should have received a message about the sled being synced"); + assert_eq!( + synced_message, + metrics::Message::TimeSynced { sled_id: mgr.sled_id() }, + ); + + // Then, check that we received a message about the zone's VNIC. + let vnic_message = tokio::time::timeout( LINK_NOTIFICATION_TIMEOUT, metrics_rx.recv(), ) @@ -5059,7 +5110,7 @@ mod test { .expect("Should have received a message about the zone's VNIC"); let zone_name = format!("oxz_ntp_{}", id); assert_eq!( - message, + vnic_message, metrics::Message::TrackVnic { zone_name, name: "oxControlService0".into() @@ -5189,10 +5240,21 @@ mod test { assert_eq!(found.zones.len(), 1); assert_eq!(found.zones[0].id, id); + // First, we will get a message about the sled being synced. + let synced_message = tokio::time::timeout( + LINK_NOTIFICATION_TIMEOUT, + metrics_rx.recv(), + ).await.expect("Should have received a message about the sled being synced within the timeout") + .expect("Should have received a message about the sled being synced"); + assert_eq!( + synced_message, + metrics::Message::TimeSynced { sled_id: mgr.sled_id() } + ); + // In this case, the manager creates the zone once, and then "ensuring" // it a second time is a no-op. So we simply expect the same message // sequence as starting a zone for the first time. - let message = tokio::time::timeout( + let vnic_message = tokio::time::timeout( LINK_NOTIFICATION_TIMEOUT, metrics_rx.recv(), ) @@ -5203,7 +5265,7 @@ mod test { .expect("Should have received a message about the zone's VNIC"); let zone_name = format!("oxz_ntp_{}", id); assert_eq!( - message, + vnic_message, metrics::Message::TrackVnic { zone_name, name: "oxControlService0".into() @@ -5247,21 +5309,33 @@ mod test { String::from(test_config.config_dir.path().as_str()), ) .await; + + let sled_id = mgr.sled_id(); drop_service_manager(mgr).await; + // First, we will get a message about the sled being synced. + let synced_message = tokio::time::timeout( + LINK_NOTIFICATION_TIMEOUT, + metrics_rx.recv(), + ).await.expect("Should have received a message about the sled being synced within the timeout") + .expect("Should have received a message about the sled being synced"); + assert_eq!(synced_message, metrics::Message::TimeSynced { sled_id }); + // Check that we received a message about the zone's VNIC. Since the // manager is being dropped, it should also send a message about the // VNIC being deleted. let zone_name = format!("oxz_ntp_{}", id); - for expected_message in [ + for expected_vnic_message in [ metrics::Message::TrackVnic { zone_name, name: "oxControlService0".into(), }, metrics::Message::UntrackVnic { name: "oxControlService0".into() }, ] { - println!("Expecting message from manager: {expected_message:#?}"); - let message = tokio::time::timeout( + println!( + "Expecting message from manager: {expected_vnic_message:#?}" + ); + let vnic_message = tokio::time::timeout( LINK_NOTIFICATION_TIMEOUT, metrics_rx.recv(), ) @@ -5270,7 +5344,7 @@ mod test { "Should have received a message about the zone's VNIC within the timeout" ) .expect("Should have received a message about the zone's VNIC"); - assert_eq!(message, expected_message,); + assert_eq!(vnic_message, expected_vnic_message,); } // Note that the manager has been dropped, so we should get // disconnected, not empty.