diff --git a/common/src/nexus_config.rs b/common/src/nexus_config.rs index e987790a21..24f4c34797 100644 --- a/common/src/nexus_config.rs +++ b/common/src/nexus_config.rs @@ -334,6 +334,8 @@ pub struct BackgroundTaskConfig { pub inventory: InventoryConfig, /// configuration for phantom disks task pub phantom_disks: PhantomDiskConfig, + /// configuration for blueprint related tasks + pub blueprints: BlueprintTasksConfig, /// configuration for service zone nat sync task pub sync_service_zone_nat: SyncServiceZoneNatConfig, /// configuration for the bfd manager task @@ -428,6 +430,20 @@ pub struct PhantomDiskConfig { pub period_secs: Duration, } +#[serde_as] +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct BlueprintTasksConfig { + /// period (in seconds) for periodic activations of the background task that + /// reads the latest target blueprint from the database + #[serde_as(as = "DurationSeconds")] + pub period_secs_load: Duration, + + /// period (in seconds) for periodic activations of the background task that + /// executes the latest target blueprint + #[serde_as(as = "DurationSeconds")] + pub period_secs_execute: Duration, +} + /// Configuration for a nexus server #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct PackageConfig { @@ -528,12 +544,12 @@ impl std::fmt::Display for SchemeName { mod test { use super::{ default_techport_external_server_port, AuthnConfig, - BackgroundTaskConfig, Config, ConfigDropshotWithTls, ConsoleConfig, - Database, DeploymentConfig, DnsTasksConfig, DpdConfig, - ExternalEndpointsConfig, InternalDns, InventoryConfig, LoadError, - LoadErrorKind, MgdConfig, NatCleanupConfig, PackageConfig, - PhantomDiskConfig, SchemeName, TimeseriesDbConfig, Tunables, - UpdatesConfig, + BackgroundTaskConfig, BlueprintTasksConfig, Config, + ConfigDropshotWithTls, ConsoleConfig, Database, DeploymentConfig, + DnsTasksConfig, DpdConfig, ExternalEndpointsConfig, InternalDns, + InventoryConfig, LoadError, LoadErrorKind, MgdConfig, NatCleanupConfig, + PackageConfig, PhantomDiskConfig, SchemeName, TimeseriesDbConfig, + Tunables, UpdatesConfig, }; use crate::address::{Ipv6Subnet, RACK_PREFIX}; use crate::api::internal::shared::SwitchLocation; @@ -687,6 +703,8 @@ mod test { inventory.nkeep = 11 inventory.disable = false phantom_disks.period_secs = 30 + blueprints.period_secs_load = 10 + blueprints.period_secs_execute = 60 sync_service_zone_nat.period_secs = 30 [default_region_allocation_strategy] type = "random" @@ -795,6 +813,10 @@ mod test { phantom_disks: PhantomDiskConfig { period_secs: Duration::from_secs(30), }, + blueprints: BlueprintTasksConfig { + period_secs_load: Duration::from_secs(10), + period_secs_execute: Duration::from_secs(60) + }, sync_service_zone_nat: SyncServiceZoneNatConfig { period_secs: Duration::from_secs(30) } @@ -857,6 +879,8 @@ mod test { inventory.nkeep = 3 inventory.disable = false phantom_disks.period_secs = 30 + blueprints.period_secs_load = 10 + blueprints.period_secs_execute = 60 sync_service_zone_nat.period_secs = 30 [default_region_allocation_strategy] type = "random" diff --git a/dev-tools/omdb/src/bin/omdb/nexus.rs b/dev-tools/omdb/src/bin/omdb/nexus.rs index ea89923caa..f00c05f1ec 100644 --- a/dev-tools/omdb/src/bin/omdb/nexus.rs +++ b/dev-tools/omdb/src/bin/omdb/nexus.rs @@ -256,6 +256,8 @@ async fn cmd_nexus_background_tasks_show( "dns_servers_external", "dns_propagation_external", "nat_v4_garbage_collector", + "blueprint_loader", + "blueprint_executor", ] { if let Some(bgtask) = tasks.remove(name) { print_task(&bgtask); diff --git a/dev-tools/omdb/tests/env.out b/dev-tools/omdb/tests/env.out index 878b3f04dd..72e9d2e8fc 100644 --- a/dev-tools/omdb/tests/env.out +++ b/dev-tools/omdb/tests/env.out @@ -28,6 +28,14 @@ task: "bfd_manager" switches +task: "blueprint_executor" + Executes the target blueprint + + +task: "blueprint_loader" + Loads the current target blueprint from the DB + + task: "dns_config_external" watches external DNS data stored in CockroachDB @@ -106,6 +114,14 @@ task: "bfd_manager" switches +task: "blueprint_executor" + Executes the target blueprint + + +task: "blueprint_loader" + Loads the current target blueprint from the DB + + task: "dns_config_external" watches external DNS data stored in CockroachDB @@ -171,6 +187,14 @@ task: "bfd_manager" switches +task: "blueprint_executor" + Executes the target blueprint + + +task: "blueprint_loader" + Loads the current target blueprint from the DB + + task: "dns_config_external" watches external DNS data stored in CockroachDB diff --git a/dev-tools/omdb/tests/successes.out b/dev-tools/omdb/tests/successes.out index e5a38049f3..dc77ade735 100644 --- a/dev-tools/omdb/tests/successes.out +++ b/dev-tools/omdb/tests/successes.out @@ -222,6 +222,14 @@ task: "bfd_manager" switches +task: "blueprint_executor" + Executes the target blueprint + + +task: "blueprint_loader" + Loads the current target blueprint from the DB + + task: "dns_config_external" watches external DNS data stored in CockroachDB @@ -344,6 +352,20 @@ task: "nat_v4_garbage_collector" started at (s ago) and ran for ms warning: unknown background task: "nat_v4_garbage_collector" (don't know how to interpret details: Null) +task: "blueprint_loader" + configured period: every 1m 40s + currently executing: no + last completed activation: iter 2, triggered by an explicit signal + started at (s ago) and ran for ms +warning: unknown background task: "blueprint_loader" (don't know how to interpret details: Object {"status": String("no target blueprint")}) + +task: "blueprint_executor" + configured period: every 10m + currently executing: no + last completed activation: iter 2, triggered by an explicit signal + started at (s ago) and ran for ms + last completion reported error: no blueprint + task: "bfd_manager" configured period: every 30s currently executing: no diff --git a/nexus/examples/config.toml b/nexus/examples/config.toml index 1cfe3ae8a2..4263c34f3d 100644 --- a/nexus/examples/config.toml +++ b/nexus/examples/config.toml @@ -106,6 +106,8 @@ inventory.nkeep = 5 # Disable inventory collection altogether (for emergencies) inventory.disable = false phantom_disks.period_secs = 30 +blueprints.period_secs_load = 10 +blueprints.period_secs_execute = 60 sync_service_zone_nat.period_secs = 30 [default_region_allocation_strategy] diff --git a/nexus/src/app/background/blueprint_execution.rs b/nexus/src/app/background/blueprint_execution.rs new file mode 100644 index 0000000000..8d6ea8d8ce --- /dev/null +++ b/nexus/src/app/background/blueprint_execution.rs @@ -0,0 +1,430 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Background task for realizing a plan blueprint + +use super::common::BackgroundTask; +use anyhow::Context; +use futures::future::BoxFuture; +use futures::stream; +use futures::FutureExt; +use futures::StreamExt; +use nexus_db_queries::context::OpContext; +use nexus_db_queries::db::lookup::LookupPath; +use nexus_db_queries::db::DataStore; +use nexus_types::deployment::{Blueprint, BlueprintTarget, OmicronZonesConfig}; +use serde_json::json; +use sled_agent_client::Client as SledAgentClient; +use slog::Logger; +use std::collections::BTreeMap; +use std::sync::Arc; +use tokio::sync::watch; +use uuid::Uuid; + +/// Background task that takes a [`Blueprint`] and realizes the change to +/// the state of the system based on the `Blueprint`. +pub struct BlueprintExecutor { + datastore: Arc, + rx_blueprint: watch::Receiver>>, +} + +impl BlueprintExecutor { + pub fn new( + datastore: Arc, + rx_blueprint: watch::Receiver< + Option>, + >, + ) -> BlueprintExecutor { + BlueprintExecutor { datastore, rx_blueprint } + } + + // This is a modified copy of the functionality from `nexus/src/app/sled.rs`. + // There's no good way to access this functionality right now since it is a + // method on the `Nexus` type. We want to have a more constrained type we can + // pass into background tasks for this type of functionality, but for now we + // just copy the functionality. + async fn sled_client( + &self, + opctx: &OpContext, + sled_id: &Uuid, + ) -> Result { + let (.., sled) = LookupPath::new(opctx, &self.datastore) + .sled_id(*sled_id) + .fetch() + .await + .with_context(|| { + format!( + "Failed to create sled_agent::Client for sled_id: {}", + sled_id + ) + })?; + let dur = std::time::Duration::from_secs(60); + let client = reqwest::ClientBuilder::new() + .connect_timeout(dur) + .timeout(dur) + .build() + .unwrap(); + Ok(SledAgentClient::new_with_client( + &format!("http://{}", sled.address()), + client, + opctx.log.clone(), + )) + } + + async fn realize_blueprint( + &self, + opctx: &OpContext, + blueprint: &Blueprint, + ) -> Result<(), Vec> { + let log = opctx.log.new(o!("comment" => blueprint.comment.clone())); + self.deploy_zones(&log, opctx, &blueprint.omicron_zones).await + } + + async fn deploy_zones( + &self, + log: &Logger, + opctx: &OpContext, + zones: &BTreeMap, + ) -> Result<(), Vec> { + let errors: Vec<_> = stream::iter(zones.clone()) + .filter_map(|(sled_id, config)| async move { + let client = match self.sled_client(&opctx, &sled_id).await { + Ok(client) => client, + Err(err) => { + warn!(log, "{err:#}"); + return Some(err); + } + }; + let result = client + .omicron_zones_put(&config) + .await + .with_context(|| { + format!("Failed to put {config:#?} to sled {sled_id}") + }); + + match result { + Err(error) => { + warn!(log, "{error:#}"); + Some(error) + } + Ok(_) => { + info!( + log, + "Successfully deployed zones for sled agent"; + "sled_id" => %sled_id, + "generation" => config.generation.to_string() + ); + None + } + } + }) + .collect() + .await; + + if errors.is_empty() { + Ok(()) + } else { + Err(errors) + } + } +} + +impl BackgroundTask for BlueprintExecutor { + fn activate<'a>( + &'a mut self, + opctx: &'a OpContext, + ) -> BoxFuture<'a, serde_json::Value> { + async { + // Get the latest blueprint, cloning to prevent holding a read lock + // on the watch. + let update = self.rx_blueprint.borrow_and_update().clone(); + + let Some(update) = update else { + warn!(&opctx.log, + "Blueprint execution: skipped"; + "reason" => "no blueprint"); + return json!({"error": "no blueprint" }); + }; + + let (bp_target, blueprint) = &*update; + if !bp_target.enabled { + warn!(&opctx.log, + "Blueprint execution: skipped"; + "reason" => "blueprint disabled", + "target_id" => %blueprint.id); + return json!({ + "target_id": blueprint.id.to_string(), + "error": "blueprint disabled" + }); + } + + let result = self.realize_blueprint(opctx, blueprint).await; + + // Return the result as a `serde_json::Value` + match result { + Ok(()) => json!({}), + Err(errors) => { + let errors: Vec<_> = errors + .into_iter() + .map(|e| format!("{:#}", e)) + .collect(); + json!({ + "target_id": blueprint.id.to_string(), + "errors": errors + }) + } + } + } + .boxed() + } +} +#[cfg(test)] +mod test { + use super::*; + use crate::app::background::common::BackgroundTask; + use httptest::matchers::{all_of, json_decoded, request}; + use httptest::responders::status_code; + use httptest::Expectation; + use nexus_db_model::{ + ByteCount, SledBaseboard, SledSystemHardware, SledUpdate, + }; + use nexus_test_utils_macros::nexus_test; + use nexus_types::inventory::{ + OmicronZoneConfig, OmicronZoneDataset, OmicronZoneType, + }; + use omicron_common::api::external::Generation; + use serde::Deserialize; + use std::collections::BTreeSet; + use std::net::SocketAddr; + + type ControlPlaneTestContext = + nexus_test_utils::ControlPlaneTestContext; + + fn create_blueprint( + omicron_zones: BTreeMap, + ) -> (BlueprintTarget, Blueprint) { + let id = Uuid::new_v4(); + ( + BlueprintTarget { + target_id: id, + enabled: true, + time_made_target: chrono::Utc::now(), + }, + Blueprint { + id, + omicron_zones, + zones_in_service: BTreeSet::new(), + parent_blueprint_id: None, + time_created: chrono::Utc::now(), + creator: "test".to_string(), + comment: "test blueprint".to_string(), + }, + ) + } + + #[nexus_test(server = crate::Server)] + async fn test_deploy_omicron_zones(cptestctx: &ControlPlaneTestContext) { + let nexus = &cptestctx.server.apictx().nexus; + let datastore = nexus.datastore(); + let opctx = OpContext::for_tests( + cptestctx.logctx.log.clone(), + datastore.clone(), + ); + + let (blueprint_tx, blueprint_rx) = watch::channel(None); + let mut task = BlueprintExecutor::new(datastore.clone(), blueprint_rx); + + // With no blueprint we should fail with an appropriate message. + let value = task.activate(&opctx).await; + assert_eq!(value, json!({"error": "no blueprint"})); + + // Get a success (empty) result back when the blueprint has an empty set of zones + let blueprint = Arc::new(create_blueprint(BTreeMap::new())); + blueprint_tx.send(Some(blueprint)).unwrap(); + let value = task.activate(&opctx).await; + assert_eq!(value, json!({})); + + // Create some fake sled-agent servers to respond to zone puts and add + // sleds to CRDB. + let mut s1 = httptest::Server::run(); + let mut s2 = httptest::Server::run(); + let sled_id1 = Uuid::new_v4(); + let sled_id2 = Uuid::new_v4(); + let rack_id = Uuid::new_v4(); + for (i, (sled_id, server)) in + [(sled_id1, &s1), (sled_id2, &s2)].iter().enumerate() + { + let SocketAddr::V6(addr) = server.addr() else { + panic!("Expected Ipv6 address. Got {}", server.addr()); + }; + let update = SledUpdate::new( + *sled_id, + addr, + SledBaseboard { + serial_number: i.to_string(), + part_number: "test".into(), + revision: 1, + }, + SledSystemHardware { + is_scrimlet: false, + usable_hardware_threads: 4, + usable_physical_ram: ByteCount(1000.into()), + reservoir_size: ByteCount(999.into()), + }, + rack_id, + ); + datastore + .sled_upsert(update) + .await + .expect("Failed to insert sled to db"); + } + + // The particular dataset doesn't matter for this test. + // We re-use the same one to not obfuscate things + let dataset = OmicronZoneDataset { + pool_name: format!("oxp_{}", Uuid::new_v4()).parse().unwrap(), + }; + + let generation = Generation::new(); + + // Zones are updated in a particular order, but each request contains + // the full set of zones that must be running. + // See `rack_setup::service::ServiceInner::run` for more details. + let mut zones = OmicronZonesConfig { + generation, + zones: vec![OmicronZoneConfig { + id: Uuid::new_v4(), + underlay_address: "::1".parse().unwrap(), + zone_type: OmicronZoneType::InternalDns { + dataset, + dns_address: "oh-hello-internal-dns".into(), + gz_address: "::1".parse().unwrap(), + gz_address_index: 0, + http_address: "some-ipv6-address".into(), + }, + }], + }; + + // Create a blueprint with only the `InternalDns` zone for both servers + // We reuse the same `OmicronZonesConfig` because the details don't + // matter for this test. + let blueprint = Arc::new(create_blueprint(BTreeMap::from([ + (sled_id1, zones.clone()), + (sled_id2, zones.clone()), + ]))); + + // Send the blueprint with the first set of zones to the task + blueprint_tx.send(Some(blueprint)).unwrap(); + + // Check that the initial requests were sent to the fake sled-agents + for s in [&mut s1, &mut s2] { + s.expect( + Expectation::matching(all_of![ + request::method_path("PUT", "/omicron-zones",), + // Our generation number should be 1 and there should + // be only a single zone. + request::body(json_decoded(|c: &OmicronZonesConfig| { + c.generation == 1u32.into() && c.zones.len() == 1 + })) + ]) + .respond_with(status_code(204)), + ); + } + + // Activate the task to trigger zone configuration on the sled-agents + let value = task.activate(&opctx).await; + assert_eq!(value, json!({})); + s1.verify_and_clear(); + s2.verify_and_clear(); + + // Do it again. This should trigger the same request. + for s in [&mut s1, &mut s2] { + s.expect( + Expectation::matching(request::method_path( + "PUT", + "/omicron-zones", + )) + .respond_with(status_code(204)), + ); + } + let value = task.activate(&opctx).await; + assert_eq!(value, json!({})); + s1.verify_and_clear(); + s2.verify_and_clear(); + + // Take another lap, but this time, have one server fail the request and + // try again. + s1.expect( + Expectation::matching(request::method_path( + "PUT", + "/omicron-zones", + )) + .respond_with(status_code(204)), + ); + s2.expect( + Expectation::matching(request::method_path( + "PUT", + "/omicron-zones", + )) + .respond_with(status_code(500)), + ); + + // Define a type we can use to pick stuff out of error objects. + #[derive(Deserialize)] + struct ErrorResult { + errors: Vec, + } + + let value = task.activate(&opctx).await; + println!("{:?}", value); + let result: ErrorResult = serde_json::from_value(value).unwrap(); + assert_eq!(result.errors.len(), 1); + assert!( + result.errors[0].starts_with("Failed to put OmicronZonesConfig") + ); + s1.verify_and_clear(); + s2.verify_and_clear(); + + // Add an `InternalNtp` zone for our next update + zones.generation = generation.next(); + zones.zones.push(OmicronZoneConfig { + id: Uuid::new_v4(), + underlay_address: "::1".parse().unwrap(), + zone_type: OmicronZoneType::InternalNtp { + address: "::1".into(), + dns_servers: vec!["::1".parse().unwrap()], + domain: None, + ntp_servers: vec!["some-ntp-server-addr".into()], + }, + }); + + // Update our watch channel + let blueprint = Arc::new(create_blueprint(BTreeMap::from([ + (sled_id1, zones.clone()), + (sled_id2, zones.clone()), + ]))); + blueprint_tx.send(Some(blueprint)).unwrap(); + + // Set our new expectations + for s in [&mut s1, &mut s2] { + s.expect( + Expectation::matching(all_of![ + request::method_path("PUT", "/omicron-zones",), + // Our generation number should be bumped and there should + // be two zones. + request::body(json_decoded(|c: &OmicronZonesConfig| { + c.generation == 2u32.into() && c.zones.len() == 2 + })) + ]) + .respond_with(status_code(204)), + ); + } + + // Activate the task + let value = task.activate(&opctx).await; + assert_eq!(value, json!({})); + s1.verify_and_clear(); + s2.verify_and_clear(); + } +} diff --git a/nexus/src/app/background/blueprint_load.rs b/nexus/src/app/background/blueprint_load.rs new file mode 100644 index 0000000000..c34d2ab103 --- /dev/null +++ b/nexus/src/app/background/blueprint_load.rs @@ -0,0 +1,291 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Background task for loading the target blueprint from the DB +//! +//! This task triggers the `blueprint_execution` background task when the +//! blueprint changes. + +use super::common::BackgroundTask; +use futures::future::BoxFuture; +use futures::FutureExt; +use nexus_db_queries::context::OpContext; +use nexus_db_queries::db::DataStore; +use nexus_types::deployment::{Blueprint, BlueprintTarget}; +use serde_json::json; +use std::sync::Arc; +use tokio::sync::watch; + +pub struct TargetBlueprintLoader { + datastore: Arc, + last: Option>, + tx: watch::Sender>>, +} + +impl TargetBlueprintLoader { + pub fn new(datastore: Arc) -> TargetBlueprintLoader { + let (tx, _) = watch::channel(None); + TargetBlueprintLoader { datastore, last: None, tx } + } + + /// Expose the target blueprint + pub fn watcher( + &self, + ) -> watch::Receiver>> { + self.tx.subscribe() + } +} + +impl BackgroundTask for TargetBlueprintLoader { + fn activate<'a>( + &'a mut self, + opctx: &'a OpContext, + ) -> BoxFuture<'a, serde_json::Value> { + async { + // Set up a logger for this activation that includes metadata about + // the current target. + let log = match &self.last { + None => opctx.log.clone(), + Some(old) => opctx.log.new(o!( + "original_target_id" => old.1.id.to_string(), + "original_time_created" => old.1.time_created.to_string(), + )), + }; + + // Retrieve the latest target blueprint + let result = + self.datastore.blueprint_target_get_current_full(opctx).await; + + // Decide what to do with the result + match (&mut self.last, result) { + (_, Err(error)) => { + // We failed to read the blueprint. There's nothing to do + // but log an error. We'll retry when we're activated again. + let message = format!("{:#}", error); + warn!( + &log, + "failed to read target blueprint"; + "error" => &message + ); + let e = + format!("failed to read target blueprint: {message}"); + json!({"error": e}) + } + (None, Ok(None)) => { + // We haven't found a blueprint yet. Do nothing. + json!({"status": "no target blueprint"}) + } + (Some(old), Ok(None)) => { + // We have transitioned from having a blueprint to not + // having one. This should not happen. + let message = format!( + "target blueprint with id {} was removed. There is no \ + longer any target blueprint", + old.1.id + ); + let old_id = old.1.id.to_string(); + self.last = None; + self.tx.send_replace(self.last.clone()); + error!(&log, "{message:?}"); + json!({ + "removed_target_id": old_id, + "status": "no target blueprint (removed)", + "error": message + }) + } + (None, Ok(Some((new_bp_target, new_blueprint)))) => { + // We've found a target blueprint for the first time. + // Save it and notify any watchers. + let target_id = new_blueprint.id.to_string(); + let time_created = new_blueprint.time_created.to_string(); + info!( + log, + "found new target blueprint (first find)"; + "target_id" => &target_id, + "time_created" => &time_created + ); + self.last = Some(Arc::new((new_bp_target, new_blueprint))); + self.tx.send_replace(self.last.clone()); + json!({ + "target_id": target_id, + "time_created": time_created, + "time_found": chrono::Utc::now().to_string(), + "status": "first target blueprint" + }) + } + (Some(old), Ok(Some((new_bp_target, new_blueprint)))) => { + let target_id = new_blueprint.id.to_string(); + let time_created = new_blueprint.time_created.to_string(); + if old.1.id != new_blueprint.id { + // The current target blueprint has been updated + info!( + log, + "found new target blueprint"; + "target_id" => &target_id, + "time_created" => &time_created + ); + self.last = + Some(Arc::new((new_bp_target, new_blueprint))); + self.tx.send_replace(self.last.clone()); + json!({ + "target_id": target_id, + "time_created": time_created, + "time_found": chrono::Utc::now().to_string(), + "status": "target blueprint updated" + }) + } else { + // The new target id matches the old target id + // + // Let's see if the blueprints hold the same contents. + // It should not be possible for the contents of a + // blueprint to change, but we check to catch possible + // bugs further up the stack. + if old.1 != new_blueprint { + let message = format!( + "blueprint for id {} changed. \ + Blueprints are supposed to be immutable.", + target_id + ); + error!(&log, "{}", message); + json!({ + "target_id": target_id, + "status": "target blueprint unchanged (error)", + "error": message + }) + } else { + // We found a new target blueprint that exactly matches + // the old target blueprint. This is the common case + // when we're activated by a timeout. + debug!( + log, + "found latest target blueprint (unchanged)"; + "target_id" => &target_id, + "time_created" => &time_created.clone() + ); + json!({ + "target_id": target_id, + "time_created": time_created, + "status": "target blueprint unchanged" + }) + } + } + } + } + } + .boxed() + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::app::background::common::BackgroundTask; + use nexus_inventory::now_db_precision; + use nexus_test_utils_macros::nexus_test; + use nexus_types::deployment::{Blueprint, BlueprintTarget}; + use serde::Deserialize; + use std::collections::{BTreeMap, BTreeSet}; + use uuid::Uuid; + + type ControlPlaneTestContext = + nexus_test_utils::ControlPlaneTestContext; + + fn create_blueprint( + parent_blueprint_id: Option, + ) -> (BlueprintTarget, Blueprint) { + let id = Uuid::new_v4(); + ( + BlueprintTarget { + target_id: id, + enabled: true, + time_made_target: now_db_precision(), + }, + Blueprint { + id, + omicron_zones: BTreeMap::new(), + zones_in_service: BTreeSet::new(), + parent_blueprint_id, + time_created: now_db_precision(), + creator: "test".to_string(), + comment: "test blueprint".to_string(), + }, + ) + } + + #[derive(Deserialize)] + #[allow(unused)] + struct TargetUpdate { + pub target_id: Uuid, + pub time_created: chrono::DateTime, + pub time_found: Option>, + pub status: String, + } + + #[nexus_test(server = crate::Server)] + async fn test_load_blueprints(cptestctx: &ControlPlaneTestContext) { + let nexus = &cptestctx.server.apictx().nexus; + let datastore = nexus.datastore(); + let opctx = OpContext::for_tests( + cptestctx.logctx.log.clone(), + datastore.clone(), + ); + + let mut task = TargetBlueprintLoader::new(datastore.clone()); + let mut rx = task.watcher(); + + // We expect an appropriate status with no blueprint in the datastore + let value = task.activate(&opctx).await; + assert_eq!(json!({"status": "no target blueprint"}), value); + assert!(rx.borrow().is_none()); + + let (target, blueprint) = create_blueprint(None); + + // Inserting a blueprint, but not making it the target returns the same + // status + datastore.blueprint_insert(&opctx, &blueprint).await.unwrap(); + let value = task.activate(&opctx).await; + assert_eq!(json!({"status": "no target blueprint"}), value); + assert!(rx.borrow().is_none()); + + // Setting a target blueprint makes the loader see it and broadcast it + datastore.blueprint_target_set_current(&opctx, target).await.unwrap(); + let value = task.activate(&opctx).await; + let update = serde_json::from_value::(value).unwrap(); + assert_eq!(update.target_id, blueprint.id); + assert_eq!(update.status, "first target blueprint"); + let rx_update = rx.borrow_and_update().clone().unwrap(); + assert_eq!(rx_update.0, target); + assert_eq!(rx_update.1, blueprint); + + // Activation without changing the target blueprint results in no update + let value = task.activate(&opctx).await; + let update = serde_json::from_value::(value).unwrap(); + assert_eq!(update.target_id, blueprint.id); + assert_eq!(update.status, "target blueprint unchanged"); + assert_eq!(false, rx.has_changed().unwrap()); + + // Adding a new blueprint and updating the target triggers a change + let (new_target, new_blueprint) = create_blueprint(Some(blueprint.id)); + datastore.blueprint_insert(&opctx, &new_blueprint).await.unwrap(); + datastore + .blueprint_target_set_current(&opctx, new_target) + .await + .unwrap(); + let value = task.activate(&opctx).await; + let update = serde_json::from_value::(value).unwrap(); + assert_eq!(update.target_id, new_blueprint.id); + assert_eq!(update.status, "target blueprint updated"); + let rx_update = rx.borrow_and_update().clone().unwrap(); + assert_eq!(rx_update.0, new_target); + assert_eq!(rx_update.1, new_blueprint); + + // Activating again without changing the target blueprint results in + // no update + let value = task.activate(&opctx).await; + let update = serde_json::from_value::(value).unwrap(); + assert_eq!(update.target_id, new_blueprint.id); + assert_eq!(update.status, "target blueprint unchanged"); + assert_eq!(false, rx.has_changed().unwrap()); + } +} diff --git a/nexus/src/app/background/common.rs b/nexus/src/app/background/common.rs index 4fcce74714..f954a35639 100644 --- a/nexus/src/app/background/common.rs +++ b/nexus/src/app/background/common.rs @@ -408,7 +408,7 @@ impl TaskExec { start_time, start_instant, reason, - iteration: iteration, + iteration, }); }); diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index 6eacb07dfa..95fe5c933e 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -5,6 +5,8 @@ //! Background task initialization use super::bfd; +use super::blueprint_execution; +use super::blueprint_load; use super::common; use super::dns_config; use super::dns_propagation; @@ -62,6 +64,12 @@ pub struct BackgroundTasks { /// task handle for the task that detects phantom disks pub task_phantom_disks: common::TaskHandle, + /// task handle for blueprint target loader + pub task_blueprint_loader: common::TaskHandle, + + /// task handle for blueprint execution background task + pub task_blueprint_executor: common::TaskHandle, + /// task handle for the service zone nat tracker pub task_service_zone_nat_tracker: common::TaskHandle, } @@ -192,6 +200,33 @@ impl BackgroundTasks { task }; + // Background task: blueprint loader + let blueprint_loader = + blueprint_load::TargetBlueprintLoader::new(datastore.clone()); + let rx_blueprint = blueprint_loader.watcher(); + let task_blueprint_loader = driver.register( + String::from("blueprint_loader"), + String::from("Loads the current target blueprint from the DB"), + config.blueprints.period_secs_load, + Box::new(blueprint_loader), + opctx.child(BTreeMap::new()), + vec![], + ); + + // Background task: blueprint executor + let blueprint_executor = blueprint_execution::BlueprintExecutor::new( + datastore.clone(), + rx_blueprint.clone(), + ); + let task_blueprint_executor = driver.register( + String::from("blueprint_executor"), + String::from("Executes the target blueprint"), + config.blueprints.period_secs_execute, + Box::new(blueprint_executor), + opctx.child(BTreeMap::new()), + vec![Box::new(rx_blueprint)], + ); + let task_service_zone_nat_tracker = { driver.register( "service_zone_nat_tracker".to_string(), @@ -220,6 +255,8 @@ impl BackgroundTasks { bfd_manager, task_inventory_collection, task_phantom_disks, + task_blueprint_loader, + task_blueprint_executor, task_service_zone_nat_tracker, } } diff --git a/nexus/src/app/background/mod.rs b/nexus/src/app/background/mod.rs index dc9eff7d79..2c5fa0ab3c 100644 --- a/nexus/src/app/background/mod.rs +++ b/nexus/src/app/background/mod.rs @@ -5,6 +5,8 @@ //! Background tasks mod bfd; +mod blueprint_execution; +mod blueprint_load; mod common; mod dns_config; mod dns_propagation; diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index 65525557b3..c9ca4db73e 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -361,6 +361,7 @@ impl Nexus { authn::Context::internal_api(), Arc::clone(&db_datastore), ); + let background_tasks = background::BackgroundTasks::start( &background_ctx, Arc::clone(&db_datastore), diff --git a/nexus/tests/config.test.toml b/nexus/tests/config.test.toml index a795f57f4c..3571388747 100644 --- a/nexus/tests/config.test.toml +++ b/nexus/tests/config.test.toml @@ -100,6 +100,8 @@ inventory.nkeep = 3 # Disable inventory collection altogether (for emergencies) inventory.disable = false phantom_disks.period_secs = 30 +blueprints.period_secs_load = 100 +blueprints.period_secs_execute = 600 sync_service_zone_nat.period_secs = 30 [default_region_allocation_strategy] diff --git a/smf/nexus/multi-sled/config-partial.toml b/smf/nexus/multi-sled/config-partial.toml index 2eed205ddf..8fc2429169 100644 --- a/smf/nexus/multi-sled/config-partial.toml +++ b/smf/nexus/multi-sled/config-partial.toml @@ -48,6 +48,8 @@ inventory.nkeep = 3 # Disable inventory collection altogether (for emergencies) inventory.disable = false phantom_disks.period_secs = 30 +blueprints.period_secs_load = 10 +blueprints.period_secs_execute = 60 sync_service_zone_nat.period_secs = 30 [default_region_allocation_strategy] diff --git a/smf/nexus/single-sled/config-partial.toml b/smf/nexus/single-sled/config-partial.toml index 53bdeaadd6..15f0a4ebe1 100644 --- a/smf/nexus/single-sled/config-partial.toml +++ b/smf/nexus/single-sled/config-partial.toml @@ -48,6 +48,8 @@ inventory.nkeep = 3 # Disable inventory collection altogether (for emergencies) inventory.disable = false phantom_disks.period_secs = 30 +blueprints.period_secs_load = 10 +blueprints.period_secs_execute = 60 sync_service_zone_nat.period_secs = 30 [default_region_allocation_strategy]