From d16fda0a6203306ee9d22ea42b9b9366057370da Mon Sep 17 00:00:00 2001
From: Sean Klein <seanmarionklein@gmail.com>
Date: Tue, 17 Dec 2024 13:48:22 -0800
Subject: [PATCH 1/2] Use a shared support bundle implementation for the
 simulated sled agent

---
 nexus/src/app/sagas/disk_create.rs            |  11 +-
 nexus/src/app/sagas/instance_create.rs        |   2 +-
 nexus/src/app/sagas/instance_ip_attach.rs     |  20 +-
 nexus/src/app/sagas/instance_ip_detach.rs     |  10 +-
 nexus/test-utils/src/resource_helpers.rs      |  65 +-
 .../crucible_replacements.rs                  |   1 -
 nexus/tests/integration_tests/disks.rs        |  10 +-
 nexus/tests/integration_tests/instances.rs    |   8 +-
 nexus/tests/integration_tests/snapshots.rs    |   4 +-
 .../integration_tests/volume_management.rs    |  12 +-
 sled-agent/src/sim/artifact_store.rs          |   9 +-
 sled-agent/src/sim/http_entrypoints.rs        | 149 ++--
 sled-agent/src/sim/http_entrypoints_pantry.rs |  27 +-
 .../src/sim/http_entrypoints_storage.rs       |  34 +-
 sled-agent/src/sim/mod.rs                     |   1 +
 sled-agent/src/sim/server.rs                  |  45 +-
 sled-agent/src/sim/sled_agent.rs              | 230 +++----
 sled-agent/src/sim/storage.rs                 | 638 ++++++++++--------
 sled-agent/src/support_bundle/storage.rs      | 159 ++++-
 19 files changed, 787 insertions(+), 648 deletions(-)

diff --git a/nexus/src/app/sagas/disk_create.rs b/nexus/src/app/sagas/disk_create.rs
index 51f6d29de1..7e04dbe857 100644
--- a/nexus/src/app/sagas/disk_create.rs
+++ b/nexus/src/app/sagas/disk_create.rs
@@ -1034,15 +1034,12 @@ pub(crate) mod test {
         true
     }
 
