Skip to content

Commit

Permalink
replace vmm/migration CTE with transaction
Browse files Browse the repository at this point in the history
This should fix the CRDB errors on multiple UPDATEs for the same table
in a CTE.
  • Loading branch information
hawkw committed Aug 1, 2024
1 parent cf25094 commit 1d18889
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 592 deletions.
51 changes: 50 additions & 1 deletion nexus/db-queries/src/db/datastore/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@
use super::DataStore;
use crate::context::OpContext;
use crate::db;
use crate::db::error::public_error_from_diesel;
use crate::db::error::ErrorHandler;
use crate::db::model::{Migration, MigrationState};
use crate::db::model::Generation;
use crate::db::model::Migration;
use crate::db::model::MigrationState;
use crate::db::pagination::paginated;
use crate::db::schema::migration::dsl;
use crate::db::update_and_check::UpdateAndCheck;
use crate::db::update_and_check::UpdateAndQueryResult;
use crate::db::update_and_check::UpdateStatus;
use async_bb8_diesel::AsyncRunQueryDsl;
use chrono::Utc;
Expand All @@ -23,6 +27,7 @@ use omicron_common::api::external::UpdateResult;
use omicron_common::api::internal::nexus;
use omicron_uuid_kinds::GenericUuid;
use omicron_uuid_kinds::InstanceUuid;
use omicron_uuid_kinds::PropolisUuid;
use uuid::Uuid;

impl DataStore {
Expand Down Expand Up @@ -123,6 +128,50 @@ impl DataStore {
})
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
}

