Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjstone committed Feb 28, 2024
1 parent 7bdf2e3 commit 7c4a2de
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 3 deletions.
175 changes: 174 additions & 1 deletion sled-agent/src/nexus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,21 @@ pub use nexus_client::Client as NexusClient;

use internal_dns::resolver::{ResolveError, Resolver};
use internal_dns::ServiceName;
use nexus_client::types::SledAgentInfo;
use omicron_common::address::NEXUS_INTERNAL_PORT;
use omicron_common::api::external::Generation;
use sled_hardware::HardwareManager;
use slog::Logger;
use std::future::Future;
use std::net::SocketAddrV6;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, Notify};
use tokio::task::JoinHandle;
use tokio::time::{interval, Duration, MissedTickBehavior};
use uuid::Uuid;

use crate::instance_manager::InstanceManager;

/// A thin wrapper over a progenitor-generated NexusClient.
///
Expand Down Expand Up @@ -202,3 +210,168 @@ impl ConvertInto<nexus_client::types::DatasetKind>
}
}
}

// Somewhat arbitrary bound size, large enough that we should never hit it.
const QUEUE_SIZE: usize = 256;

pub enum NexusNotifierMsg {
// Inform nexus about a change to this sled-agent. This is just a
// notification to perform a send. The request is constructed inside
// `NexusNotifierTask`.
NotifyNexusAboutSelf,
}

#[derive(Debug)]
pub struct NexusNotifierHandle {
tx: mpsc::Sender<NexusNotifierMsg>,
}

// A successful reply from nexus
pub enum NexusSuccess {
Get(SledAgentInfo),
Put,
}

// A mechanism owned by the `NexusNotifierTask` that allows it to access
// enough information to send a `SledAgentInfo` to Nexus.
pub struct NexusNotifierInput {
sled_id: Uuid,
sled_address: SocketAddrV6,
nexus_client: NexusClient,
hardware: HardwareManager,
instances: InstanceManager,
}

pub struct NexusNotifierTask {
input: NexusNotifierInput,
log: Logger,
rx: mpsc::Receiver<NexusNotifierMsg>,
// We only send `Get` requests if we haven't learned our generation yet
generation: Option<Generation>,

// Do we need to notify nexus about an update to our state.
pending_notification: bool,

// We only have one outstanding nexus request at a time.
// We spawn a task to manage this request so we don't block our main notifier task.
// We wait for a response on a channel.
outstanding_request: Option<
tokio::task::JoinHandle<
Result<
NexusSuccess,
nexus_client::Error<nexus_client::types::Error>,
>,
>,
>,
// A notification sent from the outstanding task when it has completed.
outstanding_req_ready: Arc<Notify>,
}

