diff --git a/Cargo.lock b/Cargo.lock index 5b575be25a..76d43120d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -913,6 +913,11 @@ name = "cc" version = "1.0.97" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "099a5357d84c4c61eb35fc8eafa9a79a902c2f76911e5747ced4e032edd8d9b4" +dependencies = [ + "jobserver", + "libc", + "once_cell", +] [[package]] name = "cert-dev" @@ -1514,6 +1519,36 @@ dependencies = [ "serde_json", ] +[[package]] +name = "crucible-common" +version = "0.0.1" +source = "git+https://github.com/oxidecomputer/crucible?rev=e58ca3693cb9ce0438947beba10e97ee38a0966b#e58ca3693cb9ce0438947beba10e97ee38a0966b" +dependencies = [ + "anyhow", + "atty", + "crucible-workspace-hack", + "dropshot", + "nix 0.28.0", + "rusqlite", + "rustls-pemfile 1.0.4", + "schemars", + "serde", + "serde_json", + "slog", + "slog-async", + "slog-bunyan", + "slog-dtrace", + "slog-term", + "tempfile", + "thiserror", + "tokio", + "tokio-rustls 0.24.1", + "toml 0.8.19", + "twox-hash", + "uuid", + "vergen", +] + [[package]] name = "crucible-pantry-client" version = "0.0.1" @@ -2484,6 +2519,18 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fancy-regex" version = "0.13.0" @@ -3004,6 +3051,19 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "git2" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b903b73e45dc0c6c596f2d37eccece7c1c8bb6e4407b001096387c63d0d93724" +dependencies = [ + "bitflags 2.6.0", + "libc", + "libgit2-sys", + "log", + "url", +] + [[package]] name = "glob" version = "0.3.1" @@ -3150,6 +3210,15 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "hashlink" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" +dependencies = [ + "hashbrown 0.14.5", +] + [[package]] name = "headers" version = "0.3.9" @@ -4022,6 +4091,15 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "jobserver" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.69" @@ -4181,6 +4259,18 @@ dependencies = [ "zone 0.1.8", ] +[[package]] +name = "libgit2-sys" +version = "0.17.0+1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10472326a8a6477c3c20a64547b0059e4b0d086869eee31e6d7da728a8eb7224" +dependencies = [ + "cc", + "libc", + "libz-sys", + "pkg-config", +] + [[package]] name = "libloading" version = "0.8.3" @@ -4264,6 +4354,16 @@ dependencies = [ "redox_syscall 0.5.1", ] +[[package]] +name = "libsqlite3-sys" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c10584274047cb335c23d3e61bcef8e323adae7c5c8c760540f73610177fc3f" +dependencies = [ + "pkg-config", + "vcpkg", +] + [[package]] name = "libsw" version = "3.3.1" @@ -4296,6 +4396,18 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "libz-sys" +version = "1.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e143b5e666b2695d28f6bca6497720813f699c9602dd7f5cac91008b8ada7f9" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linear-map" version = "1.2.0" @@ -5751,6 +5863,7 @@ dependencies = [ "cockroach-admin-client", "criterion", "crucible-agent-client", + "crucible-common", "crucible-pantry-client", "diesel", "display-error-chain", @@ -6181,6 +6294,7 @@ dependencies = [ "bstr 1.9.1", "byteorder", "bytes", + "cc", "chrono", "cipher", "clap", @@ -6225,6 +6339,7 @@ dependencies = [ "managed", "memchr", "mio 0.8.11", + "nix 0.28.0", "nom", "num-bigint", "num-integer", @@ -7330,7 +7445,7 @@ dependencies = [ "base64 0.22.1", "byteorder", "bytes", - "fallible-iterator", + "fallible-iterator 0.2.0", "hmac", "md-5", "memchr", @@ -7347,7 +7462,7 @@ checksum = "02048d9e032fb3cc3413bbf7b83a15d84a5d419778e2628751896d856498eee9" dependencies = [ "bytes", "chrono", - "fallible-iterator", + "fallible-iterator 0.2.0", "postgres-protocol", "serde", "serde_json", @@ -8315,6 +8430,20 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "rusqlite" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b838eba278d213a8beaf485bd313fd580ca4505a00d5871caeb1457c55322cae" +dependencies = [ + "bitflags 2.6.0", + "fallible-iterator 0.3.0", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec 1.13.2", +] + [[package]] name = "russh" version = "0.43.0" @@ -10211,7 +10340,7 @@ dependencies = [ "async-trait", "byteorder", "bytes", - "fallible-iterator", + "fallible-iterator 0.2.0", "futures-channel", "futures-util", "log", @@ -10696,6 +10825,17 @@ dependencies = [ "utf-8", ] +[[package]] +name = "twox-hash" +version = "1.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" +dependencies = [ + "cfg-if", + "rand 0.7.3", + "static_assertions", +] + [[package]] name = "typed-path" version = "0.7.1" @@ -11053,6 +11193,22 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "vergen" +version = "8.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2990d9ea5967266ea0ccf413a4aa5c42a93dbcfda9cb49a97de6931726b12566" +dependencies = [ + "anyhow", + "cargo_metadata", + "cfg-if", + "git2", + "regex", + "rustc_version 0.4.0", + "rustversion", + "time", +] + [[package]] name = "version_check" version = "0.9.5" diff --git a/Cargo.toml b/Cargo.toml index c8ba9490b2..8bfbb3b32d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -305,6 +305,7 @@ crossterm = { version = "0.28.1", features = ["event-stream"] } crucible-agent-client = { git = "https://github.com/oxidecomputer/crucible", rev = "e58ca3693cb9ce0438947beba10e97ee38a0966b" } crucible-pantry-client = { git = "https://github.com/oxidecomputer/crucible", rev = "e58ca3693cb9ce0438947beba10e97ee38a0966b" } crucible-smf = { git = "https://github.com/oxidecomputer/crucible", rev = "e58ca3693cb9ce0438947beba10e97ee38a0966b" } +crucible-common = { git = "https://github.com/oxidecomputer/crucible", rev = "e58ca3693cb9ce0438947beba10e97ee38a0966b" } csv = "1.3.0" curve25519-dalek = "4" datatest-stable = "0.2.9" diff --git a/dev-tools/omdb/src/bin/omdb/nexus.rs b/dev-tools/omdb/src/bin/omdb/nexus.rs index ec3e519cbc..6c39490db3 100644 --- a/dev-tools/omdb/src/bin/omdb/nexus.rs +++ b/dev-tools/omdb/src/bin/omdb/nexus.rs @@ -32,6 +32,7 @@ use nexus_saga_recovery::LastPass; use nexus_types::deployment::Blueprint; use nexus_types::internal_api::background::LookupRegionPortStatus; use nexus_types::internal_api::background::RegionReplacementDriverStatus; +use nexus_types::internal_api::background::RegionSnapshotReplacementStartStatus; use nexus_types::inventory::BaseboardId; use omicron_uuid_kinds::CollectionUuid; use omicron_uuid_kinds::GenericUuid; @@ -1332,6 +1333,38 @@ fn print_task_details(bgtask: &BackgroundTask, details: &serde_json::Value) { } } }; + } else if name == "region_snapshot_replacement" { + match serde_json::from_value::( + details.clone(), + ) { + Err(error) => eprintln!( + "warning: failed to interpret task details: {:?}: {:?}", + error, details + ), + + Ok(status) => { + println!( + " total requests created ok: {}", + status.requests_created_ok.len(), + ); + for line in &status.requests_created_ok { + println!(" > {line}"); + } + + println!( + " total start saga invoked ok: {}", + status.start_invoked_ok.len(), + ); + for line in &status.start_invoked_ok { + println!(" > {line}"); + } + + println!(" errors: {}", status.errors.len()); + for line in &status.errors { + println!(" > {line}"); + } + } + } } else { println!( "warning: unknown background task: {:?} \ diff --git a/dev-tools/omdb/tests/env.out b/dev-tools/omdb/tests/env.out index 67f113a801..8d5e8a1d41 100644 --- a/dev-tools/omdb/tests/env.out +++ b/dev-tools/omdb/tests/env.out @@ -127,6 +127,10 @@ task: "region_replacement_driver" drive region replacements forward to completion +task: "region_snapshot_replacement_start" + detect if region snapshots need replacement and begin the process + + task: "saga_recovery" recovers sagas assigned to this Nexus @@ -276,6 +280,10 @@ task: "region_replacement_driver" drive region replacements forward to completion +task: "region_snapshot_replacement_start" + detect if region snapshots need replacement and begin the process + + task: "saga_recovery" recovers sagas assigned to this Nexus @@ -412,6 +420,10 @@ task: "region_replacement_driver" drive region replacements forward to completion +task: "region_snapshot_replacement_start" + detect if region snapshots need replacement and begin the process + + task: "saga_recovery" recovers sagas assigned to this Nexus diff --git a/dev-tools/omdb/tests/successes.out b/dev-tools/omdb/tests/successes.out index d4c07899f4..9c7ae8df6d 100644 --- a/dev-tools/omdb/tests/successes.out +++ b/dev-tools/omdb/tests/successes.out @@ -328,6 +328,10 @@ task: "region_replacement_driver" drive region replacements forward to completion +task: "region_snapshot_replacement_start" + detect if region snapshots need replacement and begin the process + + task: "saga_recovery" recovers sagas assigned to this Nexus @@ -566,6 +570,13 @@ task: "region_replacement_driver" number of region replacement finish sagas started ok: 0 number of errors: 0 +task: "region_snapshot_replacement_start" + configured period: every s + currently executing: no + last completed activation: , triggered by a periodic timer firing + started at (s ago) and ran for ms +warning: unknown background task: "region_snapshot_replacement_start" (don't know how to interpret details: Object {"errors": Array [], "requests_created_ok": Array [], "start_invoked_ok": Array []}) + task: "saga_recovery" configured period: every 10m currently executing: no diff --git a/nexus-config/src/nexus_config.rs b/nexus-config/src/nexus_config.rs index 9d8bf1ac9b..b222ebd23b 100644 --- a/nexus-config/src/nexus_config.rs +++ b/nexus-config/src/nexus_config.rs @@ -391,6 +391,8 @@ pub struct BackgroundTaskConfig { pub saga_recovery: SagaRecoveryConfig, /// configuration for lookup region port task pub lookup_region_port: LookupRegionPortConfig, + /// configuration for region snapshot replacement starter task + pub region_snapshot_replacement_start: RegionSnapshotReplacementStartConfig, } #[serde_as] @@ -627,6 +629,14 @@ pub struct LookupRegionPortConfig { pub period_secs: Duration, } +#[serde_as] +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct RegionSnapshotReplacementStartConfig { + /// period (in seconds) for periodic activations of this background task + #[serde_as(as = "DurationSeconds")] + pub period_secs: Duration, +} + /// Configuration for a nexus server #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct PackageConfig { @@ -874,6 +884,7 @@ mod test { abandoned_vmm_reaper.period_secs = 60 saga_recovery.period_secs = 60 lookup_region_port.period_secs = 60 + region_snapshot_replacement_start.period_secs = 30 [default_region_allocation_strategy] type = "random" seed = 0 @@ -1036,6 +1047,10 @@ mod test { lookup_region_port: LookupRegionPortConfig { period_secs: Duration::from_secs(60), }, + region_snapshot_replacement_start: + RegionSnapshotReplacementStartConfig { + period_secs: Duration::from_secs(30), + }, }, default_region_allocation_strategy: crate::nexus_config::RegionAllocationStrategy::Random { @@ -1112,6 +1127,7 @@ mod test { abandoned_vmm_reaper.period_secs = 60 saga_recovery.period_secs = 60 lookup_region_port.period_secs = 60 + region_snapshot_replacement_start.period_secs = 30 [default_region_allocation_strategy] type = "random" "##, diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index a949b31f0d..36cee151a7 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -25,6 +25,7 @@ chrono.workspace = true cockroach-admin-client.workspace = true crucible-agent-client.workspace = true crucible-pantry-client.workspace = true +crucible-common.workspace = true dns-service-client.workspace = true dpd-client.workspace = true mg-admin-client.workspace = true diff --git a/nexus/db-queries/src/db/datastore/mod.rs b/nexus/db-queries/src/db/datastore/mod.rs index 58259be7ee..13c3708e4a 100644 --- a/nexus/db-queries/src/db/datastore/mod.rs +++ b/nexus/db-queries/src/db/datastore/mod.rs @@ -127,8 +127,12 @@ pub use vmm::VmmStateUpdateResult; pub use volume::read_only_resources_associated_with_volume; pub use volume::CrucibleResources; pub use volume::CrucibleTargets; +pub use volume::ExistingTarget; +pub use volume::ReplacementTarget; pub use volume::VolumeCheckoutReason; pub use volume::VolumeReplacementParams; +pub use volume::VolumeToDelete; +pub use volume::VolumeWithTarget; // Number of unique datasets required to back a region. // TODO: This should likely turn into a configuration option. diff --git a/nexus/db-queries/src/db/datastore/volume.rs b/nexus/db-queries/src/db/datastore/volume.rs index f777384b7b..f5c1f121e4 100644 --- a/nexus/db-queries/src/db/datastore/volume.rs +++ b/nexus/db-queries/src/db/datastore/volume.rs @@ -1795,16 +1795,16 @@ pub struct VolumeReplacementParams { // parameters #[derive(Debug, Clone, Copy)] -pub struct VolumeWithTarget(Uuid); +pub struct VolumeWithTarget(pub Uuid); #[derive(Debug, Clone, Copy)] -pub struct ExistingTarget(SocketAddrV6); +pub struct ExistingTarget(pub SocketAddrV6); #[derive(Debug, Clone, Copy)] -pub struct ReplacementTarget(SocketAddrV6); +pub struct ReplacementTarget(pub SocketAddrV6); #[derive(Debug, Clone, Copy)] -pub struct VolumeToDelete(Uuid); +pub struct VolumeToDelete(pub Uuid); impl DataStore { /// Replace a read-write region in a Volume with a new region. diff --git a/nexus/examples/config-second.toml b/nexus/examples/config-second.toml index 754f37c064..572de807d7 100644 --- a/nexus/examples/config-second.toml +++ b/nexus/examples/config-second.toml @@ -139,6 +139,7 @@ v2p_mapping_propagation.period_secs = 30 abandoned_vmm_reaper.period_secs = 60 saga_recovery.period_secs = 600 lookup_region_port.period_secs = 60 +region_snapshot_replacement_start.period_secs = 30 [default_region_allocation_strategy] # allocate region on 3 random distinct zpools, on 3 random distinct sleds. diff --git a/nexus/examples/config.toml b/nexus/examples/config.toml index bd50e846bd..3aebe35152 100644 --- a/nexus/examples/config.toml +++ b/nexus/examples/config.toml @@ -125,6 +125,7 @@ v2p_mapping_propagation.period_secs = 30 abandoned_vmm_reaper.period_secs = 60 saga_recovery.period_secs = 600 lookup_region_port.period_secs = 60 +region_snapshot_replacement_start.period_secs = 30 [default_region_allocation_strategy] # allocate region on 3 random distinct zpools, on 3 random distinct sleds. diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index 850e63443a..cc42a8f302 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -108,6 +108,7 @@ use super::tasks::phantom_disks; use super::tasks::physical_disk_adoption; use super::tasks::region_replacement; use super::tasks::region_replacement_driver; +use super::tasks::region_snapshot_replacement_start::*; use super::tasks::saga_recovery; use super::tasks::service_firewall_rules; use super::tasks::sync_service_zone_nat::ServiceZoneNatTracker; @@ -161,6 +162,7 @@ pub struct BackgroundTasks { pub task_vpc_route_manager: Activator, pub task_saga_recovery: Activator, pub task_lookup_region_port: Activator, + pub task_region_snapshot_replacement_start: Activator, // Handles to activate background tasks that do not get used by Nexus // at-large. These background tasks are implementation details as far as @@ -242,6 +244,7 @@ impl BackgroundTasksInitializer { task_vpc_route_manager: Activator::new(), task_saga_recovery: Activator::new(), task_lookup_region_port: Activator::new(), + task_region_snapshot_replacement_start: Activator::new(), task_internal_dns_propagation: Activator::new(), task_external_dns_propagation: Activator::new(), @@ -303,6 +306,7 @@ impl BackgroundTasksInitializer { task_vpc_route_manager, task_saga_recovery, task_lookup_region_port, + task_region_snapshot_replacement_start, // Add new background tasks here. Be sure to use this binding in a // call to `Driver::register()` below. That's what actually wires // up the Activator to the corresponding background task. @@ -721,13 +725,28 @@ impl BackgroundTasksInitializer { description: "fill in missing ports for region records", period: config.lookup_region_port.period_secs, task_impl: Box::new(lookup_region_port::LookupRegionPort::new( - datastore, + datastore.clone(), )), opctx: opctx.child(BTreeMap::new()), watchers: vec![], activator: task_lookup_region_port, }); + driver.register(TaskDefinition { + name: "region_snapshot_replacement_start", + description: + "detect if region snapshots need replacement and begin the \ + process", + period: config.region_snapshot_replacement_start.period_secs, + task_impl: Box::new(RegionSnapshotReplacementDetector::new( + datastore, + sagas.clone(), + )), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_region_snapshot_replacement_start, + }); + driver } } diff --git a/nexus/src/app/background/tasks/mod.rs b/nexus/src/app/background/tasks/mod.rs index fe041a6daa..b0281afd9f 100644 --- a/nexus/src/app/background/tasks/mod.rs +++ b/nexus/src/app/background/tasks/mod.rs @@ -25,6 +25,7 @@ pub mod phantom_disks; pub mod physical_disk_adoption; pub mod region_replacement; pub mod region_replacement_driver; +pub mod region_snapshot_replacement_start; pub mod saga_recovery; pub mod service_firewall_rules; pub mod sync_service_zone_nat; diff --git a/nexus/src/app/background/tasks/region_snapshot_replacement_start.rs b/nexus/src/app/background/tasks/region_snapshot_replacement_start.rs new file mode 100644 index 0000000000..9bc66d48c8 --- /dev/null +++ b/nexus/src/app/background/tasks/region_snapshot_replacement_start.rs @@ -0,0 +1,512 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Background task for detecting region snapshots that need replacing and +//! beginning that process +//! +//! This task's responsibility is to create region snapshot replacement requests +//! when physical disks are expunged, and trigger the region snapshot +//! replacement start saga for any requests that are in state "Requested". See +//! the documentation in that saga's docstring for more information. + +use crate::app::authn; +use crate::app::background::BackgroundTask; +use crate::app::saga::StartSaga; +use crate::app::sagas; +use crate::app::sagas::region_snapshot_replacement_start::*; +use crate::app::sagas::NexusSaga; +use crate::app::RegionAllocationStrategy; +use futures::future::BoxFuture; +use futures::FutureExt; +use nexus_db_model::RegionSnapshotReplacement; +use nexus_db_queries::context::OpContext; +use nexus_db_queries::db::DataStore; +use nexus_types::internal_api::background::RegionSnapshotReplacementStartStatus; +use serde_json::json; +use std::sync::Arc; + +pub struct RegionSnapshotReplacementDetector { + datastore: Arc, + sagas: Arc, +} + +impl RegionSnapshotReplacementDetector { + pub fn new(datastore: Arc, sagas: Arc) -> Self { + RegionSnapshotReplacementDetector { datastore, sagas } + } + + async fn send_start_request( + &self, + serialized_authn: authn::saga::Serialized, + request: RegionSnapshotReplacement, + ) -> Result<(), omicron_common::api::external::Error> { + let params = sagas::region_snapshot_replacement_start::Params { + serialized_authn, + request, + allocation_strategy: + RegionAllocationStrategy::RandomWithDistinctSleds { seed: None }, + }; + + let saga_dag = SagaRegionSnapshotReplacementStart::prepare(¶ms)?; + self.sagas.saga_start(saga_dag).await + } + + /// Find region snapshots on expunged physical disks and create region + /// snapshot replacement requests for them. + async fn create_requests_for_region_snapshots_on_expunged_disks( + &self, + opctx: &OpContext, + status: &mut RegionSnapshotReplacementStartStatus, + ) { + let log = &opctx.log; + + // Find region snapshots on expunged physical disks + let region_snapshots_to_be_replaced = match self + .datastore + .find_region_snapshots_on_expunged_physical_disks(opctx) + .await + { + Ok(region_snapshots) => region_snapshots, + + Err(e) => { + let s = format!( + "find_region_snapshots_on_expunged_physical_disks \ + failed: {e}", + ); + + error!(&log, "{s}"); + status.errors.push(s); + return; + } + }; + + for region_snapshot in region_snapshots_to_be_replaced { + // If no request exists yet, create one. + let existing_request = match self + .datastore + .lookup_region_snapshot_replacement_request( + opctx, + ®ion_snapshot, + ) + .await + { + Ok(existing_request) => existing_request, + + Err(e) => { + let s = + format!("error looking up replacement request: {e}"); + + error!( + &log, + "{s}"; + "snapshot_id" => %region_snapshot.snapshot_id, + "region_id" => %region_snapshot.region_id, + "dataset_id" => %region_snapshot.dataset_id, + ); + status.errors.push(s); + continue; + } + }; + + if existing_request.is_none() { + match self + .datastore + .create_region_snapshot_replacement_request( + opctx, + ®ion_snapshot, + ) + .await + { + Ok(request_id) => { + let s = format!( + "created region snapshot replacement request \ + {request_id}" + ); + + info!( + &log, + "{s}"; + "snapshot_id" => %region_snapshot.snapshot_id, + "region_id" => %region_snapshot.region_id, + "dataset_id" => %region_snapshot.dataset_id, + ); + status.requests_created_ok.push(s); + } + + Err(e) => { + let s = + format!("error creating replacement request: {e}"); + + error!( + &log, + "{s}"; + "snapshot_id" => %region_snapshot.snapshot_id, + "region_id" => %region_snapshot.region_id, + "dataset_id" => %region_snapshot.dataset_id, + ); + status.errors.push(s); + } + } + } + } + } + + /// For each region snapshot replacement request in state "Requested", run + /// the start saga. + async fn start_requested_region_snapshot_replacements( + &self, + opctx: &OpContext, + status: &mut RegionSnapshotReplacementStartStatus, + ) { + let log = &opctx.log; + + let requests = match self + .datastore + .get_requested_region_snapshot_replacements(opctx) + .await + { + Ok(requests) => requests, + + Err(e) => { + let s = format!( + "query for region snapshot replacement requests failed: {e}" + ); + + error!(&log, "{s}"); + status.errors.push(s); + return; + } + }; + + for request in requests { + let request_id = request.id; + + let result = self + .send_start_request( + authn::saga::Serialized::for_opctx(opctx), + request.clone(), + ) + .await; + + match result { + Ok(()) => { + let s = format!( + "region snapshot replacement start invoked ok for \ + {request_id}" + ); + + info!( + &log, + "{s}"; + "request.snapshot_id" => %request.old_snapshot_id, + "request.region_id" => %request.old_region_id, + "request.dataset_id" => %request.old_dataset_id, + ); + status.start_invoked_ok.push(s); + } + + Err(e) => { + let s = format!( + "invoking region snapshot replacement start for \ + {request_id} failed: {e}", + ); + + error!( + &log, + "{s}"; + "request.snapshot_id" => %request.old_snapshot_id, + "request.region_id" => %request.old_region_id, + "request.dataset_id" => %request.old_dataset_id, + ); + status.errors.push(s); + } + } + } + } +} + +impl BackgroundTask for RegionSnapshotReplacementDetector { + fn activate<'a>( + &'a mut self, + opctx: &'a OpContext, + ) -> BoxFuture<'a, serde_json::Value> { + async { + let log = &opctx.log; + info!(&log, "region snapshot replacement start task started"); + + let mut status = RegionSnapshotReplacementStartStatus::default(); + + self.create_requests_for_region_snapshots_on_expunged_disks( + opctx, + &mut status, + ) + .await; + + self.start_requested_region_snapshot_replacements( + opctx, + &mut status, + ) + .await; + + info!(&log, "region snapshot replacement start task done"); + + json!(status) + } + .boxed() + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::app::background::init::test::NoopStartSaga; + use crate::app::MIN_DISK_SIZE_BYTES; + use chrono::Utc; + use nexus_db_model::BlockSize; + use nexus_db_model::Generation; + use nexus_db_model::PhysicalDiskPolicy; + use nexus_db_model::RegionSnapshot; + use nexus_db_model::RegionSnapshotReplacement; + use nexus_db_model::Snapshot; + use nexus_db_model::SnapshotIdentity; + use nexus_db_model::SnapshotState; + use nexus_db_queries::authz; + use nexus_db_queries::db::lookup::LookupPath; + use nexus_test_utils::resource_helpers::create_project; + use nexus_test_utils_macros::nexus_test; + use omicron_common::api::external; + use omicron_uuid_kinds::GenericUuid; + use std::collections::BTreeMap; + use uuid::Uuid; + + type ControlPlaneTestContext = + nexus_test_utils::ControlPlaneTestContext; + type DiskTest<'a> = + nexus_test_utils::resource_helpers::DiskTest<'a, crate::Server>; + + #[nexus_test(server = crate::Server)] + async fn test_add_region_snapshot_replacement_causes_start( + cptestctx: &ControlPlaneTestContext, + ) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = OpContext::for_tests( + cptestctx.logctx.log.clone(), + datastore.clone(), + ); + + let starter = Arc::new(NoopStartSaga::new()); + let mut task = RegionSnapshotReplacementDetector::new( + datastore.clone(), + starter.clone(), + ); + + // Noop test + let result: RegionSnapshotReplacementStartStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + assert_eq!(result, RegionSnapshotReplacementStartStatus::default()); + assert_eq!(starter.count_reset(), 0); + + // Add a region snapshot replacement request for a fake region snapshot + + let request = RegionSnapshotReplacement::new( + Uuid::new_v4(), // dataset id + Uuid::new_v4(), // region id + Uuid::new_v4(), // snapshot id + ); + + let request_id = request.id; + + datastore + .insert_region_snapshot_replacement_request_with_volume_id( + &opctx, + request, + Uuid::new_v4(), + ) + .await + .unwrap(); + + // Activate the task - it should pick that up and try to run the + // region snapshot replacement start saga + let result: RegionSnapshotReplacementStartStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + + assert_eq!( + result, + RegionSnapshotReplacementStartStatus { + requests_created_ok: vec![], + start_invoked_ok: vec![format!( + "region snapshot replacement start invoked ok for \ + {request_id}" + )], + errors: vec![], + }, + ); + + assert_eq!(starter.count_reset(), 1); + } + + #[nexus_test(server = crate::Server)] + async fn test_expunge_disk_causes_region_snapshot_replacement_start( + cptestctx: &ControlPlaneTestContext, + ) { + let disk_test = DiskTest::new(cptestctx).await; + + let client = &cptestctx.external_client; + let project = create_project(&client, "testing").await; + let project_id = project.identity.id; + + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = OpContext::for_tests( + cptestctx.logctx.log.clone(), + datastore.clone(), + ); + + let starter = Arc::new(NoopStartSaga::new()); + let mut task = RegionSnapshotReplacementDetector::new( + datastore.clone(), + starter.clone(), + ); + + // Noop test + let result: RegionSnapshotReplacementStartStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + assert_eq!(result, RegionSnapshotReplacementStartStatus::default()); + assert_eq!(starter.count_reset(), 0); + + // Add three region snapshots for each dataset + + let region_id = Uuid::new_v4(); + let snapshot_id = Uuid::new_v4(); + let mut dataset_to_zpool: BTreeMap = + BTreeMap::default(); + + for zpool in disk_test.zpools() { + for dataset in &zpool.datasets { + dataset_to_zpool + .insert(zpool.id.to_string(), dataset.id.to_string()); + + datastore + .region_snapshot_create(RegionSnapshot::new( + dataset.id, + region_id, + snapshot_id, + String::from("[fd00:1122:3344::101]:12345"), + )) + .await + .unwrap(); + } + } + + // Create the fake snapshot + + let (.., authz_project) = LookupPath::new(&opctx, &datastore) + .project_id(project_id) + .lookup_for(authz::Action::CreateChild) + .await + .unwrap(); + + datastore + .project_ensure_snapshot( + &opctx, + &authz_project, + Snapshot { + identity: SnapshotIdentity { + id: snapshot_id, + name: external::Name::try_from("snapshot".to_string()) + .unwrap() + .into(), + description: "snapshot".into(), + + time_created: Utc::now(), + time_modified: Utc::now(), + time_deleted: None, + }, + + project_id, + disk_id: Uuid::new_v4(), + volume_id: Uuid::new_v4(), + destination_volume_id: Uuid::new_v4(), + + gen: Generation::new(), + state: SnapshotState::Creating, + block_size: BlockSize::AdvancedFormat, + + size: external::ByteCount::try_from(MIN_DISK_SIZE_BYTES) + .unwrap() + .into(), + }, + ) + .await + .unwrap(); + + // Expunge one of the physical disks + + let first_zpool = + disk_test.zpools().next().expect("Expected at least one zpool"); + + let (_, db_zpool) = LookupPath::new(&opctx, datastore) + .zpool_id(first_zpool.id.into_untyped_uuid()) + .fetch() + .await + .unwrap(); + + datastore + .physical_disk_update_policy( + &opctx, + db_zpool.physical_disk_id, + PhysicalDiskPolicy::Expunged, + ) + .await + .unwrap(); + + // Activate the task - it should pick that up and try to run the region + // snapshot replacement start saga for the region snapshot on that + // expunged disk + + let result: RegionSnapshotReplacementStartStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + + eprintln!("{:?}", &result); + + assert_eq!(result.requests_created_ok.len(), 1); + assert_eq!(result.start_invoked_ok.len(), 1); + assert!(result.errors.is_empty()); + + // The last part of the message is the region snapshot replacement + // request id + let request_created_uuid: Uuid = result.requests_created_ok[0] + .split(" ") + .last() + .unwrap() + .parse() + .unwrap(); + let request_started_uuid: Uuid = result.start_invoked_ok[0] + .split(" ") + .last() + .unwrap() + .parse() + .unwrap(); + + assert_eq!(request_created_uuid, request_started_uuid); + + assert_eq!(starter.count_reset(), 1); + + let request = datastore + .get_region_snapshot_replacement_request_by_id( + &opctx, + request_created_uuid, + ) + .await + .unwrap(); + + assert_eq!(request.old_snapshot_id, snapshot_id); + assert_eq!(request.old_region_id, region_id); + + let dataset_id = + dataset_to_zpool.get(&first_zpool.id.to_string()).unwrap(); + assert_eq!(&request.old_dataset_id.to_string(), dataset_id); + } +} diff --git a/nexus/src/app/crucible.rs b/nexus/src/app/crucible.rs index b8fca26c14..86de328355 100644 --- a/nexus/src/app/crucible.rs +++ b/nexus/src/app/crucible.rs @@ -150,7 +150,7 @@ impl super::Nexus { } /// Call out to Crucible agent and perform region creation. Optionally, - /// supply a read-only source to invoke a clone. + /// supply a read-only source's repair address to invoke a clone. pub async fn ensure_region_in_dataset( &self, log: &Logger, diff --git a/nexus/src/app/sagas/common_storage.rs b/nexus/src/app/sagas/common_storage.rs index 592463f5bb..d37370506c 100644 --- a/nexus/src/app/sagas/common_storage.rs +++ b/nexus/src/app/sagas/common_storage.rs @@ -15,6 +15,7 @@ use nexus_db_queries::db; use nexus_db_queries::db::lookup::LookupPath; use omicron_common::api::external::Error; use omicron_common::retry_until_known_result; +use slog::Logger; use std::net::SocketAddrV6; // Common Pantry operations @@ -107,3 +108,33 @@ pub(crate) async fn call_pantry_detach_for_disk( Ok(()) } + +pub(crate) fn find_only_new_region( + log: &Logger, + existing_datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, + new_datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, +) -> Option<(db::model::Dataset, db::model::Region)> { + // Only filter on whether or not a Region is in the existing list! Datasets + // can change values (like size_used) if this saga interleaves with other + // saga runs of the same type. + let mut dataset_and_region: Vec<(db::model::Dataset, db::model::Region)> = + new_datasets_and_regions + .into_iter() + .filter(|(_, r)| { + !existing_datasets_and_regions.iter().any(|(_, er)| er == r) + }) + .collect(); + + if dataset_and_region.len() != 1 { + error!( + log, + "find_only_new_region saw dataset_and_region len {}: {:?}", + dataset_and_region.len(), + dataset_and_region, + ); + + None + } else { + dataset_and_region.pop() + } +} diff --git a/nexus/src/app/sagas/mod.rs b/nexus/src/app/sagas/mod.rs index 0c57a5b2dc..5934868c20 100644 --- a/nexus/src/app/sagas/mod.rs +++ b/nexus/src/app/sagas/mod.rs @@ -38,6 +38,7 @@ pub mod project_create; pub mod region_replacement_drive; pub mod region_replacement_finish; pub mod region_replacement_start; +pub mod region_snapshot_replacement_start; pub mod snapshot_create; pub mod snapshot_delete; pub mod test_saga; @@ -188,6 +189,9 @@ fn make_action_registry() -> ActionRegistry { ::register_actions( &mut registry, ); + ::register_actions( + &mut registry, + ); #[cfg(test)] ::register_actions(&mut registry); diff --git a/nexus/src/app/sagas/region_replacement_start.rs b/nexus/src/app/sagas/region_replacement_start.rs index d4d455f927..86aab2ac22 100644 --- a/nexus/src/app/sagas/region_replacement_start.rs +++ b/nexus/src/app/sagas/region_replacement_start.rs @@ -26,12 +26,13 @@ //! ``` //! //! The first thing this saga does is set itself as the "operating saga" for the -//! request, and change the state to "Allocating". Then, it performs the following -//! steps: +//! request, and change the state to "Allocating". Then, it performs the +//! following steps: //! //! 1. Allocate a new region //! -//! 2. For the affected Volume, swap the region being replaced with the new region. +//! 2. For the affected Volume, swap the region being replaced with the new +//! region. //! //! 3. Create a fake volume that can be later deleted with the region being //! replaced. @@ -48,6 +49,7 @@ use super::{ ActionRegistry, NexusActionContext, NexusSaga, SagaInitError, ACTION_GENERATE_ID, }; +use crate::app::sagas::common_storage::find_only_new_region; use crate::app::sagas::declare_saga_actions; use crate::app::RegionAllocationStrategy; use crate::app::{authn, db}; @@ -57,7 +59,6 @@ use serde::Deserialize; use serde::Serialize; use sled_agent_client::types::CrucibleOpts; use sled_agent_client::types::VolumeConstructionRequest; -use slog::Logger; use std::net::SocketAddrV6; use steno::ActionError; use steno::Node; @@ -285,36 +286,6 @@ async fn srrs_alloc_new_region( Ok(datasets_and_regions) } -fn find_only_new_region( - log: &Logger, - existing_datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, - new_datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, -) -> Option<(db::model::Dataset, db::model::Region)> { - // Only filter on whether or not a Region is in the existing list! Datasets - // can change values (like size_used) if this saga interleaves with other - // saga runs of the same type. - let mut dataset_and_region: Vec<(db::model::Dataset, db::model::Region)> = - new_datasets_and_regions - .into_iter() - .filter(|(_, r)| { - !existing_datasets_and_regions.iter().any(|(_, er)| er == r) - }) - .collect(); - - if dataset_and_region.len() != 1 { - error!( - log, - "find_only_new_region saw dataset_and_region len {}: {:?}", - dataset_and_region.len(), - dataset_and_region, - ); - - None - } else { - dataset_and_region.pop() - } -} - async fn srrs_alloc_new_region_undo( sagactx: NexusActionContext, ) -> Result<(), anyhow::Error> { diff --git a/nexus/src/app/sagas/region_snapshot_replacement_start.rs b/nexus/src/app/sagas/region_snapshot_replacement_start.rs new file mode 100644 index 0000000000..941899d862 --- /dev/null +++ b/nexus/src/app/sagas/region_snapshot_replacement_start.rs @@ -0,0 +1,1134 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! In the same way that read/write regions need to be replaced when a physical +//! disk is expunged, read-only regions need to be replaced too: Volumes are in +//! a similarly degraded state when the read-only Downstairs have gone away, and +//! remain in this degraded state until a new Region replaces the one that is +//! gone. +//! +//! It's this saga's responsibility to start that replacement process. This saga +//! handles the following region snapshot replacement request state transitions: +//! +//! ```text +//! Requested <-- +//! | +//! | | +//! v | +//! | +//! Allocating -- +//! +//! | +//! v +//! +//! ReplacementDone +//! ``` +//! +//! The first thing this saga does is set itself as the "operating saga" for the +//! request, and change the state to "Allocating". Then, it performs the +//! following steps: +//! +//! 1. Allocate a new region +//! +//! 2. Create a blank volume that can be later deleted to stash the snapshot +//! being replaced. This is populated in the `volume_replace_snapshot` +//! transaction so that `volume_references` for the corresponding region +//! snapshot remains accurate. +//! +//! 3. For the affected Volume, swap the snapshot being replaced with the new +//! region. +//! +//! 4. Update the region snapshot replacement request by clearing the operating +//! saga id and changing the state to "ReplacementDone". +//! +//! Any unwind will place the state back into Requested. +//! +//! See the documentation for the "region snapshot replacement garbage collect" +//! saga for the next step in the process. + +use super::{ + ActionRegistry, NexusActionContext, NexusSaga, SagaInitError, + ACTION_GENERATE_ID, +}; +use crate::app::db::datastore::ExistingTarget; +use crate::app::db::datastore::RegionAllocationFor; +use crate::app::db::datastore::RegionAllocationParameters; +use crate::app::db::datastore::ReplacementTarget; +use crate::app::db::datastore::VolumeToDelete; +use crate::app::db::datastore::VolumeWithTarget; +use crate::app::db::lookup::LookupPath; +use crate::app::sagas::common_storage::find_only_new_region; +use crate::app::sagas::declare_saga_actions; +use crate::app::RegionAllocationStrategy; +use crate::app::{authn, db}; +use nexus_types::identity::Asset; +use nexus_types::identity::Resource; +use omicron_common::api::external::Error; +use serde::Deserialize; +use serde::Serialize; +use sled_agent_client::types::CrucibleOpts; +use sled_agent_client::types::VolumeConstructionRequest; +use std::net::SocketAddrV6; +use steno::ActionError; +use steno::Node; +use uuid::Uuid; + +// region snapshot replacement start saga: input parameters + +#[derive(Debug, Deserialize, Serialize)] +pub(crate) struct Params { + pub serialized_authn: authn::saga::Serialized, + pub request: db::model::RegionSnapshotReplacement, + pub allocation_strategy: RegionAllocationStrategy, +} + +// region snapshot replacement start saga: actions + +declare_saga_actions! { + region_snapshot_replacement_start; + SET_SAGA_ID -> "unused_1" { + + rsrss_set_saga_id + - rsrss_set_saga_id_undo + } + GET_ALLOC_REGION_PARAMS -> "alloc_region_params" { + + rsrss_get_alloc_region_params + } + ALLOC_NEW_REGION -> "new_datasets_and_regions" { + + rsrss_alloc_new_region + - rsrss_alloc_new_region_undo + } + FIND_NEW_REGION -> "new_dataset_and_region" { + + rsrss_find_new_region + } + NEW_REGION_ENSURE -> "ensured_dataset_and_region" { + + rsrss_new_region_ensure + - rsrss_new_region_ensure_undo + } + GET_OLD_SNAPSHOT_VOLUME_ID -> "old_snapshot_volume_id" { + + rsrss_get_old_snapshot_volume_id + } + CREATE_FAKE_VOLUME -> "unused_2" { + + rsrss_create_fake_volume + - rsrss_create_fake_volume_undo + } + REPLACE_SNAPSHOT_IN_VOLUME -> "unused_3" { + + rsrss_replace_snapshot_in_volume + - rsrss_replace_snapshot_in_volume_undo + } + UPDATE_REQUEST_RECORD -> "unused_4" { + + rsrss_update_request_record + } +} + +// region snapshot replacement start saga: definition + +#[derive(Debug)] +pub(crate) struct SagaRegionSnapshotReplacementStart; +impl NexusSaga for SagaRegionSnapshotReplacementStart { + const NAME: &'static str = "region-snapshot-replacement-start"; + type Params = Params; + + fn register_actions(registry: &mut ActionRegistry) { + region_snapshot_replacement_start_register_actions(registry); + } + + fn make_saga_dag( + _params: &Self::Params, + mut builder: steno::DagBuilder, + ) -> Result { + builder.append(Node::action( + "saga_id", + "GenerateSagaId", + ACTION_GENERATE_ID.as_ref(), + )); + + builder.append(Node::action( + "new_volume_id", + "GenerateNewVolumeId", + ACTION_GENERATE_ID.as_ref(), + )); + + builder.append(set_saga_id_action()); + builder.append(get_alloc_region_params_action()); + builder.append(alloc_new_region_action()); + builder.append(find_new_region_action()); + builder.append(new_region_ensure_action()); + builder.append(get_old_snapshot_volume_id_action()); + builder.append(create_fake_volume_action()); + builder.append(replace_snapshot_in_volume_action()); + builder.append(update_request_record_action()); + + Ok(builder.build()?) + } +} + +// region snapshot replacement start saga: action implementations + +async fn rsrss_set_saga_id( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let saga_id = sagactx.lookup::("saga_id")?; + + // Change the request record here to an intermediate "allocating" state to + // block out other sagas that will be triggered for the same request. This + // avoids Nexus allocating a bunch of replacement read-only regions only to + // unwind all but one. + osagactx + .datastore() + .set_region_snapshot_replacement_allocating( + &opctx, + params.request.id, + saga_id, + ) + .await + .map_err(ActionError::action_failed)?; + + Ok(()) +} + +async fn rsrss_set_saga_id_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let saga_id = sagactx.lookup::("saga_id")?; + + osagactx + .datastore() + .undo_set_region_snapshot_replacement_allocating( + &opctx, + params.request.id, + saga_id, + ) + .await?; + + Ok(()) +} + +#[derive(Debug, Deserialize, Serialize)] +struct AllocRegionParams { + block_size: u64, + blocks_per_extent: u64, + extent_count: u64, + current_allocated_regions: Vec<(db::model::Dataset, db::model::Region)>, + snapshot_id: Uuid, + snapshot_volume_id: Uuid, +} + +async fn rsrss_get_alloc_region_params( + sagactx: NexusActionContext, +) -> Result { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + // Look up the existing snapshot + let (.., db_snapshot) = LookupPath::new(&opctx, &osagactx.datastore()) + .snapshot_id(params.request.old_snapshot_id) + .fetch() + .await + .map_err(ActionError::action_failed)?; + + // Find the region to replace + let db_region = osagactx + .datastore() + .get_region(params.request.old_region_id) + .await + .map_err(ActionError::action_failed)?; + + let current_allocated_regions = osagactx + .datastore() + .get_allocated_regions(db_snapshot.volume_id) + .await + .map_err(ActionError::action_failed)?; + + Ok(AllocRegionParams { + block_size: db_region.block_size().to_bytes(), + blocks_per_extent: db_region.blocks_per_extent(), + extent_count: db_region.extent_count(), + current_allocated_regions, + snapshot_id: db_snapshot.id(), + snapshot_volume_id: db_snapshot.volume_id, + }) +} + +async fn rsrss_alloc_new_region( + sagactx: NexusActionContext, +) -> Result, ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let alloc_region_params = + sagactx.lookup::("alloc_region_params")?; + + // Request an additional region for this snapshot volume. It's important + // _not_ to delete the existing snapshot first, as (if it's still there) + // then the Crucible agent could reuse the allocated port and cause trouble. + let datasets_and_regions = osagactx + .datastore() + .arbitrary_region_allocate( + &opctx, + RegionAllocationFor::SnapshotVolume { + volume_id: alloc_region_params.snapshot_volume_id, + snapshot_id: alloc_region_params.snapshot_id, + }, + RegionAllocationParameters::FromRaw { + block_size: alloc_region_params.block_size, + blocks_per_extent: alloc_region_params.blocks_per_extent, + extent_count: alloc_region_params.extent_count, + }, + ¶ms.allocation_strategy, + alloc_region_params.current_allocated_regions.len() + 1, + ) + .await + .map_err(ActionError::action_failed)?; + + Ok(datasets_and_regions) +} + +async fn rsrss_alloc_new_region_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + let log = osagactx.log(); + + let alloc_region_params = + sagactx.lookup::("alloc_region_params")?; + + let maybe_dataset_and_region = find_only_new_region( + log, + alloc_region_params.current_allocated_regions, + sagactx.lookup::>( + "new_datasets_and_regions", + )?, + ); + + // It should be guaranteed that if rsrss_alloc_new_region succeeded then it + // would have bumped the region redundancy, so we should see something here. + // Guard against the case anyway. + if let Some(dataset_and_region) = maybe_dataset_and_region { + let (_, region) = dataset_and_region; + osagactx + .datastore() + .regions_hard_delete(log, vec![region.id()]) + .await?; + } else { + warn!(&log, "maybe_dataset_and_region is None!"); + } + + Ok(()) +} + +async fn rsrss_find_new_region( + sagactx: NexusActionContext, +) -> Result<(db::model::Dataset, db::model::Region), ActionError> { + let osagactx = sagactx.user_data(); + let log = osagactx.log(); + + let alloc_region_params = + sagactx.lookup::("alloc_region_params")?; + + let maybe_dataset_and_region = find_only_new_region( + log, + alloc_region_params.current_allocated_regions, + sagactx.lookup::>( + "new_datasets_and_regions", + )?, + ); + + let Some(dataset_and_region) = maybe_dataset_and_region else { + return Err(ActionError::action_failed(Error::internal_error( + &format!( + "expected dataset and region, saw {:?}!", + maybe_dataset_and_region, + ), + ))); + }; + + Ok(dataset_and_region) +} + +async fn rsrss_new_region_ensure( + sagactx: NexusActionContext, +) -> Result< + (nexus_db_model::Dataset, crucible_agent_client::types::Region), + ActionError, +> { + let params = sagactx.saga_params::()?; + let osagactx = sagactx.user_data(); + let log = osagactx.log(); + + // With a list of datasets and regions to ensure, other sagas need to have a + // separate no-op forward step for the undo action to ensure that the undo + // step occurs in the case that the ensure partially fails. Here this is not + // required, there's only one dataset and region. + let new_dataset_and_region = sagactx + .lookup::<(db::model::Dataset, db::model::Region)>( + "new_dataset_and_region", + )?; + + let region_snapshot = osagactx + .datastore() + .region_snapshot_get( + params.request.old_dataset_id, + params.request.old_region_id, + params.request.old_snapshot_id, + ) + .await + .map_err(ActionError::action_failed)?; + + let Some(region_snapshot) = region_snapshot else { + return Err(ActionError::action_failed(format!( + "region snapshot {} {} {} deleted!", + params.request.old_dataset_id, + params.request.old_region_id, + params.request.old_snapshot_id, + ))); + }; + + let (new_dataset, new_region) = new_dataset_and_region; + + // Currently, the repair port is set using a fixed offset above the + // downstairs port. Once this goes away, Nexus will require a way to query + // for the repair port! + + let mut source_repair_addr: SocketAddrV6 = + match region_snapshot.snapshot_addr.parse() { + Ok(addr) => addr, + + Err(e) => { + return Err(ActionError::action_failed(format!( + "error parsing region_snapshot.snapshot_addr: {e}" + ))); + } + }; + + source_repair_addr.set_port( + source_repair_addr.port() + crucible_common::REPAIR_PORT_OFFSET, + ); + + let ensured_region = osagactx + .nexus() + .ensure_region_in_dataset( + log, + &new_dataset, + &new_region, + Some(source_repair_addr.to_string()), + ) + .await + .map_err(ActionError::action_failed)?; + + Ok((new_dataset, ensured_region)) +} + +async fn rsrss_new_region_ensure_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let log = sagactx.user_data().log(); + let osagactx = sagactx.user_data(); + + warn!(log, "rsrss_new_region_ensure_undo: Deleting crucible regions"); + + let new_dataset_and_region = sagactx + .lookup::<(db::model::Dataset, db::model::Region)>( + "new_dataset_and_region", + )?; + + osagactx + .nexus() + .delete_crucible_regions(log, vec![new_dataset_and_region]) + .await?; + + Ok(()) +} + +async fn rsrss_get_old_snapshot_volume_id( + sagactx: NexusActionContext, +) -> Result { + // Save the snapshot's original volume ID, because we'll be altering it and + // need the original + + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let (.., db_snapshot) = LookupPath::new(&opctx, &osagactx.datastore()) + .snapshot_id(params.request.old_snapshot_id) + .fetch() + .await + .map_err(ActionError::action_failed)?; + + Ok(db_snapshot.volume_id) +} + +async fn rsrss_create_fake_volume( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + + let new_volume_id = sagactx.lookup::("new_volume_id")?; + + // Create a fake volume record for the old snapshot target. This will be + // deleted after snapshot replacement has finished. It can be completely + // blank here, it will be replaced by `volume_replace_snapshot`. + + let volume_construction_request = VolumeConstructionRequest::Volume { + id: new_volume_id, + block_size: 0, + sub_volumes: vec![VolumeConstructionRequest::Region { + block_size: 0, + blocks_per_extent: 0, + extent_count: 0, + gen: 0, + opts: CrucibleOpts { + id: new_volume_id, + target: vec![], + lossy: false, + flush_timeout: None, + key: None, + cert_pem: None, + key_pem: None, + root_cert_pem: None, + control: None, + read_only: true, + }, + }], + read_only_parent: None, + }; + + let volume_data = serde_json::to_string(&volume_construction_request) + .map_err(|e| { + ActionError::action_failed(Error::internal_error(&e.to_string())) + })?; + + let volume = db::model::Volume::new(new_volume_id, volume_data); + + osagactx + .datastore() + .volume_create(volume) + .await + .map_err(ActionError::action_failed)?; + + Ok(()) +} + +async fn rsrss_create_fake_volume_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + + // Delete the fake volume. + + let new_volume_id = sagactx.lookup::("new_volume_id")?; + osagactx.datastore().volume_hard_delete(new_volume_id).await?; + + Ok(()) +} + +#[derive(Debug)] +struct ReplaceParams { + old_volume_id: Uuid, + old_snapshot_address: SocketAddrV6, + new_region_address: SocketAddrV6, + new_volume_id: Uuid, +} + +async fn get_replace_params( + sagactx: &NexusActionContext, +) -> Result { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let new_volume_id = sagactx.lookup::("new_volume_id")?; + + let region_snapshot = osagactx + .datastore() + .region_snapshot_get( + params.request.old_dataset_id, + params.request.old_region_id, + params.request.old_snapshot_id, + ) + .await + .map_err(ActionError::action_failed)?; + + let Some(region_snapshot) = region_snapshot else { + return Err(ActionError::action_failed(format!( + "region snapshot {} {} {} deleted!", + params.request.old_dataset_id, + params.request.old_region_id, + params.request.old_snapshot_id, + ))); + }; + + let old_snapshot_address: SocketAddrV6 = + match region_snapshot.snapshot_addr.parse() { + Ok(addr) => addr, + + Err(e) => { + return Err(ActionError::action_failed(format!( + "parsing {} as SocketAddrV6 failed: {e}", + region_snapshot.snapshot_addr, + ))); + } + }; + + let (new_dataset, ensured_region) = sagactx.lookup::<( + db::model::Dataset, + crucible_agent_client::types::Region, + )>( + "ensured_dataset_and_region", + )?; + + let Some(new_dataset_address) = new_dataset.address() else { + return Err(ActionError::action_failed(format!( + "dataset {} does not have an address!", + new_dataset.id(), + ))); + }; + + let new_region_address = SocketAddrV6::new( + *new_dataset_address.ip(), + ensured_region.port_number, + 0, + 0, + ); + + let old_volume_id = sagactx.lookup::("old_snapshot_volume_id")?; + + // Return the replacement parameters for the forward action case - the undo + // will swap the existing and replacement target + Ok(ReplaceParams { + old_volume_id, + old_snapshot_address, + new_region_address, + new_volume_id, + }) +} + +async fn rsrss_replace_snapshot_in_volume( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let log = sagactx.user_data().log(); + let osagactx = sagactx.user_data(); + + let replacement_params = get_replace_params(&sagactx).await?; + + info!( + log, + "replacing {} with {} in volume {}", + replacement_params.old_snapshot_address, + replacement_params.new_region_address, + replacement_params.old_volume_id, + ); + + // `volume_replace_snapshot` will swap the old snapshot for the new region. + // No repair or reconcilation needs to occur after this. + osagactx + .datastore() + .volume_replace_snapshot( + VolumeWithTarget(replacement_params.old_volume_id), + ExistingTarget(replacement_params.old_snapshot_address), + ReplacementTarget(replacement_params.new_region_address), + VolumeToDelete(replacement_params.new_volume_id), + ) + .await + .map_err(ActionError::action_failed)?; + + Ok(()) +} + +async fn rsrss_replace_snapshot_in_volume_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + // Undo the forward action's volume_replace_snapshot call by swapping the + // existing target and replacement target parameters. + + let log = sagactx.user_data().log(); + let osagactx = sagactx.user_data(); + + let replacement_params = get_replace_params(&sagactx).await?; + + // Note the old and new are _not_ swapped in this log message! The intention + // is that someone reviewing the logs could search for "replacing UUID with + // UUID in volume UUID" and get (in the case of no re-execution) two + // results. + info!( + log, + "undo: replacing {} with {} in volume {}", + replacement_params.old_snapshot_address, + replacement_params.new_region_address, + replacement_params.old_volume_id, + ); + + osagactx + .datastore() + .volume_replace_snapshot( + VolumeWithTarget(replacement_params.old_volume_id), + ExistingTarget(replacement_params.new_region_address), + ReplacementTarget(replacement_params.old_snapshot_address), + VolumeToDelete(replacement_params.new_volume_id), + ) + .await?; + + Ok(()) +} + +async fn rsrss_update_request_record( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let params = sagactx.saga_params::()?; + let osagactx = sagactx.user_data(); + let datastore = osagactx.datastore(); + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let saga_id = sagactx.lookup::("saga_id")?; + let new_dataset_and_region = sagactx + .lookup::<(db::model::Dataset, db::model::Region)>( + "new_dataset_and_region", + )?; + + let new_region_id = new_dataset_and_region.1.id(); + + let old_region_volume_id = sagactx.lookup::("new_volume_id")?; + + // Now that the region has been ensured and the construction request has + // been updated, update the replacement request record to 'ReplacementDone' + // and clear the operating saga id. There is no undo step for this, it + // should succeed idempotently. + datastore + .set_region_snapshot_replacement_replacement_done( + &opctx, + params.request.id, + saga_id, + new_region_id, + old_region_volume_id, + ) + .await + .map_err(ActionError::action_failed)?; + + Ok(()) +} + +#[cfg(test)] +pub(crate) mod test { + use crate::{ + app::db::lookup::LookupPath, app::db::DataStore, + app::saga::create_saga_dag, + app::sagas::region_snapshot_replacement_start::*, + app::sagas::test_helpers::test_opctx, app::RegionAllocationStrategy, + }; + use nexus_db_model::RegionSnapshotReplacement; + use nexus_db_model::RegionSnapshotReplacementState; + use nexus_db_model::Volume; + use nexus_db_queries::authn::saga::Serialized; + use nexus_db_queries::context::OpContext; + use nexus_test_utils::resource_helpers::create_disk; + use nexus_test_utils::resource_helpers::create_project; + use nexus_test_utils::resource_helpers::create_snapshot; + use nexus_test_utils::resource_helpers::DiskTest; + use nexus_test_utils_macros::nexus_test; + use nexus_types::external_api::views; + use nexus_types::identity::Asset; + use sled_agent_client::types::VolumeConstructionRequest; + + type ControlPlaneTestContext = + nexus_test_utils::ControlPlaneTestContext; + + const DISK_NAME: &str = "my-disk"; + const SNAPSHOT_NAME: &str = "my-snap"; + const PROJECT_NAME: &str = "springfield-squidport"; + + async fn prepare_for_test( + cptestctx: &ControlPlaneTestContext, + ) -> PrepareResult { + let client = &cptestctx.external_client; + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = test_opctx(cptestctx); + + assert_eq!(region_allocations(&datastore).await, 0); + + let mut disk_test = DiskTest::new(cptestctx).await; + disk_test.add_zpool_with_dataset(cptestctx.first_sled()).await; + + assert_eq!(region_allocations(&datastore).await, 0); + + let _project_id = + create_project(&client, PROJECT_NAME).await.identity.id; + + assert_eq!(region_allocations(&datastore).await, 0); + + // Create a disk + let disk = create_disk(&client, PROJECT_NAME, DISK_NAME).await; + + assert_eq!(region_allocations(&datastore).await, 3); + + let disk_id = disk.identity.id; + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk_id) + .fetch() + .await + .unwrap_or_else(|_| panic!("test disk {:?} should exist", disk_id)); + + // Create a snapshot + let snapshot = + create_snapshot(&client, PROJECT_NAME, DISK_NAME, SNAPSHOT_NAME) + .await; + + assert_eq!(region_allocations(&datastore).await, 6); + + let snapshot_id = snapshot.identity.id; + let (.., db_snapshot) = LookupPath::new(&opctx, &datastore) + .snapshot_id(snapshot_id) + .fetch() + .await + .unwrap_or_else(|_| { + panic!("test snapshot {:?} should exist", snapshot_id) + }); + + PrepareResult { db_disk, snapshot, db_snapshot } + } + + struct PrepareResult { + db_disk: nexus_db_model::Disk, + snapshot: views::Snapshot, + db_snapshot: nexus_db_model::Snapshot, + } + + #[nexus_test(server = crate::Server)] + async fn test_region_snapshot_replacement_start_saga( + cptestctx: &ControlPlaneTestContext, + ) { + let PrepareResult { db_disk, snapshot, db_snapshot } = + prepare_for_test(cptestctx).await; + + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = test_opctx(cptestctx); + + // Assert disk has three allocated regions + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + assert_eq!(disk_allocated_regions.len(), 3); + + // Assert the snapshot has zero allocated regions + let snapshot_id = snapshot.identity.id; + + let snapshot_allocated_regions = datastore + .get_allocated_regions(db_snapshot.volume_id) + .await + .unwrap(); + assert_eq!(snapshot_allocated_regions.len(), 0); + + // Replace one of the snapshot's targets + let region: &nexus_db_model::Region = &disk_allocated_regions[0].1; + + let region_snapshot = datastore + .region_snapshot_get(region.dataset_id(), region.id(), snapshot_id) + .await + .unwrap() + .unwrap(); + + // Manually insert the region snapshot replacement request + let request = + RegionSnapshotReplacement::for_region_snapshot(®ion_snapshot); + + datastore + .insert_region_snapshot_replacement_request(&opctx, request.clone()) + .await + .unwrap(); + + // Run the region snapshot replacement start saga + let dag = + create_saga_dag::(Params { + serialized_authn: Serialized::for_opctx(&opctx), + request: request.clone(), + allocation_strategy: RegionAllocationStrategy::Random { + seed: None, + }, + }) + .unwrap(); + + let runnable_saga = nexus.sagas.saga_prepare(dag).await.unwrap(); + + // Actually run the saga + runnable_saga.run_to_completion().await.unwrap(); + + // Validate the state transition + let result = datastore + .get_region_snapshot_replacement_request_by_id(&opctx, request.id) + .await + .unwrap(); + + assert_eq!( + result.replacement_state, + RegionSnapshotReplacementState::ReplacementDone + ); + assert!(result.new_region_id.is_some()); + assert!(result.operating_saga_id.is_none()); + + // Validate number of regions for disk didn't change + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + assert_eq!(disk_allocated_regions.len(), 3); + + // Validate that the snapshot now has one allocated region + let snapshot_allocated_datasets_and_regions = datastore + .get_allocated_regions(db_snapshot.volume_id) + .await + .unwrap(); + + assert_eq!(snapshot_allocated_datasets_and_regions.len(), 1); + + let (_, snapshot_allocated_region) = + &snapshot_allocated_datasets_and_regions[0]; + + // Validate that the snapshot's volume contains this newly allocated + // region + + let new_region_addr = datastore + .region_addr(snapshot_allocated_region.id()) + .await + .unwrap() + .unwrap(); + + let volumes = datastore + .find_volumes_referencing_socket_addr( + &opctx, + new_region_addr.into(), + ) + .await + .unwrap(); + + assert_eq!(volumes.len(), 1); + assert_eq!(volumes[0].id(), db_snapshot.volume_id); + } + + fn new_test_params( + opctx: &OpContext, + request: &RegionSnapshotReplacement, + ) -> Params { + Params { + serialized_authn: Serialized::for_opctx(opctx), + request: request.clone(), + allocation_strategy: RegionAllocationStrategy::Random { + seed: None, + }, + } + } + + pub(crate) async fn verify_clean_slate( + cptestctx: &ControlPlaneTestContext, + request: &RegionSnapshotReplacement, + affected_volume_original: &Volume, + ) { + let datastore = cptestctx.server.server_context().nexus.datastore(); + + crate::app::sagas::test_helpers::assert_no_failed_undo_steps( + &cptestctx.logctx.log, + datastore, + ) + .await; + + // For these tests, six provisioned regions exist: three for the + // original disk, and three for the (currently unused) snapshot + // destination volume + assert_eq!(region_allocations(&datastore).await, 6); + assert_region_snapshot_replacement_request_untouched( + cptestctx, &datastore, &request, + ) + .await; + assert_volume_untouched(&datastore, &affected_volume_original).await; + } + + async fn region_allocations(datastore: &DataStore) -> usize { + use async_bb8_diesel::AsyncConnection; + use async_bb8_diesel::AsyncRunQueryDsl; + use async_bb8_diesel::AsyncSimpleConnection; + use diesel::QueryDsl; + use nexus_db_queries::db::queries::ALLOW_FULL_TABLE_SCAN_SQL; + use nexus_db_queries::db::schema::region::dsl; + + let conn = datastore.pool_connection_for_tests().await.unwrap(); + + conn.transaction_async(|conn| async move { + // Selecting all regions requires a full table scan + conn.batch_execute_async(ALLOW_FULL_TABLE_SCAN_SQL).await.unwrap(); + + dsl::region + .count() + .get_result_async(&conn) + .await + .map(|x: i64| x as usize) + }) + .await + .unwrap() + } + + async fn assert_region_snapshot_replacement_request_untouched( + cptestctx: &ControlPlaneTestContext, + datastore: &DataStore, + request: &RegionSnapshotReplacement, + ) { + let opctx = test_opctx(cptestctx); + let db_request = datastore + .get_region_snapshot_replacement_request_by_id(&opctx, request.id) + .await + .unwrap(); + + assert_eq!(db_request.new_region_id, None); + assert_eq!( + db_request.replacement_state, + RegionSnapshotReplacementState::Requested + ); + assert_eq!(db_request.operating_saga_id, None); + } + + async fn assert_volume_untouched( + datastore: &DataStore, + affected_volume_original: &Volume, + ) { + let affected_volume = datastore + .volume_get(affected_volume_original.id()) + .await + .unwrap() + .unwrap(); + + let actual: VolumeConstructionRequest = + serde_json::from_str(&affected_volume.data()).unwrap(); + + let expected: VolumeConstructionRequest = + serde_json::from_str(&affected_volume_original.data()).unwrap(); + + assert_eq!(actual, expected); + } + + #[nexus_test(server = crate::Server)] + async fn test_action_failure_can_unwind_idempotently( + cptestctx: &ControlPlaneTestContext, + ) { + let PrepareResult { db_disk, snapshot, db_snapshot } = + prepare_for_test(cptestctx).await; + + let log = &cptestctx.logctx.log; + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = test_opctx(cptestctx); + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + assert_eq!(disk_allocated_regions.len(), 3); + + let region: &nexus_db_model::Region = &disk_allocated_regions[0].1; + let snapshot_id = snapshot.identity.id; + + let region_snapshot = datastore + .region_snapshot_get(region.dataset_id(), region.id(), snapshot_id) + .await + .unwrap() + .unwrap(); + + let request = + RegionSnapshotReplacement::for_region_snapshot(®ion_snapshot); + + datastore + .insert_region_snapshot_replacement_request(&opctx, request.clone()) + .await + .unwrap(); + + let affected_volume_original = + datastore.volume_get(db_snapshot.volume_id).await.unwrap().unwrap(); + + verify_clean_slate(&cptestctx, &request, &affected_volume_original) + .await; + + crate::app::sagas::test_helpers::action_failure_can_unwind_idempotently::< + SagaRegionSnapshotReplacementStart, + _, + _ + >( + nexus, + || Box::pin(async { new_test_params(&opctx, &request) }), + || Box::pin(async { + verify_clean_slate( + &cptestctx, + &request, + &affected_volume_original, + ).await; + }), + log + ).await; + } + + #[nexus_test(server = crate::Server)] + async fn test_actions_succeed_idempotently( + cptestctx: &ControlPlaneTestContext, + ) { + let PrepareResult { db_disk, snapshot, db_snapshot: _ } = + prepare_for_test(cptestctx).await; + + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = test_opctx(cptestctx); + + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + assert_eq!(disk_allocated_regions.len(), 3); + + let region: &nexus_db_model::Region = &disk_allocated_regions[0].1; + let snapshot_id = snapshot.identity.id; + + let region_snapshot = datastore + .region_snapshot_get(region.dataset_id(), region.id(), snapshot_id) + .await + .unwrap() + .unwrap(); + + let request = + RegionSnapshotReplacement::for_region_snapshot(®ion_snapshot); + + datastore + .insert_region_snapshot_replacement_request(&opctx, request.clone()) + .await + .unwrap(); + + // Build the saga DAG with the provided test parameters + let params = new_test_params(&opctx, &request); + let dag = create_saga_dag::(params) + .unwrap(); + crate::app::sagas::test_helpers::actions_succeed_idempotently( + nexus, dag, + ) + .await; + } +} diff --git a/nexus/test-utils/src/resource_helpers.rs b/nexus/test-utils/src/resource_helpers.rs index ac7188f232..14180459ab 100644 --- a/nexus/test-utils/src/resource_helpers.rs +++ b/nexus/test-utils/src/resource_helpers.rs @@ -432,6 +432,28 @@ pub async fn create_disk( .await } +pub async fn create_snapshot( + client: &ClientTestContext, + project_name: &str, + disk_name: &str, + snapshot_name: &str, +) -> views::Snapshot { + let snapshots_url = format!("/v1/snapshots?project={}", project_name); + + object_create( + client, + &snapshots_url, + ¶ms::SnapshotCreate { + identity: IdentityMetadataCreateParams { + name: snapshot_name.parse().unwrap(), + description: format!("snapshot {:?}", snapshot_name), + }, + disk: disk_name.to_string().try_into().unwrap(), + }, + ) + .await +} + pub async fn delete_disk( client: &ClientTestContext, project_name: &str, diff --git a/nexus/tests/config.test.toml b/nexus/tests/config.test.toml index 8f65a73204..35b55184b9 100644 --- a/nexus/tests/config.test.toml +++ b/nexus/tests/config.test.toml @@ -137,6 +137,7 @@ lookup_region_port.period_secs = 60 # Therefore, disable the background task during tests. instance_updater.disable = true instance_updater.period_secs = 60 +region_snapshot_replacement_start.period_secs = 30 [default_region_allocation_strategy] # we only have one sled in the test environment, so we need to use the diff --git a/nexus/types/src/internal_api/background.rs b/nexus/types/src/internal_api/background.rs index 6463aa8ab6..2f8a411cf7 100644 --- a/nexus/types/src/internal_api/background.rs +++ b/nexus/types/src/internal_api/background.rs @@ -19,3 +19,12 @@ pub struct LookupRegionPortStatus { pub found_port_ok: Vec, pub errors: Vec, } + +/// The status of a `region_snapshot_replacement_start` background task +/// activation +#[derive(Serialize, Deserialize, Default, Debug, PartialEq, Eq)] +pub struct RegionSnapshotReplacementStartStatus { + pub requests_created_ok: Vec, + pub start_invoked_ok: Vec, + pub errors: Vec, +} diff --git a/smf/nexus/multi-sled/config-partial.toml b/smf/nexus/multi-sled/config-partial.toml index c502c20b1b..437615938f 100644 --- a/smf/nexus/multi-sled/config-partial.toml +++ b/smf/nexus/multi-sled/config-partial.toml @@ -65,6 +65,7 @@ abandoned_vmm_reaper.period_secs = 60 saga_recovery.period_secs = 600 lookup_region_port.period_secs = 60 instance_updater.period_secs = 30 +region_snapshot_replacement_start.period_secs = 30 [default_region_allocation_strategy] # by default, allocate across 3 distinct sleds diff --git a/smf/nexus/single-sled/config-partial.toml b/smf/nexus/single-sled/config-partial.toml index 30a0243122..95dcca14ae 100644 --- a/smf/nexus/single-sled/config-partial.toml +++ b/smf/nexus/single-sled/config-partial.toml @@ -65,6 +65,7 @@ abandoned_vmm_reaper.period_secs = 60 saga_recovery.period_secs = 600 lookup_region_port.period_secs = 60 instance_updater.period_secs = 30 +region_snapshot_replacement_start.period_secs = 30 [default_region_allocation_strategy] # by default, allocate without requirement for distinct sleds. diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 1774dd7b5c..8040e2e9e9 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -133,6 +133,7 @@ bstr-6f8ce4dd05d13bba = { package = "bstr", version = "0.2.17" } bstr-dff4ba8e3ae991db = { package = "bstr", version = "1.9.1" } byteorder = { version = "1.5.0" } bytes = { version = "1.7.1", features = ["serde"] } +cc = { version = "1.0.97", default-features = false, features = ["parallel"] } chrono = { version = "0.4.38", features = ["serde"] } cipher = { version = "0.4.4", default-features = false, features = ["block-padding", "zeroize"] } clap = { version = "4.5.15", features = ["cargo", "derive", "env", "wrap_help"] } @@ -229,6 +230,7 @@ zeroize = { version = "1.7.0", features = ["std", "zeroize_derive"] } dof = { version = "0.3.0", default-features = false, features = ["des"] } linux-raw-sys = { version = "0.4.13", default-features = false, features = ["elf", "errno", "general", "ioctl", "no_std", "std", "system"] } mio = { version = "0.8.11", features = ["net", "os-ext"] } +nix = { version = "0.28.0", features = ["feature", "fs", "ioctl", "poll", "signal", "term", "uio"] } once_cell = { version = "1.19.0" } rustix = { version = "0.38.34", features = ["fs", "stdio", "system", "termios"] } signal-hook-mio = { version = "0.2.4", default-features = false, features = ["support-v0_8", "support-v1_0"] } @@ -237,30 +239,35 @@ signal-hook-mio = { version = "0.2.4", default-features = false, features = ["su dof = { version = "0.3.0", default-features = false, features = ["des"] } linux-raw-sys = { version = "0.4.13", default-features = false, features = ["elf", "errno", "general", "ioctl", "no_std", "std", "system"] } mio = { version = "0.8.11", features = ["net", "os-ext"] } +nix = { version = "0.28.0", features = ["feature", "fs", "ioctl", "poll", "signal", "term", "uio"] } once_cell = { version = "1.19.0" } rustix = { version = "0.38.34", features = ["fs", "stdio", "system", "termios"] } signal-hook-mio = { version = "0.2.4", default-features = false, features = ["support-v0_8", "support-v1_0"] } [target.x86_64-apple-darwin.dependencies] mio = { version = "0.8.11", features = ["net", "os-ext"] } +nix = { version = "0.28.0", features = ["feature", "fs", "ioctl", "poll", "signal", "term", "uio"] } once_cell = { version = "1.19.0" } rustix = { version = "0.38.34", features = ["fs", "stdio", "system", "termios"] } signal-hook-mio = { version = "0.2.4", default-features = false, features = ["support-v0_8", "support-v1_0"] } [target.x86_64-apple-darwin.build-dependencies] mio = { version = "0.8.11", features = ["net", "os-ext"] } +nix = { version = "0.28.0", features = ["feature", "fs", "ioctl", "poll", "signal", "term", "uio"] } once_cell = { version = "1.19.0" } rustix = { version = "0.38.34", features = ["fs", "stdio", "system", "termios"] } signal-hook-mio = { version = "0.2.4", default-features = false, features = ["support-v0_8", "support-v1_0"] } [target.aarch64-apple-darwin.dependencies] mio = { version = "0.8.11", features = ["net", "os-ext"] } +nix = { version = "0.28.0", features = ["feature", "fs", "ioctl", "poll", "signal", "term", "uio"] } once_cell = { version = "1.19.0" } rustix = { version = "0.38.34", features = ["fs", "stdio", "system", "termios"] } signal-hook-mio = { version = "0.2.4", default-features = false, features = ["support-v0_8", "support-v1_0"] } [target.aarch64-apple-darwin.build-dependencies] mio = { version = "0.8.11", features = ["net", "os-ext"] } +nix = { version = "0.28.0", features = ["feature", "fs", "ioctl", "poll", "signal", "term", "uio"] } once_cell = { version = "1.19.0" } rustix = { version = "0.38.34", features = ["fs", "stdio", "system", "termios"] } signal-hook-mio = { version = "0.2.4", default-features = false, features = ["support-v0_8", "support-v1_0"] } @@ -268,6 +275,7 @@ signal-hook-mio = { version = "0.2.4", default-features = false, features = ["su [target.x86_64-unknown-illumos.dependencies] dof = { version = "0.3.0", default-features = false, features = ["des"] } mio = { version = "0.8.11", features = ["net", "os-ext"] } +nix = { version = "0.28.0", features = ["feature", "fs", "ioctl", "poll", "signal", "term", "uio"] } once_cell = { version = "1.19.0" } rustix = { version = "0.38.34", features = ["fs", "stdio", "system", "termios"] } signal-hook-mio = { version = "0.2.4", default-features = false, features = ["support-v0_8", "support-v1_0"] } @@ -277,6 +285,7 @@ toml_edit-cdcf2f9584511fe6 = { package = "toml_edit", version = "0.19.15", featu [target.x86_64-unknown-illumos.build-dependencies] dof = { version = "0.3.0", default-features = false, features = ["des"] } mio = { version = "0.8.11", features = ["net", "os-ext"] } +nix = { version = "0.28.0", features = ["feature", "fs", "ioctl", "poll", "signal", "term", "uio"] } once_cell = { version = "1.19.0" } rustix = { version = "0.38.34", features = ["fs", "stdio", "system", "termios"] } signal-hook-mio = { version = "0.2.4", default-features = false, features = ["support-v0_8", "support-v1_0"] }