diff --git a/sled-storage/src/manager.rs b/sled-storage/src/manager.rs index 2855345c3b..96119bd74e 100644 --- a/sled-storage/src/manager.rs +++ b/sled-storage/src/manager.rs @@ -17,6 +17,7 @@ use omicron_common::disk::DiskIdentity; use sled_hardware::{DiskVariant, UnparsedDisk}; use slog::{debug, error, info, o, warn, Logger}; use tokio::sync::{mpsc, oneshot, watch}; +use tokio::time::{interval, Duration, MissedTickBehavior}; // The size of the mpsc bounded channel used to communicate // between the `StorageHandle` and `StorageManager`. @@ -24,7 +25,6 @@ const QUEUE_SIZE: usize = 256; #[derive(Debug, Clone, PartialEq, Eq)] pub enum StorageManagerState { - WaitingForBootDisk, WaitingForKeyManager, QueuingDisks, Normal, @@ -37,6 +37,10 @@ enum StorageRequest { DisksChanged(HashSet), // NewFilesystem(NewFilesystemRequest), KeyManagerReady, + /// This will always grab the latest state after any new updates, as it + /// serializes through the `StorageManager` task. + /// This serialization is particularly useful for tests. + GetLatestResources(oneshot::Sender), } /// A mechanism for interacting with the [`StorageManager`] @@ -107,6 +111,20 @@ impl StorageHandle { } } } + + /// Wait for any storage resource changes + pub async fn wait_for_changes(&mut self) -> StorageResources { + self.resource_updates.changed().await.unwrap(); + self.resource_updates.borrow().clone() + } + + /// Retrieve the latest value of `StorageResources` from the + /// `StorageManager` task. + pub async fn get_latest_resources(&mut self) -> StorageResources { + let (tx, rx) = oneshot::channel(); + self.tx.send(StorageRequest::GetLatestResources(tx)).await.unwrap(); + rx.await.unwrap() + } } /// The storage manager responsible for the state of the storage @@ -134,7 +152,7 @@ impl StorageManager { ( StorageManager { log: log.new(o!("component" => "StorageManager")), - state: StorageManagerState::WaitingForBootDisk, + state: StorageManagerState::WaitingForKeyManager, rx, resources, queued_u2_drives: HashSet::new(), @@ -151,8 +169,22 @@ impl StorageManager { /// This should be spawned into a tokio task pub async fn run(&mut self) { loop { - if let Err(e) = self.step().await { - warn!(self.log, "{e}"); + const QUEUED_DISK_RETRY_TIMEOUT: Duration = Duration::from_secs(10); + let mut interval = interval(QUEUED_DISK_RETRY_TIMEOUT); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + tokio::select! { + res = self.step() => { + if let Err(e) = res { + warn!(self.log, "{e}"); + } + } + _ = interval.tick(), + if self.state == StorageManagerState::QueuingDisks => + { + // We are going to try to configure these disks again + self.state = StorageManagerState::Normal; + self.add_queued_disks().await; + } } } } @@ -181,11 +213,83 @@ impl StorageManager { } StorageRequest::RemoveDisk(_unparsed_disk) => todo!(), StorageRequest::DisksChanged(_unparsed_disks) => todo!(), - StorageRequest::KeyManagerReady => todo!(), + StorageRequest::KeyManagerReady => { + self.state = StorageManagerState::Normal; + self.add_queued_disks().await; + } + StorageRequest::GetLatestResources(tx) => { + let _ = tx.send(self.resources.clone()); + } } Ok(()) } + // Loop through all queued disks inserting them into [`StorageResources`] + // unless we hit a transient error. If we hit a transient error, we return + // and wait for the next retry window to re-call this method. If we hit a + // permanent error we log it, but we continue inserting queued disks. + async fn add_queued_disks(&mut self) { + // Operate on queued real disks + + // Disks that should be requeued. + let mut saved = HashSet::new(); + let queued = std::mem::take(&mut self.queued_u2_drives); + let mut iter = queued.into_iter(); + while let Some(disk) = iter.next() { + if self.state == StorageManagerState::QueuingDisks { + // We hit a transient error in a prior iteration. + saved.insert(disk); + } else { + // Try ot add the disk. If there was a transient error the disk will + // have been requeued. If there was a permanent error, it will have been + // dropped. If there is an another unexpected error, we will handle it and + // requeue ourselves. + if let Err(err) = self.add_u2_disk(disk.clone()).await { + warn!( + self.log, + "Potentially transient error: {err}: - requeing disk {:?}", + disk + ); + saved.insert(disk); + } + } + } + // Merge any requeued disks from transient errors with saved disks here + self.queued_u2_drives.extend(saved); + + // Operate on queued synthetic disks + if self.state == StorageManagerState::QueuingDisks { + return; + } + + let mut saved = HashSet::new(); + let queued = std::mem::take(&mut self.queued_synthetic_u2_drives); + let mut iter = queued.into_iter(); + while let Some(zpool_name) = iter.next() { + if self.state == StorageManagerState::QueuingDisks { + // We hit a transient error in a prior iteration. + saved.insert(zpool_name); + } else { + // Try ot add the disk. If there was a transient error the disk will + // have been requeued. If there was a permanent error, it will have been + // dropped. If there is an another unexpected error, we will handle it and + // requeue ourselves. + if let Err(err) = + self.add_synthetic_u2_disk(zpool_name.clone()).await + { + warn!( + self.log, + "Potentially transient error: {err}: - requeing synthetic disk {:?}", + zpool_name + ); + saved.insert(zpool_name); + } + } + } + // Merge any requeued disks from transient errors with saved disks here + self.queued_synthetic_u2_drives.extend(saved); + } + // Add a real U.2 disk to [`StorageResources`] or queue it to be added later async fn add_u2_disk( &mut self, @@ -218,7 +322,7 @@ impl StorageManager { ); self.queued_u2_drives.insert(unparsed_disk); self.state = StorageManagerState::QueuingDisks; - Err(err.into()) + Ok(()) } Err(err) => { error!( @@ -226,7 +330,7 @@ impl StorageManager { "Persistent error: {err} - not queueing disk {:?}", unparsed_disk ); - Err(err.into()) + Ok(()) } } } @@ -406,7 +510,7 @@ mod tests { KeyManager::new(&logctx.log, HardcodedSecretRetriever {}); let (mut manager, _) = StorageManager::new(&logctx.log, key_requester); let zpool_name = ZpoolName::new_external(Uuid::new_v4()); - assert_eq!(StorageManagerState::WaitingForBootDisk, manager.state); + assert_eq!(StorageManagerState::WaitingForKeyManager, manager.state); manager.add_synthetic_u2_disk(zpool_name.clone()).await.unwrap(); assert!(manager.resources.all_u2_zpools().is_empty()); assert_eq!( @@ -415,10 +519,7 @@ mod tests { ); // Walk through other non-normal stages and enusre disk gets queued - for stage in [ - StorageManagerState::WaitingForKeyManager, - StorageManagerState::QueuingDisks, - ] { + for stage in [StorageManagerState::QueuingDisks] { manager.queued_synthetic_u2_drives.clear(); manager.state = stage; manager.add_synthetic_u2_disk(zpool_name.clone()).await.unwrap(); @@ -454,7 +555,7 @@ mod tests { #[tokio::test] async fn wait_for_bootdisk() { - let logctx = test_setup_log("ensure_u2_gets_added_to_resources"); + let logctx = test_setup_log("wait_for_bootdisk"); let (mut key_manager, key_requester) = KeyManager::new(&logctx.log, HardcodedSecretRetriever {}); let (mut manager, mut handle) = @@ -474,5 +575,41 @@ mod tests { handle.upsert_synthetic_disk(zpool_name.clone()).await; handle.wait_for_boot_disk().await; + Zpool::destroy(&zpool_name).unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn queued_disks_get_added_as_resources() { + let logctx = test_setup_log("queued_disks_get_added_as_resources"); + let (mut key_manager, key_requester) = + KeyManager::new(&logctx.log, HardcodedSecretRetriever {}); + let (mut manager, mut handle) = + StorageManager::new(&logctx.log, key_requester); + + // Spawn the key_manager so that it will respond to requests for encryption keys + tokio::spawn(async move { key_manager.run().await }); + + // Spawn the storage manager as done by sled-agent + tokio::spawn(async move { + manager.run().await; + }); + + // Queue up a disks, as we haven't told the `StorageManager` that + // the `KeyManager` is ready yet. + let zpool_name = ZpoolName::new_external(Uuid::new_v4()); + let dir = tempdir().unwrap(); + let _ = new_disk(dir.path(), &zpool_name); + handle.upsert_synthetic_disk(zpool_name.clone()).await; + let resources = handle.get_latest_resources().await; + assert!(resources.all_u2_zpools().is_empty()); + + // Now inform the storage manager that the key manager is ready + // The queued disk should be successfully added + handle.key_manager_ready().await; + let resources = handle.get_latest_resources().await; + assert_eq!(resources.all_u2_zpools().len(), 1); + Zpool::destroy(&zpool_name).unwrap(); + logctx.cleanup_successful(); } }