impl NexusNotifierTask {
pub fn new(
input: NexusNotifierInput,
log: &Logger,
) -> (NexusNotifierTask, NexusNotifierHandle) {
let (tx, rx) = mpsc::channel(QUEUE_SIZE);
(
NexusNotifierTask {
input,
log: log.new(o!("component" => "NexusNotifierTask")),
rx,
generation: None,
// We start with pending true, because we always want to attempt
// to retrieve the current generation number before we upsert
// ourselves.
pending_notification: true,
outstanding_request: None,
outstanding_req_ready: Arc::new(Notify::new()),
},
NexusNotifierHandle { tx },
)
}

/// Run the main receive loop of the `NexusNotifierTask`
///
/// This should be spawned into a tokio task
pub async fn run(mut self) {
loop {
const RETRY_TIMEOUT: Duration = Duration::from_secs(5);
let mut interval = interval(RETRY_TIMEOUT);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
tokio::select! {
req = self.rx.recv() => {
let Some(req) = req else {
warn!(self.log, "All senders dropped. Exiting.");
break;
};
match req {
NexusNotifierMsg::NotifyNexusAboutSelf => {
// We'll contact nexus on the next timeout
self.pending_notification = true;
}
}
}
_ = self.outstanding_req_ready.notified() => {
// Our request task has completed. Let's check the result.
self.handle_nexus_reply().await;

}
_ = interval.tick(), if self.pending_notification => {
self.contact_nexus().await;
}
}
}
}

/// Notify nexus about self
async fn contact_nexus(&mut self) {
// Is there already an outstanding request to nexus?
if self.outstanding_request.is_some() {
return;
}

let client = self.input.nexus_client.clone();
let sled_id = self.input.sled_id;

// Have we learned about any generations stored in CRDB yet?
if let Some(generation) = self.generation {
let role = if self.input.hardware.is_scrimlet() {
nexus_client::types::SledRole::Scrimlet
} else {
nexus_client::types::SledRole::Gimlet
};
let info = SledAgentInfo {
sa_address: self.input.sled_address.to_string(),
role,
baseboard: self.input.hardware.baseboard().convert(),
usable_hardware_threads: self
.input
.hardware
.online_processor_count(),
usable_physical_ram: self
.input
.hardware
.usable_physical_ram_bytes()
.into(),
reservoir_size: self.input.instances.reservoir_size().into(),
generation,
};
self.outstanding_request = Some(tokio::spawn(async move {
client
.sled_agent_put(&sled_id, &info)
.await
.map(|_| NexusSuccess::Put)
}));
} else {
self.outstanding_request = Some(tokio::spawn(async move {
client
.sled_agent_get(&sled_id)
.await
.map(|info| NexusSuccess::Get(info.into_inner()))
}));
}
}

/// Handle a reply from nexus by extracting the value from a `JoinHandle`
async fn handle_nexus_reply(&mut self) {}
}
4 changes: 3 additions & 1 deletion sled-agent/src/sim/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use nexus_client::types as NexusTypes;
use nexus_client::types::{IpRange, Ipv4Range, Ipv6Range};
use omicron_common::address::DNS_OPTE_IPV4_SUBNET;
use omicron_common::address::NEXUS_OPTE_IPV4_SUBNET;
use omicron_common::api::external::Generation;
use omicron_common::api::external::MacAddr;
use omicron_common::backoff::{
retry_notify, retry_policy_internal_service_aggressive, BackoffError,
Expand Down Expand Up @@ -101,7 +102,7 @@ impl Server {
nexus_client
.sled_agent_put(
&config.id,
&NexusTypes::SledAgentStartupInfo {
&NexusTypes::SledAgentInfo {
sa_address: sa_address.to_string(),
role: NexusTypes::SledRole::Scrimlet,
baseboard: NexusTypes::Baseboard {
Expand All @@ -124,6 +125,7 @@ impl Server {
config.hardware.reservoir_ram,
)
.unwrap(),
generation: Generation::new(),
},
)
.await
Expand Down
4 changes: 3 additions & 1 deletion sled-agent/src/sled_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use illumos_utils::zone::ZONE_PREFIX;
use omicron_common::address::{
get_sled_address, get_switch_zone_address, Ipv6Subnet, SLED_PREFIX,
};
use omicron_common::api::external::Generation;
use omicron_common::api::external::{ByteCount, ByteCountRangeError, Vni};
use omicron_common::api::internal::nexus::ProducerEndpoint;
use omicron_common::api::internal::nexus::ProducerKind;
Expand Down Expand Up @@ -693,7 +694,7 @@ impl SledAgent {
.client()
.sled_agent_put(
&sled_id,
&nexus_client::types::SledAgentStartupInfo {
&nexus_client::types::SledAgentInfo {
sa_address: sled_address.to_string(),
role,
baseboard: baseboard.clone(),
Expand All @@ -704,6 +705,7 @@ impl SledAgent {
reservoir_size: nexus_client::types::ByteCount(
reservoir_size.to_bytes(),
),
generation: Generation::new(),
},
)
.await
Expand Down

0 comments on commit 7c4a2de

Please sign in to comment.