diff --git a/nexus/db-model/src/instance.rs b/nexus/db-model/src/instance.rs index f5dda84fa3..5fa4620fd0 100644 --- a/nexus/db-model/src/instance.rs +++ b/nexus/db-model/src/instance.rs @@ -180,15 +180,20 @@ pub struct InstanceRuntimeState { #[diesel(column_name = migration_id)] pub migration_id: Option, - /// The UUID of the saga currently holding the update lock on this instance. - /// If this is [`None`] the instance is not locked. Otherwise, if this is - /// [`Some`], this contains the UUID of the saga holding the lock. + /// A UUID identifying the saga currently holding the update lock on this + /// instance. If this is [`None`] the instance is not locked. Otherwise, if + /// this is [`Some`], the instance is locked by the saga owning this UUID. + /// Note that this is not (presently) the UUID *of* the locking saga, but + /// rather, a UUID *generated by* that saga. Therefore, it may not be + /// useable to look up which saga holds the lock. /// /// This field is guarded by the instance's `updater_gen` #[diesel(column_name = updater_id)] pub updater_id: Option, - /// The generation number for the updater lock. + /// The generation number for the updater lock. This is updated whenever the + /// lock is acquired or released, and is used in attempts to set the + /// `updater_id` field to #[diesel(column_name = updater_gen)] pub updater_gen: Generation, } diff --git a/nexus/db-queries/src/db/datastore/instance.rs b/nexus/db-queries/src/db/datastore/instance.rs index cf7077e9a7..dd0630a25e 100644 --- a/nexus/db-queries/src/db/datastore/instance.rs +++ b/nexus/db-queries/src/db/datastore/instance.rs @@ -18,7 +18,6 @@ use crate::db::error::public_error_from_diesel; use crate::db::error::ErrorHandler; use crate::db::identity::Resource; use crate::db::lookup::LookupPath; -use crate::db::model::Generation; use crate::db::model::Instance; use crate::db::model::InstanceRuntimeState; use crate::db::model::Name; @@ -115,6 +114,17 @@ impl From for omicron_common::api::external::Instance { } } +/// Errors returned by [`DataStore::instance_updater_lock`]. +#[derive(Debug, thiserror::Error, PartialEq)] +pub enum InstanceLockError { + /// The instance was already locked by another saga. + #[error("instance already locked by another saga")] + AlreadyLocked, + /// An error occurred executing the query. + #[error("error locking instance: {0}")] + Query(#[from] Error), +} + impl DataStore { /// Idempotently insert a database record for an Instance /// @@ -551,18 +561,21 @@ impl DataStore { /// # Returns /// /// - [`Ok`]`(`[Instance`]`)` if the lock was acquired. - /// - [`Err`]`([`Error::InternalError`])` if the instance was locked by - /// another saga. - /// - [`Err`]`(`[`Error::ObjectNotFound`]`)` if no instance with the - /// provided ID exists (or the instance record has been deleted). - /// - [`Err`]`(other)` if the database query failed for another reason. + /// - [`Err`]`([`InstanceLockError::AlreadyLocked`])` if the instance was + /// locked by another saga. + /// - [`Err`]`([`InstanceLockError::Query`]`(...))` if the query to fetch + /// the instance or lock it returned another error (such as if the + /// instance no longer exists, or if the database connection failed). pub async fn instance_updater_lock( &self, opctx: &OpContext, authz_instance: &authz::Instance, - saga_lock_id: &Uuid, - ) -> Result { + saga_lock_id: Uuid, + ) -> Result { + use db::schema::instance::dsl; + let mut instance = self.instance_refetch(opctx, authz_instance).await?; + let instance_id = instance.id(); // `true` if the instance was locked by *this* call to // `instance_updater_lock`, *false* in the (rare) case that it was // previously locked by *this* saga's ID. This is used only for logging, @@ -582,11 +595,11 @@ impl DataStore { match instance.runtime_state.updater_id { // If the `updater_id` field is not null and the ID equals this // saga's ID, we already have the lock. We're done here! - Some(lock_id) if lock_id == *saga_lock_id => { + Some(lock_id) if lock_id == saga_lock_id => { slog::info!( &opctx.log, "instance updater lock acquired!"; - "instance_id" => %instance.id(), + "instance_id" => %instance_id, "saga_id" => %saga_lock_id, "already_locked" => !did_lock, ); @@ -598,124 +611,68 @@ impl DataStore { slog::info!( &opctx.log, "instance is locked by another saga"; - "instance_id" => %instance.id(), + "instance_id" => %instance_id, "locked_by" => %lock_id, "saga_id" => %saga_lock_id, ); - return Err(Error::internal_error( - "instance is already locked by another saga", - )); + return Err(InstanceLockError::AlreadyLocked); } // No saga's ID is set as the instance's `updater_id`. We can // attempt to lock it. None => {} } - let gen = instance.runtime_state.updater_gen; + + // Okay, now attempt to acquire the lock + let current_gen = instance.runtime_state.updater_gen; slog::debug!( &opctx.log, "attempting to acquire instance updater lock"; - "instance_id" => %instance.id(), + "instance_id" => %instance_id, "saga_id" => %saga_lock_id, - "current_gen" => ?gen, + "current_gen" => ?current_gen, ); - (instance, did_lock) = self - .instance_updater_try_lock( - opctx, - authz_instance, - gen, - saga_lock_id, - ) - .await?; - } - } - - /// Attempts to lock an instance's record to apply state updates in an - /// instance-update saga, if the pprovided `current_gen` matches the - /// instance's current `updater_gen`. - /// - /// This function will attempt to set the `updater_id` field on the record - /// corresponding to the provided `authz_instance` to the provided - /// `saga_lock_id`. If the instance's `updater_gen` field is equal to - /// `current_gen`, and the instance's `updater_id` field is null, then the - /// generation is advanced and the `updater_id` field is set to - /// `saga_lock_id`, acquiring the lock for the calling saga. Otherwise, if - /// the generation has advanced since `current_gen` was captured, or if - /// another saga has locked the instance, the lock is not acquired. - /// - /// # Notes - /// - /// This method MUST only be called from the context of a saga! The - /// calling saga must ensure that the reverse action for the action that - /// acquires the lock must call [`DataStore::instance_updater_unlock`] to - /// ensure that the lock is always released if the saga unwinds. - /// - /// # Arguments - /// - /// - `authz_instance`: the instance to attempt to lock to lock - /// - `current_gen`: the current generation of the instance's `updater_id` - /// - `saga_lock_id`: the UUID of the saga that's attempting to lock this - /// instance. - /// - /// # Returns - /// - /// - [`Ok`]`((`[Instance`]`, true))` if the lock was acquired. - /// - [`Ok`]`((`[Instance`]`, false))` if the lock was not acquired because - /// the instance was already locked by another saga, or because the - /// generation has advanced since `current_gen` was captured. - /// - [`Err`]`(`[`Error::ObjectNotFound`]`)` if no instance with the - /// provided ID exists (or the instance record has been deleted). - /// - [`Err`]`(other)` if the database query failed for another reason. - pub async fn instance_updater_try_lock( - &self, - opctx: &OpContext, - authz_instance: &authz::Instance, - current_gen: Generation, - saga_lock_id: &Uuid, - ) -> Result<(Instance, bool), Error> { - use db::schema::instance::dsl; - - // The generation to advance to. - let new_gen = Generation(current_gen.0.next()); - - let instance_id = authz_instance.id(); - let locked = diesel::update(dsl::instance) - .filter(dsl::time_deleted.is_null()) - .filter(dsl::id.eq(instance_id)) - // If the generation is the same as the captured generation when we - // read the instance record to check if it was not locked, we can - // lock this instance. Otherwise, the update will fail. This query - // is equivalent to an atomic compare-and-swap instruction in a - // non-distributed single-process mutex. - .filter(dsl::updater_gen.eq(current_gen)) - .set(( - dsl::updater_gen.eq(new_gen), - dsl::updater_id.eq(Some(*saga_lock_id)), - )) - .check_if_exists::(instance_id) - .execute_and_check(&*self.pool_connection_authorized(opctx).await?) - .await - .map(|r| { - // If we successfully updated the instance record, we have - // acquired the lock; otherwise, we haven't --- either because - // our generation is stale, or because the instance is already locked. - let locked = match r.status { - UpdateStatus::Updated => true, - UpdateStatus::NotUpdatedButExists => false, - }; - (r.found, locked) - }) - .map_err(|e| { - public_error_from_diesel( - e, - ErrorHandler::NotFoundByLookup( - ResourceType::Instance, - LookupType::ById(instance_id), - ), + (instance, did_lock) = diesel::update(dsl::instance) + .filter(dsl::time_deleted.is_null()) + .filter(dsl::id.eq(instance_id)) + // If the generation is the same as the captured generation when we + // read the instance record to check if it was not locked, we can + // lock this instance, since changing the `updater_id` field + // always increments the generation number. Otherwise, if the + // generation has changed since we fetched the instance, this + // update will fail. This query is equivalent to an atomic + // compare-and-swap instruction in a non-distributed + // single-process mutex. + .filter(dsl::updater_gen.eq(current_gen)) + .set(( + dsl::updater_gen.eq(dsl::updater_gen + 1), + dsl::updater_id.eq(Some(saga_lock_id)), + )) + .check_if_exists::(instance_id) + .execute_and_check( + &*self.pool_connection_authorized(opctx).await?, ) - })?; - - Ok(locked) + .await + .map(|r| { + // If we successfully updated the instance record, we have + // acquired the lock; otherwise, we haven't --- either because + // our generation is stale, or because the instance is already locked. + let locked = match r.status { + UpdateStatus::Updated => true, + UpdateStatus::NotUpdatedButExists => false, + }; + (r.found, locked) + }) + .map_err(|e| { + public_error_from_diesel( + e, + ErrorHandler::NotFoundByLookup( + ResourceType::Instance, + LookupType::ById(instance_id), + ), + ) + })?; + } } /// Release the instance-updater lock acquired by @@ -724,7 +681,7 @@ impl DataStore { &self, opctx: &OpContext, authz_instance: &authz::Instance, - saga_lock_id: &Uuid, + saga_lock_id: Uuid, ) -> Result { use db::schema::instance::dsl; @@ -736,7 +693,7 @@ impl DataStore { // Only unlock the instance if: // - the provided updater ID matches that of the saga that has // currently locked this instance. - .filter(dsl::updater_id.eq(Some(*saga_lock_id))) + .filter(dsl::updater_id.eq(Some(saga_lock_id))) .set(( dsl::updater_gen.eq(dsl::updater_gen + 1), dsl::updater_id.eq(None::), @@ -774,20 +731,15 @@ mod tests { use nexus_types::external_api::params; use omicron_common::api::external::ByteCount; use omicron_common::api::external::IdentityMetadataCreateParams; - use omicron_test_utils::dev; - #[tokio::test] - async fn test_instance_updater_acquires_lock() { - // Setup - let logctx = dev::test_setup_log("test_empty_blueprint"); - let mut db = test_setup_database(&logctx.log).await; - let (opctx, datastore) = datastore_test(&logctx, &db).await; + async fn test_setup( + datastore: &DataStore, + opctx: &OpContext, + ) -> authz::Instance { let silo_id = *fixed_data::silo::DEFAULT_SILO_ID; let project_id = Uuid::new_v4(); let instance_id = Uuid::new_v4(); - let saga1 = Uuid::new_v4(); - let saga2 = Uuid::new_v4(); let (authz_project, _project) = datastore .project_create( @@ -839,12 +791,24 @@ mod tests { .lookup_for(authz::Action::Modify) .await .expect("instance must exist"); + authz_instance + } + + #[tokio::test] + async fn test_instance_updater_acquires_lock() { + // Setup + let logctx = dev::test_setup_log("test_instance_updater_acquires_lock"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + let saga1 = Uuid::new_v4(); + let saga2 = Uuid::new_v4(); + let authz_instance = test_setup(&datastore, &opctx).await; macro_rules! assert_locked { ($id:expr) => { let instance = dbg!( datastore - .instance_updater_lock(&opctx, &authz_instance, &$id) + .instance_updater_lock(&opctx, &authz_instance, $id) .await ) .expect(concat!( @@ -863,12 +827,12 @@ mod tests { macro_rules! assert_not_locked { ($id:expr) => { let err = dbg!(datastore - .instance_updater_lock(&opctx, &authz_instance, &$id) + .instance_updater_lock(&opctx, &authz_instance, $id) .await) .expect_err("attempting to lock the instance while it is already locked must fail"); assert_eq!( err, - Error::internal_error("instance is already locked by another saga") + InstanceLockError::AlreadyLocked, ); }; } @@ -881,7 +845,7 @@ mod tests { // unlock the instance from saga 1 datastore - .instance_updater_unlock(&opctx, &authz_instance, &saga1) + .instance_updater_unlock(&opctx, &authz_instance, saga1) .await .expect("instance must be unlocked by saga 1"); @@ -895,4 +859,63 @@ mod tests { db.cleanup().await.unwrap(); logctx.cleanup_successful(); } + + #[tokio::test] + async fn test_instance_updater_lock_is_idempotent() { + // Setup + let logctx = + dev::test_setup_log("test_instance_updater_lock_is_idempotent"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + let authz_instance = test_setup(&datastore, &opctx).await; + let saga1 = Uuid::new_v4(); + + // attempt to lock the instance once. + let instance1 = dbg!( + datastore + .instance_updater_lock(&opctx, &authz_instance, saga1) + .await + ) + .expect("instance should be locked"); + assert_eq!(instance1.runtime_state.updater_id, Some(saga1)); + // doing it again should be fine. + let instance2 = dbg!( + datastore + .instance_updater_lock(&opctx, &authz_instance, saga1) + .await + ) + .expect( + "instance_updater_lock should succeed again with the same saga ID", + ); + assert_eq!(instance1.runtime_state.updater_id, Some(saga1)); + // the generation should not have changed as a result of the second + // update. + assert_eq!( + instance1.runtime_state.updater_gen, + instance2.runtime_state.updater_gen + ); + + // now, unlock the instance. + let unlocked = dbg!( + datastore + .instance_updater_unlock(&opctx, &authz_instance, saga1) + .await + ) + .expect("instance should unlock"); + assert!(unlocked, "instance should have unlocked"); + + // unlocking it again should also succeed... + let unlocked = dbg!( + datastore + .instance_updater_unlock(&opctx, &authz_instance, saga1) + .await + ) + .expect("instance should unlock again"); + // ...but the `locked` bool should now be false. + assert!(!unlocked, "instance should already have been unlocked"); + + // Clean up. + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } }