From 7c4a2de7e944ac2f702ebaff3df701bd05210e09 Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Wed, 28 Feb 2024 20:31:08 +0000 Subject: [PATCH] wip --- sled-agent/src/nexus.rs | 175 ++++++++++++++++++++++++++++++++++- sled-agent/src/sim/server.rs | 4 +- sled-agent/src/sled_agent.rs | 4 +- 3 files changed, 180 insertions(+), 3 deletions(-) diff --git a/sled-agent/src/nexus.rs b/sled-agent/src/nexus.rs index cc715f4010..aa6fb2f3dc 100644 --- a/sled-agent/src/nexus.rs +++ b/sled-agent/src/nexus.rs @@ -6,13 +6,21 @@ pub use nexus_client::Client as NexusClient; use internal_dns::resolver::{ResolveError, Resolver}; use internal_dns::ServiceName; +use nexus_client::types::SledAgentInfo; use omicron_common::address::NEXUS_INTERNAL_PORT; +use omicron_common::api::external::Generation; +use sled_hardware::HardwareManager; use slog::Logger; use std::future::Future; +use std::net::SocketAddrV6; use std::pin::Pin; use std::sync::Arc; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, Notify}; use tokio::task::JoinHandle; +use tokio::time::{interval, Duration, MissedTickBehavior}; +use uuid::Uuid; + +use crate::instance_manager::InstanceManager; /// A thin wrapper over a progenitor-generated NexusClient. /// @@ -202,3 +210,168 @@ impl ConvertInto } } } + +// Somewhat arbitrary bound size, large enough that we should never hit it. +const QUEUE_SIZE: usize = 256; + +pub enum NexusNotifierMsg { + // Inform nexus about a change to this sled-agent. This is just a + // notification to perform a send. The request is constructed inside + // `NexusNotifierTask`. + NotifyNexusAboutSelf, +} + +#[derive(Debug)] +pub struct NexusNotifierHandle { + tx: mpsc::Sender, +} + +// A successful reply from nexus +pub enum NexusSuccess { + Get(SledAgentInfo), + Put, +} + +// A mechanism owned by the `NexusNotifierTask` that allows it to access +// enough information to send a `SledAgentInfo` to Nexus. +pub struct NexusNotifierInput { + sled_id: Uuid, + sled_address: SocketAddrV6, + nexus_client: NexusClient, + hardware: HardwareManager, + instances: InstanceManager, +} + +pub struct NexusNotifierTask { + input: NexusNotifierInput, + log: Logger, + rx: mpsc::Receiver, + // We only send `Get` requests if we haven't learned our generation yet + generation: Option, + + // Do we need to notify nexus about an update to our state. + pending_notification: bool, + + // We only have one outstanding nexus request at a time. + // We spawn a task to manage this request so we don't block our main notifier task. + // We wait for a response on a channel. + outstanding_request: Option< + tokio::task::JoinHandle< + Result< + NexusSuccess, + nexus_client::Error, + >, + >, + >, + // A notification sent from the outstanding task when it has completed. + outstanding_req_ready: Arc, +} + +impl NexusNotifierTask { + pub fn new( + input: NexusNotifierInput, + log: &Logger, + ) -> (NexusNotifierTask, NexusNotifierHandle) { + let (tx, rx) = mpsc::channel(QUEUE_SIZE); + ( + NexusNotifierTask { + input, + log: log.new(o!("component" => "NexusNotifierTask")), + rx, + generation: None, + // We start with pending true, because we always want to attempt + // to retrieve the current generation number before we upsert + // ourselves. + pending_notification: true, + outstanding_request: None, + outstanding_req_ready: Arc::new(Notify::new()), + }, + NexusNotifierHandle { tx }, + ) + } + + /// Run the main receive loop of the `NexusNotifierTask` + /// + /// This should be spawned into a tokio task + pub async fn run(mut self) { + loop { + const RETRY_TIMEOUT: Duration = Duration::from_secs(5); + let mut interval = interval(RETRY_TIMEOUT); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + tokio::select! { + req = self.rx.recv() => { + let Some(req) = req else { + warn!(self.log, "All senders dropped. Exiting."); + break; + }; + match req { + NexusNotifierMsg::NotifyNexusAboutSelf => { + // We'll contact nexus on the next timeout + self.pending_notification = true; + } + } + } + _ = self.outstanding_req_ready.notified() => { + // Our request task has completed. Let's check the result. + self.handle_nexus_reply().await; + + } + _ = interval.tick(), if self.pending_notification => { + self.contact_nexus().await; + } + } + } + } + + /// Notify nexus about self + async fn contact_nexus(&mut self) { + // Is there already an outstanding request to nexus? + if self.outstanding_request.is_some() { + return; + } + + let client = self.input.nexus_client.clone(); + let sled_id = self.input.sled_id; + + // Have we learned about any generations stored in CRDB yet? + if let Some(generation) = self.generation { + let role = if self.input.hardware.is_scrimlet() { + nexus_client::types::SledRole::Scrimlet + } else { + nexus_client::types::SledRole::Gimlet + }; + let info = SledAgentInfo { + sa_address: self.input.sled_address.to_string(), + role, + baseboard: self.input.hardware.baseboard().convert(), + usable_hardware_threads: self + .input + .hardware + .online_processor_count(), + usable_physical_ram: self + .input + .hardware + .usable_physical_ram_bytes() + .into(), + reservoir_size: self.input.instances.reservoir_size().into(), + generation, + }; + self.outstanding_request = Some(tokio::spawn(async move { + client + .sled_agent_put(&sled_id, &info) + .await + .map(|_| NexusSuccess::Put) + })); + } else { + self.outstanding_request = Some(tokio::spawn(async move { + client + .sled_agent_get(&sled_id) + .await + .map(|info| NexusSuccess::Get(info.into_inner())) + })); + } + } + + /// Handle a reply from nexus by extracting the value from a `JoinHandle` + async fn handle_nexus_reply(&mut self) {} +} diff --git a/sled-agent/src/sim/server.rs b/sled-agent/src/sim/server.rs index 8854aee05c..a02f545b20 100644 --- a/sled-agent/src/sim/server.rs +++ b/sled-agent/src/sim/server.rs @@ -17,6 +17,7 @@ use nexus_client::types as NexusTypes; use nexus_client::types::{IpRange, Ipv4Range, Ipv6Range}; use omicron_common::address::DNS_OPTE_IPV4_SUBNET; use omicron_common::address::NEXUS_OPTE_IPV4_SUBNET; +use omicron_common::api::external::Generation; use omicron_common::api::external::MacAddr; use omicron_common::backoff::{ retry_notify, retry_policy_internal_service_aggressive, BackoffError, @@ -101,7 +102,7 @@ impl Server { nexus_client .sled_agent_put( &config.id, - &NexusTypes::SledAgentStartupInfo { + &NexusTypes::SledAgentInfo { sa_address: sa_address.to_string(), role: NexusTypes::SledRole::Scrimlet, baseboard: NexusTypes::Baseboard { @@ -124,6 +125,7 @@ impl Server { config.hardware.reservoir_ram, ) .unwrap(), + generation: Generation::new(), }, ) .await diff --git a/sled-agent/src/sled_agent.rs b/sled-agent/src/sled_agent.rs index fc0115bc13..a633cbee3e 100644 --- a/sled-agent/src/sled_agent.rs +++ b/sled-agent/src/sled_agent.rs @@ -44,6 +44,7 @@ use illumos_utils::zone::ZONE_PREFIX; use omicron_common::address::{ get_sled_address, get_switch_zone_address, Ipv6Subnet, SLED_PREFIX, }; +use omicron_common::api::external::Generation; use omicron_common::api::external::{ByteCount, ByteCountRangeError, Vni}; use omicron_common::api::internal::nexus::ProducerEndpoint; use omicron_common::api::internal::nexus::ProducerKind; @@ -693,7 +694,7 @@ impl SledAgent { .client() .sled_agent_put( &sled_id, - &nexus_client::types::SledAgentStartupInfo { + &nexus_client::types::SledAgentInfo { sa_address: sled_address.to_string(), role, baseboard: baseboard.clone(), @@ -704,6 +705,7 @@ impl SledAgent { reservoir_size: nexus_client::types::ByteCount( reservoir_size.to_bytes(), ), + generation: Generation::new(), }, ) .await