Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjstone committed Feb 28, 2024
1 parent f6116b6 commit cf185cf
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 155 deletions.
2 changes: 1 addition & 1 deletion sled-agent/src/hardware_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl HardwareMonitor {
}
HardwareUpdate::TofinoDeviceChange => {
if let Some(sled_agent) = &mut self.sled_agent {
sled_agent.notify_nexus_about_self(&self.log);
sled_agent.notify_nexus_about_self(&self.log).await;
}
}
HardwareUpdate::DiskAdded(disk) => {
Expand Down
66 changes: 21 additions & 45 deletions sled-agent/src/nexus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use tokio::time::{interval, Duration, MissedTickBehavior};
use uuid::Uuid;

use crate::instance_manager::InstanceManager;
use crate::vmm_reservoir::VmmReservoirManagerHandle;

/// A thin wrapper over a progenitor-generated NexusClient.
///
Expand Down Expand Up @@ -80,45 +81,6 @@ impl NexusClientWithResolver {
}
}

type NexusRequestFut = dyn Future<Output = ()> + Send;
type NexusRequest = Pin<Box<NexusRequestFut>>;

/// A queue of futures which represent requests to Nexus.
pub struct NexusRequestQueue {
tx: mpsc::UnboundedSender<NexusRequest>,
_worker: JoinHandle<()>,
}

impl NexusRequestQueue {
/// Creates a new request queue, along with a worker which executes
/// any incoming tasks.
pub fn new() -> Self {
// TODO(https://github.com/oxidecomputer/omicron/issues/1917):
// In the future, this should basically just be a wrapper around a
// generation number, and we shouldn't be serializing requests to Nexus.
//
// In the meanwhile, we're using an unbounded_channel for simplicity, so
// that we don't need to cope with dropped notifications /
// retransmissions.
let (tx, mut rx) = mpsc::unbounded_channel();

let _worker = tokio::spawn(async move {
while let Some(fut) = rx.recv().await {
fut.await;
}
});

Self { tx, _worker }
}

/// Gets access to the sending portion of the request queue.
///
/// Callers can use this to add their own requests.
pub fn sender(&self) -> &mpsc::UnboundedSender<NexusRequest> {
&self.tx
}
}

