diff --git a/sled-storage/src/manager.rs b/sled-storage/src/manager.rs index 499d8edee2..2855345c3b 100644 --- a/sled-storage/src/manager.rs +++ b/sled-storage/src/manager.rs @@ -23,7 +23,7 @@ use tokio::sync::{mpsc, oneshot, watch}; const QUEUE_SIZE: usize = 256; #[derive(Debug, Clone, PartialEq, Eq)] -pub enum StorageManagerStage { +pub enum StorageManagerState { WaitingForBootDisk, WaitingForKeyManager, QueuingDisks, @@ -34,7 +34,7 @@ enum StorageRequest { AddDisk(UnparsedDisk), AddSyntheticDisk(ZpoolName), RemoveDisk(UnparsedDisk), - DisksChanged(Vec), + DisksChanged(HashSet), // NewFilesystem(NewFilesystemRequest), KeyManagerReady, } @@ -45,12 +45,76 @@ pub struct StorageHandle { resource_updates: watch::Receiver, } +impl StorageHandle { + /// Adds a disk and associated zpool to the storage manager. + pub async fn upsert_disk(&self, disk: UnparsedDisk) { + self.tx.send(StorageRequest::AddDisk(disk)).await.unwrap(); + } + + /// Adds a synthetic disk backed by a zpool to the storage manager. + pub async fn upsert_synthetic_disk(&self, pool: ZpoolName) { + self.tx.send(StorageRequest::AddSyntheticDisk(pool)).await.unwrap(); + } + + /// Removes a disk, if it's tracked by the storage manager, as well + /// as any associated zpools. + pub async fn delete_disk(&self, disk: UnparsedDisk) { + self.tx.send(StorageRequest::RemoveDisk(disk)).await.unwrap(); + } + + /// Ensures that the storage manager tracks exactly the provided disks. + /// + /// This acts similar to a batch [Self::upsert_disk] for all new disks, and + /// [Self::delete_disk] for all removed disks. + /// + /// If errors occur, an arbitrary "one" of them will be returned, but a + /// best-effort attempt to add all disks will still be attempted. + pub async fn ensure_using_exactly_these_disks(&self, unparsed_disks: I) + where + I: IntoIterator, + { + self.tx + .send(StorageRequest::DisksChanged( + unparsed_disks.into_iter().collect(), + )) + .await + .unwrap(); + } + + /// Notify the [`StorageManager`] that the [`key_manager::KeyManager`] + /// has determined what [`key_manager::SecretRetriever`] to use and + /// it is now possible to retrieve secrets and construct keys. Note + /// that in cases of using the trust quorum, it is possible that the + /// [`key_manager::SecretRetriever`] is ready, but enough key shares cannot + /// be retrieved from other sleds. In this case, we still will be unable + /// to add the disks successfully. In the common case this is a transient + /// error. In other cases it may be fatal. However, that is outside the + /// scope of the cares of this module. + pub async fn key_manager_ready(&self) { + self.tx.send(StorageRequest::KeyManagerReady).await.unwrap(); + } + + /// Wait for a boot disk to be initialized + pub async fn wait_for_boot_disk(&mut self) -> (DiskIdentity, ZpoolName) { + loop { + // We panic if the sender is dropped, as this means + // the StorageManager has gone away, which it should not do. + self.resource_updates.changed().await.unwrap(); + // Limit any RWLock related cancellation issues by immediately cloning + let resources = self.resource_updates.borrow().clone(); + if let Some((disk_id, zpool_name)) = resources.boot_disk() { + return (disk_id, zpool_name); + } + } + } +} + /// The storage manager responsible for the state of the storage /// on a sled. The storage manager runs in its own task and is interacted /// with via the [`StorageHandle`]. pub struct StorageManager { log: Logger, - stage: StorageManagerStage, + state: StorageManagerState, rx: mpsc::Receiver, resources: StorageResources, queued_u2_drives: HashSet, @@ -70,7 +134,7 @@ impl StorageManager { ( StorageManager { log: log.new(o!("component" => "StorageManager")), - stage: StorageManagerStage::WaitingForBootDisk, + state: StorageManagerState::WaitingForBootDisk, rx, resources, queued_u2_drives: HashSet::new(), @@ -127,7 +191,7 @@ impl StorageManager { &mut self, unparsed_disk: UnparsedDisk, ) -> Result<(), Error> { - if self.stage != StorageManagerStage::Normal { + if self.state != StorageManagerState::Normal { self.queued_u2_drives.insert(unparsed_disk); return Ok(()); } @@ -139,14 +203,21 @@ impl StorageManager { ) .await { - Ok(disk) => self.resources.insert_real_disk(disk), + Ok(disk) => { + if self.resources.insert_real_disk(disk)? { + let _ = self + .resource_updates + .send_replace(self.resources.clone()); + } + Ok(()) + } Err(err @ DiskError::Dataset(DatasetError::KeyManager(_))) => { warn!( self.log, "Transient error: {err} - queuing disk {:?}", unparsed_disk ); self.queued_u2_drives.insert(unparsed_disk); - self.stage = StorageManagerStage::QueuingDisks; + self.state = StorageManagerState::QueuingDisks; Err(err.into()) } Err(err) => { @@ -175,7 +246,9 @@ impl StorageManager { Some(&self.key_requester), ) .await?; - self.resources.insert_real_disk(disk)?; + if self.resources.insert_real_disk(disk)? { + let _ = self.resource_updates.send_replace(self.resources.clone()); + } Ok(()) } @@ -201,7 +274,9 @@ impl StorageManager { Some(&self.key_requester), ) .await?; - self.resources.insert_synthetic_disk(zpool_name)?; + if self.resources.insert_synthetic_disk(zpool_name)? { + let _ = self.resource_updates.send_replace(self.resources.clone()); + } Ok(()) } @@ -211,7 +286,7 @@ impl StorageManager { &mut self, zpool_name: ZpoolName, ) -> Result<(), Error> { - if self.stage != StorageManagerStage::Normal { + if self.state != StorageManagerState::Normal { info!(self.log, "Queuing synthetic U.2 drive: {zpool_name}"); self.queued_synthetic_u2_drives.insert(zpool_name); return Ok(()); @@ -232,14 +307,21 @@ impl StorageManager { ) .await { - Ok(()) => self.resources.insert_synthetic_disk(zpool_name), + Ok(()) => { + if self.resources.insert_synthetic_disk(zpool_name)? { + let _ = self + .resource_updates + .send_replace(self.resources.clone()); + } + Ok(()) + } Err(err @ DatasetError::KeyManager(_)) => { warn!( self.log, "Transient error: {err} - queuing disk {:?}", synthetic_id ); self.queued_synthetic_u2_drives.insert(zpool_name); - self.stage = StorageManagerStage::QueuingDisks; + self.state = StorageManagerState::QueuingDisks; Ok(()) } Err(err) => { @@ -324,7 +406,7 @@ mod tests { KeyManager::new(&logctx.log, HardcodedSecretRetriever {}); let (mut manager, _) = StorageManager::new(&logctx.log, key_requester); let zpool_name = ZpoolName::new_external(Uuid::new_v4()); - assert_eq!(StorageManagerStage::WaitingForBootDisk, manager.stage); + assert_eq!(StorageManagerState::WaitingForBootDisk, manager.state); manager.add_synthetic_u2_disk(zpool_name.clone()).await.unwrap(); assert!(manager.resources.all_u2_zpools().is_empty()); assert_eq!( @@ -334,11 +416,11 @@ mod tests { // Walk through other non-normal stages and enusre disk gets queued for stage in [ - StorageManagerStage::WaitingForKeyManager, - StorageManagerStage::QueuingDisks, + StorageManagerState::WaitingForKeyManager, + StorageManagerState::QueuingDisks, ] { manager.queued_synthetic_u2_drives.clear(); - manager.stage = stage; + manager.state = stage; manager.add_synthetic_u2_disk(zpool_name.clone()).await.unwrap(); assert!(manager.resources.all_u2_zpools().is_empty()); assert_eq!( @@ -363,10 +445,34 @@ mod tests { tokio::spawn(async move { key_manager.run().await }); // Set the stage to pretend we've progressed enough to have a key_manager available. - manager.stage = StorageManagerStage::Normal; + manager.state = StorageManagerState::Normal; manager.add_synthetic_u2_disk(zpool_name.clone()).await.unwrap(); assert_eq!(manager.resources.all_u2_zpools().len(), 1); Zpool::destroy(&zpool_name).unwrap(); logctx.cleanup_successful(); } + + #[tokio::test] + async fn wait_for_bootdisk() { + let logctx = test_setup_log("ensure_u2_gets_added_to_resources"); + let (mut key_manager, key_requester) = + KeyManager::new(&logctx.log, HardcodedSecretRetriever {}); + let (mut manager, mut handle) = + StorageManager::new(&logctx.log, key_requester); + // Spawn the key_manager so that it will respond to requests for encryption keys + tokio::spawn(async move { key_manager.run().await }); + + // Spawn the storage manager as done by sled-agent + tokio::spawn(async move { + manager.run().await; + }); + + // Create a synthetic internal disk + let zpool_name = ZpoolName::new_internal(Uuid::new_v4()); + let dir = tempdir().unwrap(); + let _ = new_disk(dir.path(), &zpool_name); + + handle.upsert_synthetic_disk(zpool_name.clone()).await; + handle.wait_for_boot_disk().await; + } } diff --git a/sled-storage/src/resources.rs b/sled-storage/src/resources.rs index 7601ac7b86..fb57d742e3 100644 --- a/sled-storage/src/resources.rs +++ b/sled-storage/src/resources.rs @@ -49,30 +49,47 @@ pub struct StorageResources { impl StorageResources { /// Insert a disk and its zpool - pub(crate) fn insert_real_disk(&mut self, disk: Disk) -> Result<(), Error> { + /// + /// Return true, if data was changed, false otherwise + pub(crate) fn insert_real_disk( + &mut self, + disk: Disk, + ) -> Result { let parent = disk.identity().clone(); let zpool_name = disk.zpool_name().clone(); let disk = DiskWrapper::Real { disk: disk.clone(), devfs_path: disk.devfs_path().clone(), }; + if let Some(stored) = self.disks.get(&parent) { + if stored == &disk { + return Ok(false); + } + } Arc::make_mut(&mut self.disks).insert(disk.identity(), disk); let zpool = Pool::new(zpool_name, parent)?; Arc::make_mut(&mut self.pools).insert(zpool.name.id(), zpool); - Ok(()) + Ok(true) } /// Insert a synthetic disk and its zpool + /// + /// Return true, if data was changed, false otherwise pub(crate) fn insert_synthetic_disk( &mut self, zpool_name: ZpoolName, - ) -> Result<(), Error> { + ) -> Result { let disk = DiskWrapper::Synthetic { zpool_name: zpool_name.clone() }; let parent = disk.identity().clone(); + if let Some(stored) = self.disks.get(&parent) { + if stored == &disk { + return Ok(false); + } + } Arc::make_mut(&mut self.disks).insert(disk.identity(), disk); let zpool = Pool::new(zpool_name, parent)?; Arc::make_mut(&mut self.pools).insert(zpool.name.id(), zpool); - Ok(()) + Ok(true) } /// Returns the identity of the boot disk.