Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjstone committed Oct 2, 2023
1 parent 9a1e916 commit 30e16c8
Showing 1 changed file with 150 additions and 13 deletions.
163 changes: 150 additions & 13 deletions sled-storage/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ use omicron_common::disk::DiskIdentity;
use sled_hardware::{DiskVariant, UnparsedDisk};
use slog::{debug, error, info, o, warn, Logger};
use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::{interval, Duration, MissedTickBehavior};

// The size of the mpsc bounded channel used to communicate
// between the `StorageHandle` and `StorageManager`.
const QUEUE_SIZE: usize = 256;

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StorageManagerState {
WaitingForBootDisk,
WaitingForKeyManager,
QueuingDisks,
Normal,
Expand All @@ -37,6 +37,10 @@ enum StorageRequest {
DisksChanged(HashSet<UnparsedDisk>),
// NewFilesystem(NewFilesystemRequest),
KeyManagerReady,
/// This will always grab the latest state after any new updates, as it
/// serializes through the `StorageManager` task.
/// This serialization is particularly useful for tests.
GetLatestResources(oneshot::Sender<StorageResources>),
}

/// A mechanism for interacting with the [`StorageManager`]
Expand Down Expand Up @@ -107,6 +111,20 @@ impl StorageHandle {
}
}
}

/// Wait for any storage resource changes
pub async fn wait_for_changes(&mut self) -> StorageResources {
self.resource_updates.changed().await.unwrap();
self.resource_updates.borrow().clone()
}

/// Retrieve the latest value of `StorageResources` from the
/// `StorageManager` task.
pub async fn get_latest_resources(&mut self) -> StorageResources {
let (tx, rx) = oneshot::channel();
self.tx.send(StorageRequest::GetLatestResources(tx)).await.unwrap();
rx.await.unwrap()
}
}

