diff --git a/.github/workflows/hakari.yml b/.github/workflows/hakari.yml
index 10c1c04003..0f942e7d76 100644
--- a/.github/workflows/hakari.yml
+++ b/.github/workflows/hakari.yml
@@ -24,7 +24,7 @@ jobs:
         with:
           toolchain: stable
       - name: Install cargo-hakari
-        uses: taiki-e/install-action@4ce8785db2a8a56c9ede16f705c2c49c5c61669c # v2
+        uses: taiki-e/install-action@1d776b18af134fca43744080130085d256938196 # v2
         with:
           tool: cargo-hakari
       - name: Check workspace-hack Cargo.toml is up-to-date
diff --git a/Cargo.lock b/Cargo.lock
index 628478abfe..2c1e4acd50 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2042,9 +2042,9 @@ dependencies = [
 
 [[package]]
 name = "dyn-clone"
-version = "1.0.16"
+version = "1.0.17"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "545b22097d44f8a9581187cdf93de7a71e4722bf51200cfaba810865b49a495d"
+checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125"
 
 [[package]]
 name = "ecdsa"
@@ -4271,6 +4271,7 @@ dependencies = [
  "dns-service-client",
  "futures",
  "httptest",
+ "illumos-utils",
  "internal-dns",
  "ipnet",
  "nexus-db-model",
diff --git a/Cargo.toml b/Cargo.toml
index ecbcc54c92..ad3d801876 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -200,7 +200,7 @@ dns-server = { path = "dns-server" }
 dns-service-client = { path = "clients/dns-service-client" }
 dpd-client = { path = "clients/dpd-client" }
 dropshot = { git = "https://github.com/oxidecomputer/dropshot", branch = "main", features = [ "usdt-probes" ] }
-dyn-clone = "1.0.16"
+dyn-clone = "1.0.17"
 either = "1.10.0"
 expectorate = "1.1.0"
 fatfs = "0.3.6"
diff --git a/nexus/blueprint-execution/Cargo.toml b/nexus/blueprint-execution/Cargo.toml
index 3284bda27e..164559b468 100644
--- a/nexus/blueprint-execution/Cargo.toml
+++ b/nexus/blueprint-execution/Cargo.toml
@@ -10,6 +10,7 @@ omicron-rpaths.workspace = true
 anyhow.workspace = true
 dns-service-client.workspace = true
 futures.workspace = true
+illumos-utils.workspace = true
 internal-dns.workspace = true
 nexus-db-model.workspace = true
 nexus-db-queries.workspace = true
diff --git a/nexus/blueprint-execution/src/datasets.rs b/nexus/blueprint-execution/src/datasets.rs
new file mode 100644
index 0000000000..97d8324fdb
--- /dev/null
+++ b/nexus/blueprint-execution/src/datasets.rs
@@ -0,0 +1,315 @@
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at https://mozilla.org/MPL/2.0/.
+
+//! Ensures dataset records required by a given blueprint
+
+use anyhow::Context;
+use illumos_utils::zpool::ZpoolName;
+use nexus_db_model::Dataset;
+use nexus_db_model::DatasetKind;
+use nexus_db_queries::context::OpContext;
+use nexus_db_queries::db::DataStore;
+use nexus_types::deployment::OmicronZoneConfig;
+use nexus_types::deployment::OmicronZoneType;
+use nexus_types::identity::Asset;
+use slog::info;
+use slog::warn;
+use slog_error_chain::InlineErrorChain;
+use std::collections::BTreeSet;
+use std::net::SocketAddrV6;
+
+/// For each crucible zone in `blueprint`, ensure that a corresponding dataset
+/// record exists in `datastore`
+///
+/// Does not modify any existing dataset records. Returns the number of datasets
+/// inserted.
+pub(crate) async fn ensure_crucible_dataset_records_exist(
+    opctx: &OpContext,
+    datastore: &DataStore,
+    all_omicron_zones: impl Iterator<Item = &OmicronZoneConfig>,
+) -> anyhow::Result<usize> {
+    // Before attempting to insert any datasets, first query for any existing
+    // dataset records so we can filter them out. This looks like a typical
+    // TOCTOU issue, but it is purely a performance optimization. We expect
+    // almost all executions of this function to do nothing: new crucible
+    // datasets are created very rarely relative to how frequently blueprint
+    // realization happens. We could remove this check and filter and instead
+    // run the below "insert if not exists" query on every crucible zone, and
+    // the behavior would still be correct. However, that would issue far more
+    // queries than necessary in the very common case of "we don't need to do
+    // anything at all".
+    let mut crucible_datasets = datastore
+        .dataset_list_all_batched(opctx, Some(DatasetKind::Crucible))
+        .await
+        .context("failed to list all datasets")?
+        .into_iter()
+        .map(|dataset| dataset.id())
+        .collect::<BTreeSet<_>>();
+
+    let mut num_inserted = 0;
+    let mut num_already_exist = 0;
+
+    for zone in all_omicron_zones {
+        let OmicronZoneType::Crucible { address, dataset } = &zone.zone_type
+        else {
+            continue;
+        };
+
+        let id = zone.id;
+
+        // If already present in the datastore, move on.
+        if crucible_datasets.remove(&id) {
+            num_already_exist += 1;
+            continue;
+        }
+
+        // Map progenitor client strings into the types we need. We never
+        // expect these to fail.
+        let addr: SocketAddrV6 = match address.parse() {
+            Ok(addr) => addr,
+            Err(err) => {
+                warn!(
+                    opctx.log, "failed to parse crucible zone address";
+                    "address" => address,
+                    "err" => InlineErrorChain::new(&err),
+                );
+                continue;
+            }
+        };
+        let zpool_name: ZpoolName = match dataset.pool_name.parse() {
+            Ok(name) => name,
+            Err(err) => {
+                warn!(
+                    opctx.log, "failed to parse crucible zone pool name";
+                    "pool_name" => &*dataset.pool_name,
+                    "err" => err,
+                );
+                continue;
+            }
+        };
+
+        let pool_id = zpool_name.id();
+        let dataset = Dataset::new(id, pool_id, addr, DatasetKind::Crucible);
+        let maybe_inserted = datastore
+            .dataset_insert_if_not_exists(dataset)
+            .await
+            .with_context(|| {
+                format!("failed to insert dataset record for dataset {id}")
+            })?;
+
+        // If we succeeded in inserting, log it; if `maybe_dataset` is `None`,
+        // we must have lost the TOCTOU race described above, and another Nexus
+        // must have inserted this dataset before we could.
+        if maybe_inserted.is_some() {
+            info!(
+                opctx.log,
+                "inserted new dataset for crucible zone";
+                "id" => %id,
+            );
+            num_inserted += 1;
+        } else {
+            num_already_exist += 1;
+        }
+    }
+
+    // We don't currently support removing datasets, so this would be
+    // surprising: the database contains dataset records that are no longer in
+    // our blueprint. We can't do anything about this, so just warn.
+    if !crucible_datasets.is_empty() {
+        warn!(
+            opctx.log,
+            "database contains {} unexpected crucible datasets",
+            crucible_datasets.len();
+            "dataset_ids" => ?crucible_datasets,
+        );
+    }
+
+    info!(
+        opctx.log,
+        "ensured all crucible zones have dataset records";
+        "num_inserted" => num_inserted,
+        "num_already_existed" => num_already_exist,
+    );
+
+    Ok(num_inserted)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use nexus_db_model::SledBaseboard;
+    use nexus_db_model::SledSystemHardware;
+    use nexus_db_model::SledUpdate;
+    use nexus_db_model::Zpool;
+    use nexus_test_utils_macros::nexus_test;
+    use sled_agent_client::types::OmicronZoneDataset;
+    use uuid::Uuid;
+
+    type ControlPlaneTestContext =
+        nexus_test_utils::ControlPlaneTestContext<omicron_nexus::Server>;
+
+    #[nexus_test]
+    async fn test_ensure_crucible_dataset_records_exist(
+        cptestctx: &ControlPlaneTestContext,
+    ) {
+        // Set up.
+        let nexus = &cptestctx.server.apictx().nexus;
+        let datastore = nexus.datastore();
+        let opctx = OpContext::for_tests(
+            cptestctx.logctx.log.clone(),
+            datastore.clone(),
+        );
+        let opctx = &opctx;
+
+        // Use the standard representative inventory collection.
+        let representative = nexus_inventory::examples::representative();
+        let collection = representative.builder.build();
+
+        // Record the sleds and zpools contained in this collection.
+        let rack_id = Uuid::new_v4();
+        for (&sled_id, config) in &collection.omicron_zones {
+            let sled = SledUpdate::new(
+                sled_id,
+                "[::1]:0".parse().unwrap(),
+                SledBaseboard {
+                    serial_number: format!("test-{sled_id}"),
+                    part_number: "test-sled".to_string(),
+                    revision: 0,
+                },
+                SledSystemHardware {
+                    is_scrimlet: false,
+                    usable_hardware_threads: 128,
+                    usable_physical_ram: (64 << 30).try_into().unwrap(),
+                    reservoir_size: (16 << 30).try_into().unwrap(),
+                },
+                rack_id,
+            );
+            datastore
+                .sled_upsert(sled)
+                .await
+                .expect("failed to upsert sled")
+                .unwrap();
+
+            for zone in &config.zones.zones {
+                let OmicronZoneType::Crucible { dataset, .. } = &zone.zone_type
+                else {
+                    continue;
+                };
+                let zpool_name: ZpoolName =
+                    dataset.pool_name.parse().expect("invalid zpool name");
+                let zpool = Zpool::new(
+                    zpool_name.id(),
+                    sled_id,
+                    Uuid::new_v4(), // physical_disk_id
+                    (1 << 30).try_into().unwrap(), // total_size
+                );
+                datastore
+                    .zpool_upsert(zpool)
+                    .await
+                    .expect("failed to upsert zpool");
+            }
+        }
+
+        // How many crucible zones are there?
+        let ncrucible_zones = collection
+            .all_omicron_zones()
+            .filter(|z| matches!(z.zone_type, OmicronZoneType::Crucible { .. }))
+            .count();
+
+        // Prior to ensuring datasets exist, there should be none.
+        assert_eq!(
+            datastore
+                .dataset_list_all_batched(opctx, Some(DatasetKind::Crucible))
+                .await
+                .unwrap()
+                .len(),
+            0
+        );
+        let ndatasets_inserted = ensure_crucible_dataset_records_exist(
+            opctx,
+            datastore,
+            collection.all_omicron_zones(),
+        )
+        .await
+        .expect("failed to ensure crucible datasets");
+
+        // We should have inserted a dataset for each crucible zone.
+        assert_eq!(ncrucible_zones, ndatasets_inserted);
+        assert_eq!(
+            datastore
+                .dataset_list_all_batched(opctx, Some(DatasetKind::Crucible))
+                .await
+                .unwrap()
+                .len(),
+            ncrucible_zones,
+        );
+
+        // Ensuring the same crucible datasets again should insert no new
+        // records.
+        let ndatasets_inserted = ensure_crucible_dataset_records_exist(
+            opctx,
+            datastore,
+            collection.all_omicron_zones(),
+        )
+        .await
+        .expect("failed to ensure crucible datasets");
+        assert_eq!(0, ndatasets_inserted);
+        assert_eq!(
+            datastore
+                .dataset_list_all_batched(opctx, Some(DatasetKind::Crucible))
+                .await
+                .unwrap()
+                .len(),
+            ncrucible_zones,
+        );
+
+        // Create another zpool on one of the sleds, so we can add a new
+        // crucible zone that uses it.
+        let new_zpool_id = Uuid::new_v4();
+        for &sled_id in collection.omicron_zones.keys().take(1) {
+            let zpool = Zpool::new(
+                new_zpool_id,
+                sled_id,
+                Uuid::new_v4(),                // physical_disk_id
+                (1 << 30).try_into().unwrap(), // total_size
+            );
+            datastore
+                .zpool_upsert(zpool)
+                .await
+                .expect("failed to upsert zpool");
+        }
+
+        // Call `ensure_crucible_dataset_records_exist` again, adding a new
+        // crucible zone. It should insert only this new zone.
+        let new_zone = OmicronZoneConfig {
+            id: Uuid::new_v4(),
+            underlay_address: "::1".parse().unwrap(),
+            zone_type: OmicronZoneType::Crucible {
+                address: "[::1]:0".to_string(),
+                dataset: OmicronZoneDataset {
+                    pool_name: ZpoolName::new_external(new_zpool_id)
+                        .to_string()
+                        .parse()
+                        .unwrap(),
+                },
+            },
+        };
+        let ndatasets_inserted = ensure_crucible_dataset_records_exist(
+            opctx,
+            datastore,
+            collection.all_omicron_zones().chain(std::iter::once(&new_zone)),
+        )
+        .await
+        .expect("failed to ensure crucible datasets");
+        assert_eq!(ndatasets_inserted, 1);
+        assert_eq!(
+            datastore
+                .dataset_list_all_batched(opctx, Some(DatasetKind::Crucible))
+                .await
+                .unwrap()
+                .len(),
+            ncrucible_zones + 1,
+        );
+    }
+}
diff --git a/nexus/blueprint-execution/src/lib.rs b/nexus/blueprint-execution/src/lib.rs
index d6f5f8fc31..531c4f57a8 100644
--- a/nexus/blueprint-execution/src/lib.rs
+++ b/nexus/blueprint-execution/src/lib.rs
@@ -19,6 +19,7 @@ use std::collections::BTreeMap;
 use std::net::SocketAddrV6;
 use uuid::Uuid;
 
+mod datasets;
 mod dns;
 mod omicron_zones;
 mod resource_allocation;
@@ -89,6 +90,14 @@ where
     omicron_zones::deploy_zones(&opctx, &sleds_by_id, &blueprint.omicron_zones)
         .await?;
 
+    datasets::ensure_crucible_dataset_records_exist(
+        &opctx,
+        datastore,
+        blueprint.all_omicron_zones().map(|(_sled_id, zone)| zone),
+    )
+    .await
+    .map_err(|err| vec![err])?;
+
     dns::deploy_dns(
         &opctx,
         datastore,
@@ -97,5 +106,7 @@ where
         &sleds_by_id,
     )
     .await
-    .map_err(|e| vec![anyhow!("{}", InlineErrorChain::new(&e))])
+    .map_err(|e| vec![anyhow!("{}", InlineErrorChain::new(&e))])?;
+
+    Ok(())
 }
diff --git a/nexus/db-model/src/dataset_kind.rs b/nexus/db-model/src/dataset_kind.rs
index 00317592e8..86495b9d61 100644
--- a/nexus/db-model/src/dataset_kind.rs
+++ b/nexus/db-model/src/dataset_kind.rs
@@ -11,7 +11,7 @@ impl_enum_type!(
     #[diesel(postgres_type(name = "dataset_kind", schema = "public"))]
     pub struct DatasetKindEnum;
 
-    #[derive(Clone, Debug, AsExpression, FromSqlRow, Serialize, Deserialize, PartialEq)]
+    #[derive(Clone, Copy, Debug, AsExpression, FromSqlRow, Serialize, Deserialize, PartialEq)]
     #[diesel(sql_type = DatasetKindEnum)]
     pub enum DatasetKind;
 
diff --git a/nexus/db-queries/src/db/collection_insert.rs b/nexus/db-queries/src/db/collection_insert.rs
index b295f0574d..ef2a4a4d48 100644
--- a/nexus/db-queries/src/db/collection_insert.rs
+++ b/nexus/db-queries/src/db/collection_insert.rs
@@ -202,6 +202,27 @@ where
             .map_err(|e| Self::translate_async_error(e))
     }
 
+    /// Issues the CTE asynchronously and parses the result.
+    ///
+    /// The four outcomes are:
+    /// - Ok(Some(new row))
+    /// - Ok(None)
+    /// - Error(collection not found)
+    /// - Error(other diesel error)
+    pub async fn insert_and_get_optional_result_async(
+        self,
+        conn: &async_bb8_diesel::Connection<DbConnection>,
+    ) -> AsyncInsertIntoCollectionResult<Option<ResourceType>>
+    where
+        // We require this bound to ensure that "Self" is runnable as query.
+        Self: query_methods::LoadQuery<'static, DbConnection, ResourceType>,
+    {
+        self.get_result_async::<ResourceType>(conn)
+            .await
+            .optional()
+            .map_err(|e| Self::translate_async_error(e))
+    }
+
     /// Issues the CTE asynchronously and parses the result.
     ///
     /// The three outcomes are:
diff --git a/nexus/db-queries/src/db/datastore/dataset.rs b/nexus/db-queries/src/db/datastore/dataset.rs
index 0b26789e8f..3c1fd0afb1 100644
--- a/nexus/db-queries/src/db/datastore/dataset.rs
+++ b/nexus/db-queries/src/db/datastore/dataset.rs
@@ -5,6 +5,9 @@
 //! [`DataStore`] methods on [`Dataset`]s.
 
 use super::DataStore;
+use super::SQL_BATCH_SIZE;
+use crate::authz;
+use crate::context::OpContext;
 use crate::db;
 use crate::db::collection_insert::AsyncInsertError;
 use crate::db::collection_insert::DatastoreCollection;
@@ -13,12 +16,17 @@ use crate::db::error::ErrorHandler;
 use crate::db::identity::Asset;
 use crate::db::model::Dataset;
 use crate::db::model::Zpool;
+use crate::db::pagination::paginated;
+use crate::db::pagination::Paginator;
 use async_bb8_diesel::AsyncRunQueryDsl;
 use chrono::Utc;
 use diesel::prelude::*;
 use diesel::upsert::excluded;
+use nexus_db_model::DatasetKind;
 use omicron_common::api::external::CreateResult;
+use omicron_common::api::external::DataPageParams;
 use omicron_common::api::external::Error;
+use omicron_common::api::external::ListResultVec;
 use omicron_common::api::external::LookupResult;
 use omicron_common::api::external::LookupType;
 use omicron_common::api::external::ResourceType;
@@ -45,11 +53,12 @@ impl DataStore {
     ) -> CreateResult<Dataset> {
         use db::schema::dataset::dsl;
 
+        let dataset_id = dataset.id();
         let zpool_id = dataset.pool_id;
         Zpool::insert_resource(
             zpool_id,
             diesel::insert_into(dsl::dataset)
-                .values(dataset.clone())
+                .values(dataset)
                 .on_conflict(dsl::id)
                 .do_update()
                 .set((
@@ -73,9 +82,261 @@ impl DataStore {
                 e,
                 ErrorHandler::Conflict(
                     ResourceType::Dataset,
-                    &dataset.id().to_string(),
+                    &dataset_id.to_string(),
                 ),
             ),
         })
     }
+
+    /// Stores a new dataset in the database, but only if a dataset with the
+    /// given `id` does not already exist
+    ///
+    /// Does not update existing rows. If a dataset with the given ID already
+    /// exists, returns `Ok(None)`.
+    pub async fn dataset_insert_if_not_exists(
+        &self,
+        dataset: Dataset,
+    ) -> CreateResult<Option<Dataset>> {
+        use db::schema::dataset::dsl;
+
+        let zpool_id = dataset.pool_id;
+        Zpool::insert_resource(
+            zpool_id,
+            diesel::insert_into(dsl::dataset)
+                .values(dataset)
+                .on_conflict(dsl::id)
+                .do_nothing(),
+        )
+        .insert_and_get_optional_result_async(
+            &*self.pool_connection_unauthorized().await?,
+        )
+        .await
+        .map_err(|e| match e {
+            AsyncInsertError::CollectionNotFound => Error::ObjectNotFound {
+                type_name: ResourceType::Zpool,
+                lookup_type: LookupType::ById(zpool_id),
+            },
+            AsyncInsertError::DatabaseError(e) => {
+                public_error_from_diesel(e, ErrorHandler::Server)
+            }
+        })
+    }
+
+    /// List one page of datasets
+    ///
+    /// If `filter_kind` is `Some(value)`, only datasets with a `kind` matching
+    /// `value` will be returned. If `filter_kind` is `None`, all datasets will
+    /// be returned.
+    async fn dataset_list(
+        &self,
+        opctx: &OpContext,
+        filter_kind: Option<DatasetKind>,
+        pagparams: &DataPageParams<'_, Uuid>,
+    ) -> ListResultVec<Dataset> {
+        opctx.authorize(authz::Action::ListChildren, &authz::FLEET).await?;
+        use db::schema::dataset::dsl;
+
+        let mut query = paginated(dsl::dataset, dsl::id, pagparams)
+            .filter(dsl::time_deleted.is_null());
+
+        if let Some(kind) = filter_kind {
+            query = query.filter(dsl::kind.eq(kind));
+        }
+
+        query
+            .select(Dataset::as_select())
+            .load_async(&*self.pool_connection_authorized(opctx).await?)
+            .await
+            .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
+    }
+
+    /// List all datasets, making as many queries as needed to get them all
+    ///
+    /// If `filter_kind` is `Some(value)`, only datasets with a `kind` matching
+    /// `value` will be returned. If `filter_kind` is `None`, all datasets will
+    /// be returned.
+    ///
+    /// This should generally not be used in API handlers or other
+    /// latency-sensitive contexts, but it can make sense in saga actions or
+    /// background tasks.
+    pub async fn dataset_list_all_batched(
+        &self,
+        opctx: &OpContext,
+        filter_kind: Option<DatasetKind>,
+    ) -> ListResultVec<Dataset> {
+        opctx.authorize(authz::Action::ListChildren, &authz::FLEET).await?;
+        opctx.check_complex_operations_allowed()?;
+
+        let mut all_datasets = Vec::new();
+        let mut paginator = Paginator::new(SQL_BATCH_SIZE);
+        while let Some(p) = paginator.next() {
+            let batch = self
+                .dataset_list(opctx, filter_kind, &p.current_pagparams())
+                .await?;
+            paginator =
+                p.found_batch(&batch, &|d: &nexus_db_model::Dataset| d.id());
+            all_datasets.extend(batch);
+        }
+
+        Ok(all_datasets)
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use crate::db::datastore::test_utils::datastore_test;
+    use nexus_db_model::SledBaseboard;
+    use nexus_db_model::SledSystemHardware;
+    use nexus_db_model::SledUpdate;
+    use nexus_test_utils::db::test_setup_database;
+    use omicron_test_utils::dev;
+
+    #[tokio::test]
+    async fn test_insert_if_not_exists() {
+        let logctx = dev::test_setup_log("inventory_insert");
+        let mut db = test_setup_database(&logctx.log).await;
+        let (opctx, datastore) = datastore_test(&logctx, &db).await;
+        let opctx = &opctx;
+
+        // There should be no datasets initially.
+        assert_eq!(
+            datastore.dataset_list_all_batched(opctx, None).await.unwrap(),
+            []
+        );
+
+        // Create a fake sled that holds our fake zpool.
+        let sled_id = Uuid::new_v4();
+        let sled = SledUpdate::new(
+            sled_id,
+            "[::1]:0".parse().unwrap(),
+            SledBaseboard {
+                serial_number: "test-sn".to_string(),
+                part_number: "test-pn".to_string(),
+                revision: 0,
+            },
+            SledSystemHardware {
+                is_scrimlet: false,
+                usable_hardware_threads: 128,
+                usable_physical_ram: (64 << 30).try_into().unwrap(),
+                reservoir_size: (16 << 30).try_into().unwrap(),
+            },
+            Uuid::new_v4(),
+        );
+        datastore
+            .sled_upsert(sled)
+            .await
+            .expect("failed to upsert sled")
+            .unwrap();
+
+        // Create a fake zpool that backs our fake datasets.
+        let zpool_id = Uuid::new_v4();
+        let zpool = Zpool::new(
+            zpool_id,
+            sled_id,
+            Uuid::new_v4(),
+            (1 << 30).try_into().unwrap(),
+        );
+        datastore.zpool_upsert(zpool).await.expect("failed to upsert zpool");
+
+        // Inserting a new dataset should succeed.
+        let dataset1 = datastore
+            .dataset_insert_if_not_exists(Dataset::new(
+                Uuid::new_v4(),
+                zpool_id,
+                "[::1]:0".parse().unwrap(),
+                DatasetKind::Crucible,
+            ))
+            .await
+            .expect("failed to insert dataset")
+            .expect("insert found unexpected existing dataset");
+        let mut expected_datasets = vec![dataset1.clone()];
+        assert_eq!(
+            datastore.dataset_list_all_batched(opctx, None).await.unwrap(),
+            expected_datasets,
+        );
+        assert_eq!(
+            datastore
+                .dataset_list_all_batched(opctx, Some(DatasetKind::Crucible))
+                .await
+                .unwrap(),
+            expected_datasets,
+        );
+        assert_eq!(
+            datastore
+                .dataset_list_all_batched(opctx, Some(DatasetKind::Cockroach))
+                .await
+                .unwrap(),
+            [],
+        );
+
+        // Attempting to insert another dataset with the same ID should succeed
+        // without updating the existing record. We'll check this by passing a
+        // different socket address and kind.
+        let insert_again_result = datastore
+            .dataset_insert_if_not_exists(Dataset::new(
+                dataset1.id(),
+                zpool_id,
+                "[::1]:12345".parse().unwrap(),
+                DatasetKind::Cockroach,
+            ))
+            .await
+            .expect("failed to do-nothing insert dataset");
+        assert_eq!(insert_again_result, None);
+        assert_eq!(
+            datastore.dataset_list_all_batched(opctx, None).await.unwrap(),
+            expected_datasets,
+        );
+
+        // We can can also upsert a different dataset...
+        let dataset2 = datastore
+            .dataset_upsert(Dataset::new(
+                Uuid::new_v4(),
+                zpool_id,
+                "[::1]:0".parse().unwrap(),
+                DatasetKind::Cockroach,
+            ))
+            .await
+            .expect("failed to upsert dataset");
+        expected_datasets.push(dataset2.clone());
+        expected_datasets.sort_by_key(|d| d.id());
+        assert_eq!(
+            datastore.dataset_list_all_batched(opctx, None).await.unwrap(),
+            expected_datasets,
+        );
+        assert_eq!(
+            datastore
+                .dataset_list_all_batched(opctx, Some(DatasetKind::Crucible))
+                .await
+                .unwrap(),
+            [dataset1.clone()],
+        );
+        assert_eq!(
+            datastore
+                .dataset_list_all_batched(opctx, Some(DatasetKind::Cockroach))
+                .await
+                .unwrap(),
+            [dataset2.clone()],
+        );
+
+        // ... and trying to `insert_if_not_exists` should similarly return
+        // `None`.
+        let insert_again_result = datastore
+            .dataset_insert_if_not_exists(Dataset::new(
+                dataset1.id(),
+                zpool_id,
+                "[::1]:12345".parse().unwrap(),
+                DatasetKind::Cockroach,
+            ))
+            .await
+            .expect("failed to do-nothing insert dataset");
+        assert_eq!(insert_again_result, None);
+        assert_eq!(
+            datastore.dataset_list_all_batched(opctx, None).await.unwrap(),
+            expected_datasets,
+        );
+
+        db.cleanup().await.unwrap();
+        logctx.cleanup_successful();
+    }
 }