Skip to content

Commit

Permalink
[nexus] Avoid racing with sled expungement when inserting new physica…
Browse files Browse the repository at this point in the history
…l disks (#5597)

# Summary

- Prevents control-plane physical disks from being creates on sleds that
are expunged
- Additionally cleans up retryable transaction errors from Diesel in
`physical_disk_and_zpool_insert`.

# Background

As of #5506 , Nexus is
responsible for adding new "control plane physical disks" to the system.
It performs this action currently via a background task (see:
https://github.com/oxidecomputer/omicron/blob/main/nexus/src/app/background/physical_disk_adoption.rs),
though this may become a more explicit operation in the future,
requiring explicit operator intervention. Either way, this
implementation calls `physical_disk_and_zpool_insert` to create the
control-plane object.

As we have been discussing solutions to
#5594 , we have discussed
"verifying that for expunged sleds, all zones are expunged, and all
physical disks are expunged". This is a property we'd like to uphold.
Prior to this PR, it was technically possible to insert a physical disk
at the moment sled expungement was also occurring. In this situation, we
could have an expunged sled, with a non-expunged physical disk, which
would be an invalid state.

# Implementation

This PR adds a check to the `physical_disk_and_zpool_insert` transaction
that the sled-under-modification is not currently expunged.
Additionally, it adds a test for this behavior.

There are some semi-related changes to Diesel error propagation in this
PR -- this needed cleanup anyway, but the gist of it is that we ensure
this transaction can correctly push retryable diesel errors all the way
up to the transaction handler that knows how to parse them.
  • Loading branch information
smklein authored Apr 22, 2024
1 parent 49b76b1 commit b3de513
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 20 deletions.
85 changes: 74 additions & 11 deletions nexus/db-queries/src/db/datastore/physical_disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::db::model::PhysicalDiskState;
use crate::db::model::Sled;
use crate::db::model::Zpool;
use crate::db::pagination::paginated;
use crate::db::TransactionError;
use crate::transaction_retry::OptionalError;
use async_bb8_diesel::AsyncRunQueryDsl;
use chrono::Utc;
Expand Down Expand Up @@ -54,26 +55,40 @@ impl DataStore {
let disk = disk.clone();
let zpool = zpool.clone();
async move {
// TODO: These functions need to retry diesel errors
// in order to actually propagate retry errors
// Verify that the sled into which we are inserting the disk
// and zpool pair is still in-service.
//
// Although the "physical_disk_insert" and "zpool_insert"
// functions below check that the Sled hasn't been deleted,
// they do not currently check that the Sled has not been
// expunged.
Self::check_sled_in_service_on_connection(
&conn,
disk.sled_id,
)
.await
.map_err(|txn_error| txn_error.into_diesel(&err))?;

Self::physical_disk_insert_on_connection(
&conn, opctx, disk,
)
.await
.map_err(|e| err.bail(e))?;
.map_err(|txn_error| txn_error.into_diesel(&err))?;

Self::zpool_insert_on_connection(&conn, opctx, zpool)
.await
.map_err(|e| err.bail(e))?;
.map_err(|txn_error| txn_error.into_diesel(&err))?;
Ok(())
}
})
.await
.map_err(|e| {
if let Some(err) = err.take() {
return err;
match err.take() {
// A called function performed its own error propagation.
Some(txn_error) => txn_error.into(),
// The transaction setup/teardown itself encountered a diesel error.
None => public_error_from_diesel(e, ErrorHandler::Server),
}
public_error_from_diesel(e, ErrorHandler::Server)
})?;
Ok(())
}
Expand All @@ -90,14 +105,16 @@ impl DataStore {
disk: PhysicalDisk,
) -> CreateResult<PhysicalDisk> {
let conn = &*self.pool_connection_authorized(&opctx).await?;
Self::physical_disk_insert_on_connection(&conn, opctx, disk).await
let disk = Self::physical_disk_insert_on_connection(&conn, opctx, disk)
.await?;
Ok(disk)
}

pub async fn physical_disk_insert_on_connection(
conn: &async_bb8_diesel::Connection<db::DbConnection>,
opctx: &OpContext,
disk: PhysicalDisk,
) -> CreateResult<PhysicalDisk> {
) -> Result<PhysicalDisk, TransactionError<Error>> {
opctx.authorize(authz::Action::Read, &authz::FLEET).await?;
use db::schema::physical_disk::dsl;

Expand Down Expand Up @@ -291,6 +308,7 @@ mod test {
sled_baseboard_for_test, sled_system_hardware_for_test,
};
use crate::db::datastore::test_utils::datastore_test;
use crate::db::lookup::LookupPath;
use crate::db::model::{PhysicalDiskKind, Sled, SledUpdate};
use dropshot::PaginationOrder;
use nexus_db_model::Generation;
Expand Down Expand Up @@ -716,9 +734,54 @@ mod test {
}

#[tokio::test]
async fn test_physical_disk_uninitialized_list() {
async fn physical_disk_cannot_insert_to_expunged_sled() {
let logctx =
dev::test_setup_log("test_physical_disk_uninitialized_list");
dev::test_setup_log("physical_disk_cannot_insert_to_expunged_sled");
let mut db = test_setup_database(&logctx.log).await;
let (opctx, datastore) = datastore_test(&logctx, &db).await;

let sled = create_test_sled(&datastore).await;

// We can insert a disk into a sled that is not yet expunged
let inv_disk = create_inv_disk("serial-001".to_string(), 1);
let (disk, zpool) = create_disk_zpool_combo(sled.id(), &inv_disk);
datastore
.physical_disk_and_zpool_insert(&opctx, disk, zpool)
.await
.unwrap();

// Mark the sled as expunged
let sled_lookup =
LookupPath::new(&opctx, &datastore).sled_id(sled.id());
let (authz_sled,) =
sled_lookup.lookup_for(authz::Action::Modify).await.unwrap();
datastore
.sled_set_policy_to_expunged(&opctx, &authz_sled)
.await
.unwrap();

// Now that the sled is expunged, inserting the disk should fail
let inv_disk = create_inv_disk("serial-002".to_string(), 2);
let (disk, zpool) = create_disk_zpool_combo(sled.id(), &inv_disk);
let err = datastore
.physical_disk_and_zpool_insert(&opctx, disk, zpool)
.await
.unwrap_err();

let expected = format!("Sled {} is not in service", sled.id());
let actual = err.to_string();
assert!(
actual.contains(&expected),
"Expected string: {expected} within actual error: {actual}",
);

db.cleanup().await.unwrap();
logctx.cleanup_successful();
}

#[tokio::test]
async fn physical_disk_uninitialized_list() {
let logctx = dev::test_setup_log("physical_disk_uninitialized_list");
let mut db = test_setup_database(&logctx.log).await;
let (opctx, datastore) = datastore_test(&logctx, &db).await;

Expand Down
9 changes: 5 additions & 4 deletions nexus/db-queries/src/db/datastore/rack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::db::model::Rack;
use crate::db::model::Zpool;
use crate::db::pagination::paginated;
use crate::db::pool::DbConnection;
use crate::db::TransactionError;
use async_bb8_diesel::AsyncConnection;
use async_bb8_diesel::AsyncRunQueryDsl;
use chrono::Utc;
Expand Down Expand Up @@ -771,9 +772,9 @@ impl DataStore {
info!(log, "physical disk upsert in handoff: {physical_disk:#?}");
if let Err(e) = Self::physical_disk_insert_on_connection(&conn, &opctx, physical_disk)
.await {
if !matches!(e, Error::ObjectAlreadyExists { .. }) {
if !matches!(e, TransactionError::CustomError(Error::ObjectAlreadyExists { .. })) {
error!(log, "Failed to upsert physical disk"; "err" => #%e);
err.set(RackInitError::PhysicalDiskInsert(e))
err.set(RackInitError::PhysicalDiskInsert(e.into()))
.unwrap();
return Err(DieselError::RollbackTransaction);
}
Expand All @@ -784,9 +785,9 @@ impl DataStore {

for zpool in zpools {
if let Err(e) = Self::zpool_insert_on_connection(&conn, &opctx, zpool).await {
if !matches!(e, Error::ObjectAlreadyExists { .. }) {
if !matches!(e, TransactionError::CustomError(Error::ObjectAlreadyExists { .. })) {
error!(log, "Failed to upsert zpool"; "err" => #%e);
err.set(RackInitError::ZpoolInsert(e)).unwrap();
err.set(RackInitError::ZpoolInsert(e.into())).unwrap();
return Err(DieselError::RollbackTransaction);
}
}
Expand Down
30 changes: 30 additions & 0 deletions nexus/db-queries/src/db/datastore/sled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use crate::db::model::SledState;
use crate::db::model::SledUpdate;
use crate::db::pagination::paginated;
use crate::db::pagination::Paginator;
use crate::db::pool::DbConnection;
use crate::db::update_and_check::{UpdateAndCheck, UpdateStatus};
use crate::db::TransactionError;
use crate::transaction_retry::OptionalError;
use async_bb8_diesel::AsyncRunQueryDsl;
use chrono::Utc;
Expand All @@ -33,8 +35,10 @@ use omicron_common::api::external;
use omicron_common::api::external::CreateResult;
use omicron_common::api::external::DataPageParams;
use omicron_common::api::external::DeleteResult;
use omicron_common::api::external::Error;
use omicron_common::api::external::ListResultVec;
use omicron_common::api::external::ResourceType;
use omicron_common::bail_unless;
use std::fmt;
use strum::IntoEnumIterator;
use thiserror::Error;
Expand Down Expand Up @@ -95,6 +99,32 @@ impl DataStore {
Ok((sled, was_modified))
}

/// Confirms that a sled exists and is in-service.
///
/// This function may be called from a transaction context.
pub async fn check_sled_in_service_on_connection(
conn: &async_bb8_diesel::Connection<DbConnection>,
sled_id: Uuid,
) -> Result<(), TransactionError<Error>> {
use db::schema::sled::dsl;
let sled_exists_and_in_service = diesel::select(diesel::dsl::exists(
dsl::sled
.filter(dsl::time_deleted.is_null())
.filter(dsl::id.eq(sled_id))
.sled_filter(SledFilter::InService),
))
.get_result_async::<bool>(conn)
.await?;

bail_unless!(
sled_exists_and_in_service,
"Sled {} is not in service",
sled_id,
);

Ok(())
}

pub async fn sled_list(
&self,
opctx: &OpContext,
Expand Down
13 changes: 9 additions & 4 deletions nexus/db-queries/src/db/datastore/zpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::db::model::Sled;
use crate::db::model::Zpool;
use crate::db::pagination::paginated;
use crate::db::pagination::Paginator;
use crate::db::TransactionError;
use async_bb8_diesel::AsyncRunQueryDsl;
use chrono::Utc;
use diesel::prelude::*;
Expand All @@ -39,21 +40,23 @@ impl DataStore {
zpool: Zpool,
) -> CreateResult<Zpool> {
let conn = &*self.pool_connection_authorized(&opctx).await?;
Self::zpool_insert_on_connection(&conn, opctx, zpool).await
let zpool =
Self::zpool_insert_on_connection(&conn, opctx, zpool).await?;
Ok(zpool)
}

/// Stores a new zpool in the database.
pub async fn zpool_insert_on_connection(
conn: &async_bb8_diesel::Connection<db::DbConnection>,
opctx: &OpContext,
zpool: Zpool,
) -> CreateResult<Zpool> {
) -> Result<Zpool, TransactionError<Error>> {
opctx.authorize(authz::Action::Modify, &authz::FLEET).await?;

use db::schema::zpool::dsl;

let sled_id = zpool.sled_id;
Sled::insert_resource(
let pool = Sled::insert_resource(
sled_id,
diesel::insert_into(dsl::zpool)
.values(zpool.clone())
Expand All @@ -78,7 +81,9 @@ impl DataStore {
&zpool.id().to_string(),
),
),
})
})?;

Ok(pool)
}

/// Fetches a page of the list of all zpools on U.2 disks in all sleds
Expand Down
25 changes: 25 additions & 0 deletions nexus/db-queries/src/db/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

//! Error handling and conversions.
use crate::transaction_retry::OptionalError;
use diesel::result::DatabaseErrorInformation;
use diesel::result::DatabaseErrorKind as DieselErrorKind;
use diesel::result::Error as DieselError;
Expand Down Expand Up @@ -70,6 +71,30 @@ impl<T> TransactionError<T> {
}
}

impl<T: std::fmt::Debug> TransactionError<T> {
/// Converts a TransactionError into a diesel error.
///
/// The following pattern is used frequently in retryable transactions:
///
/// - Create an `OptionalError<TransactionError<T>>`
/// - Execute a series of database operations, which return the
/// `TransactionError<T>` type
/// - If the underlying operations return a retryable error from diesel,
/// propagate that out.
/// - Otherwise, set the OptionalError to a value of T, and rollback the transaction.
///
/// This function assists with that conversion.
pub fn into_diesel(
self,
err: &OptionalError<TransactionError<T>>,
) -> DieselError {
match self.retryable() {
MaybeRetryable::NotRetryable(txn_error) => err.bail(txn_error),
MaybeRetryable::Retryable(diesel_error) => diesel_error,
}
}
}

impl From<PublicError> for TransactionError<PublicError> {
fn from(err: PublicError) -> Self {
TransactionError::CustomError(err)
Expand Down
2 changes: 1 addition & 1 deletion nexus/src/app/background/physical_disk_adoption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

//! Background task for autmatically adopting physical disks.
//! Background task for automatically adopting physical disks.
//!
//! Removable disks may be arbitrarily attached and detached from
//! Oxide racks. When this happens, if they had not previously
Expand Down

0 comments on commit b3de513

Please sign in to comment.