Skip to content

Commit

Permalink
review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjstone committed Feb 23, 2024
1 parent bbbe079 commit bdea2f6
Showing 1 changed file with 67 additions and 60 deletions.
127 changes: 67 additions & 60 deletions sled-agent/src/vmm_reservoir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use slog::Logger;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::thread;
use tokio::sync::oneshot;
use tokio::sync::{oneshot, watch};

use sled_hardware::HardwareManager;

Expand All @@ -23,32 +23,23 @@ pub enum Error {
ReservoirConfig(String),

#[error("VmmReservoirManager is currently busy")]
Busy(ReservoirMode),
Busy,

#[error("VmmReservoirManager has shutdown")]
Shutdown(ReservoirMode),
Shutdown,

#[error(
"Communication error with VmmReservoirManager: ReplySenderDropped"
)]
ReplySenderDropped,
}

#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub enum ReservoirMode {
None,
Size(u32),
Percentage(u8),
}

/// A message sent from [`VmmReservoirManagerHandle`] to [`VmmReservoirManager`]
pub enum ReservoirManagerMsg {
SetReservoirSize {
mode: ReservoirMode,
reply_tx: oneshot::Sender<Result<(), Error>>,
},
}

impl ReservoirMode {
/// Return a configuration of the VMM reservoir as either a percentage of
/// DRAM or as an exact size in MiB.
Expand All @@ -57,11 +48,11 @@ impl ReservoirMode {
pub fn from_config(
percentage: Option<u8>,
size_mb: Option<u32>,
) -> ReservoirMode {
) -> Option<ReservoirMode> {
match (percentage, size_mb) {
(None, None) => ReservoirMode::None,
(Some(p), None) => ReservoirMode::Percentage(p),
(None, Some(mb)) => ReservoirMode::Size(mb),
(None, None) => None,
(Some(p), None) => Some(ReservoirMode::Percentage(p)),
(None, Some(mb)) => Some(ReservoirMode::Size(mb)),
(Some(_), Some(_)) => panic!(
"only one of vmm_reservoir_percentage and \
vmm_reservoir_size_mb is allowed"
Expand All @@ -70,10 +61,19 @@ impl ReservoirMode {
}
}

/// A message sent from [`VmmReservoirManagerHandle`] to [`VmmReservoirManager`]
enum ReservoirManagerMsg {
SetReservoirSize {
mode: ReservoirMode,
reply_tx: oneshot::Sender<Result<(), Error>>,
},
}

/// A mechanism to interact with the [`VmmReservoirManager`]
pub struct VmmReservoirManagerHandle {
reservoir_size: Arc<AtomicU64>,
tx: flume::Sender<ReservoirManagerMsg>,
watch_rx: watch::Receiver<Option<ByteCount>>,
_manager_handle: thread::JoinHandle<()>,
}

Expand All @@ -83,6 +83,13 @@ impl VmmReservoirManagerHandle {
self.reservoir_size.load(Ordering::SeqCst).try_into().unwrap()
}

/// Return a [`tokio::sync::watch::Receiver`] whose value changes when the
/// reservoir size is updated.
#[allow(unused)]
pub fn watcher(&self) -> watch::Receiver<Option<ByteCount>> {
self.watch_rx.clone()
}

/// Tell the [`VmmReservoirManager`] to set the reservoir size and wait for
/// a response.
///
Expand All @@ -98,16 +105,10 @@ impl VmmReservoirManagerHandle {
let (tx, rx) = oneshot::channel();
let msg = ReservoirManagerMsg::SetReservoirSize { mode, reply_tx: tx };
if let Err(e) = self.tx.try_send(msg) {
let (e, is_shutdown) = match e {
flume::TrySendError::Full(e) => (e, false),
flume::TrySendError::Disconnected(e) => (e, true),
};
let ReservoirManagerMsg::SetReservoirSize { mode, .. } = e;
if is_shutdown {
return Err(Error::Shutdown(mode));
} else {
return Err(Error::Busy(mode));
}
return Err(match e {
flume::TrySendError::Full(e) => Error::Busy,
flume::TrySendError::Disconnected(e) => Error::Shutdown,
});
}
rx.await.map_err(|_| Error::ReplySenderDropped)?
}
Expand All @@ -117,26 +118,56 @@ impl VmmReservoirManagerHandle {
pub struct VmmReservoirManager {
reservoir_size: Arc<AtomicU64>,
rx: flume::Receiver<ReservoirManagerMsg>,
watch_tx: watch::Sender<Option<ByteCount>>,
log: Logger,
}

impl VmmReservoirManager {
pub fn run(
pub fn spawn(
log: &Logger,
hardware_manager: HardwareManager,
reservoir_mode: Option<ReservoirMode>,
) -> VmmReservoirManagerHandle {
let log = log.new(o!("component" => "VmmReservoirManager"));
// We use a rendevous channel to only allow one request at a time.
// Resizing a reservoir may block the thread for up to two minutes, so
// we want to ensure it is complete before allowing another call.
let (tx, rx) = flume::bounded(0);
let reservoir_size = Arc::new(AtomicU64::new(0));
let (watch_tx, watch_rx) = watch::channel(None);
let manager = VmmReservoirManager {
reservoir_size: reservoir_size.clone(),
rx,
watch_tx,
log,
};
let _manager_handle = thread::spawn(move || {
manager.run(hardware_manager, reservoir_mode)
});
VmmReservoirManagerHandle {
reservoir_size,
tx,
watch_rx,
_manager_handle,
}
}

fn run(
self,
hardware_manager: HardwareManager,
reservoir_mode: ReservoirMode,
reservoir_mode: Option<ReservoirMode>,
) {
match reservoir_mode {
ReservoirMode::None => warn!(self.log, "Not using VMM reservoir"),
ReservoirMode::Size(0) | ReservoirMode::Percentage(0) => {
None => warn!(self.log, "Not using VMM reservoir"),
Some(ReservoirMode::Size(0))
| Some(ReservoirMode::Percentage(0)) => {
warn!(
self.log,
"Not using VMM reservoir (size 0 bytes requested)"
)
}
_ => {
if let Err(e) =
self.set_reservoir_size(&hardware_manager, reservoir_mode)
Some(mode) => {
if let Err(e) = self.set_reservoir_size(&hardware_manager, mode)
{
error!(self.log, "Failed to setup VMM reservoir: {e}");
}
Expand All @@ -160,14 +191,13 @@ impl VmmReservoirManager {
/// Sets the VMM reservoir to the requested percentage of usable physical
/// RAM or to a size in MiB. Either mode will round down to the nearest
/// aligned size required by the control plane.
pub fn set_reservoir_size(
fn set_reservoir_size(
&self,
hardware: &sled_hardware::HardwareManager,
mode: ReservoirMode,
) -> Result<(), Error> {
let hardware_physical_ram_bytes = hardware.usable_physical_ram_bytes();
let req_bytes = match mode {
ReservoirMode::None => return Ok(()),
ReservoirMode::Size(mb) => {
let bytes = ByteCount::from_mebibytes_u32(mb).to_bytes();
if bytes > hardware_physical_ram_bytes {
Expand Down Expand Up @@ -221,6 +251,7 @@ impl VmmReservoirManager {
vmm_reservoir::ReservoirControl::set(reservoir_size)?;

self.reservoir_size.store(reservoir_size.to_bytes(), Ordering::SeqCst);
self.watch_tx.send_replace(Some(reservoir_size));
info!(
self.log,
"Finished setting reservoir size to {reservoir_size} bytes"
Expand All @@ -229,27 +260,3 @@ impl VmmReservoirManager {
Ok(())
}
}

impl VmmReservoirManager {
pub fn spawn(
log: &Logger,
hardware_manager: HardwareManager,
reservoir_mode: ReservoirMode,
) -> VmmReservoirManagerHandle {
let log = log.new(o!("component" => "VmmReservoirManager"));
// We use a rendevous channel to only allow one request at a time.
// Resizing a reservoir may block the thread for up to two minutes, so
// we want to ensure it is complete before allowing another call.
let (tx, rx) = flume::bounded(0);
let reservoir_size = Arc::new(AtomicU64::new(0));
let manager = VmmReservoirManager {
reservoir_size: reservoir_size.clone(),
rx,
log,
};
let _manager_handle = thread::spawn(move || {
manager.run(hardware_manager, reservoir_mode)
});
VmmReservoirManagerHandle { reservoir_size, tx, _manager_handle }
}
}

0 comments on commit bdea2f6

Please sign in to comment.