From cf185cf590dc3b92837a5218d9cbb7678e45e1a1 Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Wed, 28 Feb 2024 22:47:51 +0000 Subject: [PATCH] wip --- sled-agent/src/hardware_monitor.rs | 2 +- sled-agent/src/nexus.rs | 66 +++++--------- sled-agent/src/sled_agent.rs | 135 +++++++---------------------- sled-agent/src/vmm_reservoir.rs | 7 +- 4 files changed, 55 insertions(+), 155 deletions(-) diff --git a/sled-agent/src/hardware_monitor.rs b/sled-agent/src/hardware_monitor.rs index 698d2d4608..f8dc887ade 100644 --- a/sled-agent/src/hardware_monitor.rs +++ b/sled-agent/src/hardware_monitor.rs @@ -172,7 +172,7 @@ impl HardwareMonitor { } HardwareUpdate::TofinoDeviceChange => { if let Some(sled_agent) = &mut self.sled_agent { - sled_agent.notify_nexus_about_self(&self.log); + sled_agent.notify_nexus_about_self(&self.log).await; } } HardwareUpdate::DiskAdded(disk) => { diff --git a/sled-agent/src/nexus.rs b/sled-agent/src/nexus.rs index a3e607b2a7..f31609ccc3 100644 --- a/sled-agent/src/nexus.rs +++ b/sled-agent/src/nexus.rs @@ -21,6 +21,7 @@ use tokio::time::{interval, Duration, MissedTickBehavior}; use uuid::Uuid; use crate::instance_manager::InstanceManager; +use crate::vmm_reservoir::VmmReservoirManagerHandle; /// A thin wrapper over a progenitor-generated NexusClient. /// @@ -80,45 +81,6 @@ impl NexusClientWithResolver { } } -type NexusRequestFut = dyn Future + Send; -type NexusRequest = Pin>; - -/// A queue of futures which represent requests to Nexus. -pub struct NexusRequestQueue { - tx: mpsc::UnboundedSender, - _worker: JoinHandle<()>, -} - -impl NexusRequestQueue { - /// Creates a new request queue, along with a worker which executes - /// any incoming tasks. - pub fn new() -> Self { - // TODO(https://github.com/oxidecomputer/omicron/issues/1917): - // In the future, this should basically just be a wrapper around a - // generation number, and we shouldn't be serializing requests to Nexus. - // - // In the meanwhile, we're using an unbounded_channel for simplicity, so - // that we don't need to cope with dropped notifications / - // retransmissions. - let (tx, mut rx) = mpsc::unbounded_channel(); - - let _worker = tokio::spawn(async move { - while let Some(fut) = rx.recv().await { - fut.await; - } - }); - - Self { tx, _worker } - } - - /// Gets access to the sending portion of the request queue. - /// - /// Callers can use this to add their own requests. - pub fn sender(&self) -> &mpsc::UnboundedSender { - &self.tx - } -} - pub fn d2n_params( params: &dns_service_client::types::DnsConfigParams, ) -> nexus_client::types::DnsConfigParams { @@ -226,6 +188,16 @@ pub struct NexusNotifierHandle { tx: mpsc::Sender, } +impl NexusNotifierHandle { + pub async fn notify_nexus_about_self(&self, log: &Logger) { + if let Err(_) = + self.tx.send(NexusNotifierMsg::NotifyNexusAboutSelf).await + { + warn!(log, "Failed to send to NexusNotifierTask: did it exit?"); + } + } +} + // A successful reply from nexus pub enum NexusSuccess { Get(SledAgentInfo), @@ -235,11 +207,11 @@ pub enum NexusSuccess { // A mechanism owned by the `NexusNotifierTask` that allows it to access // enough information to send a `SledAgentInfo` to Nexus. pub struct NexusNotifierInput { - sled_id: Uuid, - sled_address: SocketAddrV6, - nexus_client: NexusClient, - hardware: HardwareManager, - instances: InstanceManager, + pub sled_id: Uuid, + pub sled_address: SocketAddrV6, + pub nexus_client: NexusClient, + pub hardware: HardwareManager, + pub vmm_reservoir_manager: VmmReservoirManagerHandle, } /// A mechanism for notifying nexus about this sled agent @@ -377,7 +349,11 @@ impl NexusNotifierTask { .hardware .usable_physical_ram_bytes() .into(), - reservoir_size: self.input.instances.reservoir_size().into(), + reservoir_size: self + .input + .vmm_reservoir_manager + .reservoir_size() + .into(), generation: known_info.generation, }; // We don't need to send a request if the info is identical to what diff --git a/sled-agent/src/sled_agent.rs b/sled-agent/src/sled_agent.rs index a633cbee3e..dbb3aee6ca 100644 --- a/sled-agent/src/sled_agent.rs +++ b/sled-agent/src/sled_agent.rs @@ -14,7 +14,10 @@ use crate::config::Config; use crate::instance_manager::InstanceManager; use crate::long_running_tasks::LongRunningTaskHandles; use crate::metrics::MetricsManager; -use crate::nexus::{ConvertInto, NexusClientWithResolver, NexusRequestQueue}; +use crate::nexus::{ + ConvertInto, NexusClientWithResolver, NexusNotifierHandle, + NexusNotifierInput, NexusNotifierTask, +}; use crate::params::{ DiskStateRequested, InstanceExternalIpBody, InstanceHardware, InstanceMigrationSourceParams, InstancePutStateResponse, @@ -286,8 +289,8 @@ struct SledAgentInner { // Connection to Nexus. nexus_client: NexusClientWithResolver, - // A serialized request queue for operations interacting with Nexus. - nexus_request_queue: NexusRequestQueue, + // A mechanism for notifiying nexus about sled-agent updates + nexus_notifier: NexusNotifierHandle, // The rack network config provided at RSS time. rack_network_config: Option, @@ -436,7 +439,7 @@ impl SledAgent { storage_manager.clone(), long_running_task_handles.zone_bundler.clone(), ZoneBuilderFactory::default(), - vmm_reservoir_manager, + vmm_reservoir_manager.clone(), )?; let update_config = ConfigUpdates { @@ -541,6 +544,22 @@ impl SledAgent { endpoint, )); + // Spawn a background task for managing notifications to nexus + // about this sled-agent. + let nexus_notifier_input = NexusNotifierInput { + sled_id: request.body.id, + sled_address: get_sled_address(request.body.subnet), + nexus_client: nexus_client.client().clone(), + hardware: long_running_task_handles.hardware_manager.clone(), + vmm_reservoir_manager, + }; + let (nexus_notifier_task, nexus_notifier_handle) = + NexusNotifierTask::new(nexus_notifier_input, &log); + + tokio::spawn(async move { + nexus_notifier_task.run().await; + }); + let sled_agent = SledAgent { inner: Arc::new(SledAgentInner { id: request.body.id, @@ -553,14 +572,7 @@ impl SledAgent { port_manager, services, nexus_client, - - // TODO(https://github.com/oxidecomputer/omicron/issues/1917): - // Propagate usage of this request queue throughout the Sled - // Agent. - // - // Also, we could maybe de-dup some of the backoff code in the - // request queue? - nexus_request_queue: NexusRequestQueue::new(), + nexus_notifier: nexus_notifier_handle, rack_network_config, zone_bundler: long_running_task_handles.zone_bundler.clone(), bootstore: long_running_task_handles.bootstore.clone(), @@ -574,7 +586,7 @@ impl SledAgent { // existence. If inspection of the hardware later informs us that we're // actually running on a scrimlet, that's fine, the updated value will // be received by Nexus eventually. - sled_agent.notify_nexus_about_self(&log); + sled_agent.notify_nexus_about_self(&log).await; Ok(sled_agent) } @@ -652,99 +664,10 @@ impl SledAgent { Ok(()) } - /// Sends a request to Nexus informing it that the current sled exists, - /// with information abou the existing set of hardware. - /// - /// Does not block until Nexus is available -- the future created by this - /// function is retried in a queue that is polled in the background. - pub(crate) fn notify_nexus_about_self(&self, log: &Logger) { - let sled_id = self.inner.id; - let nexus_client = self.inner.nexus_client.clone(); - let sled_address = self.inner.sled_address(); - let is_scrimlet = self.inner.hardware.is_scrimlet(); - let baseboard = self.inner.hardware.baseboard().convert(); - let usable_hardware_threads = - self.inner.hardware.online_processor_count(); - let usable_physical_ram = - self.inner.hardware.usable_physical_ram_bytes(); - let reservoir_size = self.inner.instances.reservoir_size(); - - let log = log.clone(); - let fut = async move { - // Notify the control plane that we're up, and continue trying this - // until it succeeds. We retry with a randomized, capped - // exponential backoff. - // - // TODO-robustness if this returns a 400 error, we probably want to - // return a permanent error from the `notify_nexus` closure. - let notify_nexus = || async { - info!( - log, - "contacting server nexus, registering sled"; - "id" => ?sled_id, - "baseboard" => ?baseboard, - ); - let role = if is_scrimlet { - nexus_client::types::SledRole::Scrimlet - } else { - nexus_client::types::SledRole::Gimlet - }; - - nexus_client - .client() - .sled_agent_put( - &sled_id, - &nexus_client::types::SledAgentInfo { - sa_address: sled_address.to_string(), - role, - baseboard: baseboard.clone(), - usable_hardware_threads, - usable_physical_ram: nexus_client::types::ByteCount( - usable_physical_ram, - ), - reservoir_size: nexus_client::types::ByteCount( - reservoir_size.to_bytes(), - ), - generation: Generation::new(), - }, - ) - .await - .map_err(|err| BackoffError::transient(err.to_string())) - }; - // This notification is often invoked before Nexus has started - // running, so avoid flagging any errors as concerning until some - // time has passed. - let log_notification_failure = |err, call_count, total_duration| { - if call_count == 0 { - info!( - log, - "failed to notify nexus about sled agent"; - "error" => %err, - ); - } else if total_duration > std::time::Duration::from_secs(30) { - warn!( - log, - "failed to notify nexus about sled agent"; - "error" => %err, - "total duration" => ?total_duration, - ); - } - }; - retry_notify_ext( - retry_policy_internal_service_aggressive(), - notify_nexus, - log_notification_failure, - ) - .await - .expect("Expected an infinite retry loop contacting Nexus"); - }; - self.inner - .nexus_request_queue - .sender() - .send(Box::pin(fut)) - .unwrap_or_else(|err| { - panic!("Failed to send future to request queue: {err}"); - }); + /// Trigger a request to Nexus informing it that the current sled exists, + /// with information about the existing set of hardware. + pub(crate) async fn notify_nexus_about_self(&self, log: &Logger) { + self.inner.nexus_notifier.notify_nexus_about_self(log).await; } /// List all zone bundles on the system, for any zones live or dead. diff --git a/sled-agent/src/vmm_reservoir.rs b/sled-agent/src/vmm_reservoir.rs index a60dd97437..8c2980c51a 100644 --- a/sled-agent/src/vmm_reservoir.rs +++ b/sled-agent/src/vmm_reservoir.rs @@ -69,11 +69,12 @@ enum ReservoirManagerMsg { }, } +#[derive(Clone)] /// A mechanism to interact with the [`VmmReservoirManager`] pub struct VmmReservoirManagerHandle { reservoir_size: Arc, tx: flume::Sender, - _manager_handle: thread::JoinHandle<()>, + _manager_handle: Arc>, } impl VmmReservoirManagerHandle { @@ -130,9 +131,9 @@ impl VmmReservoirManager { rx, log, }; - let _manager_handle = thread::spawn(move || { + let _manager_handle = Arc::new(thread::spawn(move || { manager.run(hardware_manager, reservoir_mode) - }); + })); VmmReservoirManagerHandle { reservoir_size, tx, _manager_handle } }