pub fn d2n_params(
params: &dns_service_client::types::DnsConfigParams,
) -> nexus_client::types::DnsConfigParams {
Expand Down Expand Up @@ -226,6 +188,16 @@ pub struct NexusNotifierHandle {
tx: mpsc::Sender<NexusNotifierMsg>,
}

impl NexusNotifierHandle {
pub async fn notify_nexus_about_self(&self, log: &Logger) {
if let Err(_) =
self.tx.send(NexusNotifierMsg::NotifyNexusAboutSelf).await
{
warn!(log, "Failed to send to NexusNotifierTask: did it exit?");
}
}
}

// A successful reply from nexus
pub enum NexusSuccess {
Get(SledAgentInfo),
Expand All @@ -235,11 +207,11 @@ pub enum NexusSuccess {
// A mechanism owned by the `NexusNotifierTask` that allows it to access
// enough information to send a `SledAgentInfo` to Nexus.
pub struct NexusNotifierInput {
sled_id: Uuid,
sled_address: SocketAddrV6,
nexus_client: NexusClient,
hardware: HardwareManager,
instances: InstanceManager,
pub sled_id: Uuid,
pub sled_address: SocketAddrV6,
pub nexus_client: NexusClient,
pub hardware: HardwareManager,
pub vmm_reservoir_manager: VmmReservoirManagerHandle,
}

/// A mechanism for notifying nexus about this sled agent
Expand Down Expand Up @@ -377,7 +349,11 @@ impl NexusNotifierTask {
.hardware
.usable_physical_ram_bytes()
.into(),
reservoir_size: self.input.instances.reservoir_size().into(),
reservoir_size: self
.input
.vmm_reservoir_manager
.reservoir_size()
.into(),
generation: known_info.generation,
};
// We don't need to send a request if the info is identical to what
Expand Down
135 changes: 29 additions & 106 deletions sled-agent/src/sled_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use crate::config::Config;
use crate::instance_manager::InstanceManager;
use crate::long_running_tasks::LongRunningTaskHandles;
use crate::metrics::MetricsManager;
use crate::nexus::{ConvertInto, NexusClientWithResolver, NexusRequestQueue};
use crate::nexus::{
ConvertInto, NexusClientWithResolver, NexusNotifierHandle,
NexusNotifierInput, NexusNotifierTask,
};
use crate::params::{
DiskStateRequested, InstanceExternalIpBody, InstanceHardware,
InstanceMigrationSourceParams, InstancePutStateResponse,
Expand Down Expand Up @@ -286,8 +289,8 @@ struct SledAgentInner {
// Connection to Nexus.
nexus_client: NexusClientWithResolver,

// A serialized request queue for operations interacting with Nexus.
nexus_request_queue: NexusRequestQueue,
// A mechanism for notifiying nexus about sled-agent updates
nexus_notifier: NexusNotifierHandle,

// The rack network config provided at RSS time.
rack_network_config: Option<RackNetworkConfig>,
Expand Down Expand Up @@ -436,7 +439,7 @@ impl SledAgent {
storage_manager.clone(),
long_running_task_handles.zone_bundler.clone(),
ZoneBuilderFactory::default(),
vmm_reservoir_manager,
vmm_reservoir_manager.clone(),
)?;

let update_config = ConfigUpdates {
Expand Down Expand Up @@ -541,6 +544,22 @@ impl SledAgent {
endpoint,
));

// Spawn a background task for managing notifications to nexus
// about this sled-agent.
let nexus_notifier_input = NexusNotifierInput {
sled_id: request.body.id,
sled_address: get_sled_address(request.body.subnet),
nexus_client: nexus_client.client().clone(),
hardware: long_running_task_handles.hardware_manager.clone(),
vmm_reservoir_manager,
};
let (nexus_notifier_task, nexus_notifier_handle) =
NexusNotifierTask::new(nexus_notifier_input, &log);

tokio::spawn(async move {
nexus_notifier_task.run().await;
});

let sled_agent = SledAgent {
inner: Arc::new(SledAgentInner {
id: request.body.id,
Expand All @@ -553,14 +572,7 @@ impl SledAgent {
port_manager,
services,
nexus_client,

// TODO(https://github.com/oxidecomputer/omicron/issues/1917):
// Propagate usage of this request queue throughout the Sled
// Agent.
//
// Also, we could maybe de-dup some of the backoff code in the
// request queue?
nexus_request_queue: NexusRequestQueue::new(),
nexus_notifier: nexus_notifier_handle,
rack_network_config,
zone_bundler: long_running_task_handles.zone_bundler.clone(),
bootstore: long_running_task_handles.bootstore.clone(),
Expand All @@ -574,7 +586,7 @@ impl SledAgent {
// existence. If inspection of the hardware later informs us that we're
// actually running on a scrimlet, that's fine, the updated value will
// be received by Nexus eventually.
sled_agent.notify_nexus_about_self(&log);
sled_agent.notify_nexus_about_self(&log).await;

Ok(sled_agent)
}
Expand Down Expand Up @@ -652,99 +664,10 @@ impl SledAgent {
Ok(())
}

/// Sends a request to Nexus informing it that the current sled exists,
/// with information abou the existing set of hardware.
///
/// Does not block until Nexus is available -- the future created by this
/// function is retried in a queue that is polled in the background.
pub(crate) fn notify_nexus_about_self(&self, log: &Logger) {
let sled_id = self.inner.id;
let nexus_client = self.inner.nexus_client.clone();
let sled_address = self.inner.sled_address();
let is_scrimlet = self.inner.hardware.is_scrimlet();
let baseboard = self.inner.hardware.baseboard().convert();
let usable_hardware_threads =
self.inner.hardware.online_processor_count();
let usable_physical_ram =
self.inner.hardware.usable_physical_ram_bytes();
let reservoir_size = self.inner.instances.reservoir_size();

let log = log.clone();
let fut = async move {
// Notify the control plane that we're up, and continue trying this
// until it succeeds. We retry with a randomized, capped
// exponential backoff.
//
// TODO-robustness if this returns a 400 error, we probably want to
// return a permanent error from the `notify_nexus` closure.
let notify_nexus = || async {
info!(
log,
"contacting server nexus, registering sled";
"id" => ?sled_id,
"baseboard" => ?baseboard,
);
let role = if is_scrimlet {
nexus_client::types::SledRole::Scrimlet
} else {
nexus_client::types::SledRole::Gimlet
};

nexus_client
.client()
.sled_agent_put(
&sled_id,
&nexus_client::types::SledAgentInfo {
sa_address: sled_address.to_string(),
role,
baseboard: baseboard.clone(),
usable_hardware_threads,
usable_physical_ram: nexus_client::types::ByteCount(
usable_physical_ram,
),
reservoir_size: nexus_client::types::ByteCount(
reservoir_size.to_bytes(),
),
generation: Generation::new(),
},
)
.await
.map_err(|err| BackoffError::transient(err.to_string()))
};
// This notification is often invoked before Nexus has started
// running, so avoid flagging any errors as concerning until some
// time has passed.
let log_notification_failure = |err, call_count, total_duration| {
if call_count == 0 {
info!(
log,
"failed to notify nexus about sled agent";
"error" => %err,
);
} else if total_duration > std::time::Duration::from_secs(30) {
warn!(
log,
"failed to notify nexus about sled agent";
"error" => %err,
"total duration" => ?total_duration,
);
}
};
retry_notify_ext(
retry_policy_internal_service_aggressive(),
notify_nexus,
log_notification_failure,
)
.await
.expect("Expected an infinite retry loop contacting Nexus");
};
self.inner
.nexus_request_queue
.sender()
.send(Box::pin(fut))
.unwrap_or_else(|err| {
panic!("Failed to send future to request queue: {err}");
});
/// Trigger a request to Nexus informing it that the current sled exists,
/// with information about the existing set of hardware.
pub(crate) async fn notify_nexus_about_self(&self, log: &Logger) {
self.inner.nexus_notifier.notify_nexus_about_self(log).await;
}

/// List all zone bundles on the system, for any zones live or dead.
Expand Down
7 changes: 4 additions & 3 deletions sled-agent/src/vmm_reservoir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,12 @@ enum ReservoirManagerMsg {
},
}

#[derive(Clone)]
/// A mechanism to interact with the [`VmmReservoirManager`]
pub struct VmmReservoirManagerHandle {
reservoir_size: Arc<AtomicU64>,
tx: flume::Sender<ReservoirManagerMsg>,
_manager_handle: thread::JoinHandle<()>,
_manager_handle: Arc<thread::JoinHandle<()>>,
}

impl VmmReservoirManagerHandle {
Expand Down Expand Up @@ -130,9 +131,9 @@ impl VmmReservoirManager {
rx,
log,
};
let _manager_handle = thread::spawn(move || {
let _manager_handle = Arc::new(thread::spawn(move || {
manager.run(hardware_manager, reservoir_mode)
});
}));
VmmReservoirManagerHandle { reservoir_size, tx, _manager_handle }
}

Expand Down

0 comments on commit cf185cf

Please sign in to comment.