Skip to content

Commit

Permalink
Use DB backed blueprints from #4899
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjstone committed Jan 27, 2024
1 parent 864bca7 commit 24b34f7
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ use uuid::Uuid;

/// Background task that takes a [`Blueprint`] and realizes the change to
/// the state of the system based on the `Blueprint`.
pub struct PlanExecutor {
pub struct BlueprintExecutor {
datastore: Arc<DataStore>,
rx_blueprint: watch::Receiver<Option<Arc<Blueprint>>>,
}

impl PlanExecutor {
impl BlueprintExecutor {
// Temporary until we wire up the background task
#[allow(unused)]
pub fn new(
datastore: Arc<DataStore>,
rx_blueprint: watch::Receiver<Option<Arc<Blueprint>>>,
) -> PlanExecutor {
PlanExecutor { datastore, rx_blueprint }
) -> BlueprintExecutor {
BlueprintExecutor { datastore, rx_blueprint }
}

// This is a modified copy of the functionality from `nexus/src/app/sled.rs`.
Expand Down Expand Up @@ -92,7 +92,7 @@ impl PlanExecutor {
let client = match self.sled_client(&opctx, &sled_id).await {
Ok(client) => client,
Err(err) => {
warn!(log, "{}", err);
warn!(log, "{err:#}");
return Some(err);
}
};
Expand All @@ -105,7 +105,7 @@ impl PlanExecutor {

match result {
Err(error) => {
warn!(log, "{}", error);
warn!(log, "{error:#}");
Some(error)
}
Ok(_) => {
Expand All @@ -130,15 +130,15 @@ impl PlanExecutor {
}
}

impl BackgroundTask for PlanExecutor {
impl BackgroundTask for BlueprintExecutor {
fn activate<'a>(
&'a mut self,
opctx: &'a OpContext,
) -> BoxFuture<'a, serde_json::Value> {
async {
// Get the latest blueprint, cloning to prevent holding a read lock
// on the watch.
let blueprint = self.rx_blueprint.borrow().clone();
let blueprint = self.rx_blueprint.borrow_and_update().clone();

let Some(blueprint) = blueprint else {
warn!(&opctx.log,
Expand Down Expand Up @@ -210,7 +210,7 @@ mod test {
);

let (blueprint_tx, blueprint_rx) = watch::channel(None);
let mut task = PlanExecutor::new(datastore.clone(), blueprint_rx);
let mut task = BlueprintExecutor::new(datastore.clone(), blueprint_rx);

// With no blueprint we should fail with an appropriate message.
let value = task.activate(&opctx).await;
Expand Down Expand Up @@ -267,8 +267,7 @@ mod test {

// Zones are updated in a particular order, but each request contains
// the full set of zones that must be running.
// See https://github.com/oxidecomputer/omicron/blob/main/sled-agent/src/rack_setup/service.rs#L976-L998
// for more details.
// See `rack_setup::service::ServiceInner::run` for more details.
let mut zones = OmicronZonesConfig {
generation,
zones: vec![OmicronZoneConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@
//! changes.
use super::common::BackgroundTask;
use crate::app::deployment;
use futures::future::BoxFuture;
use futures::FutureExt;
use nexus_db_queries::authz;
use nexus_db_queries::authz::Action;
use nexus_db_queries::context::OpContext;
use nexus_db_queries::db::DataStore;
use nexus_types::deployment::Blueprint;
Expand All @@ -21,45 +18,22 @@ use std::sync::Arc;
use tokio::sync::watch;

pub struct TargetBlueprintLoader {
blueprints: Arc<std::sync::Mutex<deployment::Blueprints>>,
#[allow(unused)]
datastore: Arc<DataStore>,
last: Option<Arc<Blueprint>>,
tx: watch::Sender<Option<Arc<Blueprint>>>,
rx: watch::Receiver<Option<Arc<Blueprint>>>,
}

impl TargetBlueprintLoader {
pub fn new(
blueprints: Arc<std::sync::Mutex<deployment::Blueprints>>,
datastore: Arc<DataStore>,
) -> TargetBlueprintLoader {
pub fn new(datastore: Arc<DataStore>) -> TargetBlueprintLoader {
let (tx, rx) = watch::channel(None);
TargetBlueprintLoader { blueprints, datastore, last: None, tx, rx }
TargetBlueprintLoader { datastore, last: None, tx, rx }
}

/// Expose the target blueprint
pub fn watcher(&self) -> watch::Receiver<Option<Arc<Blueprint>>> {
self.rx.clone()
}

// This function is a modified copy from `nexus/src/app/deployment.rs` for
// use until the types are in the datastore.
//
// This is a stand-in for a datastore function that fetches the current
// target information and the target blueprint's contents. This helper
// exists to combine the authz check with the lookup, which is what the
// datastore function will eventually do.
async fn blueprint_target(
&self,
opctx: &OpContext,
) -> Result<Option<Blueprint>, anyhow::Error> {
opctx.authorize(Action::Read, &authz::BLUEPRINT_CONFIG).await?;
let blueprints = self.blueprints.lock().unwrap();
Ok(blueprints.target.target_id.and_then(|target_id| {
blueprints.all_blueprints.get(&target_id).cloned()
}))
}
}

impl BackgroundTask for TargetBlueprintLoader {
Expand All @@ -79,7 +53,8 @@ impl BackgroundTask for TargetBlueprintLoader {
};

// Retrieve the latest target blueprint
let result = self.blueprint_target(opctx).await;
let result =
self.datastore.blueprint_target_get_current_full(opctx).await;

// Decide what to do with the result
match (&self.last, result) {
Expand All @@ -101,18 +76,17 @@ impl BackgroundTask for TargetBlueprintLoader {
json!({})
}
(Some(old), Ok(None)) => {
// We have transitioned from having a blueprint to not having one.
// This should not happen.
// We have transitioned from having a blueprint to not
// having one. This should not happen.
let message = format!(
"target blueprint with id {} was removed. There is no \
longer any target blueprint",
old.id
);
error!(&log, "{}", message);
json!({"error": message})

}
(None, Ok(Some(new_target))) => {
(None, Ok(Some((_, new_target)))) => {
// We've found a new target blueprint for the first time.
// Save it and notify any watchers.
let target_id = new_target.id.to_string();
Expand All @@ -125,9 +99,11 @@ impl BackgroundTask for TargetBlueprintLoader {
);
self.last = Some(Arc::new(new_target));
self.tx.send_replace(self.last.clone());
json!({"target_id": target_id, "time_created": time_created})
json!({
"target_id": target_id, "time_created": time_created
})
}
(Some(old), Ok(Some(new))) => {
(Some(old), Ok(Some((_, new)))) => {
let target_id = new.id.to_string();
let time_created = new.time_created.to_string();
if old.id != new.id {
Expand All @@ -140,7 +116,10 @@ impl BackgroundTask for TargetBlueprintLoader {
);
self.last = Some(Arc::new(new));
self.tx.send_replace(self.last.clone());
json!({"target_id": target_id, "time_created": time_created})
json!({
"target_id": target_id,
"time_created": time_created
})
} else {
// The new target id matches the old target id
//
Expand All @@ -157,20 +136,24 @@ impl BackgroundTask for TargetBlueprintLoader {
error!(&log, "{}", message);
json!({"error": message})
} else {
// We found a new target blueprint that exactly matches
// the old target blueprint. This is the common case
// when we're activated by a timeout.
debug!(
log,
"found latest target blueprint (unchanged)";
"target_id" => &target_id,
"time_created" => &time_created.clone()
);
json!({"target_id": target_id, "time_created": time_created})
}
// We found a new target blueprint that exactly matches
// the old target blueprint. This is the common case
// when we're activated by a timeout.
debug!(
log,
"found latest target blueprint (unchanged)";
"target_id" => &target_id,
"time_created" => &time_created.clone()
);
json!({
"target_id": target_id,
"time_created": time_created
})
}
}
}
}
}.boxed()
}
.boxed()
}
}
41 changes: 19 additions & 22 deletions nexus/src/app/background/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

//! Background task initialization
use crate::app::deployment;

use super::blueprint_execution;
use super::blueprint_load;
use super::common;
use super::dns_config;
use super::dns_propagation;
Expand All @@ -14,8 +14,6 @@ use super::external_endpoints;
use super::inventory_collection;
use super::nat_cleanup;
use super::phantom_disks;
use super::plan_blueprint_load;
use super::plan_execution;
use nexus_db_model::DnsGroup;
use nexus_db_queries::context::OpContext;
use nexus_db_queries::db::DataStore;
Expand Down Expand Up @@ -62,10 +60,10 @@ pub struct BackgroundTasks {
pub task_phantom_disks: common::TaskHandle,

/// task handle for blueprint target loader
pub task_plan_blueprint_target_loader: common::TaskHandle,
pub task_blueprint_loader: common::TaskHandle,

/// task handle for plan execution background task
pub task_plan_executor: common::TaskHandle,
/// task handle for blueprint execution background task
pub task_blueprint_executor: common::TaskHandle,
}

impl BackgroundTasks {
Expand All @@ -77,7 +75,6 @@ impl BackgroundTasks {
dpd_clients: &HashMap<SwitchLocation, Arc<dpd_client::Client>>,
nexus_id: Uuid,
resolver: internal_dns::resolver::Resolver,
blueprints: Arc<std::sync::Mutex<deployment::Blueprints>>,
) -> BackgroundTasks {
let mut driver = common::Driver::new();

Expand Down Expand Up @@ -176,13 +173,11 @@ impl BackgroundTasks {
};

// Background task: blueprint loader
let blueprint_loader = plan_blueprint_load::TargetBlueprintLoader::new(
blueprints,
datastore.clone(),
);
let blueprint_loader =
blueprint_load::TargetBlueprintLoader::new(datastore.clone());
let rx_blueprint = blueprint_loader.watcher();
let task_plan_blueprint_target_loader = driver.register(
String::from("blueprint_target_loader"),
let task_blueprint_loader = driver.register(
String::from("blueprint_loader"),
String::from("Loads the current target blueprint from the DB"),
// TODO: Add to `BackgroundTaskConfig`?
std::time::Duration::from_secs(5),
Expand All @@ -191,15 +186,17 @@ impl BackgroundTasks {
vec![],
);

// Background task: plan executor
let plan_executor =
plan_execution::PlanExecutor::new(datastore, rx_blueprint.clone());
let task_plan_executor = driver.register(
String::from("plan_executor"),
// Background task: blueprint executor
let blueprint_executor = blueprint_execution::BlueprintExecutor::new(
datastore,
rx_blueprint.clone(),
);
let task_blueprint_executor = driver.register(
String::from("blueprint_executor"),
String::from("Executes the target blueprint"),
// TODO: Add to `BackgroundTaskConfig`?
std::time::Duration::from_secs(60),
Box::new(plan_executor),
Box::new(blueprint_executor),
opctx.child(BTreeMap::new()),
vec![Box::new(rx_blueprint)],
);
Expand All @@ -215,8 +212,8 @@ impl BackgroundTasks {
nat_cleanup,
task_inventory_collection,
task_phantom_disks,
task_plan_blueprint_target_loader,
task_plan_executor,
task_blueprint_loader,
task_blueprint_executor,
}
}

Expand Down
4 changes: 2 additions & 2 deletions nexus/src/app/background/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

//! Background tasks
mod blueprint_execution;
mod blueprint_load;
mod common;
mod dns_config;
mod dns_propagation;
Expand All @@ -13,8 +15,6 @@ mod init;
mod inventory_collection;
mod nat_cleanup;
mod phantom_disks;
mod plan_blueprint_load;
mod plan_execution;
mod status;

pub use common::Driver;
Expand Down
4 changes: 0 additions & 4 deletions nexus/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,17 +357,13 @@ impl Nexus {
Arc::clone(&db_datastore),
);

let blueprints =
Arc::new(std::sync::Mutex::new(deployment::Blueprints::new()));

let background_tasks = background::BackgroundTasks::start(
&background_ctx,
Arc::clone(&db_datastore),
&config.pkg.background_tasks,
&dpd_clients,
config.deployment.id,
resolver.clone(),
Arc::clone(&blueprints),
);

let external_resolver = {
Expand Down

0 comments on commit 24b34f7

Please sign in to comment.