-    async fn no_regions_ensured(
-        sled_agent: &SledAgent,
-        test: &DiskTest<'_>,
-    ) -> bool {
+    fn no_regions_ensured(sled_agent: &SledAgent, test: &DiskTest<'_>) -> bool {
         for zpool in test.zpools() {
             for dataset in &zpool.datasets {
                 let crucible_dataset =
-                    sled_agent.get_crucible_dataset(zpool.id, dataset.id).await;
-                if !crucible_dataset.is_empty().await {
+                    sled_agent.get_crucible_dataset(zpool.id, dataset.id);
+                if !crucible_dataset.is_empty() {
                     return false;
                 }
             }
@@ -1073,7 +1070,7 @@ pub(crate) mod test {
                 .await
         );
         assert!(no_region_allocations_exist(datastore, &test).await);
-        assert!(no_regions_ensured(&sled_agent, &test).await);
+        assert!(no_regions_ensured(&sled_agent, &test));
 
         assert!(test.crucible_resources_deleted().await);
     }
diff --git a/nexus/src/app/sagas/instance_create.rs b/nexus/src/app/sagas/instance_create.rs
index 07f7911ef5..aa181d7b79 100644
--- a/nexus/src/app/sagas/instance_create.rs
+++ b/nexus/src/app/sagas/instance_create.rs
@@ -1346,7 +1346,7 @@ pub mod test {
         assert!(disk_is_detached(datastore).await);
         assert!(no_instances_or_disks_on_sled(&sled_agent).await);
 
-        let v2p_mappings = &*sled_agent.v2p_mappings.lock().await;
+        let v2p_mappings = &*sled_agent.v2p_mappings.lock().unwrap();
         assert!(v2p_mappings.is_empty());
     }
 
diff --git a/nexus/src/app/sagas/instance_ip_attach.rs b/nexus/src/app/sagas/instance_ip_attach.rs
index e6fb8654ea..b0d51f7201 100644
--- a/nexus/src/app/sagas/instance_ip_attach.rs
+++ b/nexus/src/app/sagas/instance_ip_attach.rs
@@ -435,14 +435,16 @@ pub(crate) mod test {
                 &instance_id,
             )
             .await;
-        let mut eips = sled_agent.external_ips.lock().await;
-        let my_eips = eips.entry(vmm_id).or_default();
-        assert!(my_eips
-            .iter()
-            .any(|v| matches!(v, InstanceExternalIpBody::Floating(_))));
-        assert!(my_eips
-            .iter()
-            .any(|v| matches!(v, InstanceExternalIpBody::Ephemeral(_))));
+        {
+            let mut eips = sled_agent.external_ips.lock().unwrap();
+            let my_eips = eips.entry(vmm_id).or_default();
+            assert!(my_eips
+                .iter()
+                .any(|v| matches!(v, InstanceExternalIpBody::Floating(_))));
+            assert!(my_eips
+                .iter()
+                .any(|v| matches!(v, InstanceExternalIpBody::Ephemeral(_))));
+        }
 
         // DB has records for SNAT plus the new IPs.
         let db_eips = datastore
@@ -497,7 +499,7 @@ pub(crate) mod test {
                 &instance_id,
             )
             .await;
-        let mut eips = sled_agent.external_ips.lock().await;
+        let mut eips = sled_agent.external_ips.lock().unwrap();
         let my_eips = eips.entry(vmm_id).or_default();
         assert!(my_eips.is_empty());
     }
diff --git a/nexus/src/app/sagas/instance_ip_detach.rs b/nexus/src/app/sagas/instance_ip_detach.rs
index d9da9fc05c..bec46f0269 100644
--- a/nexus/src/app/sagas/instance_ip_detach.rs
+++ b/nexus/src/app/sagas/instance_ip_detach.rs
@@ -405,9 +405,11 @@ pub(crate) mod test {
                 &instance_id,
             )
             .await;
-        let mut eips = sled_agent.external_ips.lock().await;
-        let my_eips = eips.entry(vmm_id).or_default();
-        assert!(my_eips.is_empty());
+        {
+            let mut eips = sled_agent.external_ips.lock().unwrap();
+            let my_eips = eips.entry(vmm_id).or_default();
+            assert!(my_eips.is_empty());
+        }
 
         // DB only has record for SNAT.
         let db_eips = datastore
@@ -467,7 +469,7 @@ pub(crate) mod test {
         assert!(db_eips.iter().any(|v| v.kind == IpKind::SNat));
 
         // No IP bindings remain on sled-agent.
-        let eips = &*sled_agent.external_ips.lock().await;
+        let eips = &*sled_agent.external_ips.lock().unwrap();
         for (_nic_id, eip_set) in eips {
             assert_eq!(eip_set.len(), 2);
         }
diff --git a/nexus/test-utils/src/resource_helpers.rs b/nexus/test-utils/src/resource_helpers.rs
index c5cb0231d1..89f453c873 100644
--- a/nexus/test-utils/src/resource_helpers.rs
+++ b/nexus/test-utils/src/resource_helpers.rs
@@ -1282,26 +1282,24 @@ impl<'a, N: NexusServer> DiskTest<'a, N> {
         // Tell the simulated sled agent to create the disk and zpool containing
         // these datasets.
 
-        sled_agent
-            .create_external_physical_disk(
-                physical_disk_id,
-                disk_identity.clone(),
-            )
-            .await;
-        sled_agent
-            .create_zpool(zpool.id, physical_disk_id, zpool.size.to_bytes())
-            .await;
+        sled_agent.create_external_physical_disk(
+            physical_disk_id,
+            disk_identity.clone(),
+        );
+        sled_agent.create_zpool(
+            zpool.id,
+            physical_disk_id,
+            zpool.size.to_bytes(),
+        );
 
         for dataset in &zpool.datasets {
             // Sled Agent side: Create the Dataset, make sure regions can be
             // created immediately if Nexus requests anything.
             let address =
-                sled_agent.create_crucible_dataset(zpool.id, dataset.id).await;
+                sled_agent.create_crucible_dataset(zpool.id, dataset.id);
             let crucible =
-                sled_agent.get_crucible_dataset(zpool.id, dataset.id).await;
-            crucible
-                .set_create_callback(Box::new(|_| RegionState::Created))
-                .await;
+                sled_agent.get_crucible_dataset(zpool.id, dataset.id);
+            crucible.set_create_callback(Box::new(|_| RegionState::Created));
 
             // Nexus side: Notify Nexus of the physical disk/zpool/dataset
             // combination that exists.
@@ -1381,23 +1379,19 @@ impl<'a, N: NexusServer> DiskTest<'a, N> {
                 for dataset in &zpool.datasets {
                     let crucible = self
                         .get_sled(*sled_id)
-                        .get_crucible_dataset(zpool.id, dataset.id)
-                        .await;
+                        .get_crucible_dataset(zpool.id, dataset.id);
                     let called = std::sync::atomic::AtomicBool::new(false);
-                    crucible
-                        .set_create_callback(Box::new(move |_| {
-                            if !called.load(std::sync::atomic::Ordering::SeqCst)
-                            {
-                                called.store(
-                                    true,
-                                    std::sync::atomic::Ordering::SeqCst,
-                                );
-                                RegionState::Requested
-                            } else {
-                                RegionState::Created
-                            }
-                        }))
-                        .await;
+                    crucible.set_create_callback(Box::new(move |_| {
+                        if !called.load(std::sync::atomic::Ordering::SeqCst) {
+                            called.store(
+                                true,
+                                std::sync::atomic::Ordering::SeqCst,
+                            );
+                            RegionState::Requested
+                        } else {
+                            RegionState::Created
+                        }
+                    }));
                 }
             }
         }
@@ -1409,11 +1403,9 @@ impl<'a, N: NexusServer> DiskTest<'a, N> {
                 for dataset in &zpool.datasets {
                     let crucible = self
                         .get_sled(*sled_id)
-                        .get_crucible_dataset(zpool.id, dataset.id)
-                        .await;
+                        .get_crucible_dataset(zpool.id, dataset.id);
                     crucible
-                        .set_create_callback(Box::new(|_| RegionState::Failed))
-                        .await;
+                        .set_create_callback(Box::new(|_| RegionState::Failed));
                 }
             }
         }
@@ -1430,9 +1422,8 @@ impl<'a, N: NexusServer> DiskTest<'a, N> {
                 for dataset in &zpool.datasets {
                     let crucible = self
                         .get_sled(*sled_id)
-                        .get_crucible_dataset(zpool.id, dataset.id)
-                        .await;
-                    if !crucible.is_empty().await {
+                        .get_crucible_dataset(zpool.id, dataset.id);
+                    if !crucible.is_empty() {
                         return false;
                     }
                 }
diff --git a/nexus/tests/integration_tests/crucible_replacements.rs b/nexus/tests/integration_tests/crucible_replacements.rs
index 5f7e92c393..621df47448 100644
--- a/nexus/tests/integration_tests/crucible_replacements.rs
+++ b/nexus/tests/integration_tests/crucible_replacements.rs
@@ -373,7 +373,6 @@ impl<'a> RegionReplacementDeletedVolumeTest<'a> {
             .activate_background_attachment(
                 region_replacement.volume_id.to_string(),
             )
-            .await
             .unwrap();
     }
 
diff --git a/nexus/tests/integration_tests/disks.rs b/nexus/tests/integration_tests/disks.rs
index d9888f9ccd..5da4e49a3a 100644
--- a/nexus/tests/integration_tests/disks.rs
+++ b/nexus/tests/integration_tests/disks.rs
@@ -1341,9 +1341,7 @@ async fn test_disk_virtual_provisioning_collection_failed_delete(
         .sled_agent
         .sled_agent
         .get_crucible_dataset(zpool.id, dataset.id)
-        .await
-        .set_region_deletion_error(true)
-        .await;
+        .set_region_deletion_error(true);
 
     // Delete the disk - expect this to fail
     NexusRequest::new(
@@ -1379,9 +1377,7 @@ async fn test_disk_virtual_provisioning_collection_failed_delete(
         .sled_agent
         .sled_agent
         .get_crucible_dataset(zpool.id, dataset.id)
-        .await
-        .set_region_deletion_error(false)
-        .await;
+        .set_region_deletion_error(false);
 
     // Request disk delete again
     NexusRequest::new(
@@ -2479,7 +2475,7 @@ async fn test_no_halt_disk_delete_one_region_on_expunged_agent(
     let zpool = disk_test.zpools().next().expect("Expected at least one zpool");
     let dataset = &zpool.datasets[0];
 
-    cptestctx.sled_agent.sled_agent.drop_dataset(zpool.id, dataset.id).await;
+    cptestctx.sled_agent.sled_agent.drop_dataset(zpool.id, dataset.id);
 
     // Spawn a task that tries to delete the disk
     let disk_url = get_disk_url(DISK_NAME);
diff --git a/nexus/tests/integration_tests/instances.rs b/nexus/tests/integration_tests/instances.rs
index 163869896f..a1ffd73020 100644
--- a/nexus/tests/integration_tests/instances.rs
+++ b/nexus/tests/integration_tests/instances.rs
@@ -646,7 +646,7 @@ async fn test_instance_start_creates_networking_state(
 
     sled_agents.push(&cptestctx.sled_agent.sled_agent);
     for agent in &sled_agents {
-        agent.v2p_mappings.lock().await.clear();
+        agent.v2p_mappings.lock().unwrap().clear();
     }
 
     // Start the instance and make sure that it gets to Running.
@@ -6244,7 +6244,7 @@ async fn test_instance_v2p_mappings(cptestctx: &ControlPlaneTestContext) {
     // Validate that every sled no longer has the V2P mapping for this instance
     for sled_agent in &sled_agents {
         let condition = || async {
-            let v2p_mappings = sled_agent.v2p_mappings.lock().await;
+            let v2p_mappings = sled_agent.v2p_mappings.lock().unwrap();
             if v2p_mappings.is_empty() {
                 Ok(())
             } else {
@@ -6501,7 +6501,7 @@ async fn assert_sled_v2p_mappings(
     vni: Vni,
 ) {
     let condition = || async {
-        let v2p_mappings = sled_agent.v2p_mappings.lock().await;
+        let v2p_mappings = sled_agent.v2p_mappings.lock().unwrap();
         let mapping = v2p_mappings.iter().find(|mapping| {
             mapping.virtual_ip == nic.ip
                 && mapping.virtual_mac == nic.mac
@@ -6573,7 +6573,7 @@ pub async fn assert_sled_vpc_routes(
             kind: RouterKind::Custom(db_subnet.ipv4_block.0.into()),
         };
 
-        let vpc_routes = sled_agent.vpc_routes.lock().await;
+        let vpc_routes = sled_agent.vpc_routes.lock().unwrap();
         let sys_routes_found = vpc_routes
             .iter()
             .any(|(id, set)| *id == sys_key && set.routes == system_routes);
diff --git a/nexus/tests/integration_tests/snapshots.rs b/nexus/tests/integration_tests/snapshots.rs
index accb4470fb..dff07732c7 100644
--- a/nexus/tests/integration_tests/snapshots.rs
+++ b/nexus/tests/integration_tests/snapshots.rs
@@ -980,9 +980,7 @@ async fn test_snapshot_unwind(cptestctx: &ControlPlaneTestContext) {
         .sled_agent
         .sled_agent
         .get_crucible_dataset(zpool.id, dataset.id)
-        .await
-        .set_creating_a_running_snapshot_should_fail()
-        .await;
+        .set_creating_a_running_snapshot_should_fail();
 
     // Issue snapshot request, expecting it to fail
     let snapshots_url = format!("/v1/snapshots?project={}", PROJECT_NAME);
diff --git a/nexus/tests/integration_tests/volume_management.rs b/nexus/tests/integration_tests/volume_management.rs
index 6a9ce28389..543e66ee30 100644
--- a/nexus/tests/integration_tests/volume_management.rs
+++ b/nexus/tests/integration_tests/volume_management.rs
@@ -2528,9 +2528,7 @@ async fn test_disk_create_saga_unwinds_correctly(
         .sled_agent
         .sled_agent
         .get_crucible_dataset(zpool.id, dataset.id)
-        .await
-        .set_region_creation_error(true)
-        .await;
+        .set_region_creation_error(true);
 
     let disk_size = ByteCount::from_gibibytes_u32(2);
     let base_disk = params::DiskCreate {
@@ -2598,9 +2596,7 @@ async fn test_snapshot_create_saga_unwinds_correctly(
         .sled_agent
         .sled_agent
         .get_crucible_dataset(zpool.id, dataset.id)
-        .await
-        .set_region_creation_error(true)
-        .await;
+        .set_region_creation_error(true);
 
     // Create a snapshot
     let snapshot_create = params::SnapshotCreate {
@@ -4225,11 +4221,9 @@ async fn test_read_only_region_reference_counting(
                 TypedUuid::from_untyped_uuid(db_read_only_dataset.pool_id),
                 db_read_only_dataset.id(),
             )
-            .await
             .get(crucible_agent_client::types::RegionId(
                 read_only_region.id().to_string()
             ))
-            .await
             .unwrap()
             .state,
         crucible_agent_client::types::State::Created
@@ -4297,11 +4291,9 @@ async fn test_read_only_region_reference_counting(
                 TypedUuid::from_untyped_uuid(db_read_only_dataset.pool_id),
                 db_read_only_dataset.id(),
             )
-            .await
             .get(crucible_agent_client::types::RegionId(
                 read_only_region.id().to_string()
             ))
-            .await
             .unwrap()
             .state,
         crucible_agent_client::types::State::Destroyed
diff --git a/sled-agent/src/sim/artifact_store.rs b/sled-agent/src/sim/artifact_store.rs
index d73f5a2880..efb4fa9f36 100644
--- a/sled-agent/src/sim/artifact_store.rs
+++ b/sled-agent/src/sim/artifact_store.rs
@@ -5,10 +5,7 @@
 //! Implementation of `crate::artifact_store::StorageBackend` for our simulated
 //! storage.
 
-use std::sync::Arc;
-
 use camino_tempfile::Utf8TempDir;
-use futures::lock::Mutex;
 use sled_storage::error::Error as StorageError;
 
 use super::storage::Storage;
@@ -16,11 +13,11 @@ use crate::artifact_store::DatasetsManager;
 
 pub(super) struct SimArtifactStorage {
     root: Utf8TempDir,
-    backend: Arc<Mutex<Storage>>,
+    backend: Storage,
 }
 
 impl SimArtifactStorage {
-    pub(super) fn new(backend: Arc<Mutex<Storage>>) -> SimArtifactStorage {
+    pub(super) fn new(backend: Storage) -> SimArtifactStorage {
         SimArtifactStorage {
             root: camino_tempfile::tempdir().unwrap(),
             backend,
@@ -36,9 +33,7 @@ impl DatasetsManager for SimArtifactStorage {
         let config = self
             .backend
             .lock()
-            .await
             .datasets_config_list()
-            .await
             .map_err(|_| StorageError::LedgerNotFound)?;
         Ok(crate::artifact_store::filter_dataset_mountpoints(
             config,
diff --git a/sled-agent/src/sim/http_entrypoints.rs b/sled-agent/src/sim/http_entrypoints.rs
index 60dcb1be31..227270200d 100644
--- a/sled-agent/src/sim/http_entrypoints.rs
+++ b/sled-agent/src/sim/http_entrypoints.rs
@@ -5,6 +5,7 @@
 //! HTTP entrypoint functions for the sled agent's exposed API
 
 use super::collection::PokeMode;
+use crate::support_bundle::storage::SupportBundleQueryType;
 use camino::Utf8PathBuf;
 use dropshot::endpoint;
 use dropshot::ApiDescription;
@@ -37,6 +38,7 @@ use omicron_common::disk::DatasetsManagementResult;
 use omicron_common::disk::DisksManagementResult;
 use omicron_common::disk::OmicronPhysicalDisksConfig;
 use omicron_common::update::ArtifactHash;
+use range_requests::RequestContextEx;
 use sled_agent_api::*;
 use sled_agent_types::boot_disk::BootDiskOsWriteStatus;
 use sled_agent_types::boot_disk::BootDiskPathParams;
@@ -240,7 +242,6 @@ impl SledAgentApi for SledAgentSimImpl {
             path_params.disk_id,
             body.snapshot_id,
         )
-        .await
         .map_err(|e| HttpError::for_internal_error(e.to_string()))?;
 
         Ok(HttpResponseOk(VmmIssueDiskSnapshotRequestResponse {
@@ -268,7 +269,6 @@ impl SledAgentApi for SledAgentSimImpl {
         let body_args = body.into_inner();
 
         sa.set_virtual_nic_host(&body_args)
-            .await
             .map_err(|e| HttpError::for_internal_error(e.to_string()))?;
 
         Ok(HttpResponseUpdatedNoContent())
@@ -282,7 +282,6 @@ impl SledAgentApi for SledAgentSimImpl {
         let body_args = body.into_inner();
 
         sa.unset_virtual_nic_host(&body_args)
-            .await
             .map_err(|e| HttpError::for_internal_error(e.to_string()))?;
 
         Ok(HttpResponseUpdatedNoContent())
@@ -294,7 +293,7 @@ impl SledAgentApi for SledAgentSimImpl {
     {
         let sa = rqctx.context();
 
-        let vnics = sa.list_virtual_nics().await.map_err(HttpError::from)?;
+        let vnics = sa.list_virtual_nics().map_err(HttpError::from)?;
 
         Ok(HttpResponseOk(vnics))
     }
@@ -310,7 +309,7 @@ impl SledAgentApi for SledAgentSimImpl {
         rqctx: RequestContext<Self::Context>,
     ) -> Result<HttpResponseOk<EarlyNetworkConfig>, HttpError> {
         let config =
-            rqctx.context().bootstore_network_config.lock().await.clone();
+            rqctx.context().bootstore_network_config.lock().unwrap().clone();
         Ok(HttpResponseOk(config))
     }
 
@@ -318,7 +317,8 @@ impl SledAgentApi for SledAgentSimImpl {
         rqctx: RequestContext<Self::Context>,
         body: TypedBody<EarlyNetworkConfig>,
     ) -> Result<HttpResponseUpdatedNoContent, HttpError> {
-        let mut config = rqctx.context().bootstore_network_config.lock().await;
+        let mut config =
+            rqctx.context().bootstore_network_config.lock().unwrap();
         *config = body.into_inner();
         Ok(HttpResponseUpdatedNoContent())
     }
@@ -329,7 +329,7 @@ impl SledAgentApi for SledAgentSimImpl {
     ) -> Result<HttpResponseOk<Inventory>, HttpError> {
         let sa = rqctx.context();
         Ok(HttpResponseOk(
-            sa.inventory(rqctx.server.local_addr).await.map_err(|e| {
+            sa.inventory(rqctx.server.local_addr).map_err(|e| {
                 HttpError::for_internal_error(format!("{:#}", e))
             })?,
         ))
@@ -341,7 +341,7 @@ impl SledAgentApi for SledAgentSimImpl {
     ) -> Result<HttpResponseOk<DatasetsManagementResult>, HttpError> {
         let sa = rqctx.context();
         let body_args = body.into_inner();
-        let result = sa.datasets_ensure(body_args).await?;
+        let result = sa.datasets_ensure(body_args)?;
         Ok(HttpResponseOk(result))
     }
 
@@ -349,7 +349,7 @@ impl SledAgentApi for SledAgentSimImpl {
         rqctx: RequestContext<Self::Context>,
     ) -> Result<HttpResponseOk<DatasetsConfig>, HttpError> {
         let sa = rqctx.context();
-        Ok(HttpResponseOk(sa.datasets_config_list().await?))
+        Ok(HttpResponseOk(sa.datasets_config_list()?))
     }
 
     async fn omicron_physical_disks_put(
@@ -358,7 +358,7 @@ impl SledAgentApi for SledAgentSimImpl {
     ) -> Result<HttpResponseOk<DisksManagementResult>, HttpError> {
         let sa = rqctx.context();
         let body_args = body.into_inner();
-        let result = sa.omicron_physical_disks_ensure(body_args).await?;
+        let result = sa.omicron_physical_disks_ensure(body_args)?;
         Ok(HttpResponseOk(result))
     }
 
@@ -366,7 +366,7 @@ impl SledAgentApi for SledAgentSimImpl {
         rqctx: RequestContext<Self::Context>,
     ) -> Result<HttpResponseOk<OmicronPhysicalDisksConfig>, HttpError> {
         let sa = rqctx.context();
-        Ok(HttpResponseOk(sa.omicron_physical_disks_list().await?))
+        Ok(HttpResponseOk(sa.omicron_physical_disks_list()?))
     }
 
     async fn omicron_zones_put(
@@ -375,7 +375,7 @@ impl SledAgentApi for SledAgentSimImpl {
     ) -> Result<HttpResponseUpdatedNoContent, HttpError> {
         let sa = rqctx.context();
         let body_args = body.into_inner();
-        sa.omicron_zones_ensure(body_args).await;
+        sa.omicron_zones_ensure(body_args);
         Ok(HttpResponseUpdatedNoContent())
     }
 
@@ -390,7 +390,7 @@ impl SledAgentApi for SledAgentSimImpl {
         rqctx: RequestContext<Self::Context>,
     ) -> Result<HttpResponseOk<Vec<ResolvedVpcRouteState>>, HttpError> {
         let sa = rqctx.context();
-        Ok(HttpResponseOk(sa.list_vpc_routes().await))
+        Ok(HttpResponseOk(sa.list_vpc_routes()))
     }
 
     async fn set_vpc_routes(
@@ -398,7 +398,7 @@ impl SledAgentApi for SledAgentSimImpl {
         body: TypedBody<Vec<ResolvedVpcRouteSet>>,
     ) -> Result<HttpResponseUpdatedNoContent, HttpError> {
         let sa = rqctx.context();
-        sa.set_vpc_routes(body.into_inner()).await;
+        sa.set_vpc_routes(body.into_inner());
         Ok(HttpResponseUpdatedNoContent())
     }
 
@@ -419,7 +419,7 @@ impl SledAgentApi for SledAgentSimImpl {
         rqctx: RequestContext<Self::Context>,
         path_params: Path<SupportBundlePathParam>,
         query_params: Query<SupportBundleCreateQueryParams>,
-        _body: StreamingBody,
+        body: StreamingBody,
     ) -> Result<HttpResponseCreated<SupportBundleMetadata>, HttpError> {
         let sa = rqctx.context();
 
@@ -433,6 +433,7 @@ impl SledAgentApi for SledAgentSimImpl {
                 dataset_id,
                 support_bundle_id,
                 hash,
+                body.into_stream(),
             )
             .await?,
         ))
@@ -446,15 +447,15 @@ impl SledAgentApi for SledAgentSimImpl {
         let SupportBundlePathParam { zpool_id, dataset_id, support_bundle_id } =
             path_params.into_inner();
 
-        sa.support_bundle_get(zpool_id, dataset_id, support_bundle_id).await?;
-
-        Ok(http::Response::builder()
-            .status(http::StatusCode::OK)
-            .header(http::header::CONTENT_TYPE, "text/html")
-            .body(dropshot::Body::with_content(
-                "simulated support bundle; do not eat",
-            ))
-            .unwrap())
+        let range = rqctx.range();
+        sa.support_bundle_get(
+            zpool_id,
+            dataset_id,
+            support_bundle_id,
+            range,
+            SupportBundleQueryType::Whole,
+        )
+        .await
     }
 
     async fn support_bundle_download_file(
@@ -465,18 +466,18 @@ impl SledAgentApi for SledAgentSimImpl {
         let SupportBundleFilePathParam {
             parent:
                 SupportBundlePathParam { zpool_id, dataset_id, support_bundle_id },
-            file: _,
+            file,
         } = path_params.into_inner();
 
-        sa.support_bundle_get(zpool_id, dataset_id, support_bundle_id).await?;
-
-        Ok(http::Response::builder()
-            .status(http::StatusCode::OK)
-            .header(http::header::CONTENT_TYPE, "text/html")
-            .body(dropshot::Body::with_content(
-                "simulated support bundle file; do not eat",
-            ))
-            .unwrap())
+        let range = rqctx.range();
+        sa.support_bundle_get(
+            zpool_id,
+            dataset_id,
+            support_bundle_id,
+            range,
+            SupportBundleQueryType::Path { file_path: file },
+        )
+        .await
     }
 
     async fn support_bundle_index(
@@ -487,15 +488,15 @@ impl SledAgentApi for SledAgentSimImpl {
         let SupportBundlePathParam { zpool_id, dataset_id, support_bundle_id } =
             path_params.into_inner();
 
-        sa.support_bundle_get(zpool_id, dataset_id, support_bundle_id).await?;
-
-        Ok(http::Response::builder()
-            .status(http::StatusCode::OK)
-            .header(http::header::CONTENT_TYPE, "text/html")
-            .body(dropshot::Body::with_content(
-                "simulated support bundle index; do not eat",
-            ))
-            .unwrap())
+        let range = rqctx.range();
+        sa.support_bundle_get(
+            zpool_id,
+            dataset_id,
+            support_bundle_id,
+            range,
+            SupportBundleQueryType::Index,
+        )
+        .await
     }
 
     async fn support_bundle_head(
@@ -506,17 +507,15 @@ impl SledAgentApi for SledAgentSimImpl {
         let SupportBundlePathParam { zpool_id, dataset_id, support_bundle_id } =
             path_params.into_inner();
 
-        sa.support_bundle_get(zpool_id, dataset_id, support_bundle_id).await?;
-
-        let fictional_length = 10000;
-
-        Ok(http::Response::builder()
-            .status(http::StatusCode::OK)
-            .header(http::header::CONTENT_TYPE, "text/html")
-            .header(hyper::header::ACCEPT_RANGES, "bytes")
-            .header(hyper::header::CONTENT_LENGTH, fictional_length)
-            .body(dropshot::Body::empty())
-            .unwrap())
+        let range = rqctx.range();
+        sa.support_bundle_head(
+            zpool_id,
+            dataset_id,
+            support_bundle_id,
+            range,
+            SupportBundleQueryType::Whole,
+        )
+        .await
     }
 
     async fn support_bundle_head_file(
@@ -527,20 +526,18 @@ impl SledAgentApi for SledAgentSimImpl {
         let SupportBundleFilePathParam {
             parent:
                 SupportBundlePathParam { zpool_id, dataset_id, support_bundle_id },
-            file: _,
+            file,
         } = path_params.into_inner();
 
-        sa.support_bundle_get(zpool_id, dataset_id, support_bundle_id).await?;
-
-        let fictional_length = 10000;
-
-        Ok(http::Response::builder()
-            .status(http::StatusCode::OK)
-            .header(http::header::CONTENT_TYPE, "text/html")
-            .header(hyper::header::ACCEPT_RANGES, "bytes")
-            .header(hyper::header::CONTENT_LENGTH, fictional_length)
-            .body(dropshot::Body::empty())
-            .unwrap())
+        let range = rqctx.range();
+        sa.support_bundle_get(
+            zpool_id,
+            dataset_id,
+            support_bundle_id,
+            range,
+            SupportBundleQueryType::Path { file_path: file },
+        )
+        .await
     }
 
     async fn support_bundle_head_index(
@@ -551,17 +548,15 @@ impl SledAgentApi for SledAgentSimImpl {
         let SupportBundlePathParam { zpool_id, dataset_id, support_bundle_id } =
             path_params.into_inner();
 
-        sa.support_bundle_get(zpool_id, dataset_id, support_bundle_id).await?;
-
-        let fictional_length = 10000;
-
-        Ok(http::Response::builder()
-            .status(http::StatusCode::OK)
-            .header(http::header::CONTENT_TYPE, "text/html")
-            .header(hyper::header::ACCEPT_RANGES, "bytes")
-            .header(hyper::header::CONTENT_LENGTH, fictional_length)
-            .body(dropshot::Body::empty())
-            .unwrap())
+        let range = rqctx.range();
+        sa.support_bundle_head(
+            zpool_id,
+            dataset_id,
+            support_bundle_id,
+            range,
+            SupportBundleQueryType::Index,
+        )
+        .await
     }
 
     async fn support_bundle_delete(
diff --git a/sled-agent/src/sim/http_entrypoints_pantry.rs b/sled-agent/src/sim/http_entrypoints_pantry.rs
index c98c7db665..cbdc9329d4 100644
--- a/sled-agent/src/sim/http_entrypoints_pantry.rs
+++ b/sled-agent/src/sim/http_entrypoints_pantry.rs
@@ -69,7 +69,7 @@ async fn pantry_status(
 ) -> Result<HttpResponseOk<PantryStatus>, HttpError> {
     let pantry = rc.context();
 
-    let status = pantry.status().await?;
+    let status = pantry.status()?;
 
     Ok(HttpResponseOk(status))
 }
@@ -103,7 +103,7 @@ async fn volume_status(
     let path = path.into_inner();
     let pantry = rc.context();
 
-    let status = pantry.volume_status(path.id.clone()).await?;
+    let status = pantry.volume_status(path.id.clone())?;
     Ok(HttpResponseOk(status))
 }
 
@@ -134,7 +134,6 @@ async fn attach(
 
     pantry
         .attach(path.id.clone(), body.volume_construction_request)
-        .await
         .map_err(|e| HttpError::for_internal_error(e.to_string()))?;
 
     Ok(HttpResponseOk(AttachResult { id: path.id }))
@@ -161,13 +160,11 @@ async fn attach_activate_background(
     let body = body.into_inner();
     let pantry = rc.context();
 
-    pantry
-        .attach_activate_background(
-            path.id.clone(),
-            body.job_id,
-            body.volume_construction_request,
-        )
-        .await?;
+    pantry.attach_activate_background(
+        path.id.clone(),
+        body.job_id,
+        body.volume_construction_request,
+    )?;
 
     Ok(HttpResponseUpdatedNoContent())
 }
@@ -194,7 +191,7 @@ async fn is_job_finished(
     let path = path.into_inner();
     let pantry = rc.context();
 
-    let job_is_finished = pantry.is_job_finished(path.id).await?;
+    let job_is_finished = pantry.is_job_finished(path.id)?;
 
     Ok(HttpResponseOk(JobPollResponse { job_is_finished }))
 }
@@ -217,7 +214,7 @@ async fn job_result_ok(
     let path = path.into_inner();
     let pantry = rc.context();
 
-    let job_result = pantry.get_job_result(path.id).await?;
+    let job_result = pantry.get_job_result(path.id)?;
 
     match job_result {
         Ok(job_result_ok) => {
@@ -260,7 +257,6 @@ async fn import_from_url(
 
     let job_id = pantry
         .import_from_url(path.id.clone(), body.url, body.expected_digest)
-        .await
         .map_err(|e| HttpError::for_internal_error(e.to_string()))?;
 
     Ok(HttpResponseOk(ImportFromUrlResponse { job_id }))
@@ -287,7 +283,6 @@ async fn snapshot(
 
     pantry
         .snapshot(path.id.clone(), body.snapshot_id)
-        .await
         .map_err(|e| HttpError::for_internal_error(e.to_string()))?;
 
     Ok(HttpResponseUpdatedNoContent())
@@ -320,7 +315,7 @@ async fn bulk_write(
     )
     .map_err(|e| HttpError::for_bad_request(None, e.to_string()))?;
 
-    pantry.bulk_write(path.id.clone(), body.offset, data).await?;
+    pantry.bulk_write(path.id.clone(), body.offset, data)?;
 
     Ok(HttpResponseUpdatedNoContent())
 }
@@ -344,7 +339,6 @@ async fn scrub(
 
     let job_id = pantry
         .scrub(path.id.clone())
-        .await
         .map_err(|e| HttpError::for_internal_error(e.to_string()))?;
 
     Ok(HttpResponseOk(ScrubResponse { job_id }))
@@ -364,7 +358,6 @@ async fn detach(
 
     pantry
         .detach(path.id)
-        .await
         .map_err(|e| HttpError::for_internal_error(e.to_string()))?;
 
     Ok(HttpResponseDeleted())
diff --git a/sled-agent/src/sim/http_entrypoints_storage.rs b/sled-agent/src/sim/http_entrypoints_storage.rs
index 34e26f5191..cb53d96fd1 100644
--- a/sled-agent/src/sim/http_entrypoints_storage.rs
+++ b/sled-agent/src/sim/http_entrypoints_storage.rs
@@ -63,7 +63,7 @@ async fn region_list(
     rc: RequestContext<Arc<CrucibleData>>,
 ) -> Result<HttpResponseOk<Vec<Region>>, HttpError> {
     let crucible = rc.context();
-    Ok(HttpResponseOk(crucible.list().await))
+    Ok(HttpResponseOk(crucible.list()))
 }
 
 #[endpoint {
@@ -77,7 +77,7 @@ async fn region_create(
     let params = body.into_inner();
     let crucible = rc.context();
 
-    let region = crucible.create(params).await.map_err(|e| {
+    let region = crucible.create(params).map_err(|e| {
         HttpError::for_internal_error(
             format!("region create failure: {:?}", e,),
         )
@@ -97,7 +97,7 @@ async fn region_get(
     let id = path.into_inner().id;
     let crucible = rc.context();
 
-    match crucible.get(id).await {
+    match crucible.get(id) {
         Some(region) => Ok(HttpResponseOk(region)),
         None => {
             Err(HttpError::for_not_found(None, "Region not found".to_string()))
@@ -118,7 +118,6 @@ async fn region_delete(
 
     crucible
         .delete(id)
-        .await
         .map_err(|e| HttpError::for_bad_request(None, e.to_string()))?;
 
     Ok(HttpResponseDeleted())
@@ -136,16 +135,16 @@ async fn region_get_snapshots(
 
     let crucible = rc.context();
 
-    if crucible.get(id.clone()).await.is_none() {
+    if crucible.get(id.clone()).is_none() {
         return Err(HttpError::for_not_found(
             None,
             "Region not found".to_string(),
         ));
     }
 
-    let snapshots = crucible.snapshots_for_region(&id).await;
+    let snapshots = crucible.snapshots_for_region(&id);
 
-    let running_snapshots = crucible.running_snapshots_for_id(&id).await;
+    let running_snapshots = crucible.running_snapshots_for_id(&id);
 
     Ok(HttpResponseOk(GetSnapshotResponse { snapshots, running_snapshots }))
 }
@@ -167,14 +166,14 @@ async fn region_get_snapshot(
     let p = path.into_inner();
     let crucible = rc.context();
 
-    if crucible.get(p.id.clone()).await.is_none() {
+    if crucible.get(p.id.clone()).is_none() {
         return Err(HttpError::for_not_found(
             None,
             "Region not found".to_string(),
         ));
     }
 
-    for snapshot in &crucible.snapshots_for_region(&p.id).await {
+    for snapshot in &crucible.snapshots_for_region(&p.id) {
         if snapshot.name == p.name {
             return Ok(HttpResponseOk(snapshot.clone()));
         }
@@ -203,7 +202,7 @@ async fn region_delete_snapshot(
     let p = path.into_inner();
     let crucible = rc.context();
 
-    if crucible.get(p.id.clone()).await.is_none() {
+    if crucible.get(p.id.clone()).is_none() {
         return Err(HttpError::for_not_found(
             None,
             "Region not found".to_string(),
@@ -212,7 +211,6 @@ async fn region_delete_snapshot(
 
     crucible
         .delete_snapshot(&p.id, &p.name)
-        .await
         .map_err(|e| HttpError::for_bad_request(None, e.to_string()))?;
 
     Ok(HttpResponseDeleted())
@@ -235,14 +233,14 @@ async fn region_run_snapshot(
     let p = path.into_inner();
     let crucible = rc.context();
 
-    if crucible.get(p.id.clone()).await.is_none() {
+    if crucible.get(p.id.clone()).is_none() {
         return Err(HttpError::for_not_found(
             None,
             "Region not found".to_string(),
         ));
     }
 
-    let snapshots = crucible.snapshots_for_region(&p.id).await;
+    let snapshots = crucible.snapshots_for_region(&p.id);
 
     if !snapshots.iter().any(|x| x.name == p.name) {
         return Err(HttpError::for_not_found(
@@ -251,10 +249,8 @@ async fn region_run_snapshot(
         ));
     }
 
-    let running_snapshot = crucible
-        .create_running_snapshot(&p.id, &p.name)
-        .await
-        .map_err(|e| {
+    let running_snapshot =
+        crucible.create_running_snapshot(&p.id, &p.name).map_err(|e| {
             HttpError::for_internal_error(format!(
                 "running snapshot create failure: {:?}",
                 e,
@@ -275,14 +271,14 @@ async fn region_delete_running_snapshot(
     let p = path.into_inner();
     let crucible = rc.context();
 
-    if crucible.get(p.id.clone()).await.is_none() {
+    if crucible.get(p.id.clone()).is_none() {
         return Err(HttpError::for_not_found(
             None,
             "Region not found".to_string(),
         ));
     }
 
-    crucible.delete_running_snapshot(&p.id, &p.name).await.map_err(|e| {
+    crucible.delete_running_snapshot(&p.id, &p.name).map_err(|e| {
         HttpError::for_internal_error(format!(
             "running snapshot create failure: {:?}",
             e,
diff --git a/sled-agent/src/sim/mod.rs b/sled-agent/src/sim/mod.rs
index ab3b155b36..c59af4ccce 100644
--- a/sled-agent/src/sim/mod.rs
+++ b/sled-agent/src/sim/mod.rs
@@ -24,3 +24,4 @@ pub use config::{
 };
 pub use server::{run_standalone_server, RssArgs, Server};
 pub use sled_agent::SledAgent;
+pub(crate) use storage::Storage;
diff --git a/sled-agent/src/sim/server.rs b/sled-agent/src/sim/server.rs
index 3833d5ca7c..ef13cdfcdd 100644
--- a/sled-agent/src/sim/server.rs
+++ b/sled-agent/src/sim/server.rs
@@ -188,23 +188,19 @@ impl Server {
             let vendor = "synthetic-vendor".to_string();
             let serial = format!("synthetic-serial-{zpool_id}");
             let model = "synthetic-model".to_string();
-            sled_agent
-                .create_external_physical_disk(
-                    physical_disk_id,
-                    DiskIdentity {
-                        vendor: vendor.clone(),
-                        serial: serial.clone(),
-                        model: model.clone(),
-                    },
-                )
-                .await;
+            sled_agent.create_external_physical_disk(
+                physical_disk_id,
+                DiskIdentity {
+                    vendor: vendor.clone(),
+                    serial: serial.clone(),
+                    model: model.clone(),
+                },
+            );
 
-            sled_agent
-                .create_zpool(zpool_id, physical_disk_id, zpool.size)
-                .await;
+            sled_agent.create_zpool(zpool_id, physical_disk_id, zpool.size);
             let dataset_id = DatasetUuid::new_v4();
             let address =
-                sled_agent.create_crucible_dataset(zpool_id, dataset_id).await;
+                sled_agent.create_crucible_dataset(zpool_id, dataset_id);
 
             datasets.push(NexusTypes::DatasetCreateRequest {
                 zpool_id: zpool_id.into_untyped_uuid(),
@@ -218,10 +214,8 @@ impl Server {
             // Whenever Nexus tries to allocate a region, it should complete
             // immediately. What efficiency!
             let crucible =
-                sled_agent.get_crucible_dataset(zpool_id, dataset_id).await;
-            crucible
-                .set_create_callback(Box::new(|_| RegionState::Created))
-                .await;
+                sled_agent.get_crucible_dataset(zpool_id, dataset_id);
+            crucible.set_create_callback(Box::new(|_| RegionState::Created))
         }
 
         Ok(Server {
@@ -240,8 +234,7 @@ impl Server {
             self.log.new(o!("kind" => "pantry")),
             self.config.storage.ip,
             self.sled_agent.clone(),
-        )
-        .await;
+        );
         self.pantry_server = Some(pantry_server);
         self.pantry_server.as_ref().unwrap()
     }
@@ -370,7 +363,7 @@ pub async fn run_standalone_server(
     dns.initialize_with_config(&log, &dns_config).await?;
     let internal_dns_version = dns_config.generation;
 
-    let all_u2_zpools = server.sled_agent.get_zpools().await;
+    let all_u2_zpools = server.sled_agent.get_zpools();
     let get_random_zpool = || {
         use rand::seq::SliceRandom;
         let pool = all_u2_zpools
@@ -516,12 +509,12 @@ pub async fn run_standalone_server(
     };
 
     let mut datasets = vec![];
-    let physical_disks = server.sled_agent.get_all_physical_disks().await;
-    let zpools = server.sled_agent.get_zpools().await;
+    let physical_disks = server.sled_agent.get_all_physical_disks();
+    let zpools = server.sled_agent.get_zpools();
     for zpool in &zpools {
         let zpool_id = ZpoolUuid::from_untyped_uuid(zpool.id);
         for (dataset_id, address) in
-            server.sled_agent.get_crucible_datasets(zpool_id).await
+            server.sled_agent.get_crucible_datasets(zpool_id)
         {
             datasets.push(NexusTypes::DatasetCreateRequest {
                 zpool_id: zpool.id,
@@ -540,7 +533,7 @@ pub async fn run_standalone_server(
     };
 
     let omicron_physical_disks_config =
-        server.sled_agent.omicron_physical_disks_list().await?;
+        server.sled_agent.omicron_physical_disks_list()?;
     let mut sled_configs = BTreeMap::new();
     sled_configs.insert(
         config.id,
@@ -559,7 +552,7 @@ pub async fn run_standalone_server(
                     })
                     .collect(),
             },
-            datasets: server.sled_agent.datasets_config_list().await?,
+            datasets: server.sled_agent.datasets_config_list()?,
             zones,
         },
     );
diff --git a/sled-agent/src/sim/sled_agent.rs b/sled-agent/src/sim/sled_agent.rs
index 1f099fc036..b0846c7216 100644
--- a/sled-agent/src/sim/sled_agent.rs
+++ b/sled-agent/src/sim/sled_agent.rs
@@ -14,11 +14,14 @@ use super::storage::Storage;
 use crate::artifact_store::ArtifactStore;
 use crate::nexus::NexusClient;
 use crate::sim::simulatable::Simulatable;
+use crate::support_bundle::storage::SupportBundleQueryType;
 use crate::updates::UpdateManager;
 use anyhow::bail;
 use anyhow::Context;
+use bytes::Bytes;
+use dropshot::Body;
 use dropshot::HttpError;
-use futures::lock::Mutex;
+use futures::Stream;
 use nexus_sled_agent_shared::inventory::{
     Inventory, InventoryDataset, InventoryDisk, InventoryZpool,
     OmicronZonesConfig, SledRole,
@@ -47,8 +50,8 @@ use oxnet::Ipv6Net;
 use propolis_client::{
     types::VolumeConstructionRequest, Client as PropolisClient,
 };
+use range_requests::PotentialRange;
 use sled_agent_api::SupportBundleMetadata;
-use sled_agent_api::SupportBundleState;
 use sled_agent_types::disk::DiskStateRequested;
 use sled_agent_types::early_networking::{
     EarlyNetworkConfig, EarlyNetworkConfigBody,
@@ -62,6 +65,7 @@ use std::collections::{HashMap, HashSet, VecDeque};
 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
 use std::str::FromStr;
 use std::sync::Arc;
+use std::sync::Mutex;
 use std::time::Duration;
 use uuid::Uuid;
 
@@ -79,14 +83,15 @@ pub struct SledAgent {
     vmms: Arc<SimCollection<SimInstance>>,
     /// collection of simulated disks, indexed by disk uuid
     disks: Arc<SimCollection<SimDisk>>,
-    storage: Arc<Mutex<Storage>>,
+    storage: Storage,
     updates: UpdateManager,
     nexus_address: SocketAddr,
     pub nexus_client: Arc<NexusClient>,
     disk_id_to_region_ids: Mutex<HashMap<String, Vec<Uuid>>>,
     pub v2p_mappings: Mutex<HashSet<VirtualNetworkInterfaceHost>>,
-    mock_propolis:
-        Mutex<Option<(propolis_mock_server::Server, PropolisClient)>>,
+    mock_propolis: futures::lock::Mutex<
+        Option<(propolis_mock_server::Server, PropolisClient)>,
+    >,
     /// lists of external IPs assigned to instances
     pub external_ips:
         Mutex<HashMap<PropolisUuid, HashSet<InstanceExternalIpBody>>>,
@@ -173,11 +178,11 @@ impl SledAgent {
             },
         });
 
-        let storage = Arc::new(Mutex::new(Storage::new(
+        let storage = Storage::new(
             id.into_untyped_uuid(),
             config.storage.ip,
             storage_log,
-        )));
+        );
         let artifacts =
             ArtifactStore::new(&log, SimArtifactStorage::new(storage.clone()));
 
@@ -202,7 +207,7 @@ impl SledAgent {
             v2p_mappings: Mutex::new(HashSet::new()),
             external_ips: Mutex::new(HashMap::new()),
             vpc_routes: Mutex::new(HashMap::new()),
-            mock_propolis: Mutex::new(None),
+            mock_propolis: futures::lock::Mutex::new(None),
             config: config.clone(),
             fake_zones: Mutex::new(OmicronZonesConfig {
                 generation: Generation::new(),
@@ -222,7 +227,7 @@ impl SledAgent {
     /// three crucible regions). Extract the region addresses, lookup the region
     /// from the port and pair disk id with region ids. This map is referred to
     /// later when making snapshots.
-    pub async fn map_disk_ids_to_region_ids(
+    pub fn map_disk_ids_to_region_ids(
         &self,
         volume_construction_request: &VolumeConstructionRequest,
     ) -> Result<(), Error> {
@@ -243,11 +248,10 @@ impl SledAgent {
 
         let mut region_ids = Vec::new();
 
-        let storage = self.storage.lock().await;
+        let storage = self.storage.lock();
         for target in targets {
             let region = storage
                 .get_region_for_port(target.port())
-                .await
                 .ok_or_else(|| {
                     Error::internal_error(&format!(
                         "no region for port {}",
@@ -259,7 +263,8 @@ impl SledAgent {
             region_ids.push(region_id);
         }
 
-        let mut disk_id_to_region_ids = self.disk_id_to_region_ids.lock().await;
+        let mut disk_id_to_region_ids =
+            self.disk_id_to_region_ids.lock().unwrap();
         disk_id_to_region_ids.insert(disk_id.to_string(), region_ids.clone());
 
         Ok(())
@@ -398,10 +403,10 @@ impl SledAgent {
 
         for disk_request in &hardware.disks {
             let vcr = &disk_request.volume_construction_request;
-            self.map_disk_ids_to_region_ids(&vcr).await?;
+            self.map_disk_ids_to_region_ids(&vcr)?;
         }
 
-        let mut routes = self.vpc_routes.lock().await;
+        let mut routes = self.vpc_routes.lock().unwrap();
         for nic in &hardware.nics {
             let my_routers = [
                 RouterId { vni: nic.vni, kind: RouterKind::System },
@@ -449,7 +454,8 @@ impl SledAgent {
         propolis_id: PropolisUuid,
         state: VmmStateRequested,
     ) -> Result<VmmPutStateResponse, HttpError> {
-        if let Some(e) = self.instance_ensure_state_error.lock().await.as_ref()
+        if let Some(e) =
+            self.instance_ensure_state_error.lock().unwrap().as_ref()
         {
             return Err(e.clone().into());
         }
@@ -552,7 +558,7 @@ impl SledAgent {
     }
 
     pub async fn set_instance_ensure_state_error(&self, error: Option<Error>) {
-        *self.instance_ensure_state_error.lock().await = error;
+        *self.instance_ensure_state_error.lock().unwrap() = error;
     }
 
     /// Idempotently ensures that the given API Disk (described by `api_disk`)
@@ -592,89 +598,58 @@ impl SledAgent {
     }
 
     /// Adds a Physical Disk to the simulated sled agent.
-    pub async fn create_external_physical_disk(
+    pub fn create_external_physical_disk(
         &self,
         id: PhysicalDiskUuid,
         identity: DiskIdentity,
     ) {
         let variant = DiskVariant::U2;
-        self.storage
-            .lock()
-            .await
-            .insert_physical_disk(id, identity, variant)
-            .await;
+        self.storage.lock().insert_physical_disk(id, identity, variant);
     }
 
-    pub async fn get_all_physical_disks(
+    pub fn get_all_physical_disks(
         &self,
     ) -> Vec<nexus_client::types::PhysicalDiskPutRequest> {
-        self.storage.lock().await.get_all_physical_disks()
+        self.storage.lock().get_all_physical_disks()
     }
 
-    pub async fn get_zpools(
-        &self,
-    ) -> Vec<nexus_client::types::ZpoolPutRequest> {
-        self.storage.lock().await.get_all_zpools()
+    pub fn get_zpools(&self) -> Vec<nexus_client::types::ZpoolPutRequest> {
+        self.storage.lock().get_all_zpools()
     }
 
-    pub async fn get_crucible_datasets(
+    pub fn get_crucible_datasets(
         &self,
         zpool_id: ZpoolUuid,
     ) -> Vec<(DatasetUuid, SocketAddr)> {
-        self.storage.lock().await.get_all_crucible_datasets(zpool_id)
+        self.storage.lock().get_all_crucible_datasets(zpool_id)
     }
 
     /// Adds a Zpool to the simulated sled agent.
-    pub async fn create_zpool(
+    pub fn create_zpool(
         &self,
         id: ZpoolUuid,
         physical_disk_id: PhysicalDiskUuid,
         size: u64,
     ) {
-        self.storage
-            .lock()
-            .await
-            .insert_zpool(id, physical_disk_id, size)
-            .await;
-    }
-
-    /// Adds a debug dataset within a zpool
-    pub async fn create_debug_dataset(
-        &self,
-        zpool_id: ZpoolUuid,
-        dataset_id: DatasetUuid,
-    ) {
-        self.storage
-            .lock()
-            .await
-            .insert_debug_dataset(zpool_id, dataset_id)
-            .await
+        self.storage.lock().insert_zpool(id, physical_disk_id, size);
     }
 
     /// Adds a Crucible Dataset within a zpool.
-    pub async fn create_crucible_dataset(
+    pub fn create_crucible_dataset(
         &self,
         zpool_id: ZpoolUuid,
         dataset_id: DatasetUuid,
     ) -> SocketAddr {
-        self.storage
-            .lock()
-            .await
-            .insert_crucible_dataset(zpool_id, dataset_id)
-            .await
+        self.storage.lock().insert_crucible_dataset(zpool_id, dataset_id)
     }
 
     /// Returns a crucible dataset within a particular zpool.
-    pub async fn get_crucible_dataset(
+    pub fn get_crucible_dataset(
         &self,
         zpool_id: ZpoolUuid,
         dataset_id: DatasetUuid,
     ) -> Arc<CrucibleData> {
-        self.storage
-            .lock()
-            .await
-            .get_crucible_dataset(zpool_id, dataset_id)
-            .await
+        self.storage.lock().get_crucible_dataset(zpool_id, dataset_id)
     }
 
     /// Issue a snapshot request for a Crucible disk attached to an instance.
@@ -686,7 +661,7 @@ impl SledAgent {
     ///
     /// We're not simulating the propolis server, so directly create a
     /// snapshot here.
-    pub async fn instance_issue_disk_snapshot_request(
+    pub fn instance_issue_disk_snapshot_request(
         &self,
         _propolis_id: PropolisUuid,
         disk_id: Uuid,
@@ -696,7 +671,7 @@ impl SledAgent {
         // for each region that makes up the disk. Use the disk_id_to_region_ids
         // map to perform lookup based on this function's disk id argument.
 
-        let disk_id_to_region_ids = self.disk_id_to_region_ids.lock().await;
+        let disk_id_to_region_ids = self.disk_id_to_region_ids.lock().unwrap();
         let region_ids = disk_id_to_region_ids.get(&disk_id.to_string());
 
         let region_ids = region_ids.ok_or_else(|| {
@@ -705,16 +680,14 @@ impl SledAgent {
 
         info!(self.log, "disk id {} region ids are {:?}", disk_id, region_ids);
 
-        let storage = self.storage.lock().await;
+        let storage = self.storage.lock();
 
         for region_id in region_ids {
-            let crucible_data =
-                storage.get_dataset_for_region(*region_id).await;
+            let crucible_data = storage.get_dataset_for_region(*region_id);
 
             if let Some(crucible_data) = crucible_data {
                 crucible_data
                     .create_snapshot(*region_id, snapshot_id)
-                    .await
                     .map_err(|e| Error::internal_error(&e.to_string()))?;
             } else {
                 return Err(Error::not_found_by_id(
@@ -727,28 +700,28 @@ impl SledAgent {
         Ok(())
     }
 
-    pub async fn set_virtual_nic_host(
+    pub fn set_virtual_nic_host(
         &self,
         mapping: &VirtualNetworkInterfaceHost,
     ) -> Result<(), Error> {
-        let mut v2p_mappings = self.v2p_mappings.lock().await;
+        let mut v2p_mappings = self.v2p_mappings.lock().unwrap();
         v2p_mappings.insert(mapping.clone());
         Ok(())
     }
 
-    pub async fn unset_virtual_nic_host(
+    pub fn unset_virtual_nic_host(
         &self,
         mapping: &VirtualNetworkInterfaceHost,
     ) -> Result<(), Error> {
-        let mut v2p_mappings = self.v2p_mappings.lock().await;
+        let mut v2p_mappings = self.v2p_mappings.lock().unwrap();
         v2p_mappings.remove(mapping);
         Ok(())
     }
 
-    pub async fn list_virtual_nics(
+    pub fn list_virtual_nics(
         &self,
     ) -> Result<Vec<VirtualNetworkInterfaceHost>, Error> {
-        let v2p_mappings = self.v2p_mappings.lock().await;
+        let v2p_mappings = self.v2p_mappings.lock().unwrap();
         Ok(Vec::from_iter(v2p_mappings.clone()))
     }
 
@@ -763,7 +736,7 @@ impl SledAgent {
             ));
         }
 
-        let mut eips = self.external_ips.lock().await;
+        let mut eips = self.external_ips.lock().unwrap();
         let my_eips = eips.entry(propolis_id).or_default();
 
         // High-level behaviour: this should always succeed UNLESS
@@ -796,7 +769,7 @@ impl SledAgent {
             ));
         }
 
-        let mut eips = self.external_ips.lock().await;
+        let mut eips = self.external_ips.lock().unwrap();
         let my_eips = eips.entry(propolis_id).or_default();
 
         my_eips.remove(&body_args);
@@ -841,10 +814,7 @@ impl SledAgent {
         Ok(addr)
     }
 
-    pub async fn inventory(
-        &self,
-        addr: SocketAddr,
-    ) -> anyhow::Result<Inventory> {
+    pub fn inventory(&self, addr: SocketAddr) -> anyhow::Result<Inventory> {
         let sled_agent_address = match addr {
             SocketAddr::V4(_) => {
                 bail!("sled_agent_ip must be v6 for inventory")
@@ -852,7 +822,7 @@ impl SledAgent {
             SocketAddr::V6(v6) => v6,
         };
 
-        let storage = self.storage.lock().await;
+        let storage = self.storage.lock();
         Ok(Inventory {
             sled_id: self.id,
             sled_agent_address,
@@ -867,7 +837,7 @@ impl SledAgent {
                 self.config.hardware.reservoir_ram,
             )
             .context("reservoir_size")?,
-            omicron_zones: self.fake_zones.lock().await.clone(),
+            omicron_zones: self.fake_zones.lock().unwrap().clone(),
             disks: storage
                 .physical_disks()
                 .values()
@@ -899,7 +869,6 @@ impl SledAgent {
             // represent the "real" datasets the sled agent can observe.
             datasets: storage
                 .datasets_config_list()
-                .await
                 .map(|config| {
                     config
                         .datasets
@@ -926,10 +895,10 @@ impl SledAgent {
         dataset_id: DatasetUuid,
     ) -> Result<Vec<SupportBundleMetadata>, HttpError> {
         self.storage
-            .lock()
-            .await
-            .support_bundle_list(zpool_id, dataset_id)
+            .as_support_bundle_storage(&self.log)
+            .list(zpool_id, dataset_id)
             .await
+            .map_err(|err| err.into())
     }
 
     pub async fn support_bundle_create(
@@ -938,35 +907,49 @@ impl SledAgent {
         dataset_id: DatasetUuid,
         support_bundle_id: SupportBundleUuid,
         expected_hash: ArtifactHash,
+        stream: impl Stream<Item = Result<Bytes, HttpError>>,
     ) -> Result<SupportBundleMetadata, HttpError> {
         self.storage
-            .lock()
-            .await
-            .support_bundle_create(
+            .as_support_bundle_storage(&self.log)
+            .create(
                 zpool_id,
                 dataset_id,
                 support_bundle_id,
                 expected_hash,
+                stream,
             )
-            .await?;
-
-        Ok(SupportBundleMetadata {
-            support_bundle_id,
-            state: SupportBundleState::Complete,
-        })
+            .await
+            .map_err(|err| err.into())
     }
 
-    pub async fn support_bundle_get(
+    pub(crate) async fn support_bundle_get(
         &self,
         zpool_id: ZpoolUuid,
         dataset_id: DatasetUuid,
         support_bundle_id: SupportBundleUuid,
-    ) -> Result<(), HttpError> {
+        range: Option<PotentialRange>,
+        query: SupportBundleQueryType,
+    ) -> Result<http::Response<Body>, HttpError> {
         self.storage
-            .lock()
+            .as_support_bundle_storage(&self.log)
+            .get(zpool_id, dataset_id, support_bundle_id, range, query)
             .await
-            .support_bundle_exists(zpool_id, dataset_id, support_bundle_id)
+            .map_err(|err| err.into())
+    }
+
+    pub(crate) async fn support_bundle_head(
+        &self,
+        zpool_id: ZpoolUuid,
+        dataset_id: DatasetUuid,
+        support_bundle_id: SupportBundleUuid,
+        range: Option<PotentialRange>,
+        query: SupportBundleQueryType,
+    ) -> Result<http::Response<Body>, HttpError> {
+        self.storage
+            .as_support_bundle_storage(&self.log)
+            .head(zpool_id, dataset_id, support_bundle_id, range, query)
             .await
+            .map_err(|err| err.into())
     }
 
     pub async fn support_bundle_delete(
@@ -976,67 +959,58 @@ impl SledAgent {
         support_bundle_id: SupportBundleUuid,
     ) -> Result<(), HttpError> {
         self.storage
-            .lock()
-            .await
-            .support_bundle_delete(zpool_id, dataset_id, support_bundle_id)
+            .as_support_bundle_storage(&self.log)
+            .delete(zpool_id, dataset_id, support_bundle_id)
             .await
+            .map_err(|err| err.into())
     }
 
-    pub async fn datasets_ensure(
+    pub fn datasets_ensure(
         &self,
         config: DatasetsConfig,
     ) -> Result<DatasetsManagementResult, HttpError> {
-        self.storage.lock().await.datasets_ensure(config).await
+        self.storage.lock().datasets_ensure(config)
     }
 
-    pub async fn datasets_config_list(
-        &self,
-    ) -> Result<DatasetsConfig, HttpError> {
-        self.storage.lock().await.datasets_config_list().await
+    pub fn datasets_config_list(&self) -> Result<DatasetsConfig, HttpError> {
+        self.storage.lock().datasets_config_list()
     }
 
-    pub async fn omicron_physical_disks_list(
+    pub fn omicron_physical_disks_list(
         &self,
     ) -> Result<OmicronPhysicalDisksConfig, HttpError> {
-        self.storage.lock().await.omicron_physical_disks_list().await
+        self.storage.lock().omicron_physical_disks_list()
     }
 
-    pub async fn omicron_physical_disks_ensure(
+    pub fn omicron_physical_disks_ensure(
         &self,
         config: OmicronPhysicalDisksConfig,
     ) -> Result<DisksManagementResult, HttpError> {
-        self.storage.lock().await.omicron_physical_disks_ensure(config).await
+        self.storage.lock().omicron_physical_disks_ensure(config)
     }
 
-    pub async fn omicron_zones_list(&self) -> OmicronZonesConfig {
-        self.fake_zones.lock().await.clone()
+    pub fn omicron_zones_list(&self) -> OmicronZonesConfig {
+        self.fake_zones.lock().unwrap().clone()
     }
 
-    pub async fn omicron_zones_ensure(
-        &self,
-        requested_zones: OmicronZonesConfig,
-    ) {
-        *self.fake_zones.lock().await = requested_zones;
+    pub fn omicron_zones_ensure(&self, requested_zones: OmicronZonesConfig) {
+        *self.fake_zones.lock().unwrap() = requested_zones;
     }
 
-    pub async fn drop_dataset(
-        &self,
-        zpool_id: ZpoolUuid,
-        dataset_id: DatasetUuid,
-    ) {
-        self.storage.lock().await.drop_dataset(zpool_id, dataset_id)
+    pub fn drop_dataset(&self, zpool_id: ZpoolUuid, dataset_id: DatasetUuid) {
+        self.storage.lock().drop_dataset(zpool_id, dataset_id)
     }
 
-    pub async fn list_vpc_routes(&self) -> Vec<ResolvedVpcRouteState> {
-        let routes = self.vpc_routes.lock().await;
+    pub fn list_vpc_routes(&self) -> Vec<ResolvedVpcRouteState> {
+        let routes = self.vpc_routes.lock().unwrap();
         routes
             .iter()
             .map(|(k, v)| ResolvedVpcRouteState { id: *k, version: v.version })
             .collect()
     }
 
-    pub async fn set_vpc_routes(&self, new_routes: Vec<ResolvedVpcRouteSet>) {
-        let mut routes = self.vpc_routes.lock().await;
+    pub fn set_vpc_routes(&self, new_routes: Vec<ResolvedVpcRouteSet>) {
+        let mut routes = self.vpc_routes.lock().unwrap();
         for new in new_routes {
             // Disregard any route information for a subnet we don't have.
             let Some(old) = routes.get(&new.id) else {
diff --git a/sled-agent/src/sim/storage.rs b/sled-agent/src/sim/storage.rs
index dc8cf63fe4..e6d999e7e8 100644
--- a/sled-agent/src/sim/storage.rs
+++ b/sled-agent/src/sim/storage.rs
@@ -12,15 +12,18 @@ use crate::sim::http_entrypoints_pantry::ExpectedDigest;
 use crate::sim::http_entrypoints_pantry::PantryStatus;
 use crate::sim::http_entrypoints_pantry::VolumeStatus;
 use crate::sim::SledAgent;
+use crate::support_bundle::storage::SupportBundleManager;
 use anyhow::{self, bail, Result};
+use camino::Utf8Path;
+use camino_tempfile::Utf8TempDir;
 use chrono::prelude::*;
 use crucible_agent_client::types::{
     CreateRegion, Region, RegionId, RunningSnapshot, Snapshot, State,
 };
 use dropshot::HandlerTaskMode;
 use dropshot::HttpError;
-use futures::lock::Mutex;
 use omicron_common::disk::DatasetManagementStatus;
+use omicron_common::disk::DatasetName;
 use omicron_common::disk::DatasetsConfig;
 use omicron_common::disk::DatasetsManagementResult;
 use omicron_common::disk::DiskIdentity;
@@ -28,24 +31,26 @@ use omicron_common::disk::DiskManagementStatus;
 use omicron_common::disk::DiskVariant;
 use omicron_common::disk::DisksManagementResult;
 use omicron_common::disk::OmicronPhysicalDisksConfig;
-use omicron_common::update::ArtifactHash;
+use omicron_common::disk::SharedDatasetConfig;
 use omicron_uuid_kinds::DatasetUuid;
 use omicron_uuid_kinds::GenericUuid;
 use omicron_uuid_kinds::OmicronZoneUuid;
 use omicron_uuid_kinds::PhysicalDiskUuid;
 use omicron_uuid_kinds::PropolisUuid;
-use omicron_uuid_kinds::SupportBundleUuid;
 use omicron_uuid_kinds::ZpoolUuid;
 use propolis_client::types::VolumeConstructionRequest;
 use serde::Serialize;
-use sled_agent_api::SupportBundleMetadata;
-use sled_agent_api::SupportBundleState;
+use sled_storage::manager::NestedDatasetConfig;
+use sled_storage::manager::NestedDatasetListOptions;
+use sled_storage::manager::NestedDatasetLocation;
 use slog::Logger;
+use std::collections::BTreeMap;
 use std::collections::HashMap;
 use std::collections::HashSet;
 use std::net::{IpAddr, SocketAddr};
 use std::str::FromStr;
 use std::sync::Arc;
+use std::sync::Mutex;
 use uuid::Uuid;
 
 type CreateCallback = Box<dyn Fn(&CreateRegion) -> State + Send + 'static>;
@@ -676,91 +681,90 @@ impl CrucibleData {
         }
     }
 
-    pub async fn set_create_callback(&self, callback: CreateCallback) {
-        self.inner.lock().await.set_create_callback(callback);
+    pub fn set_create_callback(&self, callback: CreateCallback) {
+        self.inner.lock().unwrap().set_create_callback(callback);
     }
 
-    pub async fn list(&self) -> Vec<Region> {
-        self.inner.lock().await.list()
+    pub fn list(&self) -> Vec<Region> {
+        self.inner.lock().unwrap().list()
     }
 
-    pub async fn create(&self, params: CreateRegion) -> Result<Region> {
-        self.inner.lock().await.create(params)
+    pub fn create(&self, params: CreateRegion) -> Result<Region> {
+        self.inner.lock().unwrap().create(params)
     }
 
-    pub async fn get(&self, id: RegionId) -> Option<Region> {
-        self.inner.lock().await.get(id)
+    pub fn get(&self, id: RegionId) -> Option<Region> {
+        self.inner.lock().unwrap().get(id)
     }
 
-    pub async fn delete(&self, id: RegionId) -> Result<Option<Region>> {
-        self.inner.lock().await.delete(id)
+    pub fn delete(&self, id: RegionId) -> Result<Option<Region>> {
+        self.inner.lock().unwrap().delete(id)
     }
 
-    pub async fn create_snapshot(
+    pub fn create_snapshot(
         &self,
         id: Uuid,
         snapshot_id: Uuid,
     ) -> Result<Snapshot> {
-        self.inner.lock().await.create_snapshot(id, snapshot_id)
+        self.inner.lock().unwrap().create_snapshot(id, snapshot_id)
     }
 
-    pub async fn snapshots_for_region(&self, id: &RegionId) -> Vec<Snapshot> {
-        self.inner.lock().await.snapshots_for_region(id)
+    pub fn snapshots_for_region(&self, id: &RegionId) -> Vec<Snapshot> {
+        self.inner.lock().unwrap().snapshots_for_region(id)
     }
 
-    pub async fn get_snapshot_for_region(
+    pub fn get_snapshot_for_region(
         &self,
         id: &RegionId,
         snapshot_id: &str,
     ) -> Option<Snapshot> {
-        self.inner.lock().await.get_snapshot_for_region(id, snapshot_id)
+        self.inner.lock().unwrap().get_snapshot_for_region(id, snapshot_id)
     }
 
-    pub async fn running_snapshots_for_id(
+    pub fn running_snapshots_for_id(
         &self,
         id: &RegionId,
     ) -> HashMap<String, RunningSnapshot> {
-        self.inner.lock().await.running_snapshots_for_id(id)
+        self.inner.lock().unwrap().running_snapshots_for_id(id)
     }
 
-    pub async fn delete_snapshot(
-        &self,
-        id: &RegionId,
-        name: &str,
-    ) -> Result<()> {
-        self.inner.lock().await.delete_snapshot(id, name)
+    pub fn delete_snapshot(&self, id: &RegionId, name: &str) -> Result<()> {
+        self.inner.lock().unwrap().delete_snapshot(id, name)
     }
 
-    pub async fn set_creating_a_running_snapshot_should_fail(&self) {
-        self.inner.lock().await.set_creating_a_running_snapshot_should_fail();
+    pub fn set_creating_a_running_snapshot_should_fail(&self) {
+        self.inner
+            .lock()
+            .unwrap()
+            .set_creating_a_running_snapshot_should_fail();
     }
 
-    pub async fn set_region_creation_error(&self, value: bool) {
-        self.inner.lock().await.set_region_creation_error(value);
+    pub fn set_region_creation_error(&self, value: bool) {
+        self.inner.lock().unwrap().set_region_creation_error(value);
     }
 
-    pub async fn set_region_deletion_error(&self, value: bool) {
-        self.inner.lock().await.set_region_deletion_error(value);
+    pub fn set_region_deletion_error(&self, value: bool) {
+        self.inner.lock().unwrap().set_region_deletion_error(value);
     }
 
-    pub async fn create_running_snapshot(
+    pub fn create_running_snapshot(
         &self,
         id: &RegionId,
         name: &str,
     ) -> Result<RunningSnapshot> {
-        self.inner.lock().await.create_running_snapshot(id, name)
+        self.inner.lock().unwrap().create_running_snapshot(id, name)
     }
 
-    pub async fn delete_running_snapshot(
+    pub fn delete_running_snapshot(
         &self,
         id: &RegionId,
         name: &str,
     ) -> Result<()> {
-        self.inner.lock().await.delete_running_snapshot(id, name)
+        self.inner.lock().unwrap().delete_running_snapshot(id, name)
     }
 
-    pub async fn is_empty(&self) -> bool {
-        self.inner.lock().await.is_empty()
+    pub fn is_empty(&self) -> bool {
+        self.inner.lock().unwrap().is_empty()
     }
 }
 
@@ -814,11 +818,6 @@ impl CrucibleServer {
     }
 }
 
-#[derive(Default)]
-pub(crate) struct DebugData {
-    bundles: HashMap<SupportBundleUuid, ArtifactHash>,
-}
-
 pub(crate) struct PhysicalDisk {
     pub(crate) identity: DiskIdentity,
     pub(crate) variant: DiskVariant,
@@ -828,7 +827,6 @@ pub(crate) struct PhysicalDisk {
 /// Describes data being simulated within a dataset.
 pub(crate) enum DatasetContents {
     Crucible(CrucibleServer),
-    Debug(DebugData),
 }
 
 pub(crate) struct Zpool {
@@ -847,10 +845,6 @@ impl Zpool {
         Zpool { id, physical_disk_id, total_size, datasets: HashMap::new() }
     }
 
-    fn insert_debug_dataset(&mut self, id: DatasetUuid) {
-        self.datasets.insert(id, DatasetContents::Debug(DebugData::default()));
-    }
-
     fn insert_crucible_dataset(
         &mut self,
         log: &Logger,
@@ -871,10 +865,7 @@ impl Zpool {
         let DatasetContents::Crucible(crucible) = self
             .datasets
             .get(&id)
-            .expect("Failed to get the dataset we just inserted")
-        else {
-            panic!("Should have just inserted Crucible dataset");
-        };
+            .expect("Failed to get the dataset we just inserted");
         crucible
     }
 
@@ -882,17 +873,16 @@ impl Zpool {
         self.total_size
     }
 
-    pub async fn get_dataset_for_region(
+    pub fn get_dataset_for_region(
         &self,
         region_id: Uuid,
     ) -> Option<Arc<CrucibleData>> {
         for dataset in self.datasets.values() {
-            if let DatasetContents::Crucible(dataset) = dataset {
-                for region in &dataset.data().list().await {
-                    let id = Uuid::from_str(&region.id.0).unwrap();
-                    if id == region_id {
-                        return Some(dataset.data());
-                    }
+            let DatasetContents::Crucible(dataset) = dataset;
+            for region in &dataset.data().list() {
+                let id = Uuid::from_str(&region.id.0).unwrap();
+                if id == region_id {
+                    return Some(dataset.data());
                 }
             }
         }
@@ -900,19 +890,18 @@ impl Zpool {
         None
     }
 
-    pub async fn get_region_for_port(&self, port: u16) -> Option<Region> {
+    pub fn get_region_for_port(&self, port: u16) -> Option<Region> {
         let mut regions = vec![];
 
         for dataset in self.datasets.values() {
-            if let DatasetContents::Crucible(dataset) = dataset {
-                for region in &dataset.data().list().await {
-                    if region.state == State::Destroyed {
-                        continue;
-                    }
+            let DatasetContents::Crucible(dataset) = dataset;
+            for region in &dataset.data().list() {
+                if region.state == State::Destroyed {
+                    continue;
+                }
 
-                    if port == region.port_number {
-                        regions.push(region.clone());
-                    }
+                if port == region.port_number {
+                    regions.push(region.clone());
                 }
             }
         }
@@ -928,12 +917,90 @@ impl Zpool {
     }
 }
 
+/// Represents a nested dataset
+pub struct NestedDatasetStorage {
+    config: NestedDatasetConfig,
+    // We intentionally store the children before the mountpoint,
+    // so they are deleted first.
+    children: BTreeMap<String, NestedDatasetStorage>,
+    // We store this directory as a temporary directory so it gets
+    // removed when this struct is dropped.
+    #[allow(dead_code)]
+    mountpoint: Utf8TempDir,
+}
+
+impl NestedDatasetStorage {
+    fn new(
+        zpool_root: &Utf8Path,
+        dataset_root: DatasetName,
+        path: String,
+        shared_config: SharedDatasetConfig,
+    ) -> Self {
+        let name = NestedDatasetLocation { path, root: dataset_root };
+
+        // Create a mountpoint for the nested dataset storage that lasts
+        // as long as the nested dataset does.
+        let mountpoint = name.mountpoint(zpool_root);
+        println!("NestedDatasetStorage: Mountpoint {mountpoint}");
+        let parent = mountpoint.as_path().parent().unwrap();
+        println!("NestedDatasetStorage: Creating parent dir: {parent}");
+        std::fs::create_dir_all(&parent).unwrap();
+
+        let new_dir_name = mountpoint.as_path().file_name().unwrap();
+        println!("NestedDatasetStorage: New dir name: {new_dir_name}");
+        let mountpoint = camino_tempfile::Builder::new()
+            .rand_bytes(0)
+            .prefix(new_dir_name)
+            .tempdir_in(parent)
+            .unwrap();
+
+        Self {
+            config: NestedDatasetConfig { name, inner: shared_config },
+            children: BTreeMap::new(),
+            mountpoint,
+        }
+    }
+}
+
 /// Simulated representation of all storage on a sled.
+#[derive(Clone)]
 pub struct Storage {
-    sled_id: Uuid,
+    inner: Arc<Mutex<StorageInner>>,
+}
+
+impl Storage {
+    pub fn new(sled_id: Uuid, crucible_ip: IpAddr, log: Logger) -> Self {
+        Self {
+            inner: Arc::new(Mutex::new(StorageInner::new(
+                sled_id,
+                crucible_ip,
+                log,
+            ))),
+        }
+    }
+
+    pub fn lock(&self) -> std::sync::MutexGuard<StorageInner> {
+        self.inner.lock().unwrap()
+    }
+
+    pub fn as_support_bundle_storage<'a>(
+        &'a self,
+        log: &'a Logger,
+    ) -> SupportBundleManager<'a> {
+        SupportBundleManager::new(log, self)
+    }
+}
+
+/// Simulated representation of all storage on a sled.
+///
+/// Guarded by a mutex from [Storage].
+pub struct StorageInner {
     log: Logger,
+    sled_id: Uuid,
+    root: Utf8TempDir,
     config: Option<OmicronPhysicalDisksConfig>,
     dataset_config: Option<DatasetsConfig>,
+    nested_datasets: HashMap<DatasetName, NestedDatasetStorage>,
     physical_disks: HashMap<PhysicalDiskUuid, PhysicalDisk>,
     next_disk_slot: i64,
     zpools: HashMap<ZpoolUuid, Zpool>,
@@ -941,13 +1008,15 @@ pub struct Storage {
     next_crucible_port: u16,
 }
 
-impl Storage {
+impl StorageInner {
     pub fn new(sled_id: Uuid, crucible_ip: IpAddr, log: Logger) -> Self {
         Self {
             sled_id,
             log,
+            root: camino_tempfile::tempdir().unwrap(),
             config: None,
             dataset_config: None,
+            nested_datasets: HashMap::new(),
             physical_disks: HashMap::new(),
             next_disk_slot: 0,
             zpools: HashMap::new(),
@@ -956,14 +1025,17 @@ impl Storage {
         }
     }
 
+    /// Returns a path to the "zpool root" for storage.
+    pub fn root(&self) -> &Utf8Path {
+        self.root.path()
+    }
+
     /// Returns an immutable reference to all (currently known) physical disks
     pub fn physical_disks(&self) -> &HashMap<PhysicalDiskUuid, PhysicalDisk> {
         &self.physical_disks
     }
 
-    pub async fn datasets_config_list(
-        &self,
-    ) -> Result<DatasetsConfig, HttpError> {
+    pub fn datasets_config_list(&self) -> Result<DatasetsConfig, HttpError> {
         let Some(config) = self.dataset_config.as_ref() else {
             return Err(HttpError::for_not_found(
                 None,
@@ -973,7 +1045,7 @@ impl Storage {
         Ok(config.clone())
     }
 
-    pub async fn datasets_ensure(
+    pub fn datasets_ensure(
         &mut self,
         config: DatasetsConfig,
     ) -> Result<DatasetsManagementResult, HttpError> {
@@ -997,6 +1069,27 @@ impl Storage {
         }
         self.dataset_config.replace(config.clone());
 
+        // Add a "nested dataset" entry for all datasets that should exist,
+        // and remove it for all datasets that have been removed.
+        let dataset_names: HashSet<_> = config
+            .datasets
+            .values()
+            .map(|config| config.name.clone())
+            .collect();
+        for dataset in &dataset_names {
+            let root = self.root().to_path_buf();
+            self.nested_datasets.entry(dataset.clone()).or_insert_with(|| {
+                NestedDatasetStorage::new(
+                    &root,
+                    dataset.clone(),
+                    String::new(),
+                    SharedDatasetConfig::default(),
+                )
+            });
+        }
+        self.nested_datasets
+            .retain(|dataset, _| dataset_names.contains(&dataset));
+
         Ok(DatasetsManagementResult {
             status: config
                 .datasets
@@ -1009,7 +1102,149 @@ impl Storage {
         })
     }
 
-    pub async fn omicron_physical_disks_list(
+    pub fn nested_dataset_list(
+        &self,
+        name: NestedDatasetLocation,
+        options: NestedDatasetListOptions,
+    ) -> Result<Vec<NestedDatasetConfig>, HttpError> {
+        let Some(mut nested_dataset) = self.nested_datasets.get(&name.root)
+        else {
+            return Err(HttpError::for_not_found(
+                None,
+                "Dataset not found".to_string(),
+            ));
+        };
+
+        for path_component in name.path.split('/') {
+            if path_component.is_empty() {
+                continue;
+            }
+            match nested_dataset.children.get(path_component) {
+                Some(dataset) => nested_dataset = dataset,
+                None => {
+                    return Err(HttpError::for_not_found(
+                        None,
+                        "Dataset not found".to_string(),
+                    ))
+                }
+            };
+        }
+
+        let mut children: Vec<_> = nested_dataset
+            .children
+            .values()
+            .map(|storage| storage.config.clone())
+            .collect();
+
+        match options {
+            NestedDatasetListOptions::ChildrenOnly => return Ok(children),
+            NestedDatasetListOptions::SelfAndChildren => {
+                children.insert(0, nested_dataset.config.clone());
+                return Ok(children);
+            }
+        }
+    }
+
+    pub fn nested_dataset_ensure(
+        &mut self,
+        config: NestedDatasetConfig,
+    ) -> Result<(), HttpError> {
+        let name = &config.name;
+        let nested_path = name.path.to_string();
+        let zpool_root = self.root().to_path_buf();
+        let Some(mut nested_dataset) = self.nested_datasets.get_mut(&name.root)
+        else {
+            return Err(HttpError::for_not_found(
+                None,
+                "Dataset not found".to_string(),
+            ));
+        };
+
+        for path_component in nested_path.split('/') {
+            if path_component.is_empty() {
+                continue;
+            }
+
+            // Final component of path -- insert it here if it doesn't exist
+            // already.
+            if !path_component.contains('/') {
+                let entry =
+                    nested_dataset.children.entry(path_component.to_string());
+                entry
+                    .and_modify(|storage| {
+                        storage.config = config.clone();
+                    })
+                    .or_insert_with(|| {
+                        NestedDatasetStorage::new(
+                            &zpool_root,
+                            config.name.root,
+                            nested_path,
+                            config.inner,
+                        )
+                    });
+                return Ok(());
+            }
+
+            match nested_dataset.children.get_mut(path_component) {
+                Some(dataset) => nested_dataset = dataset,
+                None => {
+                    return Err(HttpError::for_not_found(
+                        None,
+                        "Dataset not found".to_string(),
+                    ))
+                }
+            };
+        }
+        return Err(HttpError::for_not_found(
+            None,
+            "Nested Dataset not found".to_string(),
+        ));
+    }
+
+    pub fn nested_dataset_destroy(
+        &mut self,
+        name: NestedDatasetLocation,
+    ) -> Result<(), HttpError> {
+        let Some(mut nested_dataset) = self.nested_datasets.get_mut(&name.root)
+        else {
+            return Err(HttpError::for_not_found(
+                None,
+                "Dataset not found".to_string(),
+            ));
+        };
+
+        for path_component in name.path.split('/') {
+            if path_component.is_empty() {
+                continue;
+            }
+
+            // Final component of path -- remove it if it exists.
+            if !path_component.contains('/') {
+                if nested_dataset.children.remove(path_component).is_none() {
+                    return Err(HttpError::for_not_found(
+                        None,
+                        "Nested Dataset not found".to_string(),
+                    ));
+                };
+                return Ok(());
+            }
+            match nested_dataset.children.get_mut(path_component) {
+                Some(dataset) => nested_dataset = dataset,
+                None => {
+                    return Err(HttpError::for_not_found(
+                        None,
+                        "Dataset not found".to_string(),
+                    ))
+                }
+            };
+        }
+        return Err(HttpError::for_not_found(
+            None,
+            "Nested Dataset not found".to_string(),
+        ));
+    }
+
+    pub fn omicron_physical_disks_list(
         &mut self,
     ) -> Result<OmicronPhysicalDisksConfig, HttpError> {
         let Some(config) = self.config.as_ref() else {
@@ -1021,7 +1256,7 @@ impl Storage {
         Ok(config.clone())
     }
 
-    pub async fn omicron_physical_disks_ensure(
+    pub fn omicron_physical_disks_ensure(
         &mut self,
         config: OmicronPhysicalDisksConfig,
     ) -> Result<DisksManagementResult, HttpError> {
@@ -1057,7 +1292,7 @@ impl Storage {
         })
     }
 
-    pub async fn insert_physical_disk(
+    pub fn insert_physical_disk(
         &mut self,
         id: PhysicalDiskUuid,
         identity: DiskIdentity,
@@ -1070,7 +1305,7 @@ impl Storage {
     }
 
     /// Adds a Zpool to the sled's simulated storage.
-    pub async fn insert_zpool(
+    pub fn insert_zpool(
         &mut self,
         zpool_id: ZpoolUuid,
         disk_id: PhysicalDiskUuid,
@@ -1085,143 +1320,8 @@ impl Storage {
         &self.zpools
     }
 
-    fn get_debug_dataset(
-        &self,
-        zpool_id: ZpoolUuid,
-        dataset_id: DatasetUuid,
-    ) -> Result<&DebugData, HttpError> {
-        let Some(zpool) = self.zpools.get(&zpool_id) else {
-            return Err(HttpError::for_not_found(
-                None,
-                format!("zpool does not exist {zpool_id}"),
-            ));
-        };
-        let Some(dataset) = zpool.datasets.get(&dataset_id) else {
-            return Err(HttpError::for_not_found(
-                None,
-                format!("dataset does not exist {dataset_id}"),
-            ));
-        };
-
-        let DatasetContents::Debug(debug) = dataset else {
-            return Err(HttpError::for_bad_request(
-                None,
-                format!("Not a debug dataset: {zpool_id} / {dataset_id}"),
-            ));
-        };
-
-        Ok(debug)
-    }
-
-    fn get_debug_dataset_mut(
-        &mut self,
-        zpool_id: ZpoolUuid,
-        dataset_id: DatasetUuid,
-    ) -> Result<&mut DebugData, HttpError> {
-        let Some(zpool) = self.zpools.get_mut(&zpool_id) else {
-            return Err(HttpError::for_not_found(
-                None,
-                format!("zpool does not exist {zpool_id}"),
-            ));
-        };
-        let Some(dataset) = zpool.datasets.get_mut(&dataset_id) else {
-            return Err(HttpError::for_not_found(
-                None,
-                format!("dataset does not exist {dataset_id}"),
-            ));
-        };
-
-        let DatasetContents::Debug(debug) = dataset else {
-            return Err(HttpError::for_bad_request(
-                None,
-                format!("Not a debug dataset: {zpool_id} / {dataset_id}"),
-            ));
-        };
-
-        Ok(debug)
-    }
-
-    pub async fn support_bundle_list(
-        &self,
-        zpool_id: ZpoolUuid,
-        dataset_id: DatasetUuid,
-    ) -> Result<Vec<SupportBundleMetadata>, HttpError> {
-        let debug = self.get_debug_dataset(zpool_id, dataset_id)?;
-
-        Ok(debug
-            .bundles
-            .keys()
-            .map(|id| SupportBundleMetadata {
-                support_bundle_id: *id,
-                state: SupportBundleState::Complete,
-            })
-            .collect())
-    }
-
-    pub async fn support_bundle_create(
-        &mut self,
-        zpool_id: ZpoolUuid,
-        dataset_id: DatasetUuid,
-        support_bundle_id: SupportBundleUuid,
-        hash: ArtifactHash,
-    ) -> Result<(), HttpError> {
-        let debug = self.get_debug_dataset_mut(zpool_id, dataset_id)?;
-
-        // This is for the simulated server, so we totally ignore the "contents"
-        // of the bundle and just accept that it should exist.
-        debug.bundles.insert(support_bundle_id, hash);
-
-        Ok(())
-    }
-
-    pub async fn support_bundle_exists(
-        &self,
-        zpool_id: ZpoolUuid,
-        dataset_id: DatasetUuid,
-        support_bundle_id: SupportBundleUuid,
-    ) -> Result<(), HttpError> {
-        let debug = self.get_debug_dataset(zpool_id, dataset_id)?;
-
-        if !debug.bundles.contains_key(&support_bundle_id) {
-            return Err(HttpError::for_not_found(
-                None,
-                format!("Support bundle not found {support_bundle_id}"),
-            ));
-        }
-        Ok(())
-    }
-
-    pub async fn support_bundle_delete(
-        &mut self,
-        zpool_id: ZpoolUuid,
-        dataset_id: DatasetUuid,
-        support_bundle_id: SupportBundleUuid,
-    ) -> Result<(), HttpError> {
-        let debug = self.get_debug_dataset_mut(zpool_id, dataset_id)?;
-
-        if debug.bundles.remove(&support_bundle_id).is_none() {
-            return Err(HttpError::for_not_found(
-                None,
-                format!("Support bundle not found {support_bundle_id}"),
-            ));
-        }
-        Ok(())
-    }
-
-    /// Adds a debug dataset to the sled's simulated storage
-    pub async fn insert_debug_dataset(
-        &mut self,
-        zpool_id: ZpoolUuid,
-        dataset_id: DatasetUuid,
-    ) {
-        self.zpools
-            .get_mut(&zpool_id)
-            .expect("Zpool does not exist")
-            .insert_debug_dataset(dataset_id);
-    }
-
     /// Adds a Crucible dataset to the sled's simulated storage.
-    pub async fn insert_crucible_dataset(
+    pub fn insert_crucible_dataset(
         &mut self,
         zpool_id: ZpoolUuid,
         dataset_id: DatasetUuid,
@@ -1291,16 +1391,13 @@ impl Storage {
         zpool
             .datasets
             .iter()
-            .filter_map(|(id, dataset)| match dataset {
-                DatasetContents::Crucible(server) => {
-                    Some((*id, server.address()))
-                }
-                _ => None,
+            .map(|(id, dataset)| match dataset {
+                DatasetContents::Crucible(server) => (*id, server.address()),
             })
             .collect()
     }
 
-    pub async fn get_dataset(
+    pub fn get_dataset(
         &self,
         zpool_id: ZpoolUuid,
         dataset_id: DatasetUuid,
@@ -1313,24 +1410,22 @@ impl Storage {
             .expect("Dataset does not exist")
     }
 
-    pub async fn get_crucible_dataset(
+    pub fn get_crucible_dataset(
         &self,
         zpool_id: ZpoolUuid,
         dataset_id: DatasetUuid,
     ) -> Arc<CrucibleData> {
-        match self.get_dataset(zpool_id, dataset_id).await {
+        match self.get_dataset(zpool_id, dataset_id) {
             DatasetContents::Crucible(crucible) => crucible.data.clone(),
-            _ => panic!("{zpool_id} / {dataset_id} is not a crucible dataset"),
         }
     }
 
-    pub async fn get_dataset_for_region(
+    pub fn get_dataset_for_region(
         &self,
         region_id: Uuid,
     ) -> Option<Arc<CrucibleData>> {
         for zpool in self.zpools.values() {
-            if let Some(dataset) = zpool.get_dataset_for_region(region_id).await
-            {
+            if let Some(dataset) = zpool.get_dataset_for_region(region_id) {
                 return Some(dataset);
             }
         }
@@ -1338,10 +1433,10 @@ impl Storage {
         None
     }
 
-    pub async fn get_region_for_port(&self, port: u16) -> Option<Region> {
+    pub fn get_region_for_port(&self, port: u16) -> Option<Region> {
         let mut regions = vec![];
         for zpool in self.zpools.values() {
-            if let Some(region) = zpool.get_region_for_port(port).await {
+            if let Some(region) = zpool.get_region_for_port(port) {
                 regions.push(region);
             }
         }
@@ -1389,18 +1484,18 @@ impl Pantry {
         }
     }
 
-    pub async fn status(&self) -> Result<PantryStatus, HttpError> {
+    pub fn status(&self) -> Result<PantryStatus, HttpError> {
         Ok(PantryStatus {
-            volumes: self.volumes.lock().await.keys().cloned().collect(),
-            num_job_handles: self.jobs.lock().await.len(),
+            volumes: self.volumes.lock().unwrap().keys().cloned().collect(),
+            num_job_handles: self.jobs.lock().unwrap().len(),
         })
     }
 
-    pub async fn entry(
+    pub fn entry(
         &self,
         volume_id: String,
     ) -> Result<VolumeConstructionRequest, HttpError> {
-        let volumes = self.volumes.lock().await;
+        let volumes = self.volumes.lock().unwrap();
         match volumes.get(&volume_id) {
             Some(entry) => Ok(entry.vcr.clone()),
 
@@ -1408,12 +1503,12 @@ impl Pantry {
         }
     }
 
-    pub async fn attach(
+    pub fn attach(
         &self,
         volume_id: String,
         volume_construction_request: VolumeConstructionRequest,
     ) -> Result<()> {
-        let mut volumes = self.volumes.lock().await;
+        let mut volumes = self.volumes.lock().unwrap();
 
         volumes.insert(
             volume_id,
@@ -1431,14 +1526,14 @@ impl Pantry {
         Ok(())
     }
 
-    pub async fn attach_activate_background(
+    pub fn attach_activate_background(
         &self,
         volume_id: String,
         activate_job_id: String,
         volume_construction_request: VolumeConstructionRequest,
     ) -> Result<(), HttpError> {
-        let mut volumes = self.volumes.lock().await;
-        let mut jobs = self.jobs.lock().await;
+        let mut volumes = self.volumes.lock().unwrap();
+        let mut jobs = self.jobs.lock().unwrap();
 
         volumes.insert(
             volume_id,
@@ -1458,30 +1553,30 @@ impl Pantry {
         Ok(())
     }
 
-    pub async fn activate_background_attachment(
+    pub fn activate_background_attachment(
         &self,
         volume_id: String,
     ) -> Result<String, HttpError> {
         let activate_job = {
-            let volumes = self.volumes.lock().await;
+            let volumes = self.volumes.lock().unwrap();
             volumes.get(&volume_id).unwrap().activate_job.clone().unwrap()
         };
 
-        let mut status = self.volume_status(volume_id.clone()).await?;
+        let mut status = self.volume_status(volume_id.clone())?;
 
         status.active = true;
         status.seen_active = true;
 
-        self.update_volume_status(volume_id, status).await?;
+        self.update_volume_status(volume_id, status)?;
 
         Ok(activate_job)
     }
 
-    pub async fn volume_status(
+    pub fn volume_status(
         &self,
         volume_id: String,
     ) -> Result<VolumeStatus, HttpError> {
-        let volumes = self.volumes.lock().await;
+        let volumes = self.volumes.lock().unwrap();
 
         match volumes.get(&volume_id) {
             Some(pantry_volume) => Ok(pantry_volume.status.clone()),
@@ -1490,12 +1585,12 @@ impl Pantry {
         }
     }
 
-    pub async fn update_volume_status(
+    pub fn update_volume_status(
         &self,
         volume_id: String,
         status: VolumeStatus,
     ) -> Result<(), HttpError> {
-        let mut volumes = self.volumes.lock().await;
+        let mut volumes = self.volumes.lock().unwrap();
 
         match volumes.get_mut(&volume_id) {
             Some(pantry_volume) => {
@@ -1507,22 +1602,19 @@ impl Pantry {
         }
     }
 
-    pub async fn is_job_finished(
-        &self,
-        job_id: String,
-    ) -> Result<bool, HttpError> {
-        let jobs = self.jobs.lock().await;
+    pub fn is_job_finished(&self, job_id: String) -> Result<bool, HttpError> {
+        let jobs = self.jobs.lock().unwrap();
         if !jobs.contains(&job_id) {
             return Err(HttpError::for_not_found(None, job_id));
         }
         Ok(true)
     }
 
-    pub async fn get_job_result(
+    pub fn get_job_result(
         &self,
         job_id: String,
     ) -> Result<Result<bool>, HttpError> {
-        let mut jobs = self.jobs.lock().await;
+        let mut jobs = self.jobs.lock().unwrap();
         if !jobs.contains(&job_id) {
             return Err(HttpError::for_not_found(None, job_id));
         }
@@ -1530,23 +1622,23 @@ impl Pantry {
         Ok(Ok(true))
     }
 
-    pub async fn import_from_url(
+    pub fn import_from_url(
         &self,
         volume_id: String,
         _url: String,
         _expected_digest: Option<ExpectedDigest>,
     ) -> Result<String, HttpError> {
-        self.entry(volume_id).await?;
+        self.entry(volume_id)?;
 
         // Make up job
-        let mut jobs = self.jobs.lock().await;
+        let mut jobs = self.jobs.lock().unwrap();
         let job_id = Uuid::new_v4().to_string();
         jobs.insert(job_id.clone());
 
         Ok(job_id)
     }
 
-    pub async fn snapshot(
+    pub fn snapshot(
         &self,
         volume_id: String,
         snapshot_id: String,
@@ -1555,12 +1647,11 @@ impl Pantry {
         // the simulated instance ensure, then call
         // [`instance_issue_disk_snapshot_request`] as the snapshot logic is the
         // same.
-        let volumes = self.volumes.lock().await;
+        let volumes = self.volumes.lock().unwrap();
         let volume_construction_request = &volumes.get(&volume_id).unwrap().vcr;
 
         self.sled_agent
-            .map_disk_ids_to_region_ids(volume_construction_request)
-            .await?;
+            .map_disk_ids_to_region_ids(volume_construction_request)?;
 
         self.sled_agent
             .instance_issue_disk_snapshot_request(
@@ -1568,17 +1659,16 @@ impl Pantry {
                 volume_id.parse().unwrap(),
                 snapshot_id.parse().unwrap(),
             )
-            .await
             .map_err(|e| HttpError::for_internal_error(e.to_string()))
     }
 
-    pub async fn bulk_write(
+    pub fn bulk_write(
         &self,
         volume_id: String,
         offset: u64,
         data: Vec<u8>,
     ) -> Result<(), HttpError> {
-        let vcr = self.entry(volume_id).await?;
+        let vcr = self.entry(volume_id)?;
 
         // Currently, Nexus will only make volumes where the first subvolume is
         // a Region. This will change in the future!
@@ -1632,19 +1722,19 @@ impl Pantry {
         Ok(())
     }
 
-    pub async fn scrub(&self, volume_id: String) -> Result<String, HttpError> {
-        self.entry(volume_id).await?;
+    pub fn scrub(&self, volume_id: String) -> Result<String, HttpError> {
+        self.entry(volume_id)?;
 
         // Make up job
-        let mut jobs = self.jobs.lock().await;
+        let mut jobs = self.jobs.lock().unwrap();
         let job_id = Uuid::new_v4().to_string();
         jobs.insert(job_id.clone());
 
         Ok(job_id)
     }
 
-    pub async fn detach(&self, volume_id: String) -> Result<()> {
-        let mut volumes = self.volumes.lock().await;
+    pub fn detach(&self, volume_id: String) -> Result<()> {
+        let mut volumes = self.volumes.lock().unwrap();
         volumes.remove(&volume_id);
         Ok(())
     }
@@ -1656,11 +1746,7 @@ pub struct PantryServer {
 }
 
 impl PantryServer {
-    pub async fn new(
-        log: Logger,
-        ip: IpAddr,
-        sled_agent: Arc<SledAgent>,
-    ) -> Self {
+    pub fn new(log: Logger, ip: IpAddr, sled_agent: Arc<SledAgent>) -> Self {
         let pantry = Arc::new(Pantry::new(sled_agent));
 
         let server = dropshot::ServerBuilder::new(
diff --git a/sled-agent/src/support_bundle/storage.rs b/sled-agent/src/support_bundle/storage.rs
index 97d345a8d2..c1b0cfe42f 100644
--- a/sled-agent/src/support_bundle/storage.rs
+++ b/sled-agent/src/support_bundle/storage.rs
@@ -4,6 +4,7 @@
 
 //! Management of and access to Support Bundles
 
+use async_trait::async_trait;
 use bytes::Bytes;
 use camino::Utf8Path;
 use dropshot::Body;
@@ -13,6 +14,7 @@ use futures::StreamExt;
 use omicron_common::api::external::Error as ExternalError;
 use omicron_common::disk::CompressionAlgorithm;
 use omicron_common::disk::DatasetConfig;
+use omicron_common::disk::DatasetsConfig;
 use omicron_common::disk::SharedDatasetConfig;
 use omicron_common::update::ArtifactHash;
 use omicron_uuid_kinds::DatasetUuid;
@@ -30,6 +32,7 @@ use sled_storage::manager::NestedDatasetLocation;
 use sled_storage::manager::StorageHandle;
 use slog::Logger;
 use slog_error_chain::InlineErrorChain;
+use std::borrow::Cow;
 use std::io::Write;
 use tokio::io::AsyncReadExt;
 use tokio::io::AsyncSeekExt;
@@ -37,6 +40,16 @@ use tokio::io::AsyncWriteExt;
 use tokio_util::io::ReaderStream;
 use zip::result::ZipError;
 
+// The final name of the bundle, as it is stored within the dedicated
+// datasets.
+//
+// The full path is of the form:
+//
+// /pool/ext/$(POOL_UUID)/crypt/$(DATASET_TYPE)/$(BUNDLE_UUID)/bundle.zip
+//                              |               | This is a per-bundle nested dataset
+//                              | This is a Debug dataset
+const BUNDLE_FILE_NAME: &str = "bundle.zip";
+
 #[derive(thiserror::Error, Debug)]
 pub enum Error {
     #[error(transparent)]
@@ -100,6 +113,116 @@ impl From<Error> for HttpError {
     }
 }
 
+/// Abstracts the storage APIs for accessing datasets.
+///
+/// Allows support bundle storage to work on both simulated and non-simulated
+/// sled agents.
+#[async_trait]
+pub trait LocalStorage: Sync {
+    // These methods are all prefixed as "dyn_" to avoid duplicating the name
+    // with the real implementations.
+    //
+    // Dispatch is a little silly; if I use the same name as the real
+    // implementation, then a "missing function" dispatches to the trait instead
+    // and results in infinite recursion.
+
+    /// Returns all configured datasets
+    async fn dyn_datasets_config_list(&self) -> Result<DatasetsConfig, Error>;
+
+    /// Returns all nested datasets within an existing dataset
+    async fn dyn_nested_dataset_list(
+        &self,
+        name: NestedDatasetLocation,
+        options: NestedDatasetListOptions,
+    ) -> Result<Vec<NestedDatasetConfig>, Error>;
+
+    /// Ensures a nested dataset exists
+    async fn dyn_nested_dataset_ensure(
+        &self,
+        config: NestedDatasetConfig,
+    ) -> Result<(), Error>;
+
+    /// Destroys a nested dataset
+    async fn dyn_nested_dataset_destroy(
+        &self,
+        name: NestedDatasetLocation,
+    ) -> Result<(), Error>;
+
+    /// Returns the root filesystem path where datasets are mounted.
+    ///
+    /// This is typically "/" in prod, but can be a temporary directory
+    /// for tests to isolate storage that typically appears globally.
+    fn zpool_mountpoint_root(&self) -> Cow<Utf8Path>;
+}
+
+/// This implementation is effectively a pass-through to the real methods
+#[async_trait]
+impl LocalStorage for StorageHandle {
+    async fn dyn_datasets_config_list(&self) -> Result<DatasetsConfig, Error> {
+        self.datasets_config_list().await.map_err(|err| err.into())
+    }
+
+    async fn dyn_nested_dataset_list(
+        &self,
+        name: NestedDatasetLocation,
+        options: NestedDatasetListOptions,
+    ) -> Result<Vec<NestedDatasetConfig>, Error> {
+        self.nested_dataset_list(name, options).await.map_err(|err| err.into())
+    }
+
+    async fn dyn_nested_dataset_ensure(
+        &self,
+        config: NestedDatasetConfig,
+    ) -> Result<(), Error> {
+        self.nested_dataset_ensure(config).await.map_err(|err| err.into())
+    }
+
+    async fn dyn_nested_dataset_destroy(
+        &self,
+        name: NestedDatasetLocation,
+    ) -> Result<(), Error> {
+        self.nested_dataset_destroy(name).await.map_err(|err| err.into())
+    }
+
+    fn zpool_mountpoint_root(&self) -> Cow<Utf8Path> {
+        Cow::Borrowed(illumos_utils::zpool::ZPOOL_MOUNTPOINT_ROOT.into())
+    }
+}
+
+/// This implementation allows storage bundles to be stored on simulated storage
+#[async_trait]
+impl LocalStorage for crate::sim::Storage {
+    async fn dyn_datasets_config_list(&self) -> Result<DatasetsConfig, Error> {
+        self.lock().datasets_config_list().map_err(|err| err.into())
+    }
+
+    async fn dyn_nested_dataset_list(
+        &self,
+        name: NestedDatasetLocation,
+        options: NestedDatasetListOptions,
+    ) -> Result<Vec<NestedDatasetConfig>, Error> {
+        self.lock().nested_dataset_list(name, options).map_err(|err| err.into())
+    }
+
+    async fn dyn_nested_dataset_ensure(
+        &self,
+        config: NestedDatasetConfig,
+    ) -> Result<(), Error> {
+        self.lock().nested_dataset_ensure(config).map_err(|err| err.into())
+    }
+
+    async fn dyn_nested_dataset_destroy(
+        &self,
+        name: NestedDatasetLocation,
+    ) -> Result<(), Error> {
+        self.lock().nested_dataset_destroy(name).map_err(|err| err.into())
+    }
+
+    fn zpool_mountpoint_root(&self) -> Cow<Utf8Path> {
+        Cow::Owned(self.lock().root().to_path_buf())
+    }
+}
+
 /// Describes the type of access to the support bundle
 #[derive(Clone, Debug)]
 pub(crate) enum SupportBundleQueryType {
@@ -237,7 +360,7 @@ fn stream_zip_entry(
 /// APIs to manage support bundle storage.
 pub struct SupportBundleManager<'a> {
     log: &'a Logger,
-    storage: &'a StorageHandle,
+    storage: &'a dyn LocalStorage,
 }
 
 impl<'a> SupportBundleManager<'a> {
@@ -245,7 +368,7 @@ impl<'a> SupportBundleManager<'a> {
     /// to support bundle CRUD APIs.
     pub fn new(
         log: &'a Logger,
-        storage: &'a StorageHandle,
+        storage: &'a dyn LocalStorage,
     ) -> SupportBundleManager<'a> {
         Self { log, storage }
     }
@@ -256,7 +379,7 @@ impl<'a> SupportBundleManager<'a> {
         zpool_id: ZpoolUuid,
         dataset_id: DatasetUuid,
     ) -> Result<DatasetConfig, Error> {
-        let datasets_config = self.storage.datasets_config_list().await?;
+        let datasets_config = self.storage.dyn_datasets_config_list().await?;
         let dataset = datasets_config
             .datasets
             .get(&dataset_id)
@@ -290,7 +413,7 @@ impl<'a> SupportBundleManager<'a> {
             NestedDatasetLocation { path: String::from(""), root };
         let datasets = self
             .storage
-            .nested_dataset_list(
+            .dyn_nested_dataset_list(
                 dataset_location,
                 NestedDatasetListOptions::ChildrenOnly,
             )
@@ -309,8 +432,8 @@ impl<'a> SupportBundleManager<'a> {
             // The dataset for a support bundle exists.
             let support_bundle_path = dataset
                 .name
-                .mountpoint(illumos_utils::zpool::ZPOOL_MOUNTPOINT_ROOT.into())
-                .join("bundle");
+                .mountpoint(&self.storage.zpool_mountpoint_root())
+                .join(BUNDLE_FILE_NAME);
 
             // Identify whether or not the final "bundle" file exists.
             //
@@ -399,9 +522,9 @@ impl<'a> SupportBundleManager<'a> {
         let dataset =
             NestedDatasetLocation { path: support_bundle_id.to_string(), root };
         // The mounted root of the support bundle dataset
-        let support_bundle_dir = dataset
-            .mountpoint(illumos_utils::zpool::ZPOOL_MOUNTPOINT_ROOT.into());
-        let support_bundle_path = support_bundle_dir.join("bundle");
+        let support_bundle_dir =
+            dataset.mountpoint(&self.storage.zpool_mountpoint_root());
+        let support_bundle_path = support_bundle_dir.join(BUNDLE_FILE_NAME);
         let support_bundle_path_tmp = support_bundle_dir.join(format!(
             "bundle-{}.tmp",
             thread_rng()
@@ -414,7 +537,7 @@ impl<'a> SupportBundleManager<'a> {
         // Ensure that the dataset exists.
         info!(log, "Ensuring dataset exists for bundle");
         self.storage
-            .nested_dataset_ensure(NestedDatasetConfig {
+            .dyn_nested_dataset_ensure(NestedDatasetConfig {
                 name: dataset,
                 inner: SharedDatasetConfig {
                     compression: CompressionAlgorithm::On,
@@ -423,6 +546,7 @@ impl<'a> SupportBundleManager<'a> {
                 },
             })
             .await?;
+        info!(log, "Dataset does exist for bundle");
 
         // Exit early if the support bundle already exists
         if tokio::fs::try_exists(&support_bundle_path).await? {
@@ -446,9 +570,14 @@ impl<'a> SupportBundleManager<'a> {
 
         // Stream the file into the dataset, first as a temporary file,
         // and then renaming to the final location.
-        info!(log, "Streaming bundle to storage");
+        info!(
+            log,
+            "Streaming bundle to storage";
+            "path" => ?support_bundle_path_tmp,
+        );
         let tmp_file =
             tokio::fs::File::create(&support_bundle_path_tmp).await?;
+
         if let Err(err) = Self::write_and_finalize_bundle(
             tmp_file,
             &support_bundle_path_tmp,
@@ -492,7 +621,7 @@ impl<'a> SupportBundleManager<'a> {
         let root =
             self.get_configured_dataset(zpool_id, dataset_id).await?.name;
         self.storage
-            .nested_dataset_destroy(NestedDatasetLocation {
+            .dyn_nested_dataset_destroy(NestedDatasetLocation {
                 path: support_bundle_id.to_string(),
                 root,
             })
@@ -512,9 +641,9 @@ impl<'a> SupportBundleManager<'a> {
         let dataset =
             NestedDatasetLocation { path: support_bundle_id.to_string(), root };
         // The mounted root of the support bundle dataset
-        let support_bundle_dir = dataset
-            .mountpoint(illumos_utils::zpool::ZPOOL_MOUNTPOINT_ROOT.into());
-        let path = support_bundle_dir.join("bundle");
+        let support_bundle_dir =
+            dataset.mountpoint(&self.storage.zpool_mountpoint_root());
+        let path = support_bundle_dir.join(BUNDLE_FILE_NAME);
 
         let f = tokio::fs::File::open(&path).await?;
         Ok(f)

From 60c27d32e5daf1e454ecee6d1998aad565435471 Mon Sep 17 00:00:00 2001
From: Sean Klein <seanmarionklein@gmail.com>
Date: Tue, 17 Dec 2024 13:56:05 -0800
Subject: [PATCH 2/2] cleanup println

---
 sled-agent/src/sim/storage.rs | 3 ---
 1 file changed, 3 deletions(-)

diff --git a/sled-agent/src/sim/storage.rs b/sled-agent/src/sim/storage.rs
index e6d999e7e8..07cb4e1ccf 100644
--- a/sled-agent/src/sim/storage.rs
+++ b/sled-agent/src/sim/storage.rs
@@ -941,13 +941,10 @@ impl NestedDatasetStorage {
         // Create a mountpoint for the nested dataset storage that lasts
         // as long as the nested dataset does.
         let mountpoint = name.mountpoint(zpool_root);
-        println!("NestedDatasetStorage: Mountpoint {mountpoint}");
         let parent = mountpoint.as_path().parent().unwrap();
-        println!("NestedDatasetStorage: Creating parent dir: {parent}");
         std::fs::create_dir_all(&parent).unwrap();
 
         let new_dir_name = mountpoint.as_path().file_name().unwrap();
-        println!("NestedDatasetStorage: New dir name: {new_dir_name}");
         let mountpoint = camino_tempfile::Builder::new()
             .rand_bytes(0)
             .prefix(new_dir_name)