From 4090983f94c7a65b9ef515af85ea3078513a46f0 Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Thu, 23 May 2024 17:34:40 -0400 Subject: [PATCH] [#3886 1/4] Region replacement models and queries (#5791) Splitting up #5683 first by separating out the DB models, queries, and schema changes required: 1. region replacement records This commit adds a Region Replacement record, which is a request to replace a region in a volume. It transitions through the following states: Requested <-- | | | v | | Allocating -- | v Running <-- | | | v | | Driving -- | v ReplacementDone <-- | | | v | | Completing -- | v Completed which are captured in the `RegionReplacementState` enum. Transitioning from Requested to Running is the responsibility of the "start" saga, iterating between Running and Driving is the responsibility of the "drive" saga, and transitioning from ReplacementDone to Completed is the responsibility of the "finish" saga. All of these will come in subsequent PRs. The state transitions themselves are performed by these sagas and all involve a query that: - checks that the starting state (and other values as required) make sense - updates the state while setting a unique `operating_saga_id` id (and any other fields as appropriate) As multiple background tasks will be waking up, checking to see what sagas need to be triggered, and requesting that these region replacement sagas run, this is meant to block multiple sagas from running at the same time in an effort to cut down on interference - most will unwind at the first step instead of somewhere in the middle. 2. region replacement step records As region replacement takes place, Nexus will be making calls to services in order to trigger the necessary Crucible operations meant to actually perform th replacement. These steps are recorded in the database so that they can be consulted by subsequent steps, and additionally act as breadcrumbs if there is an issue. 3. volume repair records Nexus should take care to only replace one region (or snapshot!) for a volume at a time. Technically, the Upstairs can support two at a time, but codifying "only one at a time" is safer, and does not allow the possiblity for a Nexus bug to replace all three regions of a region set at a time (aka total data loss!). This "one at a time" constraint is enforced by each repair also creating a VolumeRepair record, a table for which there is a UNIQUE CONSTRAINT on the volume ID. 4. also, the `volume_replace_region` function The `volume_replace_region` function is also included in this PR. In a single transaction, this will: - set the target region's volume id to the replacement's volume id - set the replacement region's volume id to the target's volume id - update the target volume's construction request to replace the target region's SocketAddrV6 with the replacement region's This is called from the "start" saga, after allocating the replacement region, and is meant to transition the Volume's construction request from "indefinitely degraded, pointing to region that is gone" to "currently degraded, but can be repaired". --- nexus/db-model/src/lib.rs | 6 + nexus/db-model/src/region.rs | 3 + nexus/db-model/src/region_replacement.rs | 165 ++++ nexus/db-model/src/region_replacement_step.rs | 85 ++ nexus/db-model/src/schema.rs | 39 + nexus/db-model/src/schema_versions.rs | 3 +- nexus/db-model/src/upstairs_repair.rs | 1 + nexus/db-model/src/volume_repair.rs | 20 + nexus/db-queries/src/db/datastore/disk.rs | 16 + nexus/db-queries/src/db/datastore/mod.rs | 2 + nexus/db-queries/src/db/datastore/region.rs | 77 +- .../src/db/datastore/region_replacement.rs | 907 ++++++++++++++++++ nexus/db-queries/src/db/datastore/snapshot.rs | 18 + nexus/db-queries/src/db/datastore/volume.rs | 632 ++++++++++++ nexus/src/app/sagas/common_storage.rs | 1 - nexus/tests/integration_tests/disks.rs | 1 - schema/crdb/dbinit.sql | 74 +- schema/crdb/region-replacement/up01.sql | 9 + schema/crdb/region-replacement/up02.sql | 18 + schema/crdb/region-replacement/up03.sql | 1 + schema/crdb/region-replacement/up04.sql | 4 + schema/crdb/region-replacement/up05.sql | 3 + schema/crdb/region-replacement/up06.sql | 4 + schema/crdb/region-replacement/up07.sql | 16 + schema/crdb/region-replacement/up08.sql | 1 + schema/crdb/region-replacement/up09.sql | 1 + schema/crdb/region-replacement/up10.sql | 3 + schema/crdb/region-replacement/up11.sql | 1 + 28 files changed, 2106 insertions(+), 5 deletions(-) create mode 100644 nexus/db-model/src/region_replacement.rs create mode 100644 nexus/db-model/src/region_replacement_step.rs create mode 100644 nexus/db-model/src/volume_repair.rs create mode 100644 nexus/db-queries/src/db/datastore/region_replacement.rs create mode 100644 schema/crdb/region-replacement/up01.sql create mode 100644 schema/crdb/region-replacement/up02.sql create mode 100644 schema/crdb/region-replacement/up03.sql create mode 100644 schema/crdb/region-replacement/up04.sql create mode 100644 schema/crdb/region-replacement/up05.sql create mode 100644 schema/crdb/region-replacement/up06.sql create mode 100644 schema/crdb/region-replacement/up07.sql create mode 100644 schema/crdb/region-replacement/up08.sql create mode 100644 schema/crdb/region-replacement/up09.sql create mode 100644 schema/crdb/region-replacement/up10.sql create mode 100644 schema/crdb/region-replacement/up11.sql diff --git a/nexus/db-model/src/lib.rs b/nexus/db-model/src/lib.rs index 205885cfd8..bd16719633 100644 --- a/nexus/db-model/src/lib.rs +++ b/nexus/db-model/src/lib.rs @@ -66,6 +66,8 @@ pub mod queries; mod quota; mod rack; mod region; +mod region_replacement; +mod region_replacement_step; mod region_snapshot; mod role_assignment; mod role_builtin; @@ -98,6 +100,7 @@ mod virtual_provisioning_resource; mod vmm; mod vni; mod volume; +mod volume_repair; mod vpc; mod vpc_firewall_rule; mod vpc_route; @@ -162,6 +165,8 @@ pub use project::*; pub use quota::*; pub use rack::*; pub use region::*; +pub use region_replacement::*; +pub use region_replacement_step::*; pub use region_snapshot::*; pub use role_assignment::*; pub use role_builtin::*; @@ -195,6 +200,7 @@ pub use virtual_provisioning_resource::*; pub use vmm::*; pub use vni::*; pub use volume::*; +pub use volume_repair::*; pub use vpc::*; pub use vpc_firewall_rule::*; pub use vpc_route::*; diff --git a/nexus/db-model/src/region.rs b/nexus/db-model/src/region.rs index fefc4f4fce..441f928405 100644 --- a/nexus/db-model/src/region.rs +++ b/nexus/db-model/src/region.rs @@ -58,6 +58,9 @@ impl Region { } } + pub fn id(&self) -> Uuid { + self.identity.id + } pub fn volume_id(&self) -> Uuid { self.volume_id } diff --git a/nexus/db-model/src/region_replacement.rs b/nexus/db-model/src/region_replacement.rs new file mode 100644 index 0000000000..a04710f53d --- /dev/null +++ b/nexus/db-model/src/region_replacement.rs @@ -0,0 +1,165 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use super::impl_enum_type; +use crate::schema::region_replacement; +use crate::Region; +use chrono::DateTime; +use chrono::Utc; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +impl_enum_type!( + #[derive(SqlType, Debug, QueryId)] + #[diesel(postgres_type(name = "region_replacement_state", schema = "public"))] + pub struct RegionReplacementStateEnum; + + #[derive(Copy, Clone, Debug, AsExpression, FromSqlRow, Serialize, Deserialize, PartialEq)] + #[diesel(sql_type = RegionReplacementStateEnum)] + pub enum RegionReplacementState; + + // Enum values + Requested => b"requested" + Allocating => b"allocating" + Running => b"running" + Driving => b"driving" + ReplacementDone => b"replacement_done" + Completing => b"completing" + Complete => b"complete" +); + +impl std::str::FromStr for RegionReplacementState { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "requested" => Ok(RegionReplacementState::Requested), + "allocating" => Ok(RegionReplacementState::Allocating), + "running" => Ok(RegionReplacementState::Running), + "driving" => Ok(RegionReplacementState::Driving), + "replacement_done" => Ok(RegionReplacementState::ReplacementDone), + "complete" => Ok(RegionReplacementState::Complete), + "completing" => Ok(RegionReplacementState::Completing), + _ => Err(format!("unrecognized value {} for enum", s)), + } + } +} + +/// Database representation of a Region replacement request. +/// +/// This record stores the data related to the operations required for Nexus to +/// orchestrate replacing a region in a volume. It transitions through the +/// following states: +/// +/// ```text +/// Requested <-- --- +/// | | +/// | | | +/// v | | responsibility of region +/// | | replacement start saga +/// Allocating -- | +/// | +/// | | +/// v --- +/// --- +/// Running <-- | +/// | | +/// | | | +/// v | | responsibility of region +/// | | replacement drive saga +/// Driving -- | +/// | +/// | | +/// v --- +/// --- +/// ReplacementDone <-- | +/// | | +/// | | | +/// v | | +/// | | responsibility of region +/// Completing -- | replacement finish saga +/// | +/// | | +/// v | +/// | +/// Completed --- +/// ``` +/// +/// which are captured in the RegionReplacementState enum. Annotated on the +/// right are which sagas are responsible for which state transitions. The state +/// transitions themselves are performed by these sagas and all involve a query +/// that: +/// +/// - checks that the starting state (and other values as required) make sense +/// - updates the state while setting a unique operating_saga_id id (and any +/// other fields as appropriate) +/// +/// As multiple background tasks will be waking up, checking to see what sagas +/// need to be triggered, and requesting that these region replacement sagas +/// run, this is meant to block multiple sagas from running at the same time in +/// an effort to cut down on interference - most will unwind at the first step +/// of performing this state transition instead of somewhere in the middle. +/// +/// The correctness of a region replacement relies on certain operations +/// happening only when the record is in a certain state. For example: Nexus +/// should not undo a volume modification _after_ an upstairs has been sent a +/// replacement request, so volume modification happens at the Allocating state +/// (in the start saga), and replacement requests are only sent in the Driving +/// state (in the drive saga) - this ensures that replacement requests are only +/// sent if the start saga completed successfully, meaning the volume +/// modification was committed to the database and will not change or be +/// unwound. +/// +/// See also: RegionReplacementStep records +#[derive( + Queryable, + Insertable, + Debug, + Clone, + Selectable, + Serialize, + Deserialize, + PartialEq, +)] +#[diesel(table_name = region_replacement)] +pub struct RegionReplacement { + pub id: Uuid, + + pub request_time: DateTime, + + /// The region being replaced + pub old_region_id: Uuid, + + /// The volume whose region is being replaced + pub volume_id: Uuid, + + /// A synthetic volume that only is used to later delete the old region + pub old_region_volume_id: Option, + + /// The new region that will be used to replace the old one + pub new_region_id: Option, + + pub replacement_state: RegionReplacementState, + + pub operating_saga_id: Option, +} + +impl RegionReplacement { + pub fn for_region(region: &Region) -> Self { + Self::new(region.id(), region.volume_id()) + } + + pub fn new(old_region_id: Uuid, volume_id: Uuid) -> Self { + Self { + id: Uuid::new_v4(), + request_time: Utc::now(), + old_region_id, + volume_id, + old_region_volume_id: None, + new_region_id: None, + replacement_state: RegionReplacementState::Requested, + operating_saga_id: None, + } + } +} diff --git a/nexus/db-model/src/region_replacement_step.rs b/nexus/db-model/src/region_replacement_step.rs new file mode 100644 index 0000000000..c0a61b958c --- /dev/null +++ b/nexus/db-model/src/region_replacement_step.rs @@ -0,0 +1,85 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use super::impl_enum_type; +use crate::ipv6; +use crate::schema::region_replacement_step; +use crate::SqlU16; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::net::SocketAddrV6; +use uuid::Uuid; + +impl_enum_type!( + #[derive(SqlType, Debug, QueryId)] + #[diesel(postgres_type(name = "region_replacement_step_type", schema = "public"))] + pub struct RegionReplacementStepTypeEnum; + + #[derive(Copy, Clone, Debug, AsExpression, FromSqlRow, Serialize, Deserialize, PartialEq)] + #[diesel(sql_type = RegionReplacementStepTypeEnum)] + pub enum RegionReplacementStepType; + + // What is driving the repair forward? + Propolis => b"propolis" + Pantry => b"pantry" +); + +/// Database representation of a Region replacement repair step +/// +/// As region replacement takes place, Nexus will be making calls to services in +/// order to trigger the necessary Crucible operations meant to actually perform +/// the replacement. These steps are recorded in the database so that they can +/// be consulted by subsequent steps, and additionally act as breadcrumbs if +/// there is an issue. +/// +/// See also: RegionReplacement records +#[derive( + Queryable, + Insertable, + Debug, + Clone, + Selectable, + Serialize, + Deserialize, + PartialEq, +)] +#[diesel(table_name = region_replacement_step)] +pub struct RegionReplacementStep { + pub replacement_id: Uuid, + + pub step_time: DateTime, + + pub step_type: RegionReplacementStepType, + + pub step_associated_instance_id: Option, + pub step_associated_vmm_id: Option, + + pub step_associated_pantry_ip: Option, + pub step_associated_pantry_port: Option, + pub step_associated_pantry_job_id: Option, +} + +impl RegionReplacementStep { + pub fn instance_and_vmm_ids(&self) -> Option<(Uuid, Uuid)> { + if self.step_type != RegionReplacementStepType::Propolis { + return None; + } + + let instance_id = self.step_associated_instance_id?; + let vmm_id = self.step_associated_vmm_id?; + + Some((instance_id, vmm_id)) + } + + pub fn pantry_address(&self) -> Option { + if self.step_type != RegionReplacementStepType::Pantry { + return None; + } + + let ip = self.step_associated_pantry_ip?; + let port = self.step_associated_pantry_port?; + + Some(SocketAddrV6::new(*ip, *port, 0, 0)) + } +} diff --git a/nexus/db-model/src/schema.rs b/nexus/db-model/src/schema.rs index 423388de30..deeca970c7 100644 --- a/nexus/db-model/src/schema.rs +++ b/nexus/db-model/src/schema.rs @@ -1036,6 +1036,8 @@ table! { } } +allow_tables_to_appear_in_same_query!(zpool, dataset); + table! { region (id) { id -> Uuid, @@ -1051,6 +1053,8 @@ table! { } } +allow_tables_to_appear_in_same_query!(zpool, region); + table! { region_snapshot (dataset_id, region_id, snapshot_id) { dataset_id -> Uuid, @@ -1697,6 +1701,41 @@ table! { } } +table! { + region_replacement (id) { + id -> Uuid, + request_time -> Timestamptz, + old_region_id -> Uuid, + volume_id -> Uuid, + old_region_volume_id -> Nullable, + new_region_id -> Nullable, + replacement_state -> crate::RegionReplacementStateEnum, + operating_saga_id -> Nullable, + } +} + +table! { + volume_repair (volume_id) { + volume_id -> Uuid, + repair_id -> Uuid, + } +} + +table! { + region_replacement_step (replacement_id, step_time, step_type) { + replacement_id -> Uuid, + step_time -> Timestamptz, + step_type -> crate::RegionReplacementStepTypeEnum, + + step_associated_instance_id -> Nullable, + step_associated_vmm_id -> Nullable, + + step_associated_pantry_ip -> Nullable, + step_associated_pantry_port -> Nullable, + step_associated_pantry_job_id -> Nullable, + } +} + table! { db_metadata (singleton) { singleton -> Bool, diff --git a/nexus/db-model/src/schema_versions.rs b/nexus/db-model/src/schema_versions.rs index ed4b762e68..5ceaf3167a 100644 --- a/nexus/db-model/src/schema_versions.rs +++ b/nexus/db-model/src/schema_versions.rs @@ -17,7 +17,7 @@ use std::collections::BTreeMap; /// /// This must be updated when you change the database schema. Refer to /// schema/crdb/README.adoc in the root of this repository for details. -pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(64, 0, 0); +pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(65, 0, 0); /// List of all past database schema versions, in *reverse* order /// @@ -29,6 +29,7 @@ static KNOWN_VERSIONS: Lazy> = Lazy::new(|| { // | leaving the first copy as an example for the next person. // v // KnownVersion::new(next_int, "unique-dirname-with-the-sql-files"), + KnownVersion::new(65, "region-replacement"), KnownVersion::new(64, "add-view-for-v2p-mappings"), KnownVersion::new(63, "remove-producer-base-route-column"), KnownVersion::new(62, "allocate-subnet-decommissioned-sleds"), diff --git a/nexus/db-model/src/upstairs_repair.rs b/nexus/db-model/src/upstairs_repair.rs index 311592f8e4..ed281b6c64 100644 --- a/nexus/db-model/src/upstairs_repair.rs +++ b/nexus/db-model/src/upstairs_repair.rs @@ -106,6 +106,7 @@ pub struct UpstairsRepairNotification { pub upstairs_id: DbTypedUuid, pub session_id: DbTypedUuid, + // The Downstairs being repaired pub region_id: DbTypedUuid, pub target_ip: ipv6::Ipv6Addr, pub target_port: SqlU16, diff --git a/nexus/db-model/src/volume_repair.rs b/nexus/db-model/src/volume_repair.rs new file mode 100644 index 0000000000..a92fcd3425 --- /dev/null +++ b/nexus/db-model/src/volume_repair.rs @@ -0,0 +1,20 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use crate::schema::volume_repair; +use uuid::Uuid; + +/// When modifying a Volume by replacing its parts, Nexus should take care to +/// only replace one region or snapshot for a volume at a time. Technically, the +/// Upstairs can support two at a time, but codifying "only one at a time" is +/// safer, and does not allow the possiblity for a Nexus bug to replace all +/// three regions of a region set at a time (aka total data loss!). This "one at +/// a time" constraint is enforced by each repair also creating a VolumeRepair +/// record, a table for which there is a UNIQUE CONSTRAINT on the volume ID. +#[derive(Queryable, Insertable, Debug, Selectable, Clone)] +#[diesel(table_name = volume_repair)] +pub struct VolumeRepair { + pub volume_id: Uuid, + pub repair_id: Uuid, +} diff --git a/nexus/db-queries/src/db/datastore/disk.rs b/nexus/db-queries/src/db/datastore/disk.rs index 2788558a0b..e1d504761c 100644 --- a/nexus/db-queries/src/db/datastore/disk.rs +++ b/nexus/db-queries/src/db/datastore/disk.rs @@ -811,6 +811,22 @@ impl DataStore { .map(|(disk, _, _)| disk) .collect()) } + + pub async fn disk_for_volume_id( + &self, + volume_id: Uuid, + ) -> LookupResult> { + let conn = self.pool_connection_unauthorized().await?; + + use db::schema::disk::dsl; + dsl::disk + .filter(dsl::volume_id.eq(volume_id)) + .select(Disk::as_select()) + .first_async(&*conn) + .await + .optional() + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } } #[cfg(test)] diff --git a/nexus/db-queries/src/db/datastore/mod.rs b/nexus/db-queries/src/db/datastore/mod.rs index 1618395800..7c47489477 100644 --- a/nexus/db-queries/src/db/datastore/mod.rs +++ b/nexus/db-queries/src/db/datastore/mod.rs @@ -77,6 +77,7 @@ pub mod pub_test_utils; mod quota; mod rack; mod region; +mod region_replacement; mod region_snapshot; mod role; mod saga; @@ -119,6 +120,7 @@ pub use volume::read_only_resources_associated_with_volume; pub use volume::CrucibleResources; pub use volume::CrucibleTargets; pub use volume::VolumeCheckoutReason; +pub use volume::VolumeReplacementParams; // Number of unique datasets required to back a region. // TODO: This should likely turn into a configuration option. diff --git a/nexus/db-queries/src/db/datastore/region.rs b/nexus/db-queries/src/db/datastore/region.rs index 6e152cb9f2..d7da24cce3 100644 --- a/nexus/db-queries/src/db/datastore/region.rs +++ b/nexus/db-queries/src/db/datastore/region.rs @@ -13,6 +13,7 @@ use crate::db::error::public_error_from_diesel; use crate::db::error::ErrorHandler; use crate::db::lookup::LookupPath; use crate::db::model::Dataset; +use crate::db::model::PhysicalDiskPolicy; use crate::db::model::Region; use crate::transaction_retry::OptionalError; use async_bb8_diesel::AsyncRunQueryDsl; @@ -22,6 +23,7 @@ use nexus_types::external_api::params; use omicron_common::api::external; use omicron_common::api::external::DeleteResult; use omicron_common::api::external::Error; +use omicron_common::api::external::LookupResult; use slog::Logger; use uuid::Uuid; @@ -69,6 +71,22 @@ impl DataStore { .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } + pub async fn get_region_optional( + &self, + region_id: Uuid, + ) -> Result, Error> { + use db::schema::region::dsl; + dsl::region + .filter(dsl::id.eq(region_id)) + .select(Region::as_select()) + .get_result_async::( + &*self.pool_connection_unauthorized().await?, + ) + .await + .optional() + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + async fn get_block_size_from_disk_source( &self, opctx: &OpContext, @@ -173,13 +191,36 @@ impl DataStore { let (blocks_per_extent, extent_count) = Self::get_crucible_allocation(&block_size, size); - let query = crate::db::queries::region_allocation::allocation_query( + self.arbitrary_region_allocate_direct( + opctx, volume_id, u64::from(block_size.to_bytes()), blocks_per_extent, extent_count, allocation_strategy, num_regions_required, + ) + .await + } + + #[allow(clippy::too_many_arguments)] + pub async fn arbitrary_region_allocate_direct( + &self, + opctx: &OpContext, + volume_id: Uuid, + block_size: u64, + blocks_per_extent: u64, + extent_count: u64, + allocation_strategy: &RegionAllocationStrategy, + num_regions_required: usize, + ) -> Result, Error> { + let query = crate::db::queries::region_allocation::allocation_query( + volume_id, + block_size, + blocks_per_extent, + extent_count, + allocation_strategy, + num_regions_required, ); let conn = self.pool_connection_authorized(&opctx).await?; @@ -324,6 +365,40 @@ impl DataStore { Ok(0) } } + + /// Find regions on expunged disks + pub async fn find_regions_on_expunged_physical_disks( + &self, + opctx: &OpContext, + ) -> LookupResult> { + let conn = self.pool_connection_authorized(opctx).await?; + + use db::schema::dataset::dsl as dataset_dsl; + use db::schema::physical_disk::dsl as physical_disk_dsl; + use db::schema::region::dsl as region_dsl; + use db::schema::zpool::dsl as zpool_dsl; + + region_dsl::region + .filter(region_dsl::dataset_id.eq_any( + dataset_dsl::dataset + .filter(dataset_dsl::time_deleted.is_null()) + .filter(dataset_dsl::pool_id.eq_any( + zpool_dsl::zpool + .filter(zpool_dsl::time_deleted.is_null()) + .filter(zpool_dsl::physical_disk_id.eq_any( + physical_disk_dsl::physical_disk + .filter(physical_disk_dsl::disk_policy.eq(PhysicalDiskPolicy::Expunged)) + .select(physical_disk_dsl::id) + )) + .select(zpool_dsl::id) + )) + .select(dataset_dsl::id) + )) + .select(Region::as_select()) + .load_async(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } } #[cfg(test)] diff --git a/nexus/db-queries/src/db/datastore/region_replacement.rs b/nexus/db-queries/src/db/datastore/region_replacement.rs new file mode 100644 index 0000000000..d12d123e7e --- /dev/null +++ b/nexus/db-queries/src/db/datastore/region_replacement.rs @@ -0,0 +1,907 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! [`DataStore`] methods on [`RegionReplacement`]s. + +use super::DataStore; +use crate::context::OpContext; +use crate::db; +use crate::db::datastore::SQL_BATCH_SIZE; +use crate::db::error::public_error_from_diesel; +use crate::db::error::ErrorHandler; +use crate::db::model::Region; +use crate::db::model::RegionReplacement; +use crate::db::model::RegionReplacementState; +use crate::db::model::RegionReplacementStep; +use crate::db::model::UpstairsRepairNotification; +use crate::db::model::UpstairsRepairNotificationType; +use crate::db::model::VolumeRepair; +use crate::db::pagination::paginated; +use crate::db::pagination::Paginator; +use crate::db::update_and_check::UpdateAndCheck; +use crate::db::update_and_check::UpdateStatus; +use crate::db::TransactionError; +use async_bb8_diesel::AsyncConnection; +use async_bb8_diesel::AsyncRunQueryDsl; +use diesel::prelude::*; +use omicron_common::api::external::Error; +use omicron_uuid_kinds::DownstairsRegionKind; +use omicron_uuid_kinds::TypedUuid; +use uuid::Uuid; + +impl DataStore { + /// Create and insert a region replacement request for a Region, returning the ID of the + /// request. + pub async fn create_region_replacement_request_for_region( + &self, + opctx: &OpContext, + region: &Region, + ) -> Result { + let request = RegionReplacement::for_region(region); + let request_id = request.id; + + self.insert_region_replacement_request(opctx, request).await?; + + Ok(request_id) + } + + /// Insert a region replacement request into the DB, also creating the + /// VolumeRepair record. + pub async fn insert_region_replacement_request( + &self, + opctx: &OpContext, + request: RegionReplacement, + ) -> Result<(), Error> { + self.pool_connection_authorized(opctx) + .await? + .transaction_async(|conn| async move { + use db::schema::region_replacement::dsl; + use db::schema::volume_repair::dsl as volume_repair_dsl; + + diesel::insert_into(volume_repair_dsl::volume_repair) + .values(VolumeRepair { + volume_id: request.volume_id, + repair_id: request.id, + }) + .execute_async(&conn) + .await?; + + diesel::insert_into(dsl::region_replacement) + .values(request) + .execute_async(&conn) + .await?; + + Ok(()) + }) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + + pub async fn get_region_replacement_request_by_id( + &self, + opctx: &OpContext, + id: Uuid, + ) -> Result { + use db::schema::region_replacement::dsl; + + dsl::region_replacement + .filter(dsl::id.eq(id)) + .get_result_async::( + &*self.pool_connection_authorized(opctx).await?, + ) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + + pub async fn get_requested_region_replacements( + &self, + opctx: &OpContext, + ) -> Result, Error> { + opctx.check_complex_operations_allowed()?; + + let mut replacements = Vec::new(); + let mut paginator = Paginator::new(SQL_BATCH_SIZE); + let conn = self.pool_connection_authorized(opctx).await?; + + while let Some(p) = paginator.next() { + use db::schema::region_replacement::dsl; + + let batch = paginated( + dsl::region_replacement, + dsl::id, + &p.current_pagparams(), + ) + .filter( + dsl::replacement_state.eq(RegionReplacementState::Requested), + ) + .get_results_async::(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; + + paginator = p.found_batch(&batch, &|r| r.id); + replacements.extend(batch); + } + + Ok(replacements) + } + + /// Return region replacement requests that are in state `Running` with no + /// currently operating saga. These need to be checked on or driven forward. + pub async fn get_running_region_replacements( + &self, + opctx: &OpContext, + ) -> Result, Error> { + use db::schema::region_replacement::dsl; + + dsl::region_replacement + .filter(dsl::replacement_state.eq(RegionReplacementState::Running)) + .filter(dsl::operating_saga_id.is_null()) + .get_results_async::( + &*self.pool_connection_authorized(opctx).await?, + ) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + + /// Return region replacement requests that are in state `ReplacementDone` + /// with no currently operating saga. These need to be completed. + pub async fn get_done_region_replacements( + &self, + opctx: &OpContext, + ) -> Result, Error> { + use db::schema::region_replacement::dsl; + + dsl::region_replacement + .filter( + dsl::replacement_state + .eq(RegionReplacementState::ReplacementDone), + ) + .filter(dsl::operating_saga_id.is_null()) + .get_results_async::( + &*self.pool_connection_authorized(opctx).await?, + ) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + + /// Transition a RegionReplacement record from Requested to Allocating, + /// setting a unique id at the same time. + pub async fn set_region_replacement_allocating( + &self, + opctx: &OpContext, + region_replacement_id: Uuid, + operating_saga_id: Uuid, + ) -> Result<(), Error> { + use db::schema::region_replacement::dsl; + let updated = diesel::update(dsl::region_replacement) + .filter(dsl::id.eq(region_replacement_id)) + .filter( + dsl::replacement_state.eq(RegionReplacementState::Requested), + ) + .filter(dsl::operating_saga_id.is_null()) + .set(( + dsl::replacement_state.eq(RegionReplacementState::Allocating), + dsl::operating_saga_id.eq(operating_saga_id), + )) + .check_if_exists::(region_replacement_id) + .execute_and_check(&*self.pool_connection_authorized(opctx).await?) + .await; + + match updated { + Ok(result) => match result.status { + UpdateStatus::Updated => Ok(()), + UpdateStatus::NotUpdatedButExists => { + let record = result.found; + + if record.operating_saga_id == Some(operating_saga_id) + && record.replacement_state + == RegionReplacementState::Allocating + { + Ok(()) + } else { + Err(Error::conflict(format!( + "region replacement {} set to {:?} (operating saga id {:?})", + region_replacement_id, + record.replacement_state, + record.operating_saga_id, + ))) + } + } + }, + + Err(e) => Err(public_error_from_diesel(e, ErrorHandler::Server)), + } + } + + /// Transition a RegionReplacement record from Allocating to Requested, + /// clearing the operating saga id. + pub async fn undo_set_region_replacement_allocating( + &self, + opctx: &OpContext, + region_replacement_id: Uuid, + operating_saga_id: Uuid, + ) -> Result<(), Error> { + use db::schema::region_replacement::dsl; + let updated = diesel::update(dsl::region_replacement) + .filter(dsl::id.eq(region_replacement_id)) + .filter( + dsl::replacement_state.eq(RegionReplacementState::Allocating), + ) + .filter(dsl::operating_saga_id.eq(operating_saga_id)) + .set(( + dsl::replacement_state.eq(RegionReplacementState::Requested), + dsl::operating_saga_id.eq(Option::::None), + )) + .check_if_exists::(region_replacement_id) + .execute_and_check(&*self.pool_connection_authorized(opctx).await?) + .await; + + match updated { + Ok(result) => match result.status { + UpdateStatus::Updated => Ok(()), + UpdateStatus::NotUpdatedButExists => { + let record = result.found; + + if record.operating_saga_id == None + && record.replacement_state + == RegionReplacementState::Requested + { + Ok(()) + } else { + Err(Error::conflict(format!( + "region replacement {} set to {:?} (operating saga id {:?})", + region_replacement_id, + record.replacement_state, + record.operating_saga_id, + ))) + } + } + }, + + Err(e) => Err(public_error_from_diesel(e, ErrorHandler::Server)), + } + } + + /// Transition from Allocating to Running, and clear the operating saga id. + pub async fn set_region_replacement_running( + &self, + opctx: &OpContext, + region_replacement_id: Uuid, + operating_saga_id: Uuid, + new_region_id: Uuid, + old_region_volume_id: Uuid, + ) -> Result<(), Error> { + use db::schema::region_replacement::dsl; + let updated = diesel::update(dsl::region_replacement) + .filter(dsl::id.eq(region_replacement_id)) + .filter(dsl::operating_saga_id.eq(operating_saga_id)) + .filter( + dsl::replacement_state.eq(RegionReplacementState::Allocating), + ) + .set(( + dsl::replacement_state.eq(RegionReplacementState::Running), + dsl::old_region_volume_id.eq(Some(old_region_volume_id)), + dsl::new_region_id.eq(Some(new_region_id)), + dsl::operating_saga_id.eq(Option::::None), + )) + .check_if_exists::(region_replacement_id) + .execute_and_check(&*self.pool_connection_authorized(opctx).await?) + .await; + + match updated { + Ok(result) => match result.status { + UpdateStatus::Updated => Ok(()), + UpdateStatus::NotUpdatedButExists => { + let record = result.found; + + if record.operating_saga_id == None + && record.replacement_state + == RegionReplacementState::Running + && record.new_region_id == Some(new_region_id) + && record.old_region_volume_id + == Some(old_region_volume_id) + { + Ok(()) + } else { + Err(Error::conflict(format!( + "region replacement {} set to {:?} (operating saga id {:?})", + region_replacement_id, + record.replacement_state, + record.operating_saga_id, + ))) + } + } + }, + + Err(e) => Err(public_error_from_diesel(e, ErrorHandler::Server)), + } + } + + /// Find an in-progress region replacement request by new region id + pub async fn lookup_in_progress_region_replacement_request_by_new_region_id( + &self, + opctx: &OpContext, + new_region_id: TypedUuid, + ) -> Result, Error> { + use db::schema::region_replacement::dsl; + + dsl::region_replacement + .filter( + dsl::new_region_id + .eq(nexus_db_model::to_db_typed_uuid(new_region_id)), + ) + .filter(dsl::replacement_state.ne(RegionReplacementState::Complete)) + .get_result_async::( + &*self.pool_connection_authorized(opctx).await?, + ) + .await + .optional() + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + + /// Find a region replacement request by old region id + pub async fn lookup_region_replacement_request_by_old_region_id( + &self, + opctx: &OpContext, + old_region_id: TypedUuid, + ) -> Result, Error> { + use db::schema::region_replacement::dsl; + + dsl::region_replacement + .filter( + dsl::old_region_id + .eq(nexus_db_model::to_db_typed_uuid(old_region_id)), + ) + .get_result_async::( + &*self.pool_connection_authorized(opctx).await?, + ) + .await + .optional() + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + + /// Transition a RegionReplacement record from Running to Driving, + /// setting a unique id at the same time. + pub async fn set_region_replacement_driving( + &self, + opctx: &OpContext, + region_replacement_id: Uuid, + operating_saga_id: Uuid, + ) -> Result<(), Error> { + use db::schema::region_replacement::dsl; + let updated = diesel::update(dsl::region_replacement) + .filter(dsl::id.eq(region_replacement_id)) + .filter(dsl::replacement_state.eq(RegionReplacementState::Running)) + .filter(dsl::operating_saga_id.is_null()) + .set(( + dsl::replacement_state.eq(RegionReplacementState::Driving), + dsl::operating_saga_id.eq(operating_saga_id), + )) + .check_if_exists::(region_replacement_id) + .execute_and_check(&*self.pool_connection_authorized(opctx).await?) + .await; + + match updated { + Ok(result) => match result.status { + UpdateStatus::Updated => Ok(()), + UpdateStatus::NotUpdatedButExists => { + let record = result.found; + + if record.operating_saga_id == Some(operating_saga_id) + && record.replacement_state + == RegionReplacementState::Driving + { + Ok(()) + } else { + Err(Error::conflict(format!( + "region replacement {} set to {:?} (operating saga id {:?})", + region_replacement_id, + record.replacement_state, + record.operating_saga_id, + ))) + } + } + }, + + Err(e) => Err(public_error_from_diesel(e, ErrorHandler::Server)), + } + } + + /// Transition a RegionReplacement record from Driving to Running, + /// clearing the operating saga id. + pub async fn undo_set_region_replacement_driving( + &self, + opctx: &OpContext, + region_replacement_id: Uuid, + operating_saga_id: Uuid, + ) -> Result<(), Error> { + use db::schema::region_replacement::dsl; + let updated = diesel::update(dsl::region_replacement) + .filter(dsl::id.eq(region_replacement_id)) + .filter(dsl::replacement_state.eq(RegionReplacementState::Driving)) + .filter(dsl::operating_saga_id.eq(operating_saga_id)) + .set(( + dsl::replacement_state.eq(RegionReplacementState::Running), + dsl::operating_saga_id.eq(Option::::None), + )) + .check_if_exists::(region_replacement_id) + .execute_and_check(&*self.pool_connection_authorized(opctx).await?) + .await; + + match updated { + Ok(result) => match result.status { + UpdateStatus::Updated => Ok(()), + UpdateStatus::NotUpdatedButExists => { + let record = result.found; + + if record.operating_saga_id == None + && record.replacement_state + == RegionReplacementState::Running + { + Ok(()) + } else { + Err(Error::conflict(format!( + "region replacement {} set to {:?} (operating saga id {:?})", + region_replacement_id, + record.replacement_state, + record.operating_saga_id, + ))) + } + } + }, + + Err(e) => Err(public_error_from_diesel(e, ErrorHandler::Server)), + } + } + + /// Transition a RegionReplacement record from Driving to ReplacementDone, + /// clearing the operating saga id. + pub async fn set_region_replacement_from_driving_to_done( + &self, + opctx: &OpContext, + region_replacement_id: Uuid, + operating_saga_id: Uuid, + ) -> Result<(), Error> { + use db::schema::region_replacement::dsl; + let updated = diesel::update(dsl::region_replacement) + .filter(dsl::id.eq(region_replacement_id)) + .filter(dsl::replacement_state.eq(RegionReplacementState::Driving)) + .filter(dsl::operating_saga_id.eq(operating_saga_id)) + .set(( + dsl::replacement_state + .eq(RegionReplacementState::ReplacementDone), + dsl::operating_saga_id.eq(Option::::None), + )) + .check_if_exists::(region_replacement_id) + .execute_and_check(&*self.pool_connection_authorized(opctx).await?) + .await; + + match updated { + Ok(result) => match result.status { + UpdateStatus::Updated => Ok(()), + UpdateStatus::NotUpdatedButExists => { + let record = result.found; + + if record.operating_saga_id == None + && record.replacement_state + == RegionReplacementState::ReplacementDone + { + Ok(()) + } else { + Err(Error::conflict(format!( + "region replacement {} set to {:?} (operating saga id {:?})", + region_replacement_id, + record.replacement_state, + record.operating_saga_id, + ))) + } + } + }, + + Err(e) => Err(public_error_from_diesel(e, ErrorHandler::Server)), + } + } + + /// Return the most current step for a region replacement request + pub async fn current_region_replacement_request_step( + &self, + opctx: &OpContext, + id: Uuid, + ) -> Result, Error> { + use db::schema::region_replacement_step::dsl; + + dsl::region_replacement_step + .filter(dsl::replacement_id.eq(id)) + .order_by(dsl::step_time.desc()) + .first_async::( + &*self.pool_connection_authorized(opctx).await?, + ) + .await + .optional() + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + + /// Record a step taken to drive a region replacement forward + pub async fn add_region_replacement_request_step( + &self, + opctx: &OpContext, + step: RegionReplacementStep, + ) -> Result<(), Error> { + use db::schema::region_replacement_step::dsl; + + let conn = self.pool_connection_authorized(opctx).await?; + + diesel::insert_into(dsl::region_replacement_step) + .values(step) + .on_conflict((dsl::replacement_id, dsl::step_time, dsl::step_type)) + .do_nothing() + .execute_async(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; + + Ok(()) + } + + /// Transition a RegionReplacement record from ReplacementDone to Completing, + /// setting a unique id at the same time. + pub async fn set_region_replacement_completing( + &self, + opctx: &OpContext, + region_replacement_id: Uuid, + operating_saga_id: Uuid, + ) -> Result<(), Error> { + use db::schema::region_replacement::dsl; + let updated = diesel::update(dsl::region_replacement) + .filter(dsl::id.eq(region_replacement_id)) + .filter( + dsl::replacement_state + .eq(RegionReplacementState::ReplacementDone), + ) + .filter(dsl::operating_saga_id.is_null()) + .set(( + dsl::replacement_state.eq(RegionReplacementState::Completing), + dsl::operating_saga_id.eq(operating_saga_id), + )) + .check_if_exists::(region_replacement_id) + .execute_and_check(&*self.pool_connection_authorized(opctx).await?) + .await; + + match updated { + Ok(result) => match result.status { + UpdateStatus::Updated => Ok(()), + UpdateStatus::NotUpdatedButExists => { + let record = result.found; + + if record.operating_saga_id == Some(operating_saga_id) + && record.replacement_state + == RegionReplacementState::Completing + { + Ok(()) + } else { + Err(Error::conflict(format!( + "region replacement {} set to {:?} (operating saga id {:?})", + region_replacement_id, + record.replacement_state, + record.operating_saga_id, + ))) + } + } + }, + + Err(e) => Err(public_error_from_diesel(e, ErrorHandler::Server)), + } + } + + /// Transition a RegionReplacement record from Completing to ReplacementDone, + /// clearing the operating saga id. + pub async fn undo_set_region_replacement_completing( + &self, + opctx: &OpContext, + region_replacement_id: Uuid, + operating_saga_id: Uuid, + ) -> Result<(), Error> { + use db::schema::region_replacement::dsl; + let updated = diesel::update(dsl::region_replacement) + .filter(dsl::id.eq(region_replacement_id)) + .filter( + dsl::replacement_state.eq(RegionReplacementState::Completing), + ) + .filter(dsl::operating_saga_id.eq(operating_saga_id)) + .set(( + dsl::replacement_state + .eq(RegionReplacementState::ReplacementDone), + dsl::operating_saga_id.eq(Option::::None), + )) + .check_if_exists::(region_replacement_id) + .execute_and_check(&*self.pool_connection_authorized(opctx).await?) + .await; + + match updated { + Ok(result) => match result.status { + UpdateStatus::Updated => Ok(()), + UpdateStatus::NotUpdatedButExists => { + let record = result.found; + + if record.operating_saga_id == None + && record.replacement_state + == RegionReplacementState::ReplacementDone + { + Ok(()) + } else { + Err(Error::conflict(format!( + "region replacement {} set to {:?} (operating saga id {:?})", + region_replacement_id, + record.replacement_state, + record.operating_saga_id, + ))) + } + } + }, + + Err(e) => Err(public_error_from_diesel(e, ErrorHandler::Server)), + } + } + + /// Transition a RegionReplacement record from Completing to Complete, + /// clearing the operating saga id. Also removes the `volume_repair` record + /// that is taking a "lock" on the Volume. + pub async fn set_region_replacement_complete( + &self, + opctx: &OpContext, + region_replacement_id: Uuid, + operating_saga_id: Uuid, + ) -> Result<(), Error> { + type TxnError = TransactionError; + + self.pool_connection_authorized(opctx) + .await? + .transaction_async(|conn| async move { + use db::schema::volume_repair::dsl as volume_repair_dsl; + + diesel::delete( + volume_repair_dsl::volume_repair + .filter(volume_repair_dsl::repair_id.eq(region_replacement_id)) + ) + .execute_async(&conn) + .await?; + + use db::schema::region_replacement::dsl; + + let result = diesel::update(dsl::region_replacement) + .filter(dsl::id.eq(region_replacement_id)) + .filter( + dsl::replacement_state.eq(RegionReplacementState::Completing), + ) + .filter(dsl::operating_saga_id.eq(operating_saga_id)) + .set(( + dsl::replacement_state.eq(RegionReplacementState::Complete), + dsl::operating_saga_id.eq(Option::::None), + )) + .check_if_exists::(region_replacement_id) + .execute_and_check(&conn) + .await?; + + match result.status { + UpdateStatus::Updated => Ok(()), + UpdateStatus::NotUpdatedButExists => { + let record = result.found; + + if record.operating_saga_id == None + && record.replacement_state + == RegionReplacementState::Complete + { + Ok(()) + } else { + Err(TxnError::CustomError(Error::conflict(format!( + "region replacement {} set to {:?} (operating saga id {:?})", + region_replacement_id, + record.replacement_state, + record.operating_saga_id, + )))) + } + } + } + }) + .await + .map_err(|e| match e { + TxnError::CustomError(error) => error, + + TxnError::Database(error) => { + public_error_from_diesel(error, ErrorHandler::Server) + } + }) + } + + /// Nexus has been notified by an Upstairs (or has otherwised determined) + /// that a region replacement is done, so update the record. This may arrive + /// in the middle of a drive saga invocation, so do not filter on state or + /// operating saga id! + pub async fn mark_region_replacement_as_done( + &self, + opctx: &OpContext, + region_replacement_id: Uuid, + ) -> Result<(), Error> { + use db::schema::region_replacement::dsl; + let updated = diesel::update(dsl::region_replacement) + .filter(dsl::id.eq(region_replacement_id)) + .set(( + dsl::replacement_state + .eq(RegionReplacementState::ReplacementDone), + dsl::operating_saga_id.eq(Option::::None), + )) + .check_if_exists::(region_replacement_id) + .execute_and_check(&*self.pool_connection_authorized(opctx).await?) + .await; + + match updated { + Ok(result) => match result.status { + UpdateStatus::Updated => Ok(()), + + UpdateStatus::NotUpdatedButExists => { + let record = result.found; + + if record.operating_saga_id == None + && record.replacement_state + == RegionReplacementState::ReplacementDone + { + Ok(()) + } else { + Err(Error::conflict(format!( + "region replacement {} set to {:?} (operating saga id {:?})", + region_replacement_id, + record.replacement_state, + record.operating_saga_id, + ))) + } + } + }, + + Err(e) => Err(public_error_from_diesel(e, ErrorHandler::Server)), + } + } + + /// Check if a region replacement request has at least one matching + /// successful "repair finished" notification. + // + // For the purposes of changing the state of a region replacement request to + // `ReplacementDone`, check if Nexus has seen at least related one + // successful "repair finished" notification. + // + // Note: after a region replacement request has transitioned to `Complete`, + // there may be many future "repair finished" notifications for the "new" + // region that are unrelated to the replacement request. + pub async fn request_has_matching_successful_finish_notification( + &self, + opctx: &OpContext, + region_replacement: &RegionReplacement, + ) -> Result { + let Some(new_region_id) = region_replacement.new_region_id else { + return Err(Error::invalid_request(format!( + "region replacement {} has no new region id!", + region_replacement.id, + ))); + }; + + use db::schema::upstairs_repair_notification::dsl; + + let maybe_notification = dsl::upstairs_repair_notification + .filter(dsl::region_id.eq(new_region_id)) + .filter( + dsl::notification_type + .eq(UpstairsRepairNotificationType::Succeeded), + ) + .first_async::( + &*self.pool_connection_authorized(opctx).await?, + ) + .await + .optional() + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; + + Ok(maybe_notification.is_some()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + use crate::db::datastore::test_utils::datastore_test; + use nexus_test_utils::db::test_setup_database; + use omicron_test_utils::dev; + + #[tokio::test] + async fn test_one_replacement_per_volume() { + let logctx = dev::test_setup_log("test_one_replacement_per_volume"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + + let region_1_id = Uuid::new_v4(); + let region_2_id = Uuid::new_v4(); + let volume_id = Uuid::new_v4(); + + let request_1 = RegionReplacement::new(region_1_id, volume_id); + let request_2 = RegionReplacement::new(region_2_id, volume_id); + + datastore + .insert_region_replacement_request(&opctx, request_1) + .await + .unwrap(); + datastore + .insert_region_replacement_request(&opctx, request_2) + .await + .unwrap_err(); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_replacement_done_in_middle_of_drive_saga() { + // If Nexus receives a notification that a repair has finished in the + // middle of a drive saga, then make sure the replacement request state + // ends up as `ReplacementDone`. + + let logctx = dev::test_setup_log( + "test_replacement_done_in_middle_of_drive_saga", + ); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + + let region_id = Uuid::new_v4(); + let volume_id = Uuid::new_v4(); + + let request = { + let mut request = RegionReplacement::new(region_id, volume_id); + request.replacement_state = RegionReplacementState::Running; + request + }; + + datastore + .insert_region_replacement_request(&opctx, request.clone()) + .await + .unwrap(); + + // Transition to Driving + + let saga_id = Uuid::new_v4(); + + datastore + .set_region_replacement_driving(&opctx, request.id, saga_id) + .await + .unwrap(); + + // Now, Nexus receives a notification that the repair has finished + // successfully + + datastore + .mark_region_replacement_as_done(&opctx, request.id) + .await + .unwrap(); + + // Ensure that the state is ReplacementDone, and the operating saga id + // is cleared. + + let actual_request = datastore + .get_region_replacement_request_by_id(&opctx, request.id) + .await + .unwrap(); + + assert_eq!( + actual_request.replacement_state, + RegionReplacementState::ReplacementDone + ); + assert_eq!(actual_request.operating_saga_id, None); + + // The Drive saga will unwind when it tries to set the state back to + // Running. + + datastore + .undo_set_region_replacement_driving(&opctx, request.id, saga_id) + .await + .unwrap_err(); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } +} diff --git a/nexus/db-queries/src/db/datastore/snapshot.rs b/nexus/db-queries/src/db/datastore/snapshot.rs index 7a3f84bbb2..9d4900e2a4 100644 --- a/nexus/db-queries/src/db/datastore/snapshot.rs +++ b/nexus/db-queries/src/db/datastore/snapshot.rs @@ -31,6 +31,7 @@ use omicron_common::api::external::http_pagination::PaginatedBy; use omicron_common::api::external::CreateResult; use omicron_common::api::external::Error; use omicron_common::api::external::ListResultVec; +use omicron_common::api::external::LookupResult; use omicron_common::api::external::LookupType; use omicron_common::api::external::ResourceType; use omicron_common::api::external::UpdateResult; @@ -304,4 +305,21 @@ impl DataStore { } } } + + pub async fn find_snapshot_by_destination_volume_id( + &self, + opctx: &OpContext, + volume_id: Uuid, + ) -> LookupResult> { + let conn = self.pool_connection_authorized(opctx).await?; + + use db::schema::snapshot::dsl; + dsl::snapshot + .filter(dsl::destination_volume_id.eq(volume_id)) + .select(Snapshot::as_select()) + .first_async(&*conn) + .await + .optional() + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } } diff --git a/nexus/db-queries/src/db/datastore/volume.rs b/nexus/db-queries/src/db/datastore/volume.rs index 0e80ee3e3c..a7b9273aa8 100644 --- a/nexus/db-queries/src/db/datastore/volume.rs +++ b/nexus/db-queries/src/db/datastore/volume.rs @@ -45,6 +45,7 @@ use serde::Deserialize; use serde::Deserializer; use serde::Serialize; use sled_agent_client::types::VolumeConstructionRequest; +use std::net::SocketAddrV6; use uuid::Uuid; #[derive(Debug, Clone, Copy)] @@ -1150,6 +1151,48 @@ impl DataStore { }) } + /// Return all the read-write regions in a volume whose target address + /// matches the argument dataset's. + pub async fn get_dataset_rw_regions_in_volume( + &self, + opctx: &OpContext, + dataset_id: Uuid, + volume_id: Uuid, + ) -> LookupResult> { + let conn = self.pool_connection_authorized(opctx).await?; + + let dataset = { + use db::schema::dataset::dsl; + + dsl::dataset + .filter(dsl::id.eq(dataset_id)) + .select(Dataset::as_select()) + .first_async(&*conn) + .await + .map_err(|e| { + public_error_from_diesel(e, ErrorHandler::Server) + })? + }; + + let Some(volume) = self.volume_get(volume_id).await? else { + return Err(Error::internal_error("volume is gone!?")); + }; + + let vcr: VolumeConstructionRequest = + serde_json::from_str(&volume.data())?; + + let mut targets: Vec = vec![]; + + find_matching_rw_regions_in_volume( + &vcr, + dataset.address().ip(), + &mut targets, + ) + .map_err(|e| Error::internal_error(&e.to_string()))?; + + Ok(targets) + } + // An Upstairs is created as part of a Volume hierarchy if the Volume // Construction Request includes a "Region" variant. This may be at any // layer of the Volume, and some notifications will come from an Upstairs @@ -1583,6 +1626,274 @@ impl DataStore { } } +pub struct VolumeReplacementParams { + pub volume_id: Uuid, + pub region_id: Uuid, + pub region_addr: SocketAddrV6, +} + +impl DataStore { + /// Replace a read-write region in a Volume with a new region. + pub async fn volume_replace_region( + &self, + existing: VolumeReplacementParams, + replacement: VolumeReplacementParams, + ) -> Result<(), Error> { + // In a single transaction: + // + // - set the existing region's volume id to the replacement's volume id + // - set the replacement region's volume id to the existing's volume id + // - update the existing volume's construction request to replace the + // existing region's SocketAddrV6 with the replacement region's + // + // This function's effects can be undone by calling it with swapped + // parameters. + // + // # Example # + // + // Imagine `volume_replace_region` is called with the following, + // pretending that UUIDs are just eight uppercase letters: + // + // let existing = VolumeReplacementParams { + // volume_id: TARGET_VOL, + // region_id: TARGET_REG, + // region_addr: "[fd00:1122:3344:145::10]:40001", + // } + // + // let replace = VolumeReplacementParams { + // volume_id: NEW_VOL, + // region_id: NEW_REG, + // region_addr: "[fd00:1122:3344:322::4]:3956", + // } + // + // In the database, the relevant records (and columns) of the region + // table look like this prior to the transaction: + // + // id | volume_id + // -------------| --------- + // TARGET_REG | TARGET_VOL + // NEW_REG | NEW_VOL + // + // TARGET_VOL has a volume construction request where one of the targets + // list will contain TARGET_REG's address: + // + // { + // "type": "volume", + // "block_size": 512, + // "id": "TARGET_VOL", + // "read_only_parent": { + // ... + // }, + // "sub_volumes": [ + // { + // ... + // "opts": { + // ... + // "target": [ + // "[fd00:1122:3344:103::3]:19004", + // "[fd00:1122:3344:79::12]:27015", + // "[fd00:1122:3344:145::10]:40001" <----- + // ] + // } + // } + // ] + // } + // + // Note it is not required for the replacement volume to exist as a + // database record for this transaction. + // + // The first part of the transaction will swap the volume IDs of the + // existing and replacement region records: + // + // id | volume_id + // ------------| --------- + // TARGET_REG | NEW_VOL + // NEW_REG | TARGET_VOL + // + // The second part of the transaction will update the volume + // construction request of TARGET_VOL by finding and replacing + // TARGET_REG's address (in the appropriate targets array) with + // NEW_REG's address: + // + // { + // ... + // "target": [ + // "[fd00:1122:3344:103::3]:19004", + // "[fd00:1122:3344:79::12]:27015", + // "[fd00:1122:3344:322::4]:3956" <----- + // ] + // ... + // } + // + // After the transaction, the caller should ensure that TARGET_REG is + // referenced (via its socket address) in NEW_VOL. For an example, this + // is done as part of the region replacement start saga. + + #[derive(Debug, thiserror::Error)] + enum VolumeReplaceRegionError { + #[error("Error from Volume region replacement: {0}")] + Public(Error), + + #[error("Serde error during Volume region replacement: {0}")] + SerdeError(#[from] serde_json::Error), + + #[error("Target Volume deleted")] + TargetVolumeDeleted, + + #[error("Region replacement error: {0}")] + RegionReplacementError(#[from] anyhow::Error), + } + let err = OptionalError::new(); + + let conn = self.pool_connection_unauthorized().await?; + self.transaction_retry_wrapper("volume_replace_region") + .transaction(&conn, |conn| { + let err = err.clone(); + async move { + use db::schema::region::dsl as region_dsl; + use db::schema::volume::dsl as volume_dsl; + + // Set the existing region's volume id to the replacement's + // volume id + diesel::update(region_dsl::region) + .filter(region_dsl::id.eq(existing.region_id)) + .set(region_dsl::volume_id.eq(replacement.volume_id)) + .execute_async(&conn) + .await + .map_err(|e| { + err.bail_retryable_or_else(e, |e| { + VolumeReplaceRegionError::Public( + public_error_from_diesel( + e, + ErrorHandler::Server, + ) + ) + }) + })?; + + // Set the replacement region's volume id to the existing's + // volume id + diesel::update(region_dsl::region) + .filter(region_dsl::id.eq(replacement.region_id)) + .set(region_dsl::volume_id.eq(existing.volume_id)) + .execute_async(&conn) + .await + .map_err(|e| { + err.bail_retryable_or_else(e, |e| { + VolumeReplaceRegionError::Public( + public_error_from_diesel( + e, + ErrorHandler::Server, + ) + ) + }) + })?; + + // Update the existing volume's construction request to + // replace the existing region's SocketAddrV6 with the + // replacement region's + let maybe_old_volume = { + volume_dsl::volume + .filter(volume_dsl::id.eq(existing.volume_id)) + .select(Volume::as_select()) + .first_async::(&conn) + .await + .optional() + .map_err(|e| { + err.bail_retryable_or_else(e, |e| { + VolumeReplaceRegionError::Public( + public_error_from_diesel( + e, + ErrorHandler::Server, + ) + ) + }) + })? + }; + + let old_volume = if let Some(old_volume) = maybe_old_volume { + old_volume + } else { + // existing volume was deleted, so return an error, we + // can't perform the region replacement now! + return Err(err.bail(VolumeReplaceRegionError::TargetVolumeDeleted)); + }; + + let old_vcr: VolumeConstructionRequest = + match serde_json::from_str(&old_volume.data()) { + Ok(vcr) => vcr, + Err(e) => { + return Err(err.bail(VolumeReplaceRegionError::SerdeError(e))); + }, + }; + + // Copy the old volume's VCR, changing out the old region + // for the new. + let new_vcr = match replace_region_in_vcr( + &old_vcr, + existing.region_addr, + replacement.region_addr, + ) { + Ok(new_vcr) => new_vcr, + Err(e) => { + return Err(err.bail( + VolumeReplaceRegionError::RegionReplacementError(e) + )); + } + }; + + let new_volume_data = serde_json::to_string( + &new_vcr, + ) + .map_err(|e| { + err.bail(VolumeReplaceRegionError::SerdeError(e)) + })?; + + // Update the existing volume's data + diesel::update(volume_dsl::volume) + .filter(volume_dsl::id.eq(existing.volume_id)) + .set(volume_dsl::data.eq(new_volume_data)) + .execute_async(&conn) + .await + .map_err(|e| { + err.bail_retryable_or_else(e, |e| { + VolumeReplaceRegionError::Public( + public_error_from_diesel( + e, + ErrorHandler::Server, + ) + ) + }) + })?; + + Ok(()) + } + }) + .await + .map_err(|e| { + if let Some(err) = err.take() { + match err { + VolumeReplaceRegionError::Public(e) => e, + + VolumeReplaceRegionError::SerdeError(_) => { + Error::internal_error(&err.to_string()) + } + + VolumeReplaceRegionError::TargetVolumeDeleted => { + Error::internal_error(&err.to_string()) + } + + VolumeReplaceRegionError::RegionReplacementError(_) => { + Error::internal_error(&err.to_string()) + } + } + } else { + public_error_from_diesel(e, ErrorHandler::Server) + } + }) + } +} + /// Return the targets from a VolumeConstructionRequest. /// /// The targets of a volume construction request map to resources. @@ -1681,6 +1992,119 @@ pub fn volume_is_read_only( } } +/// Replace a Region in a VolumeConstructionRequest +/// +/// Note that UUIDs are not randomized by this step: Crucible will reject a +/// `target_replace` call if the replacement VolumeConstructionRequest does not +/// exactly match the original, except for a single Region difference. +/// +/// Note that the generation number _is_ bumped in this step, otherwise +/// `compare_vcr_for_update` will reject the update. +fn replace_region_in_vcr( + vcr: &VolumeConstructionRequest, + old_region: SocketAddrV6, + new_region: SocketAddrV6, +) -> anyhow::Result { + match vcr { + VolumeConstructionRequest::Volume { + id, + block_size, + sub_volumes, + read_only_parent, + } => Ok(VolumeConstructionRequest::Volume { + id: *id, + block_size: *block_size, + sub_volumes: sub_volumes + .iter() + .map(|subvol| -> anyhow::Result { + replace_region_in_vcr(&subvol, old_region, new_region) + }) + .collect::>>()?, + + // Only replacing R/W regions + read_only_parent: read_only_parent.clone(), + }), + + VolumeConstructionRequest::Url { id, block_size, url } => { + Ok(VolumeConstructionRequest::Url { + id: *id, + block_size: *block_size, + url: url.clone(), + }) + } + + VolumeConstructionRequest::Region { + block_size, + blocks_per_extent, + extent_count, + opts, + gen, + } => { + let mut opts = opts.clone(); + + for target in &mut opts.target { + let parsed_target: SocketAddrV6 = target.parse()?; + if parsed_target == old_region { + *target = new_region.to_string(); + } + } + + Ok(VolumeConstructionRequest::Region { + block_size: *block_size, + blocks_per_extent: *blocks_per_extent, + extent_count: *extent_count, + opts, + gen: *gen + 1, + }) + } + + VolumeConstructionRequest::File { id, block_size, path } => { + Ok(VolumeConstructionRequest::File { + id: *id, + block_size: *block_size, + path: path.clone(), + }) + } + } +} + +/// Find Regions in a Volume's subvolumes list whose target match the argument +/// IP, and add them to the supplied Vec. +fn find_matching_rw_regions_in_volume( + vcr: &VolumeConstructionRequest, + ip: &std::net::Ipv6Addr, + matched_targets: &mut Vec, +) -> anyhow::Result<()> { + match vcr { + VolumeConstructionRequest::Volume { sub_volumes, .. } => { + for sub_volume in sub_volumes { + find_matching_rw_regions_in_volume( + sub_volume, + ip, + matched_targets, + )?; + } + } + + VolumeConstructionRequest::Url { .. } => {} + + VolumeConstructionRequest::Region { opts, .. } => { + if !opts.read_only { + for target in &opts.target { + let parsed_target: SocketAddrV6 = target.parse()?; + if parsed_target.ip() == ip { + matched_targets.push(parsed_target); + } + } + } + } + + VolumeConstructionRequest::File { .. } => {} + } + + Ok(()) +} + #[cfg(test)] mod tests { use super::*; @@ -1688,6 +2112,7 @@ mod tests { use crate::db::datastore::test_utils::datastore_test; use nexus_test_utils::db::test_setup_database; use omicron_test_utils::dev; + use sled_agent_client::types::CrucibleOpts; // Assert that Nexus will not fail to deserialize an old version of // CrucibleResources that was serialized before schema update 6.0.0. @@ -1794,4 +2219,211 @@ mod tests { db.cleanup().await.unwrap(); logctx.cleanup_successful(); } + + #[tokio::test] + async fn test_volume_replace_region() { + let logctx = dev::test_setup_log("test_volume_replace_region"); + let log = logctx.log.new(o!()); + let mut db = test_setup_database(&log).await; + let (_opctx, db_datastore) = datastore_test(&logctx, &db).await; + + // Insert four Region records (three, plus one additionally allocated) + + let volume_id = Uuid::new_v4(); + let new_volume_id = Uuid::new_v4(); + + let mut region_and_volume_ids = [ + (Uuid::new_v4(), volume_id), + (Uuid::new_v4(), volume_id), + (Uuid::new_v4(), volume_id), + (Uuid::new_v4(), new_volume_id), + ]; + + { + let conn = db_datastore.pool_connection_for_tests().await.unwrap(); + + for i in 0..4 { + let (_, volume_id) = region_and_volume_ids[i]; + + let region = Region::new( + Uuid::new_v4(), // dataset id + volume_id, + 512_i64.try_into().unwrap(), + 10, + 10, + ); + + region_and_volume_ids[i].0 = region.id(); + + use nexus_db_model::schema::region::dsl; + diesel::insert_into(dsl::region) + .values(region.clone()) + .execute_async(&*conn) + .await + .unwrap(); + } + } + + let _volume = db_datastore + .volume_create(nexus_db_model::Volume::new( + volume_id, + serde_json::to_string(&VolumeConstructionRequest::Volume { + id: volume_id, + block_size: 512, + sub_volumes: vec![VolumeConstructionRequest::Region { + block_size: 512, + blocks_per_extent: 10, + extent_count: 10, + gen: 1, + opts: CrucibleOpts { + id: volume_id, + target: vec![ + String::from("[fd00:1122:3344:101::1]:11111"), // target to replace + String::from("[fd00:1122:3344:102::1]:22222"), + String::from("[fd00:1122:3344:103::1]:33333"), + ], + lossy: false, + flush_timeout: None, + key: None, + cert_pem: None, + key_pem: None, + root_cert_pem: None, + control: None, + read_only: false, + }, + }], + read_only_parent: None, + }) + .unwrap(), + )) + .await + .unwrap(); + + // Replace one + + let target = region_and_volume_ids[0]; + let replacement = region_and_volume_ids[3]; + + db_datastore + .volume_replace_region( + /* target */ + db::datastore::VolumeReplacementParams { + volume_id: target.1, + region_id: target.0, + region_addr: "[fd00:1122:3344:101::1]:11111" + .parse() + .unwrap(), + }, + /* replacement */ + db::datastore::VolumeReplacementParams { + volume_id: replacement.1, + region_id: replacement.0, + region_addr: "[fd55:1122:3344:101::1]:11111" + .parse() + .unwrap(), + }, + ) + .await + .unwrap(); + + let vcr: VolumeConstructionRequest = serde_json::from_str( + db_datastore.volume_get(volume_id).await.unwrap().unwrap().data(), + ) + .unwrap(); + + // Ensure the shape of the resulting VCR + assert_eq!( + &vcr, + &VolumeConstructionRequest::Volume { + id: volume_id, + block_size: 512, + sub_volumes: vec![VolumeConstructionRequest::Region { + block_size: 512, + blocks_per_extent: 10, + extent_count: 10, + gen: 2, // generation number bumped + opts: CrucibleOpts { + id: volume_id, + target: vec![ + String::from("[fd55:1122:3344:101::1]:11111"), // replaced + String::from("[fd00:1122:3344:102::1]:22222"), + String::from("[fd00:1122:3344:103::1]:33333"), + ], + lossy: false, + flush_timeout: None, + key: None, + cert_pem: None, + key_pem: None, + root_cert_pem: None, + control: None, + read_only: false, + }, + }], + read_only_parent: None, + }, + ); + + // Now undo the replacement. Note volume ID is not swapped. + db_datastore + .volume_replace_region( + /* target */ + db::datastore::VolumeReplacementParams { + volume_id: target.1, + region_id: replacement.0, + region_addr: "[fd55:1122:3344:101::1]:11111" + .parse() + .unwrap(), + }, + /* replacement */ + db::datastore::VolumeReplacementParams { + volume_id: replacement.1, + region_id: target.0, + region_addr: "[fd00:1122:3344:101::1]:11111" + .parse() + .unwrap(), + }, + ) + .await + .unwrap(); + + let vcr: VolumeConstructionRequest = serde_json::from_str( + db_datastore.volume_get(volume_id).await.unwrap().unwrap().data(), + ) + .unwrap(); + + // Ensure the shape of the resulting VCR + assert_eq!( + &vcr, + &VolumeConstructionRequest::Volume { + id: volume_id, + block_size: 512, + sub_volumes: vec![VolumeConstructionRequest::Region { + block_size: 512, + blocks_per_extent: 10, + extent_count: 10, + gen: 3, // generation number bumped + opts: CrucibleOpts { + id: volume_id, + target: vec![ + String::from("[fd00:1122:3344:101::1]:11111"), // back to what it was + String::from("[fd00:1122:3344:102::1]:22222"), + String::from("[fd00:1122:3344:103::1]:33333"), + ], + lossy: false, + flush_timeout: None, + key: None, + cert_pem: None, + key_pem: None, + root_cert_pem: None, + control: None, + read_only: false, + }, + }], + read_only_parent: None, + }, + ); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } } diff --git a/nexus/src/app/sagas/common_storage.rs b/nexus/src/app/sagas/common_storage.rs index 611fcc3258..51e9648592 100644 --- a/nexus/src/app/sagas/common_storage.rs +++ b/nexus/src/app/sagas/common_storage.rs @@ -17,7 +17,6 @@ use internal_dns::ServiceName; use nexus_db_queries::authz; use nexus_db_queries::context::OpContext; use nexus_db_queries::db; -use nexus_db_queries::db::identity::Asset; use nexus_db_queries::db::lookup::LookupPath; use omicron_common::api::external::Error; use omicron_common::backoff::{self, BackoffError}; diff --git a/nexus/tests/integration_tests/disks.rs b/nexus/tests/integration_tests/disks.rs index 886504a83b..ed4fd59277 100644 --- a/nexus/tests/integration_tests/disks.rs +++ b/nexus/tests/integration_tests/disks.rs @@ -29,7 +29,6 @@ use nexus_test_utils::resource_helpers::objects_list_page_authz; use nexus_test_utils::resource_helpers::DiskTest; use nexus_test_utils_macros::nexus_test; use nexus_types::external_api::params; -use nexus_types::identity::Asset; use omicron_common::api::external::ByteCount; use omicron_common::api::external::Disk; use omicron_common::api::external::DiskState; diff --git a/schema/crdb/dbinit.sql b/schema/crdb/dbinit.sql index 2cf9e1100f..17ea6d5510 100644 --- a/schema/crdb/dbinit.sql +++ b/schema/crdb/dbinit.sql @@ -3866,6 +3866,78 @@ ON omicron.public.sled (sled_policy) STORING (ip, sled_state); CREATE INDEX IF NOT EXISTS vmm_by_instance_id ON omicron.public.vmm (instance_id) STORING (sled_id); +CREATE TYPE IF NOT EXISTS omicron.public.region_replacement_state AS ENUM ( + 'requested', + 'allocating', + 'running', + 'driving', + 'replacement_done', + 'completing', + 'complete' +); + +CREATE TABLE IF NOT EXISTS omicron.public.region_replacement ( + /* unique ID for this region replacement */ + id UUID PRIMARY KEY, + + request_time TIMESTAMPTZ NOT NULL, + + old_region_id UUID NOT NULL, + + volume_id UUID NOT NULL, + + old_region_volume_id UUID, + + new_region_id UUID, + + replacement_state omicron.public.region_replacement_state NOT NULL, + + operating_saga_id UUID +); + +CREATE INDEX IF NOT EXISTS lookup_region_replacement_by_state on omicron.public.region_replacement (replacement_state); + +CREATE TABLE IF NOT EXISTS omicron.public.volume_repair ( + volume_id UUID PRIMARY KEY, + repair_id UUID NOT NULL +); + +CREATE INDEX IF NOT EXISTS lookup_volume_repair_by_repair_id on omicron.public.volume_repair ( + repair_id +); + +CREATE TYPE IF NOT EXISTS omicron.public.region_replacement_step_type AS ENUM ( + 'propolis', + 'pantry' +); + +CREATE TABLE IF NOT EXISTS omicron.public.region_replacement_step ( + replacement_id UUID NOT NULL, + + step_time TIMESTAMPTZ NOT NULL, + + step_type omicron.public.region_replacement_step_type NOT NULL, + + step_associated_instance_id UUID, + step_associated_vmm_id UUID, + + step_associated_pantry_ip INET, + step_associated_pantry_port INT4 CHECK (step_associated_pantry_port BETWEEN 0 AND 65535), + step_associated_pantry_job_id UUID, + + PRIMARY KEY (replacement_id, step_time, step_type) +); + +CREATE INDEX IF NOT EXISTS step_time_order on omicron.public.region_replacement_step (step_time); + +CREATE INDEX IF NOT EXISTS search_for_repair_notifications ON omicron.public.upstairs_repair_notification (region_id, notification_type); + +CREATE INDEX IF NOT EXISTS lookup_any_disk_by_volume_id ON omicron.public.disk ( + volume_id +); + +CREATE INDEX IF NOT EXISTS lookup_snapshot_by_destination_volume_id ON omicron.public.snapshot ( destination_volume_id ); + /* * Metadata for the schema itself. This version number isn't great, as there's * nothing to ensure it gets bumped when it should be, but it's a start. @@ -3926,7 +3998,7 @@ INSERT INTO omicron.public.db_metadata ( version, target_version ) VALUES - (TRUE, NOW(), NOW(), '64.0.0', NULL) + (TRUE, NOW(), NOW(), '65.0.0', NULL) ON CONFLICT DO NOTHING; COMMIT; diff --git a/schema/crdb/region-replacement/up01.sql b/schema/crdb/region-replacement/up01.sql new file mode 100644 index 0000000000..e13ec3c983 --- /dev/null +++ b/schema/crdb/region-replacement/up01.sql @@ -0,0 +1,9 @@ +CREATE TYPE IF NOT EXISTS omicron.public.region_replacement_state AS ENUM ( + 'requested', + 'allocating', + 'running', + 'driving', + 'replacement_done', + 'completing', + 'complete' +); diff --git a/schema/crdb/region-replacement/up02.sql b/schema/crdb/region-replacement/up02.sql new file mode 100644 index 0000000000..46e5de96ba --- /dev/null +++ b/schema/crdb/region-replacement/up02.sql @@ -0,0 +1,18 @@ +CREATE TABLE IF NOT EXISTS omicron.public.region_replacement ( + /* unique ID for this region replacement */ + id UUID PRIMARY KEY, + + request_time TIMESTAMPTZ NOT NULL, + + old_region_id UUID NOT NULL, + + volume_id UUID NOT NULL, + + old_region_volume_id UUID, + + new_region_id UUID, + + replacement_state omicron.public.region_replacement_state NOT NULL, + + operating_saga_id UUID +); diff --git a/schema/crdb/region-replacement/up03.sql b/schema/crdb/region-replacement/up03.sql new file mode 100644 index 0000000000..51a9db9379 --- /dev/null +++ b/schema/crdb/region-replacement/up03.sql @@ -0,0 +1 @@ +CREATE INDEX IF NOT EXISTS lookup_region_replacement_by_state on omicron.public.region_replacement (replacement_state); diff --git a/schema/crdb/region-replacement/up04.sql b/schema/crdb/region-replacement/up04.sql new file mode 100644 index 0000000000..7a95f48983 --- /dev/null +++ b/schema/crdb/region-replacement/up04.sql @@ -0,0 +1,4 @@ +CREATE TABLE IF NOT EXISTS omicron.public.volume_repair ( + volume_id UUID PRIMARY KEY, + repair_id UUID NOT NULL +); diff --git a/schema/crdb/region-replacement/up05.sql b/schema/crdb/region-replacement/up05.sql new file mode 100644 index 0000000000..b436dd865d --- /dev/null +++ b/schema/crdb/region-replacement/up05.sql @@ -0,0 +1,3 @@ +CREATE INDEX IF NOT EXISTS lookup_volume_repair_by_repair_id on omicron.public.volume_repair ( + repair_id +); diff --git a/schema/crdb/region-replacement/up06.sql b/schema/crdb/region-replacement/up06.sql new file mode 100644 index 0000000000..b02377cc59 --- /dev/null +++ b/schema/crdb/region-replacement/up06.sql @@ -0,0 +1,4 @@ +CREATE TYPE IF NOT EXISTS omicron.public.region_replacement_step_type AS ENUM ( + 'propolis', + 'pantry' +); diff --git a/schema/crdb/region-replacement/up07.sql b/schema/crdb/region-replacement/up07.sql new file mode 100644 index 0000000000..675b637bf3 --- /dev/null +++ b/schema/crdb/region-replacement/up07.sql @@ -0,0 +1,16 @@ +CREATE TABLE IF NOT EXISTS omicron.public.region_replacement_step ( + replacement_id UUID NOT NULL, + + step_time TIMESTAMPTZ NOT NULL, + + step_type omicron.public.region_replacement_step_type NOT NULL, + + step_associated_instance_id UUID, + step_associated_vmm_id UUID, + + step_associated_pantry_ip INET, + step_associated_pantry_port INT4 CHECK (step_associated_pantry_port BETWEEN 0 AND 65535), + step_associated_pantry_job_id UUID, + + PRIMARY KEY (replacement_id, step_time, step_type) +); diff --git a/schema/crdb/region-replacement/up08.sql b/schema/crdb/region-replacement/up08.sql new file mode 100644 index 0000000000..a5ecac8216 --- /dev/null +++ b/schema/crdb/region-replacement/up08.sql @@ -0,0 +1 @@ +CREATE INDEX IF NOT EXISTS step_time_order on omicron.public.region_replacement_step (step_time); diff --git a/schema/crdb/region-replacement/up09.sql b/schema/crdb/region-replacement/up09.sql new file mode 100644 index 0000000000..f5cc7bb682 --- /dev/null +++ b/schema/crdb/region-replacement/up09.sql @@ -0,0 +1 @@ +CREATE INDEX IF NOT EXISTS search_for_repair_notifications ON omicron.public.upstairs_repair_notification (region_id, notification_type); diff --git a/schema/crdb/region-replacement/up10.sql b/schema/crdb/region-replacement/up10.sql new file mode 100644 index 0000000000..eccfad8a25 --- /dev/null +++ b/schema/crdb/region-replacement/up10.sql @@ -0,0 +1,3 @@ +CREATE INDEX IF NOT EXISTS lookup_any_disk_by_volume_id ON omicron.public.disk ( + volume_id +); diff --git a/schema/crdb/region-replacement/up11.sql b/schema/crdb/region-replacement/up11.sql new file mode 100644 index 0000000000..5984bba752 --- /dev/null +++ b/schema/crdb/region-replacement/up11.sql @@ -0,0 +1 @@ +CREATE INDEX IF NOT EXISTS lookup_snapshot_by_destination_volume_id ON omicron.public.snapshot ( destination_volume_id );