From ad6c92ede4835f6ab875927494f95e726d753ddd Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Mon, 15 Jul 2024 15:29:12 -0700 Subject: [PATCH] [Sled Agent] Expunged disks are not in use after omicron_physical_disks_ensure (#5965) `omicron_physical_disks_ensure` is an API exposed by Sled Agent, which allows Nexus to control the set of "active" control plane disks. Although this API was exposed, it previously did not stop the Sled Agent from using expunged disks under all circumstances. This PR now adjusts the endpoint to "flush out" all old usage of disks before returning. This PR: - Ensures dump device management lets go of expunged U.2s - Ensures Zone bundles let go of expunged U.2s - Removes any network probes allocated with a transient filesystem on an expunged U.2 - Removes any VMMs allocates with a transient filesystem on an expunged U.2 Fixes https://github.com/oxidecomputer/omicron/issues/5929 --- illumos-utils/src/running_zone.rs | 171 +++------------------------ illumos-utils/src/zone.rs | 6 +- illumos-utils/src/zpool.rs | 15 ++- sled-agent/src/common/instance.rs | 13 +- sled-agent/src/dump_setup.rs | 31 +++++ sled-agent/src/instance.rs | 68 ++++++++--- sled-agent/src/instance_manager.rs | 84 ++++++++++++- sled-agent/src/long_running_tasks.rs | 18 ++- sled-agent/src/probe_manager.rs | 94 ++++++++++++--- sled-agent/src/services.rs | 33 ++++-- sled-agent/src/sim/instance.rs | 6 +- sled-agent/src/sled_agent.rs | 66 ++++++++++- sled-agent/src/storage_monitor.rs | 60 +++++++++- sled-agent/src/zone_bundle.rs | 16 +++ sled-storage/src/manager.rs | 43 +++---- sled-storage/src/resources.rs | 91 ++++++++++---- 16 files changed, 556 insertions(+), 259 deletions(-) diff --git a/illumos-utils/src/running_zone.rs b/illumos-utils/src/running_zone.rs index c529a1b6d4..a66fa44e9c 100644 --- a/illumos-utils/src/running_zone.rs +++ b/illumos-utils/src/running_zone.rs @@ -10,6 +10,7 @@ use crate::link::{Link, VnicAllocator}; use crate::opte::{Port, PortTicket}; use crate::svc::wait_for_service; use crate::zone::{AddressRequest, IPADM, ZONE_PREFIX}; +use crate::zpool::{PathInPool, ZpoolName}; use camino::{Utf8Path, Utf8PathBuf}; use camino_tempfile::Utf8TempDir; use ipnetwork::IpNetwork; @@ -101,60 +102,6 @@ pub enum EnsureAddressError { OpteGatewayConfig(#[from] RunCommandError), } -/// Errors returned from [`RunningZone::get`]. -#[derive(thiserror::Error, Debug)] -pub enum GetZoneError { - #[error("While looking up zones with prefix '{prefix}', could not get zones: {err}")] - GetZones { - prefix: String, - #[source] - err: crate::zone::AdmError, - }, - - #[error("Invalid Utf8 path: {0}")] - FromPathBuf(#[from] camino::FromPathBufError), - - #[error("Zone with prefix '{prefix}' not found")] - NotFound { prefix: String }, - - #[error("Cannot get zone '{name}': it is in the {state:?} state instead of running")] - NotRunning { name: String, state: zone::State }, - - #[error( - "Cannot get zone '{name}': Failed to acquire control interface {err}" - )] - ControlInterface { - name: String, - #[source] - err: crate::zone::GetControlInterfaceError, - }, - - #[error("Cannot get zone '{name}': Failed to create addrobj: {err}")] - AddrObject { - name: String, - #[source] - err: crate::addrobj::ParseError, - }, - - #[error( - "Cannot get zone '{name}': Failed to ensure address exists: {err}" - )] - EnsureAddress { - name: String, - #[source] - err: crate::zone::EnsureAddressError, - }, - - #[error( - "Cannot get zone '{name}': Incorrect bootstrap interface access {err}" - )] - BootstrapInterface { - name: String, - #[source] - err: crate::zone::GetBootstrapInterfaceError, - }, -} - #[cfg(target_os = "illumos")] static REAPER_THREAD: OnceLock> = OnceLock::new(); @@ -407,6 +354,11 @@ impl RunningZone { self.inner.root() } + /// Returns the zpool on which the filesystem path has been placed. + pub fn root_zpool(&self) -> Option<&ZpoolName> { + self.inner.zonepath.pool.as_ref() + } + pub fn control_interface(&self) -> AddrObject { AddrObject::new(self.inner.get_control_vnic_name(), "omicron6").unwrap() } @@ -797,95 +749,6 @@ impl RunningZone { Ok(()) } - /// Looks up a running zone based on the `zone_prefix`, if one already exists. - /// - /// - If the zone was found, is running, and has a network interface, it is - /// returned. - /// - If the zone was not found `Error::NotFound` is returned. - /// - If the zone was found, but not running, `Error::NotRunning` is - /// returned. - /// - Other errors may be returned attempting to look up and accessing an - /// address on the zone. - pub async fn get( - log: &Logger, - vnic_allocator: &VnicAllocator, - zone_prefix: &str, - addrtype: AddressRequest, - ) -> Result { - let zone_info = Zones::get() - .await - .map_err(|err| GetZoneError::GetZones { - prefix: zone_prefix.to_string(), - err, - })? - .into_iter() - .find(|zone_info| zone_info.name().starts_with(&zone_prefix)) - .ok_or_else(|| GetZoneError::NotFound { - prefix: zone_prefix.to_string(), - })?; - - if zone_info.state() != zone::State::Running { - return Err(GetZoneError::NotRunning { - name: zone_info.name().to_string(), - state: zone_info.state(), - }); - } - - let zone_name = zone_info.name(); - let vnic_name = - Zones::get_control_interface(zone_name).map_err(|err| { - GetZoneError::ControlInterface { - name: zone_name.to_string(), - err, - } - })?; - let addrobj = AddrObject::new_control(&vnic_name).map_err(|err| { - GetZoneError::AddrObject { name: zone_name.to_string(), err } - })?; - Zones::ensure_address(Some(zone_name), &addrobj, addrtype).map_err( - |err| GetZoneError::EnsureAddress { - name: zone_name.to_string(), - err, - }, - )?; - - let control_vnic = vnic_allocator - .wrap_existing(vnic_name) - .expect("Failed to wrap valid control VNIC"); - - // The bootstrap address for a running zone never changes, - // so there's no need to call `Zones::ensure_address`. - // Currently, only the switch zone has a bootstrap interface. - let bootstrap_vnic = Zones::get_bootstrap_interface(zone_name) - .map_err(|err| GetZoneError::BootstrapInterface { - name: zone_name.to_string(), - err, - })? - .map(|name| { - vnic_allocator - .wrap_existing(name) - .expect("Failed to wrap valid bootstrap VNIC") - }); - - Ok(Self { - id: zone_info.id().map(|x| { - x.try_into().expect("zoneid_t is expected to be an i32") - }), - inner: InstalledZone { - log: log.new(o!("zone" => zone_name.to_string())), - zonepath: zone_info.path().to_path_buf().try_into()?, - name: zone_name.to_string(), - control_vnic, - // TODO(https://github.com/oxidecomputer/omicron/issues/725) - // - // Re-initialize guest_vnic state by inspecting the zone. - opte_ports: vec![], - links: vec![], - bootstrap_vnic, - }, - }) - } - /// Return references to the OPTE ports for this zone. pub fn opte_ports(&self) -> impl Iterator { self.inner.opte_ports() @@ -1081,7 +944,7 @@ pub struct InstalledZone { log: Logger, // Filesystem path of the zone - zonepath: Utf8PathBuf, + zonepath: PathInPool, // Name of the Zone. name: String, @@ -1131,7 +994,7 @@ impl InstalledZone { /// Returns the filesystem path to the zonepath pub fn zonepath(&self) -> &Utf8Path { - &self.zonepath + &self.zonepath.path } pub fn site_profile_xml_path(&self) -> Utf8PathBuf { @@ -1147,7 +1010,7 @@ impl InstalledZone { /// Returns the filesystem path to the zone's root in the GZ. pub fn root(&self) -> Utf8PathBuf { - self.zonepath.join(Self::ROOT_FS_PATH) + self.zonepath.path.join(Self::ROOT_FS_PATH) } } @@ -1198,7 +1061,7 @@ pub struct ZoneBuilder<'a> { /// Allocates the NIC used for control plane communication. underlay_vnic_allocator: Option<&'a VnicAllocator>, /// Filesystem path at which the installed zone will reside. - zone_root_path: Option<&'a Utf8Path>, + zone_root_path: Option, /// The directories that will be searched for the image tarball for the /// provided zone type ([`Self::with_zone_type`]). zone_image_paths: Option<&'a [Utf8PathBuf]>, @@ -1251,7 +1114,7 @@ impl<'a> ZoneBuilder<'a> { } /// Filesystem path at which the installed zone will reside. - pub fn with_zone_root_path(mut self, root_path: &'a Utf8Path) -> Self { + pub fn with_zone_root_path(mut self, root_path: PathInPool) -> Self { self.zone_root_path = Some(root_path); self } @@ -1345,8 +1208,11 @@ impl<'a> ZoneBuilder<'a> { self.zone_type?, self.unique_name, ); - let zonepath = temp_dir - .join(self.zone_root_path?.strip_prefix("/").unwrap()) + let mut zonepath = self.zone_root_path?; + zonepath.path = temp_dir + .join( + zonepath.path.strip_prefix("/").unwrap() + ) .join(&full_zone_name); let iz = InstalledZone { log: self.log?, @@ -1376,7 +1242,7 @@ impl<'a> ZoneBuilder<'a> { let Self { log: Some(log), underlay_vnic_allocator: Some(underlay_vnic_allocator), - zone_root_path: Some(zone_root_path), + zone_root_path: Some(mut zone_root_path), zone_image_paths: Some(zone_image_paths), zone_type: Some(zone_type), unique_name, @@ -1440,6 +1306,7 @@ impl<'a> ZoneBuilder<'a> { net_device_names.sort(); net_device_names.dedup(); + zone_root_path.path = zone_root_path.path.join(&full_zone_name); Zones::install_omicron_zone( &log, &zone_root_path, @@ -1460,7 +1327,7 @@ impl<'a> ZoneBuilder<'a> { Ok(InstalledZone { log: log.new(o!("zone" => full_zone_name.clone())), - zonepath: zone_root_path.join(&full_zone_name), + zonepath: zone_root_path, name: full_zone_name, control_vnic, bootstrap_vnic, diff --git a/illumos-utils/src/zone.rs b/illumos-utils/src/zone.rs index 3f749fc352..7ba40af043 100644 --- a/illumos-utils/src/zone.rs +++ b/illumos-utils/src/zone.rs @@ -14,6 +14,7 @@ use std::net::{IpAddr, Ipv6Addr}; use crate::addrobj::AddrObject; use crate::dladm::{EtherstubVnic, VNIC_PREFIX_BOOTSTRAP, VNIC_PREFIX_CONTROL}; +use crate::zpool::PathInPool; use crate::{execute, PFEXEC}; use omicron_common::address::SLED_PREFIX; @@ -282,7 +283,7 @@ impl Zones { #[allow(clippy::too_many_arguments)] pub async fn install_omicron_zone( log: &Logger, - zone_root_path: &Utf8Path, + zone_root_path: &PathInPool, zone_name: &str, zone_image: &Utf8Path, datasets: &[zone::Dataset], @@ -319,10 +320,9 @@ impl Zones { true, zone::CreationOptions::Blank, ); - let path = zone_root_path.join(zone_name); cfg.get_global() .set_brand("omicron1") - .set_path(&path) + .set_path(&zone_root_path.path) .set_autoboot(false) .set_ip_type(zone::IpType::Exclusive); if !limit_priv.is_empty() { diff --git a/illumos-utils/src/zpool.rs b/illumos-utils/src/zpool.rs index fa93760f99..5dabbdecc7 100644 --- a/illumos-utils/src/zpool.rs +++ b/illumos-utils/src/zpool.rs @@ -5,7 +5,7 @@ //! Utilities for managing Zpools. use crate::{execute, ExecutionError, PFEXEC}; -use camino::Utf8Path; +use camino::{Utf8Path, Utf8PathBuf}; use std::str::FromStr; pub use omicron_common::zpool_name::ZpoolName; @@ -181,6 +181,19 @@ impl FromStr for ZpoolInfo { /// Wraps commands for interacting with ZFS pools. pub struct Zpool {} +/// A path which exists within a pool. +/// +/// By storing these types together, it's possible to answer +/// whether or not a path exists on a particular device. +// Technically we could re-derive the pool name from the path, +// but that involves some string parsing, and honestly I'd just +// Rather Not. +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct PathInPool { + pub pool: Option, + pub path: Utf8PathBuf, +} + #[cfg_attr(any(test, feature = "testing"), mockall::automock, allow(dead_code))] impl Zpool { pub fn create( diff --git a/sled-agent/src/common/instance.rs b/sled-agent/src/common/instance.rs index ed0aceff82..0fe2e27698 100644 --- a/sled-agent/src/common/instance.rs +++ b/sled-agent/src/common/instance.rs @@ -486,9 +486,15 @@ impl InstanceStates { /// instance's state in Nexus may become inconsistent. This routine should /// therefore only be invoked by callers who know that an instance is not /// migrating. - pub(crate) fn terminate_rudely(&mut self) { + pub(crate) fn terminate_rudely(&mut self, mark_failed: bool) { + let vmm_state = if mark_failed { + PropolisInstanceState(PropolisApiState::Failed) + } else { + PropolisInstanceState(PropolisApiState::Destroyed) + }; + let fake_observed = ObservedPropolisState { - vmm_state: PropolisInstanceState(PropolisApiState::Destroyed), + vmm_state, migration_status: if self.instance.migration_id.is_some() { ObservedMigrationStatus::Failed } else { @@ -893,7 +899,8 @@ mod test { assert_eq!(state.propolis_role(), PropolisRole::MigrationTarget); let prev = state.clone(); - state.terminate_rudely(); + let mark_failed = false; + state.terminate_rudely(mark_failed); assert_state_change_has_gen_change(&prev, &state); assert_eq!(state.instance.gen, prev.instance.gen); diff --git a/sled-agent/src/dump_setup.rs b/sled-agent/src/dump_setup.rs index 02d3d41dd7..02d40195cf 100644 --- a/sled-agent/src/dump_setup.rs +++ b/sled-agent/src/dump_setup.rs @@ -100,6 +100,7 @@ use std::ffi::OsString; use std::path::{Path, PathBuf}; use std::time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH}; use tokio::sync::mpsc::Receiver; +use tokio::sync::oneshot; use zone::{Zone, ZoneError}; const ZFS_PROP_USED: &str = "used"; @@ -175,6 +176,7 @@ enum DumpSetupCmd { dump_slices: Vec, debug_datasets: Vec, core_datasets: Vec, + update_complete_tx: oneshot::Sender<()>, }, } @@ -222,6 +224,12 @@ impl DumpSetup { Self { tx, mount_config, _poller, log } } + /// Given the set of all managed disks, updates the dump device location + /// for logs and dumps. + /// + /// This function returns only once this request has been handled, which + /// can be used as a signal by callers that any "old disks" are no longer + /// being used by [DumpSetup]. pub(crate) async fn update_dumpdev_setup( &self, disks: impl Iterator, @@ -279,16 +287,22 @@ impl DumpSetup { } } + let (tx, rx) = oneshot::channel(); if let Err(err) = self .tx .send(DumpSetupCmd::UpdateDumpdevSetup { dump_slices: m2_dump_slices, debug_datasets: u2_debug_datasets, core_datasets: m2_core_datasets, + update_complete_tx: tx, }) .await { error!(log, "DumpSetup channel closed: {:?}", err.0); + }; + + if let Err(err) = rx.await { + error!(log, "DumpSetup failed to await update"; "err" => ?err); } } } @@ -504,6 +518,14 @@ impl DumpSetupWorker { async fn poll_file_archival(mut self) { info!(self.log, "DumpSetup poll loop started."); + + // A oneshot which helps callers track when updates have propagated. + // + // This is particularly useful for disk expungement, when a caller + // wants to ensure that the dump device is no longer accessing an + // old device. + let mut evaluation_and_archiving_complete_tx = None; + loop { match tokio::time::timeout(ARCHIVAL_INTERVAL, self.rx.recv()).await { @@ -511,7 +533,10 @@ impl DumpSetupWorker { dump_slices, debug_datasets, core_datasets, + update_complete_tx, })) => { + evaluation_and_archiving_complete_tx = + Some(update_complete_tx); self.update_disk_loadout( dump_slices, debug_datasets, @@ -537,6 +562,12 @@ impl DumpSetupWorker { if let Err(err) = self.archive_files().await { error!(self.log, "Failed to archive debug/dump files: {err:?}"); } + + if let Some(tx) = evaluation_and_archiving_complete_tx.take() { + if let Err(err) = tx.send(()) { + error!(self.log, "DumpDevice failed to notify caller"; "err" => ?err); + } + } } } diff --git a/sled-agent/src/instance.rs b/sled-agent/src/instance.rs index ec4d503e7b..38b97173fc 100644 --- a/sled-agent/src/instance.rs +++ b/sled-agent/src/instance.rs @@ -39,9 +39,10 @@ use omicron_common::api::internal::shared::{ NetworkInterface, SourceNatConfig, }; use omicron_common::backoff; +use omicron_common::zpool_name::ZpoolName; use omicron_uuid_kinds::{GenericUuid, InstanceUuid, PropolisUuid}; use propolis_client::Client as PropolisClient; -use rand::prelude::SliceRandom; +use rand::prelude::IteratorRandom; use rand::SeedableRng; use sled_storage::dataset::ZONE_DATASET; use sled_storage::manager::StorageHandle; @@ -214,6 +215,9 @@ enum InstanceRequest { RequestZoneBundle { tx: oneshot::Sender>, }, + GetFilesystemPool { + tx: oneshot::Sender>, + }, CurrentState { tx: oneshot::Sender, }, @@ -227,6 +231,7 @@ enum InstanceRequest { tx: oneshot::Sender>, }, Terminate { + mark_failed: bool, tx: oneshot::Sender>, }, IssueSnapshotRequest { @@ -391,7 +396,8 @@ impl InstanceRunner { // of the sender alive in "self.tx_monitor". None => { warn!(self.log, "Instance 'VMM monitor' channel closed; shutting down"); - self.terminate().await; + let mark_failed = true; + self.terminate(mark_failed).await; }, } @@ -405,6 +411,10 @@ impl InstanceRunner { tx.send(self.request_zone_bundle().await) .map_err(|_| Error::FailedSendClientClosed) }, + Some(GetFilesystemPool { tx } ) => { + tx.send(self.get_filesystem_zpool()) + .map_err(|_| Error::FailedSendClientClosed) + }, Some(CurrentState{ tx }) => { tx.send(self.current_state()) .map_err(|_| Error::FailedSendClientClosed) @@ -424,9 +434,9 @@ impl InstanceRunner { ) .map_err(|_| Error::FailedSendClientClosed) }, - Some(Terminate { tx }) => { + Some(Terminate { mark_failed, tx }) => { tx.send(Ok(InstanceUnregisterResponse { - updated_runtime: Some(self.terminate().await) + updated_runtime: Some(self.terminate(mark_failed).await) })) .map_err(|_| Error::FailedSendClientClosed) }, @@ -449,7 +459,8 @@ impl InstanceRunner { }, None => { warn!(self.log, "Instance request channel closed; shutting down"); - self.terminate().await; + let mark_failed = false; + self.terminate(mark_failed).await; break; }, }; @@ -609,8 +620,8 @@ impl InstanceRunner { Some(InstanceAction::Destroy) => { info!(self.log, "terminating VMM that has exited"; "instance_id" => %self.id()); - - self.terminate().await; + let mark_failed = false; + self.terminate(mark_failed).await; Reaction::Terminate } None => Reaction::Continue, @@ -1059,6 +1070,17 @@ impl Instance { Ok(()) } + pub async fn get_filesystem_zpool( + &self, + ) -> Result, Error> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(InstanceRequest::GetFilesystemPool { tx }) + .await + .map_err(|_| Error::FailedSendChannelClosed)?; + Ok(rx.await?) + } + pub async fn current_state(&self) -> Result { let (tx, rx) = oneshot::channel(); self.tx @@ -1113,9 +1135,10 @@ impl Instance { pub async fn terminate( &self, tx: oneshot::Sender>, + mark_failed: bool, ) -> Result<(), Error> { self.tx - .send(InstanceRequest::Terminate { tx }) + .send(InstanceRequest::Terminate { mark_failed, tx }) .await .map_err(|_| Error::FailedSendChannelClosed)?; Ok(()) @@ -1180,6 +1203,13 @@ impl InstanceRunner { } } + fn get_filesystem_zpool(&self) -> Option { + let Some(run_state) = &self.running_state else { + return None; + }; + run_state.running_zone.root_zpool().map(|p| p.clone()) + } + fn current_state(&self) -> SledInstanceState { self.state.sled_instance_state() } @@ -1228,7 +1258,8 @@ impl InstanceRunner { // This case is morally equivalent to starting Propolis and then // rudely terminating it before asking it to do anything. Update // the VMM and instance states accordingly. - self.state.terminate_rudely(); + let mark_failed = false; + self.state.terminate_rudely(mark_failed); } setup_result?; } @@ -1255,7 +1286,8 @@ impl InstanceRunner { // this happens, generate an instance record bearing the // "Destroyed" state and return it to the caller. if self.running_state.is_none() { - self.terminate().await; + let mark_failed = false; + self.terminate(mark_failed).await; (None, None) } else { ( @@ -1343,20 +1375,22 @@ impl InstanceRunner { // configured VNICs. let zname = propolis_zone_name(self.propolis_id()); let mut rng = rand::rngs::StdRng::from_entropy(); - let root = self + let latest_disks = self .storage .get_latest_disks() .await - .all_u2_mountpoints(ZONE_DATASET) + .all_u2_mountpoints(ZONE_DATASET); + + let root = latest_disks + .into_iter() .choose(&mut rng) - .ok_or_else(|| Error::U2NotFound)? - .clone(); + .ok_or_else(|| Error::U2NotFound)?; let installed_zone = self .zone_builder_factory .builder() .with_log(self.log.clone()) .with_underlay_vnic_allocator(&self.vnic_allocator) - .with_zone_root_path(&root) + .with_zone_root_path(root) .with_zone_image_paths(&["/opt/oxide".into()]) .with_zone_type("propolis-server") .with_unique_name(self.propolis_id().into_untyped_uuid()) @@ -1453,9 +1487,9 @@ impl InstanceRunner { Ok(PropolisSetup { client, running_zone }) } - async fn terminate(&mut self) -> SledInstanceState { + async fn terminate(&mut self, mark_failed: bool) -> SledInstanceState { self.terminate_inner().await; - self.state.terminate_rudely(); + self.state.terminate_rudely(mark_failed); // This causes the "run" task to exit on the next iteration. self.should_terminate = true; diff --git a/sled-agent/src/instance_manager.rs b/sled-agent/src/instance_manager.rs index beeb8377d2..cfb96fb8c9 100644 --- a/sled-agent/src/instance_manager.rs +++ b/sled-agent/src/instance_manager.rs @@ -24,14 +24,16 @@ use illumos_utils::dladm::Etherstub; use illumos_utils::link::VnicAllocator; use illumos_utils::opte::PortManager; use illumos_utils::running_zone::ZoneBuilderFactory; +use omicron_common::api::external::Generation; use omicron_common::api::internal::nexus::InstanceRuntimeState; use omicron_common::api::internal::nexus::SledInstanceState; use omicron_common::api::internal::nexus::VmmRuntimeState; use omicron_uuid_kinds::InstanceUuid; use omicron_uuid_kinds::PropolisUuid; use sled_storage::manager::StorageHandle; +use sled_storage::resources::AllDisks; use slog::Logger; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashSet}; use std::net::SocketAddr; use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; @@ -119,6 +121,7 @@ impl InstanceManager { instances: BTreeMap::new(), vnic_allocator: VnicAllocator::new("Instance", etherstub), port_manager, + storage_generation: None, storage, zone_bundler, zone_builder_factory, @@ -325,6 +328,23 @@ impl InstanceManager { .map_err(|_| Error::FailedSendInstanceManagerClosed)?; rx.await? } + + /// Marks instances failed unless they're using storage from `disks`. + /// + /// This function looks for transient zone filesystem usage on expunged + /// zpools. + pub async fn use_only_these_disks( + &self, + disks: AllDisks, + ) -> Result<(), Error> { + let (tx, rx) = oneshot::channel(); + self.inner + .tx + .send(InstanceManagerRequest::OnlyUseDisks { disks, tx }) + .await + .map_err(|_| Error::FailedSendInstanceManagerClosed)?; + rx.await? + } } // Most requests that can be sent to the "InstanceManagerRunner" task. @@ -384,6 +404,10 @@ enum InstanceManagerRequest { instance_id: InstanceUuid, tx: oneshot::Sender>, }, + OnlyUseDisks { + disks: AllDisks, + tx: oneshot::Sender>, + }, } // Requests that the instance manager stop processing information about a @@ -420,6 +444,7 @@ struct InstanceManagerRunner { vnic_allocator: VnicAllocator, port_manager: PortManager, + storage_generation: Option, storage: StorageHandle, zone_bundler: ZoneBundler, zone_builder_factory: ZoneBuilderFactory, @@ -494,6 +519,10 @@ impl InstanceManagerRunner { // the state... self.get_instance_state(tx, instance_id).await }, + Some(OnlyUseDisks { disks, tx } ) => { + self.use_only_these_disks(disks).await; + tx.send(Ok(())).map_err(|_| Error::FailedSendClientClosed) + }, None => { warn!(self.log, "InstanceManager's request channel closed; shutting down"); break; @@ -638,7 +667,8 @@ impl InstanceManagerRunner { // Otherwise, we pipeline the request, and send it to the instance, // where it can receive an appropriate response. - instance.terminate(tx).await?; + let mark_failed = false; + instance.terminate(tx, mark_failed).await?; Ok(()) } @@ -775,6 +805,56 @@ impl InstanceManagerRunner { tx.send(Ok(state)).map_err(|_| Error::FailedSendClientClosed)?; Ok(()) } + + async fn use_only_these_disks(&mut self, disks: AllDisks) { + // Consider the generation number on the incoming request to avoid + // applying old requests. + let requested_generation = *disks.generation(); + if let Some(last_gen) = self.storage_generation { + if last_gen >= requested_generation { + // This request looks old, ignore it. + info!(self.log, "use_only_these_disks: Ignoring request"; + "last_gen" => ?last_gen, "requested_gen" => ?requested_generation); + return; + } + } + self.storage_generation = Some(requested_generation); + info!(self.log, "use_only_these_disks: Processing new request"; + "gen" => ?requested_generation); + + let u2_set: HashSet<_> = disks.all_u2_zpools().into_iter().collect(); + + let mut to_remove = vec![]; + for (id, (_, instance)) in self.instances.iter() { + // If we can read the filesystem pool, consider it. Otherwise, move + // on, to prevent blocking the cleanup of other instances. + let Ok(Some(filesystem_pool)) = + instance.get_filesystem_zpool().await + else { + info!(self.log, "use_only_these_disks: Cannot read filesystem pool"; "instance_id" => ?id); + continue; + }; + if !u2_set.contains(&filesystem_pool) { + to_remove.push(*id); + } + } + + for id in to_remove { + info!(self.log, "use_only_these_disks: Removing instance"; "instance_id" => ?id); + if let Some((_, instance)) = self.instances.remove(&id) { + let (tx, rx) = oneshot::channel(); + let mark_failed = true; + if let Err(e) = instance.terminate(tx, mark_failed).await { + warn!(self.log, "use_only_these_disks: Failed to request instance removal"; "err" => ?e); + continue; + } + + if let Err(e) = rx.await { + warn!(self.log, "use_only_these_disks: Failed while removing instance"; "err" => ?e); + } + } + } + } } /// Represents membership of an instance in the [`InstanceManager`]. diff --git a/sled-agent/src/long_running_tasks.rs b/sled-agent/src/long_running_tasks.rs index faea94f552..e920ffc3fc 100644 --- a/sled-agent/src/long_running_tasks.rs +++ b/sled-agent/src/long_running_tasks.rs @@ -20,7 +20,7 @@ use crate::config::Config; use crate::hardware_monitor::HardwareMonitor; use crate::services::ServiceManager; use crate::sled_agent::SledAgent; -use crate::storage_monitor::StorageMonitor; +use crate::storage_monitor::{StorageMonitor, StorageMonitorHandle}; use crate::zone_bundle::{CleanupContext, ZoneBundler}; use bootstore::schemes::v0 as bootstore; use key_manager::{KeyManager, StorageKeyRequester}; @@ -46,6 +46,10 @@ pub struct LongRunningTaskHandles { /// for establishing zpools on disks and managing their datasets. pub storage_manager: StorageHandle, + /// A mechanism for talking to the [`StorageMonitor`], which reacts to disk + /// changes and updates the dump devices. + pub storage_monitor_handle: StorageMonitorHandle, + /// A mechanism for interacting with the hardware device tree pub hardware_manager: HardwareManager, @@ -71,7 +75,8 @@ pub async fn spawn_all_longrunning_tasks( let mut storage_manager = spawn_storage_manager(log, storage_key_requester.clone()); - spawn_storage_monitor(log, storage_manager.clone()); + let storage_monitor_handle = + spawn_storage_monitor(log, storage_manager.clone()); let nongimlet_observed_disks = config.nongimlet_observed_disks.clone().unwrap_or(vec![]); @@ -106,6 +111,7 @@ pub async fn spawn_all_longrunning_tasks( LongRunningTaskHandles { storage_key_requester, storage_manager, + storage_monitor_handle, hardware_manager, bootstore, zone_bundler, @@ -137,13 +143,17 @@ fn spawn_storage_manager( handle } -fn spawn_storage_monitor(log: &Logger, storage_handle: StorageHandle) { +fn spawn_storage_monitor( + log: &Logger, + storage_handle: StorageHandle, +) -> StorageMonitorHandle { info!(log, "Starting StorageMonitor"); - let storage_monitor = + let (storage_monitor, handle) = StorageMonitor::new(log, MountConfig::default(), storage_handle); tokio::spawn(async move { storage_monitor.run().await; }); + handle } async fn spawn_hardware_manager( diff --git a/sled-agent/src/probe_manager.rs b/sled-agent/src/probe_manager.rs index 40af604645..9451484f21 100644 --- a/sled-agent/src/probe_manager.rs +++ b/sled-agent/src/probe_manager.rs @@ -10,20 +10,21 @@ use nexus_client::types::{ BackgroundTasksActivateRequest, ProbeExternalIp, ProbeInfo, }; use omicron_common::api::external::{ - VpcFirewallRuleAction, VpcFirewallRuleDirection, VpcFirewallRulePriority, - VpcFirewallRuleStatus, + Generation, VpcFirewallRuleAction, VpcFirewallRuleDirection, + VpcFirewallRulePriority, VpcFirewallRuleStatus, }; use omicron_common::api::internal::shared::NetworkInterface; -use rand::prelude::SliceRandom; +use rand::prelude::IteratorRandom; use rand::SeedableRng; use sled_storage::dataset::ZONE_DATASET; use sled_storage::manager::StorageHandle; +use sled_storage::resources::AllDisks; use slog::{error, warn, Logger}; use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; use std::sync::Arc; use std::time::Duration; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, MutexGuard}; use tokio::task::JoinHandle; use tokio::time::sleep; use uuid::Uuid; @@ -45,6 +46,11 @@ pub(crate) struct ProbeManager { inner: Arc, } +struct RunningProbes { + storage_generation: Option, + zones: HashMap, +} + pub(crate) struct ProbeManagerInner { join_handle: Mutex>>, nexus_client: NexusClientWithResolver, @@ -53,7 +59,7 @@ pub(crate) struct ProbeManagerInner { vnic_allocator: VnicAllocator, storage: StorageHandle, port_manager: PortManager, - running_probes: Mutex>, + running_probes: Mutex, } impl ProbeManager { @@ -72,7 +78,10 @@ impl ProbeManager { VNIC_ALLOCATOR_SCOPE, etherstub, ), - running_probes: Mutex::new(HashMap::new()), + running_probes: Mutex::new(RunningProbes { + storage_generation: None, + zones: HashMap::new(), + }), nexus_client, log, sled_id, @@ -85,6 +94,51 @@ impl ProbeManager { pub(crate) async fn run(&self) { self.inner.run().await; } + + /// Removes any probes using filesystem roots on zpools that are not + /// contained in the set of "disks". + pub(crate) async fn use_only_these_disks(&self, disks: &AllDisks) { + let u2_set: HashSet<_> = disks.all_u2_zpools().into_iter().collect(); + let mut probes = self.inner.running_probes.lock().await; + + // Consider the generation number on the incoming request to avoid + // applying old requests. + let requested_generation = *disks.generation(); + if let Some(last_gen) = probes.storage_generation { + if last_gen >= requested_generation { + // This request looks old, ignore it. + info!(self.inner.log, "use_only_these_disks: Ignoring request"; + "last_gen" => ?last_gen, "requested_gen" => ?requested_generation); + return; + } + } + probes.storage_generation = Some(requested_generation); + info!(self.inner.log, "use_only_these_disks: Processing new request"; + "gen" => ?requested_generation); + + let to_remove = probes + .zones + .iter() + .filter_map(|(id, probe)| { + let Some(probe_pool) = probe.root_zpool() else { + // No known pool for this probe + info!(self.inner.log, "use_only_these_disks: Cannot read filesystem pool"; "id" => ?id); + return None; + }; + + if !u2_set.contains(probe_pool) { + Some(*id) + } else { + None + } + }) + .collect::>(); + + for probe_id in to_remove { + info!(self.inner.log, "use_only_these_disks: Removing probe"; "probe_id" => ?probe_id); + self.inner.remove_probe_locked(&mut probes, probe_id).await; + } + } } /// State information about a probe. This is a common representation that @@ -226,14 +280,15 @@ impl ProbeManagerInner { /// boots the probe zone. async fn add_probe(self: &Arc, probe: &ProbeState) -> Result<()> { let mut rng = rand::rngs::StdRng::from_entropy(); - let root = self + let current_disks = self .storage .get_latest_disks() .await - .all_u2_mountpoints(ZONE_DATASET) + .all_u2_mountpoints(ZONE_DATASET); + let zone_root_path = current_disks + .into_iter() .choose(&mut rng) - .ok_or_else(|| anyhow!("u2 not found"))? - .clone(); + .ok_or_else(|| anyhow!("u2 not found"))?; let nic = probe .interface @@ -268,7 +323,7 @@ impl ProbeManagerInner { .builder() .with_log(self.log.clone()) .with_underlay_vnic_allocator(&self.vnic_allocator) - .with_zone_root_path(&root) + .with_zone_root_path(zone_root_path) .with_zone_image_paths(&["/opt/oxide".into()]) .with_zone_type("probe") .with_unique_name(probe.id) @@ -290,13 +345,13 @@ impl ProbeManagerInner { rz.ensure_address_for_port("overlay", 0).await?; info!(self.log, "started probe {}", probe.id); - self.running_probes.lock().await.insert(probe.id, rz); + self.running_probes.lock().await.zones.insert(probe.id, rz); Ok(()) } /// Remove a set of probes from this sled. - async fn remove<'a, I>(self: &Arc, probes: I) + async fn remove<'a, I>(&self, probes: I) where I: Iterator, { @@ -308,8 +363,17 @@ impl ProbeManagerInner { /// Remove a probe from this sled. This tears down the zone and it's /// network resources. - async fn remove_probe(self: &Arc, id: Uuid) { - match self.running_probes.lock().await.remove(&id) { + async fn remove_probe(&self, id: Uuid) { + let mut probes = self.running_probes.lock().await; + self.remove_probe_locked(&mut probes, id).await + } + + async fn remove_probe_locked( + &self, + probes: &mut MutexGuard<'_, RunningProbes>, + id: Uuid, + ) { + match probes.zones.remove(&id) { Some(mut running_zone) => { for l in running_zone.links_mut() { if let Err(e) = l.delete() { diff --git a/sled-agent/src/services.rs b/sled-agent/src/services.rs index f4e9f8da0a..6bf8a4fbe5 100644 --- a/sled-agent/src/services.rs +++ b/sled-agent/src/services.rs @@ -57,7 +57,7 @@ use illumos_utils::running_zone::{ }; use illumos_utils::zfs::ZONE_ZFS_RAMDISK_DATASET_MOUNTPOINT; use illumos_utils::zone::AddressRequest; -use illumos_utils::zpool::ZpoolName; +use illumos_utils::zpool::{PathInPool, ZpoolName}; use illumos_utils::{execute, PFEXEC}; use internal_dns::resolver::Resolver; use itertools::Itertools; @@ -445,6 +445,11 @@ impl OmicronZonesConfigLocal { /// Combines the Nexus-provided `OmicronZoneConfig` (which describes what Nexus /// wants for this zone) with any locally-determined configuration (like the /// path to the root filesystem) +// +// NOTE: Although the path to the root filesystem is not exactly equal to the +// ZpoolName, it is derivable from it, and the ZpoolName for the root filesystem +// is now being supplied as a part of OmicronZoneConfig. Therefore, this struct +// is less necessary than it has been historically. #[derive( Clone, Debug, @@ -551,10 +556,15 @@ impl<'a> ZoneArgs<'a> { } /// Return the root filesystem path for this zone - pub fn root(&self) -> &Utf8Path { + pub fn root(&self) -> PathInPool { match self { - ZoneArgs::Omicron(zone_config) => &zone_config.root, - ZoneArgs::Switch(zone_request) => &zone_request.root, + ZoneArgs::Omicron(zone_config) => PathInPool { + pool: zone_config.zone.filesystem_pool.clone(), + path: zone_config.root.clone(), + }, + ZoneArgs::Switch(zone_request) => { + PathInPool { pool: None, path: zone_request.root.clone() } + } } } } @@ -1436,7 +1446,7 @@ impl ServiceManager { let all_disks = self.inner.storage.get_latest_disks().await; if let Some((_, boot_zpool)) = all_disks.boot_disk() { zone_image_paths.push(boot_zpool.dataset_mountpoint( - &all_disks.mount_config.root, + &all_disks.mount_config().root, INSTALL_DATASET, )); } @@ -1462,7 +1472,7 @@ impl ServiceManager { let installed_zone = zone_builder .with_log(self.inner.log.clone()) .with_underlay_vnic_allocator(&self.inner.underlay_vnic_allocator) - .with_zone_root_path(&request.root()) + .with_zone_root_path(request.root()) .with_zone_image_paths(zone_image_paths.as_slice()) .with_zone_type(&zone_type_str) .with_datasets(datasets.as_slice()) @@ -2904,7 +2914,8 @@ impl ServiceManager { ) .await?; - let config = OmicronZoneConfigLocal { zone: zone.clone(), root }; + let config = + OmicronZoneConfigLocal { zone: zone.clone(), root: root.path }; let runtime = self .initialize_zone( @@ -3172,7 +3183,7 @@ impl ServiceManager { // Collect information that's necessary to start new zones let storage = self.inner.storage.get_latest_disks().await; - let mount_config = &storage.mount_config; + let mount_config = storage.mount_config(); let all_u2_pools = storage.all_u2_zpools(); let time_is_synchronized = match self.timesync_get_locked(&existing_zones).await { @@ -3289,7 +3300,7 @@ impl ServiceManager { mount_config: &MountConfig, zone: &OmicronZoneConfig, all_u2_pools: &Vec, - ) -> Result { + ) -> Result { let name = zone.zone_name(); // If the caller has requested a specific durable dataset, @@ -3368,7 +3379,9 @@ impl ServiceManager { device: format!("zpool: {filesystem_pool}"), }); } - Ok(filesystem_pool.dataset_mountpoint(&mount_config.root, ZONE_DATASET)) + let path = filesystem_pool + .dataset_mountpoint(&mount_config.root, ZONE_DATASET); + Ok(PathInPool { pool: Some(filesystem_pool), path }) } pub async fn cockroachdb_initialize(&self) -> Result<(), Error> { diff --git a/sled-agent/src/sim/instance.rs b/sled-agent/src/sim/instance.rs index be6c63f53a..e94b3b4984 100644 --- a/sled-agent/src/sim/instance.rs +++ b/sled-agent/src/sim/instance.rs @@ -211,7 +211,8 @@ impl SimInstanceInner { InstanceStateRequested::Stopped => { match self.next_resting_state() { VmmState::Starting => { - self.state.terminate_rudely(); + let mark_failed = false; + self.state.terminate_rudely(mark_failed); } VmmState::Running => self.queue_graceful_stop(), // Idempotently allow requests to stop an instance that is @@ -363,7 +364,8 @@ impl SimInstanceInner { /// Simulates rude termination by moving the instance to the Destroyed state /// immediately and clearing the queue of pending state transitions. fn terminate(&mut self) -> SledInstanceState { - self.state.terminate_rudely(); + let mark_failed = false; + self.state.terminate_rudely(mark_failed); self.queue.clear(); self.destroyed = true; self.state.sled_instance_state() diff --git a/sled-agent/src/sled_agent.rs b/sled-agent/src/sled_agent.rs index 82c16b0b8d..9832144791 100644 --- a/sled-agent/src/sled_agent.rs +++ b/sled-agent/src/sled_agent.rs @@ -27,6 +27,7 @@ use crate::params::{ }; use crate::probe_manager::ProbeManager; use crate::services::{self, ServiceManager}; +use crate::storage_monitor::StorageMonitorHandle; use crate::updates::{ConfigUpdates, UpdateManager}; use crate::vmm_reservoir::{ReservoirMode, VmmReservoirManager}; use crate::zone_bundle; @@ -123,6 +124,9 @@ pub enum Error { #[error("Error managing storage: {0}")] Storage(#[from] sled_storage::error::Error), + #[error("Error monitoring storage: {0}")] + StorageMonitor(#[from] crate::storage_monitor::Error), + #[error("Error updating: {0}")] Download(#[from] crate::updates::Error), @@ -277,6 +281,10 @@ struct SledAgentInner { // Component of Sled Agent responsible for storage and dataset management. storage: StorageHandle, + // Component of Sled Agent responsible for monitoring storage and updating + // dump devices. + storage_monitor: StorageMonitorHandle, + // Component of Sled Agent responsible for managing Propolis instances. instances: InstanceManager, @@ -562,6 +570,9 @@ impl SledAgent { subnet: request.body.subnet, start_request: request, storage: long_running_task_handles.storage_manager.clone(), + storage_monitor: long_running_task_handles + .storage_monitor_handle + .clone(), instances, probes, hardware: long_running_task_handles.hardware_manager.clone(), @@ -808,7 +819,60 @@ impl SledAgent { &self, config: OmicronPhysicalDisksConfig, ) -> Result { - Ok(self.storage().omicron_physical_disks_ensure(config).await?) + info!(self.log, "physical disks ensure"); + // Tell the storage subsystem which disks should be managed. + let disk_result = + self.storage().omicron_physical_disks_ensure(config).await?; + info!(self.log, "physical disks ensure: Updated storage"); + + // Grab a view of the latest set of disks, alongside a generation + // number. + // + // This generation is at LEAST as high as our last call through + // omicron_physical_disks_ensure. It may actually be higher, if a + // concurrent operation occurred. + // + // "latest_disks" has a generation number, which is important for other + // subcomponents of Sled Agent to consider. If multiple requests to + // ensure disks arrive concurrently, it's important to "only advance + // forward" as requested by Nexus. + // + // For example: if we receive the following requests concurrently: + // - Use Disks {A, B, C}, generation = 1 + // - Use Disks {A, B, C, D}, generation = 2 + // + // If we ignore generation numbers, it's possible that we start using + // "disk D" -- e.g., for instance filesystems -- and then immediately + // delete it when we process the request with "generation 1". + // + // By keeping these requests ordered, we prevent this thrashing, and + // ensure that we always progress towards the last-requested state. + let latest_disks = self.storage().get_latest_disks().await; + let our_gen = latest_disks.generation(); + info!(self.log, "physical disks ensure: Propagating new generation of disks"; "generation" => ?our_gen); + + // Ensure that the StorageMonitor, and the dump devices, have committed + // to start using new disks and stop using old ones. + self.inner.storage_monitor.await_generation(*our_gen).await?; + info!(self.log, "physical disks ensure: Updated storage monitor"); + + // Ensure that the ZoneBundler, if it was creating a bundle referencing + // the old U.2s, has stopped using them. + self.inner.zone_bundler.await_completion_of_prior_bundles().await; + info!(self.log, "physical disks ensure: Updated zone bundler"); + + // Ensure that all probes, at least after our call to + // "omicron_physical_disks_ensure", stop using any disks that + // may have been in-service from before that request. + self.inner.probes.use_only_these_disks(&latest_disks).await; + info!(self.log, "physical disks ensure: Updated probes"); + + // Do the same for instances - mark them failed if they were using + // expunged disks. + self.inner.instances.use_only_these_disks(latest_disks).await?; + info!(self.log, "physical disks ensure: Updated instances"); + + Ok(disk_result) } /// List the Omicron zone configuration that's currently running diff --git a/sled-agent/src/storage_monitor.rs b/sled-agent/src/storage_monitor.rs index 8cb63e31f8..11883adcd2 100644 --- a/sled-agent/src/storage_monitor.rs +++ b/sled-agent/src/storage_monitor.rs @@ -7,10 +7,18 @@ //! code. use crate::dump_setup::DumpSetup; +use omicron_common::api::external::Generation; use sled_storage::config::MountConfig; use sled_storage::manager::StorageHandle; use sled_storage::resources::AllDisks; use slog::Logger; +use tokio::sync::watch; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Storage Monitor no longer running")] + NotRunning, +} pub struct StorageMonitor { log: Logger, @@ -18,6 +26,46 @@ pub struct StorageMonitor { // Invokes dumpadm(8) and savecore(8) when new disks are encountered dump_setup: DumpSetup, + + tx: watch::Sender, +} + +/// Emits status about storage monitoring. +#[derive(Debug, Clone)] +pub struct StorageMonitorStatus { + /// The latest generation of physical disks to be processed + /// by the storage monitor. + pub latest_gen: Option, +} + +impl StorageMonitorStatus { + fn new() -> Self { + Self { latest_gen: None } + } +} + +#[derive(Clone)] +pub struct StorageMonitorHandle { + rx: watch::Receiver, +} + +impl StorageMonitorHandle { + pub async fn await_generation( + &self, + wanted: Generation, + ) -> Result<(), Error> { + self.rx + .clone() + .wait_for(|status| { + let Some(observed) = status.latest_gen else { + return false; + }; + return observed >= wanted; + }) + .await + .map_err(|_| Error::NotRunning)?; + Ok(()) + } } impl StorageMonitor { @@ -25,10 +73,14 @@ impl StorageMonitor { log: &Logger, mount_config: MountConfig, storage_manager: StorageHandle, - ) -> StorageMonitor { + ) -> (StorageMonitor, StorageMonitorHandle) { let dump_setup = DumpSetup::new(&log, mount_config); let log = log.new(o!("component" => "StorageMonitor")); - StorageMonitor { log, storage_manager, dump_setup } + let (tx, rx) = watch::channel(StorageMonitorStatus::new()); + ( + StorageMonitor { log, storage_manager, dump_setup, tx }, + StorageMonitorHandle { rx }, + ) } /// Run the main receive loop of the `StorageMonitor` @@ -50,10 +102,14 @@ impl StorageMonitor { } async fn handle_resource_update(&mut self, updated_disks: AllDisks) { + let generation = updated_disks.generation(); self.dump_setup .update_dumpdev_setup( updated_disks.iter_managed().map(|(_id, disk)| disk), ) .await; + self.tx.send_replace(StorageMonitorStatus { + latest_gen: Some(*generation), + }); } } diff --git a/sled-agent/src/zone_bundle.rs b/sled-agent/src/zone_bundle.rs index 16147e5957..088e7b356f 100644 --- a/sled-agent/src/zone_bundle.rs +++ b/sled-agent/src/zone_bundle.rs @@ -256,6 +256,9 @@ impl Inner { // exist; and returns those. async fn bundle_directories(&self) -> Vec { let resources = self.storage_handle.get_latest_disks().await; + // NOTE: These bundle directories are always stored on M.2s, so we don't + // need to worry about synchronizing with U.2 disk expungement at the + // callsite. let expected = resources.all_zone_bundle_directories(); let mut out = Vec::with_capacity(expected.len()); for each in expected.into_iter() { @@ -426,12 +429,17 @@ impl ZoneBundler { zone: &RunningZone, cause: ZoneBundleCause, ) -> Result { + // NOTE: [Self::await_completion_of_prior_bundles] relies on this lock + // being held across this whole function. If we want more concurrency, + // we'll need to add a barrier-like mechanism to let callers know when + // prior bundles have completed. let inner = self.inner.lock().await; let storage_dirs = inner.bundle_directories().await; let resources = inner.storage_handle.get_latest_disks().await; let extra_log_dirs = resources .all_u2_mountpoints(U2_DEBUG_DATASET) .into_iter() + .map(|pool_path| pool_path.path) .collect(); let context = ZoneBundleContext { cause, storage_dirs, extra_log_dirs }; info!( @@ -443,6 +451,14 @@ impl ZoneBundler { create(&self.log, zone, &context).await } + /// Awaits the completion of all prior calls to [ZoneBundler::create]. + /// + /// This is critical for disk expungement, which wants to ensure that the + /// Sled Agent is no longer using devices after they have been expunged. + pub async fn await_completion_of_prior_bundles(&self) { + let _ = self.inner.lock().await; + } + /// Return the paths for all bundles of the provided zone and ID. pub async fn bundle_paths( &self, diff --git a/sled-storage/src/manager.rs b/sled-storage/src/manager.rs index d374ab8e23..e081bc5034 100644 --- a/sled-storage/src/manager.rs +++ b/sled-storage/src/manager.rs @@ -584,7 +584,7 @@ impl StorageManager { // Identify which disks should be managed by the control // plane, and adopt all requested disks into the control plane // in a background task (see: [Self::manage_disks]). - self.resources.set_config(&ledger.data().disks); + self.resources.set_config(&ledger.data()); } else { info!(self.log, "KeyManager ready, but no ledger detected"); } @@ -681,7 +681,7 @@ impl StorageManager { // Identify which disks should be managed by the control // plane, and adopt all requested disks into the control plane. - self.resources.set_config(&config.disks); + self.resources.set_config(&config); // Actually try to "manage" those disks, which may involve formatting // zpools and conforming partitions to those expected by the control @@ -825,7 +825,7 @@ mod tests { use crate::dataset::DatasetKind; use crate::disk::RawSyntheticDisk; use crate::manager_test_harness::StorageManagerTestHarness; - use crate::resources::{DiskManagementError, ManagedDisk}; + use crate::resources::DiskManagementError; use super::*; use camino_tempfile::tempdir_in; @@ -999,21 +999,17 @@ mod tests { // Now let's verify we saw the correct firmware update. for rd in &raw_disks { - let managed = - all_disks_gen2.values.get(rd.identity()).expect("disk exists"); - match managed { - ManagedDisk::ExplicitlyManaged(disk) - | ManagedDisk::ImplicitlyManaged(disk) => { - assert_eq!( - disk.firmware(), - rd.firmware(), - "didn't see firmware update" - ); - } - ManagedDisk::Unmanaged(disk) => { - assert_eq!(disk, rd, "didn't see firmware update"); - } - } + let firmware = all_disks_gen2 + .iter_all() + .find_map(|(identity, _, _, fw)| { + if identity == rd.identity() { + Some(fw) + } else { + None + } + }) + .expect("disk exists"); + assert_eq!(firmware, rd.firmware(), "didn't see firmware update"); } harness.cleanup().await; @@ -1236,7 +1232,8 @@ mod tests { let expected: HashSet<_> = disks.iter().skip(1).take(3).map(|d| d.identity()).collect(); - let actual: HashSet<_> = all_disks.values.keys().collect(); + let actual: HashSet<_> = + all_disks.iter_all().map(|(identity, _, _, _)| identity).collect(); assert_eq!(expected, actual); // Ensure the same set of disks and make sure no change occurs @@ -1251,7 +1248,10 @@ mod tests { .await .unwrap(); let all_disks2 = harness.handle().get_latest_disks().await; - assert_eq!(all_disks.values, all_disks2.values); + assert_eq!( + all_disks.iter_all().collect::>(), + all_disks2.iter_all().collect::>() + ); // Add a disjoint set of disks and see that only they come through harness @@ -1266,7 +1266,8 @@ mod tests { let all_disks = harness.handle().get_latest_disks().await; let expected: HashSet<_> = disks.iter().skip(4).take(5).map(|d| d.identity()).collect(); - let actual: HashSet<_> = all_disks.values.keys().collect(); + let actual: HashSet<_> = + all_disks.iter_all().map(|(identity, _, _, _)| identity).collect(); assert_eq!(expected, actual); harness.cleanup().await; diff --git a/sled-storage/src/resources.rs b/sled-storage/src/resources.rs index 5cc4672e1e..f02f62e0a6 100644 --- a/sled-storage/src/resources.rs +++ b/sled-storage/src/resources.rs @@ -6,12 +6,16 @@ use crate::config::MountConfig; use crate::dataset::{DatasetError, M2_DEBUG_DATASET}; -use crate::disk::{Disk, DiskError, OmicronPhysicalDiskConfig, RawDisk}; +use crate::disk::{ + Disk, DiskError, OmicronPhysicalDiskConfig, OmicronPhysicalDisksConfig, + RawDisk, +}; use crate::error::Error; use camino::Utf8PathBuf; use cfg_if::cfg_if; -use illumos_utils::zpool::ZpoolName; +use illumos_utils::zpool::{PathInPool, ZpoolName}; use key_manager::StorageKeyRequester; +use omicron_common::api::external::Generation; use omicron_common::disk::DiskIdentity; use omicron_uuid_kinds::ZpoolUuid; use schemars::JsonSchema; @@ -102,7 +106,7 @@ impl DisksManagementResult { // the request of the broader control plane. This enum encompasses that duality, // by representing all disks that can exist, managed or not. #[derive(Debug, Clone, PartialEq, Eq)] -pub enum ManagedDisk { +pub(crate) enum ManagedDisk { // A disk explicitly managed by the control plane. // // This includes U.2s which Nexus has told us to format and use. @@ -121,6 +125,11 @@ pub enum ManagedDisk { Unmanaged(RawDisk), } +#[derive(Debug, Clone, Eq, PartialEq)] +struct AllDisksInner { + values: BTreeMap, +} + /// The disks, keyed by their identity, managed by the sled agent. /// /// This state is owned by [`crate::manager::StorageManager`], through @@ -139,16 +148,28 @@ pub enum ManagedDisk { /// gets cloned or dropped. #[derive(Debug, Clone, Eq, PartialEq)] pub struct AllDisks { - pub values: Arc>, - pub mount_config: MountConfig, + // This generation corresponds to the generation supplied in + // [OmicronPhysicalDisksConfig]. + generation: Generation, + inner: Arc, + mount_config: MountConfig, } impl AllDisks { + /// Returns the latest generation number of this set of disks. + pub fn generation(&self) -> &Generation { + &self.generation + } + + pub fn mount_config(&self) -> &MountConfig { + &self.mount_config + } + /// Returns the identity of the boot disk. /// /// If this returns `None`, we have not processed the boot disk yet. pub fn boot_disk(&self) -> Option<(DiskIdentity, ZpoolName)> { - for (id, disk) in self.values.iter() { + for (id, disk) in self.inner.values.iter() { if let ManagedDisk::ImplicitlyManaged(disk) = disk { if disk.is_boot_disk() { return Some((id.clone(), disk.zpool_name().clone())); @@ -179,18 +200,21 @@ impl AllDisks { } /// Returns all mountpoints within all U.2s for a particular dataset. - pub fn all_u2_mountpoints(&self, dataset: &str) -> Vec { + pub fn all_u2_mountpoints(&self, dataset: &str) -> Vec { self.all_u2_zpools() - .iter() - .map(|zpool| { - zpool.dataset_mountpoint(&self.mount_config.root, dataset) + .into_iter() + .map(|pool| { + let path = + pool.dataset_mountpoint(&self.mount_config.root, dataset); + PathInPool { pool: Some(pool), path } }) .collect() } /// Returns all zpools managed by the control plane pub fn get_all_zpools(&self) -> Vec<(ZpoolName, DiskVariant)> { - self.values + self.inner + .values .values() .filter_map(|disk| match disk { ManagedDisk::ExplicitlyManaged(disk) @@ -206,7 +230,8 @@ impl AllDisks { // // Only returns zpools from disks actively being managed. fn all_zpools(&self, variant: DiskVariant) -> Vec { - self.values + self.inner + .values .values() .filter_map(|disk| match disk { ManagedDisk::ExplicitlyManaged(disk) @@ -231,7 +256,7 @@ impl AllDisks { /// Returns an iterator over all managed disks. pub fn iter_managed(&self) -> impl Iterator { - self.values.iter().filter_map(|(identity, disk)| match disk { + self.inner.values.iter().filter_map(|(identity, disk)| match disk { ManagedDisk::ExplicitlyManaged(disk) => Some((identity, disk)), ManagedDisk::ImplicitlyManaged(disk) => Some((identity, disk)), _ => None, @@ -243,7 +268,7 @@ impl AllDisks { &self, ) -> impl Iterator { - self.values.iter().map(|(identity, disk)| match disk { + self.inner.values.iter().map(|(identity, disk)| match disk { ManagedDisk::ExplicitlyManaged(disk) => { (identity, disk.variant(), disk.slot(), disk.firmware()) } @@ -284,8 +309,11 @@ impl StorageResources { mount_config: MountConfig, key_requester: StorageKeyRequester, ) -> Self { - let disks = - AllDisks { values: Arc::new(BTreeMap::new()), mount_config }; + let disks = AllDisks { + generation: Generation::new(), + inner: Arc::new(AllDisksInner { values: BTreeMap::new() }), + mount_config, + }; Self { log: log.new(o!("component" => "StorageResources")), key_requester, @@ -310,8 +338,14 @@ impl StorageResources { /// Does not attempt to manage any of the physical disks previously /// observed. To synchronize the "set of requested disks" with the "set of /// observed disks", call [Self::synchronize_disk_management]. - pub fn set_config(&mut self, config: &Vec) { + pub fn set_config(&mut self, config: &OmicronPhysicalDisksConfig) { + let our_gen = &mut self.disks.generation; + if *our_gen > config.generation { + return; + } + *our_gen = config.generation; self.control_plane_disks = config + .disks .iter() .map(|disk| (disk.identity.clone(), disk.clone())) .collect(); @@ -336,14 +370,14 @@ impl StorageResources { &mut self, ) -> DisksManagementResult { let mut updated = false; - let disks = Arc::make_mut(&mut self.disks.values); + let disks = Arc::make_mut(&mut self.disks.inner); info!(self.log, "Synchronizing disk managment"); // "Unmanage" all disks no longer requested by the control plane. // // This updates the reported sets of "managed" disks, and performs no // other modifications to the underlying storage. - for (identity, managed_disk) in &mut *disks { + for (identity, managed_disk) in &mut disks.values { match managed_disk { // This leaves the presence of the disk still in "Self", but // downgrades the disk to an unmanaged status. @@ -365,7 +399,7 @@ impl StorageResources { // configuration. let mut result = DisksManagementResult::default(); for (identity, config) in &self.control_plane_disks { - let Some(managed_disk) = disks.get_mut(identity) else { + let Some(managed_disk) = disks.values.get_mut(identity) else { warn!( self.log, "Control plane disk requested, but not detected within sled"; @@ -496,11 +530,11 @@ impl StorageResources { // This is a trade-off for simplicity even though we may be potentially // cloning data before we know if there is a write action to perform. - let disks = Arc::make_mut(&mut self.disks.values); + let disks = Arc::make_mut(&mut self.disks.inner); // First check if there are any updates we need to apply to existing // managed disks. - if let Some(managed) = disks.get_mut(&disk_identity) { + if let Some(managed) = disks.values.get_mut(&disk_identity) { let mut updated = false; match managed { ManagedDisk::ExplicitlyManaged(mdisk) @@ -532,7 +566,9 @@ impl StorageResources { // If there's no update then we are inserting a new disk. match disk.variant() { DiskVariant::U2 => { - disks.insert(disk_identity, ManagedDisk::Unmanaged(disk)); + disks + .values + .insert(disk_identity, ManagedDisk::Unmanaged(disk)); } DiskVariant::M2 => { let managed_disk = Disk::new( @@ -543,12 +579,13 @@ impl StorageResources { Some(&self.key_requester), ) .await?; - disks.insert( + disks.values.insert( disk_identity, ManagedDisk::ImplicitlyManaged(managed_disk), ); } } + self.disk_updates.send_replace(self.disks.clone()); Ok(()) @@ -562,7 +599,7 @@ impl StorageResources { /// are only added once. pub(crate) fn remove_disk(&mut self, id: &DiskIdentity) { info!(self.log, "Removing disk"; "identity" => ?id); - let Some(entry) = self.disks.values.get(id) else { + let Some(entry) = self.disks.inner.values.get(id) else { info!(self.log, "Disk not found by id, exiting"; "identity" => ?id); return; }; @@ -589,7 +626,9 @@ impl StorageResources { } // Safe to unwrap as we just checked the key existed above - Arc::make_mut(&mut self.disks.values).remove(id).unwrap(); + let disks = Arc::make_mut(&mut self.disks.inner); + disks.values.remove(id).unwrap(); + self.disk_updates.send_replace(self.disks.clone()); } }