/// The storage manager responsible for the state of the storage
Expand Down Expand Up @@ -134,7 +152,7 @@ impl StorageManager {
(
StorageManager {
log: log.new(o!("component" => "StorageManager")),
state: StorageManagerState::WaitingForBootDisk,
state: StorageManagerState::WaitingForKeyManager,
rx,
resources,
queued_u2_drives: HashSet::new(),
Expand All @@ -151,8 +169,22 @@ impl StorageManager {
/// This should be spawned into a tokio task
pub async fn run(&mut self) {
loop {
if let Err(e) = self.step().await {
warn!(self.log, "{e}");
const QUEUED_DISK_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
let mut interval = interval(QUEUED_DISK_RETRY_TIMEOUT);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
tokio::select! {
res = self.step() => {
if let Err(e) = res {
warn!(self.log, "{e}");
}
}
_ = interval.tick(),
if self.state == StorageManagerState::QueuingDisks =>
{
// We are going to try to configure these disks again
self.state = StorageManagerState::Normal;
self.add_queued_disks().await;
}
}
}
}
Expand Down Expand Up @@ -181,11 +213,83 @@ impl StorageManager {
}
StorageRequest::RemoveDisk(_unparsed_disk) => todo!(),
StorageRequest::DisksChanged(_unparsed_disks) => todo!(),
StorageRequest::KeyManagerReady => todo!(),
StorageRequest::KeyManagerReady => {
self.state = StorageManagerState::Normal;
self.add_queued_disks().await;
}
StorageRequest::GetLatestResources(tx) => {
let _ = tx.send(self.resources.clone());
}
}
Ok(())
}

// Loop through all queued disks inserting them into [`StorageResources`]
// unless we hit a transient error. If we hit a transient error, we return
// and wait for the next retry window to re-call this method. If we hit a
// permanent error we log it, but we continue inserting queued disks.
async fn add_queued_disks(&mut self) {
// Operate on queued real disks

// Disks that should be requeued.
let mut saved = HashSet::new();
let queued = std::mem::take(&mut self.queued_u2_drives);
let mut iter = queued.into_iter();
while let Some(disk) = iter.next() {
if self.state == StorageManagerState::QueuingDisks {
// We hit a transient error in a prior iteration.
saved.insert(disk);
} else {
// Try ot add the disk. If there was a transient error the disk will
// have been requeued. If there was a permanent error, it will have been
// dropped. If there is an another unexpected error, we will handle it and
// requeue ourselves.
if let Err(err) = self.add_u2_disk(disk.clone()).await {
warn!(
self.log,
"Potentially transient error: {err}: - requeing disk {:?}",
disk
);
saved.insert(disk);
}
}
}
// Merge any requeued disks from transient errors with saved disks here
self.queued_u2_drives.extend(saved);

// Operate on queued synthetic disks
if self.state == StorageManagerState::QueuingDisks {
return;
}

let mut saved = HashSet::new();
let queued = std::mem::take(&mut self.queued_synthetic_u2_drives);
let mut iter = queued.into_iter();
while let Some(zpool_name) = iter.next() {
if self.state == StorageManagerState::QueuingDisks {
// We hit a transient error in a prior iteration.
saved.insert(zpool_name);
} else {
// Try ot add the disk. If there was a transient error the disk will
// have been requeued. If there was a permanent error, it will have been
// dropped. If there is an another unexpected error, we will handle it and
// requeue ourselves.
if let Err(err) =
self.add_synthetic_u2_disk(zpool_name.clone()).await
{
warn!(
self.log,
"Potentially transient error: {err}: - requeing synthetic disk {:?}",
zpool_name
);
saved.insert(zpool_name);
}
}
}
// Merge any requeued disks from transient errors with saved disks here
self.queued_synthetic_u2_drives.extend(saved);
}

// Add a real U.2 disk to [`StorageResources`] or queue it to be added later
async fn add_u2_disk(
&mut self,
Expand Down Expand Up @@ -218,15 +322,15 @@ impl StorageManager {
);
self.queued_u2_drives.insert(unparsed_disk);
self.state = StorageManagerState::QueuingDisks;
Err(err.into())
Ok(())
}
Err(err) => {
error!(
self.log,
"Persistent error: {err} - not queueing disk {:?}",
unparsed_disk
);
Err(err.into())
Ok(())
}
}
}
Expand Down Expand Up @@ -406,7 +510,7 @@ mod tests {
KeyManager::new(&logctx.log, HardcodedSecretRetriever {});
let (mut manager, _) = StorageManager::new(&logctx.log, key_requester);
let zpool_name = ZpoolName::new_external(Uuid::new_v4());
assert_eq!(StorageManagerState::WaitingForBootDisk, manager.state);
assert_eq!(StorageManagerState::WaitingForKeyManager, manager.state);
manager.add_synthetic_u2_disk(zpool_name.clone()).await.unwrap();
assert!(manager.resources.all_u2_zpools().is_empty());
assert_eq!(
Expand All @@ -415,10 +519,7 @@ mod tests {
);

// Walk through other non-normal stages and enusre disk gets queued
for stage in [
StorageManagerState::WaitingForKeyManager,
StorageManagerState::QueuingDisks,
] {
for stage in [StorageManagerState::QueuingDisks] {
manager.queued_synthetic_u2_drives.clear();
manager.state = stage;
manager.add_synthetic_u2_disk(zpool_name.clone()).await.unwrap();
Expand Down Expand Up @@ -454,7 +555,7 @@ mod tests {

#[tokio::test]
async fn wait_for_bootdisk() {
let logctx = test_setup_log("ensure_u2_gets_added_to_resources");
let logctx = test_setup_log("wait_for_bootdisk");
let (mut key_manager, key_requester) =
KeyManager::new(&logctx.log, HardcodedSecretRetriever {});
let (mut manager, mut handle) =
Expand All @@ -474,5 +575,41 @@ mod tests {

handle.upsert_synthetic_disk(zpool_name.clone()).await;
handle.wait_for_boot_disk().await;
Zpool::destroy(&zpool_name).unwrap();
logctx.cleanup_successful();
}

#[tokio::test]
async fn queued_disks_get_added_as_resources() {
let logctx = test_setup_log("queued_disks_get_added_as_resources");
let (mut key_manager, key_requester) =
KeyManager::new(&logctx.log, HardcodedSecretRetriever {});
let (mut manager, mut handle) =
StorageManager::new(&logctx.log, key_requester);

// Spawn the key_manager so that it will respond to requests for encryption keys
tokio::spawn(async move { key_manager.run().await });

// Spawn the storage manager as done by sled-agent
tokio::spawn(async move {
manager.run().await;
});

// Queue up a disks, as we haven't told the `StorageManager` that
// the `KeyManager` is ready yet.
let zpool_name = ZpoolName::new_external(Uuid::new_v4());
let dir = tempdir().unwrap();
let _ = new_disk(dir.path(), &zpool_name);
handle.upsert_synthetic_disk(zpool_name.clone()).await;
let resources = handle.get_latest_resources().await;
assert!(resources.all_u2_zpools().is_empty());

// Now inform the storage manager that the key manager is ready
// The queued disk should be successfully added
handle.key_manager_ready().await;
let resources = handle.get_latest_resources().await;
assert_eq!(resources.all_u2_zpools().len(), 1);
Zpool::destroy(&zpool_name).unwrap();
logctx.cleanup_successful();
}
}

0 comments on commit 30e16c8

Please sign in to comment.