pub(crate) async fn migration_update_source_on_connection(
&self,
conn: &async_bb8_diesel::Connection<db::DbConnection>,
vmm_id: &PropolisUuid,
migration: &nexus::MigrationRuntimeState,
) -> Result<UpdateAndQueryResult<Migration>, diesel::result::Error> {
let generation = Generation(migration.r#gen);
diesel::update(dsl::migration)
.filter(dsl::id.eq(migration.migration_id))
.filter(dsl::time_deleted.is_null())
.filter(dsl::source_gen.lt(generation))
.filter(dsl::source_propolis_id.eq(vmm_id.into_untyped_uuid()))
.set((
dsl::source_state.eq(MigrationState(migration.state)),
dsl::source_gen.eq(generation),
dsl::time_source_updated.eq(migration.time_updated),
))
.check_if_exists::<Migration>(migration.migration_id)
.execute_and_check(conn)
.await
}

pub(crate) async fn migration_update_target_on_connection(
&self,
conn: &async_bb8_diesel::Connection<db::DbConnection>,
vmm_id: &PropolisUuid,
migration: &nexus::MigrationRuntimeState,
) -> Result<UpdateAndQueryResult<Migration>, diesel::result::Error> {
let generation = Generation(migration.r#gen);
diesel::update(dsl::migration)
.filter(dsl::id.eq(migration.migration_id))
.filter(dsl::time_deleted.is_null())
.filter(dsl::target_gen.lt(generation))
.filter(dsl::target_propolis_id.eq(vmm_id.into_untyped_uuid()))
.set((
dsl::target_state.eq(MigrationState(migration.state)),
dsl::target_gen.eq(generation),
dsl::time_target_updated.eq(migration.time_updated),
))
.check_if_exists::<Migration>(migration.migration_id)
.execute_and_check(conn)
.await
}
}

#[cfg(test)]
Expand Down
164 changes: 123 additions & 41 deletions nexus/db-queries/src/db/datastore/vmm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use super::DataStore;
use crate::authz;
use crate::context::OpContext;
use crate::db;
use crate::db::error::public_error_from_diesel;
use crate::db::error::ErrorHandler;
use crate::db::model::Vmm;
Expand All @@ -15,18 +16,22 @@ use crate::db::model::VmmState as DbVmmState;
use crate::db::pagination::paginated;
use crate::db::schema::vmm::dsl;
use crate::db::update_and_check::UpdateAndCheck;
use crate::db::update_and_check::UpdateAndQueryResult;
use crate::db::update_and_check::UpdateStatus;
use crate::transaction_retry::OptionalError;
use async_bb8_diesel::AsyncRunQueryDsl;
use chrono::Utc;
use diesel::prelude::*;
use omicron_common::api::external::CreateResult;
use omicron_common::api::external::DataPageParams;
use omicron_common::api::external::Error;
use omicron_common::api::external::InternalContext;
use omicron_common::api::external::ListResultVec;
use omicron_common::api::external::LookupResult;
use omicron_common::api::external::LookupType;
use omicron_common::api::external::ResourceType;
use omicron_common::api::external::UpdateResult;
use omicron_common::api::internal::nexus;
use omicron_common::api::internal::nexus::Migrations;
use omicron_uuid_kinds::GenericUuid;
use omicron_uuid_kinds::PropolisUuid;
Expand Down Expand Up @@ -133,29 +138,41 @@ impl DataStore {
vmm_id: &PropolisUuid,
new_runtime: &VmmRuntimeState,
) -> Result<bool, Error> {
let updated = diesel::update(dsl::vmm)
self.vmm_update_runtime_on_connection(
&*self.pool_connection_unauthorized().await?,
vmm_id,
new_runtime,
)
.await
.map(|r| match r.status {
UpdateStatus::Updated => true,
UpdateStatus::NotUpdatedButExists => false,
})
.map_err(|e| {
public_error_from_diesel(
e,
ErrorHandler::NotFoundByLookup(
ResourceType::Vmm,
LookupType::ById(vmm_id.into_untyped_uuid()),
),
)
})
}

async fn vmm_update_runtime_on_connection(
&self,
conn: &async_bb8_diesel::Connection<db::DbConnection>,
vmm_id: &PropolisUuid,
new_runtime: &VmmRuntimeState,
) -> Result<UpdateAndQueryResult<Vmm>, diesel::result::Error> {
diesel::update(dsl::vmm)
.filter(dsl::time_deleted.is_null())
.filter(dsl::id.eq(vmm_id.into_untyped_uuid()))
.filter(dsl::state_generation.lt(new_runtime.gen))
.set(new_runtime.clone())
.check_if_exists::<Vmm>(vmm_id.into_untyped_uuid())
.execute_and_check(&*self.pool_connection_unauthorized().await?)
.execute_and_check(conn)
.await
.map(|r| match r.status {
UpdateStatus::Updated => true,
UpdateStatus::NotUpdatedButExists => false,
})
.map_err(|e| {
public_error_from_diesel(
e,
ErrorHandler::NotFoundByLookup(
ResourceType::Vmm,
LookupType::ById(vmm_id.into_untyped_uuid()),
),
)
})?;

Ok(updated)
}

/// Updates a VMM record and associated migration record(s) with a single
Expand Down Expand Up @@ -185,33 +202,94 @@ impl DataStore {
/// - `Err` if another error occurred while accessing the database.
pub async fn vmm_and_migration_update_runtime(
&self,
opctx: &OpContext,
vmm_id: PropolisUuid,
new_runtime: &VmmRuntimeState,
migrations: Migrations<'_>,
Migrations { migration_in, migration_out }: Migrations<'_>,
) -> Result<VmmStateUpdateResult, Error> {
let query = crate::db::queries::vmm::VmmAndMigrationUpdate::new(
vmm_id,
new_runtime.clone(),
migrations,
);

// The VmmAndMigrationUpdate query handles and indicates failure to find
// either the VMM or the migration, so a query failure here indicates
// some kind of internal error and not a failed lookup.
let result = query
.execute_and_check(&*self.pool_connection_unauthorized().await?)
fn migration_id(
m: Option<&nexus::MigrationRuntimeState>,
) -> Option<Uuid> {
m.as_ref().map(|m| m.migration_id)
}

if migration_id(migration_in) == migration_id(migration_out) {
return Err(Error::conflict(
"migrating from a VMM to itself is nonsensical",
))
.internal_context(format!("migration_in: {migration_in:?}; migration_out: {migration_out:?}"));
}

let err = OptionalError::new();
let conn = self.pool_connection_authorized(opctx).await?;

self.transaction_retry_wrapper("vmm_and_migration_update_runtime")
.transaction(&conn, |conn| {
let err = err.clone();
async move {
let vmm_updated = self
.vmm_update_runtime_on_connection(
&conn,
&vmm_id,
new_runtime,
)
.await.map(|r| match r.status { UpdateStatus::Updated => true, UpdateStatus::NotUpdatedButExists => false })?;
let migration_out_updated = match migration_out {
Some(migration) => {
let r = self.migration_update_source_on_connection(
&conn, &vmm_id, migration,
)
.await?;
match r.status {
UpdateStatus::Updated => true,
UpdateStatus::NotUpdatedButExists => match r.found {
m if m.time_deleted.is_some() => return Err(err.bail(Error::Gone)),
m if m.source_propolis_id != vmm_id.into_untyped_uuid() => {
return Err(err.bail(Error::invalid_value(
"source propolis UUID",
format!("{vmm_id} is not the source VMM of this migration"),
)));
}
// Not updated, generation has advanced.
_ => false
},
}
},
None => false,
};
let migration_in_updated = match migration_in {
Some(migration) => {
let r = self.migration_update_target_on_connection(
&conn, &vmm_id, migration,
)
.await?;
match r.status {
UpdateStatus::Updated => true,
UpdateStatus::NotUpdatedButExists => match r.found {
m if m.time_deleted.is_some() => return Err(err.bail(Error::Gone)),
m if m.target_propolis_id != vmm_id.into_untyped_uuid() => {
return Err(err.bail(Error::invalid_value(
"target propolis UUID",
format!("{vmm_id} is not the target VMM of this migration"),
)));
}
// Not updated, generation has advanced.
_ => false
},
}
},
None => false,
};
Ok(VmmStateUpdateResult {
vmm_updated,
migration_in_updated,
migration_out_updated,
})
}})
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?;

Ok(VmmStateUpdateResult {
vmm_updated: match result.vmm_status {
Some(UpdateStatus::Updated) => true,
Some(UpdateStatus::NotUpdatedButExists) => false,
None => false,
},
migration_in_updated: result.migration_in_status.was_updated(),
migration_out_updated: result.migration_out_status.was_updated(),
})
.map_err(|e| {
err.take().unwrap_or_else(|| public_error_from_diesel(e, ErrorHandler::Server))
})
}

/// Forcibly overwrites the Propolis IP/Port in the supplied VMM's record with
Expand Down Expand Up @@ -392,6 +470,7 @@ mod tests {
};
datastore
.vmm_and_migration_update_runtime(
&opctx,
PropolisUuid::from_untyped_uuid(vmm1.id),
&VmmRuntimeState {
time_state_updated: Utc::now(),
Expand All @@ -413,6 +492,7 @@ mod tests {
};
datastore
.vmm_and_migration_update_runtime(
&opctx,
PropolisUuid::from_untyped_uuid(vmm2.id),
&VmmRuntimeState {
time_state_updated: Utc::now(),
Expand Down Expand Up @@ -499,6 +579,7 @@ mod tests {
};
datastore
.vmm_and_migration_update_runtime(
&opctx,
PropolisUuid::from_untyped_uuid(vmm2.id),
&VmmRuntimeState {
time_state_updated: Utc::now(),
Expand All @@ -522,6 +603,7 @@ mod tests {
};
datastore
.vmm_and_migration_update_runtime(
&opctx,
PropolisUuid::from_untyped_uuid(vmm3.id),
&VmmRuntimeState {
time_state_updated: Utc::now(),
Expand Down Expand Up @@ -569,7 +651,7 @@ mod tests {
.find(|m| m.id == migration2.id)
.expect("query must include migration2");
assert_eq!(
new_db_migration2.source_state,
db_migration2.source_state,
db::model::MigrationState::COMPLETED
);
assert_eq!(
Expand Down
1 change: 0 additions & 1 deletion nexus/db-queries/src/db/queries/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
pub mod disk;
pub mod external_ip;
pub mod ip_pool;
pub mod vmm;
#[macro_use]
mod next_item;
pub mod network_interface;
Expand Down
Loading

0 comments on commit 1d18889

Please sign in to comment.