diff --git a/Cargo.lock b/Cargo.lock index 58866b90be..ec6d64e41f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3974,6 +3974,12 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfa799dd5ed20a7e349f3b4639aa80d74549c81716d9ec4f994c9b5815598306" +[[package]] +name = "indoc" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b248f5224d1d606005e02c97f5aa4e88eeb230488bcc03bc9ca4d7991399f2b5" + [[package]] name = "inout" version = "0.1.3" @@ -5373,6 +5379,7 @@ dependencies = [ "slog", "slog-error-chain", "tokio", + "update-engine", "uuid", ] @@ -5584,6 +5591,7 @@ dependencies = [ "strum", "test-strategy", "thiserror", + "update-engine", "uuid", ] @@ -6272,6 +6280,7 @@ dependencies = [ "tufaceous", "tufaceous-lib", "update-common", + "update-engine", "uuid", ] @@ -6334,6 +6343,7 @@ dependencies = [ "textwrap", "tokio", "unicode-width", + "update-engine", "url", "uuid", ] @@ -7753,7 +7763,7 @@ version = "0.27.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3aa6f61d235de56ccffbca8627377ebe6ff0052a419f67b098f319a5f32e06d" dependencies = [ - "indoc", + "indoc 1.0.9", "js-sys", "lalrpop", "lalrpop-util", @@ -11508,13 +11518,16 @@ dependencies = [ "camino", "camino-tempfile", "cancel-safe-futures", + "chrono", "clap", "debug-ignore", "derive-where", "either", "futures", + "indent_write", "indexmap 2.4.0", "indicatif", + "indoc 2.0.5", "libsw", "linear-map", "omicron-test-utils", diff --git a/Cargo.toml b/Cargo.toml index 16c06340f9..8acf5c9a35 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -393,6 +393,7 @@ illumos-utils = { path = "illumos-utils" } indent_write = "2.2.0" indexmap = "2.4.0" indicatif = { version = "0.17.8", features = ["rayon"] } +indoc = "2.0.5" installinator = { path = "installinator" } installinator-api = { path = "installinator-api" } installinator-client = { path = "clients/installinator-client" } diff --git a/dev-tools/omdb/Cargo.toml b/dev-tools/omdb/Cargo.toml index 4cc484b9a9..eebb05ba36 100644 --- a/dev-tools/omdb/Cargo.toml +++ b/dev-tools/omdb/Cargo.toml @@ -54,6 +54,7 @@ tabled.workspace = true textwrap.workspace = true tokio = { workspace = true, features = [ "full" ] } unicode-width.workspace = true +update-engine.workspace = true url.workspace = true uuid.workspace = true ipnetwork.workspace = true diff --git a/dev-tools/omdb/src/bin/omdb/nexus.rs b/dev-tools/omdb/src/bin/omdb/nexus.rs index 5af75fac8f..58b32cb1f3 100644 --- a/dev-tools/omdb/src/bin/omdb/nexus.rs +++ b/dev-tools/omdb/src/bin/omdb/nexus.rs @@ -51,8 +51,19 @@ use slog_error_chain::InlineErrorChain; use std::collections::BTreeMap; use std::collections::BTreeSet; use std::str::FromStr; +use tabled::settings::object::Columns; +use tabled::settings::Padding; use tabled::Tabled; use tokio::sync::OnceCell; +use update_engine::display::ProgressRatioDisplay; +use update_engine::events::EventReport; +use update_engine::events::StepOutcome; +use update_engine::EventBuffer; +use update_engine::ExecutionStatus; +use update_engine::ExecutionTerminalInfo; +use update_engine::NestedError; +use update_engine::NestedSpec; +use update_engine::TerminalKind; use uuid::Uuid; /// Arguments to the "omdb nexus" subcommand @@ -1586,30 +1597,68 @@ fn print_task_details(bgtask: &BackgroundTask, details: &serde_json::Value) { } } } else if name == "blueprint_executor" { + let mut value = details.clone(); + // Extract and remove the event report. (If we don't do this, the + // `Debug` output can be quite large.) + // + // TODO: show more of the event buffer. + let event_buffer = extract_event_buffer(&mut value); + #[derive(Deserialize)] struct BlueprintExecutorStatus { target_id: Uuid, enabled: bool, - errors: Option>, + execution_error: Option, } - match serde_json::from_value::(details.clone()) - { + match serde_json::from_value::(value) { Err(error) => eprintln!( "warning: failed to interpret task details: {:?}: {:?}", error, details ), Ok(status) => { - println!(" target blueprint: {}", status.target_id); - println!( - " execution: {}", - if status.enabled { "enabled" } else { "disabled" } - ); - let errors = status.errors.as_deref().unwrap_or(&[]); - println!(" errors: {}", errors.len()); - for (i, e) in errors.iter().enumerate() { - println!(" error {}: {}", i, e); + // TODO: switch the other outputs to tabled as well. + let mut builder = tabled::builder::Builder::default(); + builder.push_record([ + "target blueprint:".to_string(), + status.target_id.to_string(), + ]); + builder.push_record([ + "execution:".to_string(), + if status.enabled { + "enabled".to_string() + } else { + "disabled".to_string() + }, + ]); + + push_event_buffer_summary(event_buffer, &mut builder); + + match status.execution_error { + Some(error) => { + builder.push_record([ + "error:".to_string(), + error.to_string(), + ]); + + for source in error.sources() { + builder.push_record([ + " caused by:".to_string(), + source.to_string(), + ]); + } + } + None => { + builder.push_record([ + "error:".to_string(), + "(none)".to_string(), + ]); + } } + + let mut table = builder.build(); + bgtask_apply_kv_style(&mut table); + println!("{}", table); } } } else { @@ -1632,6 +1681,177 @@ fn reason_str(reason: &ActivationReason) -> &'static str { } } +fn bgtask_apply_kv_style(table: &mut tabled::Table) { + let style = tabled::settings::Style::empty(); + table.with(style).with( + tabled::settings::Modify::new(Columns::first()) + // Background task tables are offset by 4 characters. + .with(Padding::new(4, 0, 0, 0)), + ); +} + +// Extract and remove the event report. (If we don't do this, the `Debug` +// output can be quite large.) +fn extract_event_buffer( + value: &mut serde_json::Value, +) -> anyhow::Result> { + let Some(obj) = value.as_object_mut() else { + bail!("expected value to be an object") + }; + let Some(event_report) = obj.remove("event_report") else { + bail!("expected 'event_report' field in value") + }; + + // Try deserializing the event report generically. We could deserialize to + // a more explicit spec, e.g. `ReconfiguratorExecutionSpec`, but that's + // unnecessary for omdb's purposes. + let value: Result, NestedError> = + serde_json::from_value(event_report) + .context("failed to deserialize event report")?; + let event_report = value.context( + "event report stored as Err rather than Ok (did receiver task panic?)", + )?; + + let mut event_buffer = EventBuffer::default(); + event_buffer.add_event_report(event_report); + Ok(event_buffer) +} + +// Make a short summary of the current state of an execution based on an event +// buffer, and add it to the table. +fn push_event_buffer_summary( + event_buffer: anyhow::Result>, + builder: &mut tabled::builder::Builder, +) { + match event_buffer { + Ok(buffer) => { + event_buffer_summary_impl(buffer, builder); + } + Err(error) => { + builder.push_record([ + "event report error:".to_string(), + error.to_string(), + ]); + for source in error.chain() { + builder.push_record([ + " caused by:".to_string(), + source.to_string(), + ]); + } + } + } +} + +fn event_buffer_summary_impl( + buffer: EventBuffer, + builder: &mut tabled::builder::Builder, +) { + let Some(summary) = buffer.root_execution_summary() else { + builder.push_record(["status:", "(no information found)"]); + return; + }; + + match summary.execution_status { + ExecutionStatus::NotStarted => { + builder.push_record(["status:", "not started"]); + } + ExecutionStatus::Running { step_key, .. } => { + let step_data = buffer.get(&step_key).expect("step exists"); + builder.push_record([ + "status:".to_string(), + format!( + "running: {} (step {})", + step_data.step_info().description, + ProgressRatioDisplay::index_and_total( + step_key.index, + summary.total_steps, + ), + ), + ]); + } + ExecutionStatus::Terminal(info) => { + push_event_buffer_terminal_info( + &info, + summary.total_steps, + &buffer, + builder, + ); + } + } + + // Also look for warnings. + for (_, step_data) in buffer.iter_steps_recursive() { + if let Some(reason) = step_data.step_status().completion_reason() { + if let Some(info) = reason.step_completed_info() { + if let StepOutcome::Warning { message, .. } = &info.outcome { + builder.push_record([ + "warning:".to_string(), + // This can be a nested step, so don't print out the + // index. + format!( + "at: {}: {}", + step_data.step_info().description, + message + ), + ]); + } + } + } + } +} + +fn push_event_buffer_terminal_info( + info: &ExecutionTerminalInfo, + total_steps: usize, + buffer: &EventBuffer, + builder: &mut tabled::builder::Builder, +) { + let step_data = buffer.get(&info.step_key).expect("step exists"); + + match info.kind { + TerminalKind::Completed => { + let v = format!("completed ({} steps)", total_steps); + builder.push_record(["status:".to_string(), v]); + } + TerminalKind::Failed => { + let v = format!( + "failed at: {} (step {})", + step_data.step_info().description, + ProgressRatioDisplay::index_and_total( + info.step_key.index, + total_steps, + ) + ); + builder.push_record(["status:".to_string(), v]); + + // Don't show the error here, because it's duplicated in another + // field that's already shown. + } + TerminalKind::Aborted => { + let v = format!( + "aborted at: {} (step {})", + step_data.step_info().description, + ProgressRatioDisplay::index_and_total( + info.step_key.index, + total_steps, + ) + ); + builder.push_record(["status:".to_string(), v]); + + let Some(reason) = step_data.step_status().abort_reason() else { + builder.push_record([" reason:", "(no reason found)"]); + return; + }; + + builder.push_record([ + " reason:".to_string(), + reason.message_display(&buffer).to_string(), + ]); + // TODO: show last progress event + } + } +} + /// Used for printing background task status as a table #[derive(Tabled)] struct BackgroundTaskStatusRow { diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index d6f97adc39..cdad883ca7 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -111,6 +111,7 @@ oximeter-producer.workspace = true rustls = { workspace = true } rustls-pemfile = { workspace = true } update-common.workspace = true +update-engine.workspace = true omicron-workspace-hack.workspace = true omicron-uuid-kinds.workspace = true diff --git a/nexus/reconfigurator/execution/Cargo.toml b/nexus/reconfigurator/execution/Cargo.toml index 1c62e553a8..bb3c7ad2b9 100644 --- a/nexus/reconfigurator/execution/Cargo.toml +++ b/nexus/reconfigurator/execution/Cargo.toml @@ -30,6 +30,8 @@ reqwest.workspace = true sled-agent-client.workspace = true slog.workspace = true slog-error-chain.workspace = true +tokio.workspace = true +update-engine.workspace = true uuid.workspace = true # See omicron-rpaths for more about the "pq-sys" dependency. This is needed @@ -52,4 +54,3 @@ nexus-test-utils.workspace = true nexus-test-utils-macros.workspace = true omicron-nexus.workspace = true omicron-test-utils.workspace = true -tokio.workspace = true diff --git a/nexus/reconfigurator/execution/src/lib.rs b/nexus/reconfigurator/execution/src/lib.rs index fc2d9a8ae5..607c929a19 100644 --- a/nexus/reconfigurator/execution/src/lib.rs +++ b/nexus/reconfigurator/execution/src/lib.rs @@ -10,6 +10,7 @@ use anyhow::{anyhow, Context}; use internal_dns::resolver::Resolver; use nexus_db_queries::context::OpContext; use nexus_db_queries::db::DataStore; +use nexus_types::deployment::execution::*; use nexus_types::deployment::Blueprint; use nexus_types::deployment::BlueprintZoneFilter; use nexus_types::deployment::SledFilter; @@ -17,6 +18,7 @@ use nexus_types::external_api::views::SledState; use nexus_types::identity::Asset; use omicron_common::address::Ipv6Subnet; use omicron_common::address::SLED_PREFIX; +use omicron_physical_disks::DeployDisksDone; use omicron_uuid_kinds::GenericUuid; use omicron_uuid_kinds::SledUuid; use overridables::Overridables; @@ -24,6 +26,9 @@ use slog::info; use slog_error_chain::InlineErrorChain; use std::collections::BTreeMap; use std::net::SocketAddrV6; +use std::sync::Arc; +use tokio::sync::mpsc; +use update_engine::merge_anyhow_list; use uuid::Uuid; mod cockroachdb; @@ -92,7 +97,8 @@ pub async fn realize_blueprint( resolver: &Resolver, blueprint: &Blueprint, nexus_id: Uuid, -) -> Result> { + sender: mpsc::Sender, +) -> Result { realize_blueprint_with_overrides( opctx, datastore, @@ -100,6 +106,7 @@ pub async fn realize_blueprint( blueprint, nexus_id, &Default::default(), + sender, ) .await } @@ -111,7 +118,8 @@ pub async fn realize_blueprint_with_overrides( blueprint: &Blueprint, nexus_id: Uuid, overrides: &Overridables, -) -> Result> { + sender: mpsc::Sender, +) -> Result { let opctx = opctx.child(BTreeMap::from([( "comment".to_string(), blueprint.comment.clone(), @@ -123,143 +131,512 @@ pub async fn realize_blueprint_with_overrides( "blueprint_id" => %blueprint.id ); - datastore - .blueprint_ensure_external_networking_resources(&opctx, blueprint) - .await - .map_err(|err| { - vec![anyhow!(err).context( - "failed to ensure external networking resources in database", - )] - })?; - - let sleds_by_id: BTreeMap = datastore - .sled_list_all_batched(&opctx, SledFilter::InService) - .await - .context("listing all sleds") - .map_err(|e| vec![e])? - .into_iter() - .map(|db_sled| { - (SledUuid::from_untyped_uuid(db_sled.id()), Sled::from(db_sled)) - }) - .collect(); - - let deploy_disks_done = omicron_physical_disks::deploy_disks( + // Large enough to handle all the messages. + let engine = UpdateEngine::new(&opctx.log, sender); + + register_zone_external_networking_step( + &engine.for_component(ExecutionComponent::ExternalNetworking), &opctx, - &sleds_by_id, - &blueprint.blueprint_disks, - ) - .await?; + datastore, + blueprint, + ); - omicron_zones::deploy_zones( + let sled_list = register_sled_list_step( + &engine.for_component(ExecutionComponent::SledList), &opctx, - &sleds_by_id, - &blueprint.blueprint_zones, + datastore, ) - .await?; + .into_shared(); - // After deploying omicron zones, we may need to refresh OPTE service - // firewall rules. This is an idempotent operation, so we don't attempt - // to optimize out calling it in unnecessary cases, although it is only - // needed in cases where we've changed the set of services on one or more - // sleds, or the sleds have lost their firewall rules for some reason. - // Fixing the latter case is a side effect and should really be handled by a - // firewall-rule-specific RPW; once that RPW exists, we could trigger it - // here instead of pluming firewall rules ourselves. - nexus_networking::plumb_service_firewall_rules( - datastore, + let deploy_disks_done = register_deploy_disks_step( + &engine.for_component(ExecutionComponent::PhysicalDisks), &opctx, - &[], + blueprint, + sled_list.clone(), + ); + + register_deploy_zones_step( + &engine.for_component(ExecutionComponent::OmicronZones), &opctx, - &opctx.log, - ) - .await - .context("failed to plumb service firewall rules to sleds") - .map_err(|err| vec![err])?; + blueprint, + sled_list.clone(), + ); - datasets::ensure_dataset_records_exist( + register_plumb_firewall_rules_step( + &engine.for_component(ExecutionComponent::FirewallRules), &opctx, datastore, - blueprint - .all_omicron_zones(BlueprintZoneFilter::ShouldBeRunning) - .map(|(_sled_id, zone)| zone), - ) - .await - .map_err(|err| vec![err])?; + ); - dns::deploy_dns( + register_dataset_records_step( + &engine.for_component(ExecutionComponent::DatasetRecords), &opctx, datastore, - nexus_id.to_string(), blueprint, - &sleds_by_id, + ); + + register_dns_records_step( + &engine.for_component(ExecutionComponent::Dns), + &opctx, + datastore, + blueprint, + nexus_id, overrides, - ) - .await - .map_err(|e| vec![anyhow!("{}", InlineErrorChain::new(&e))])?; + sled_list.clone(), + ); - omicron_zones::clean_up_expunged_zones( + register_cleanup_expunged_zones_step( + &engine.for_component(ExecutionComponent::OmicronZones), &opctx, datastore, resolver, - blueprint.all_omicron_zones(BlueprintZoneFilter::Expunged), - ) - .await?; + blueprint, + ); - sled_state::decommission_sleds( + register_decommission_sleds_step( + &engine.for_component(ExecutionComponent::OmicronZones), &opctx, datastore, - blueprint - .sled_state - .iter() - .filter(|&(_, &state)| state == SledState::Decommissioned) - .map(|(&sled_id, _)| sled_id), - ) - .await?; + blueprint, + ); - omicron_physical_disks::decommission_expunged_disks( + register_decommission_expunged_disks_step( + &engine.for_component(ExecutionComponent::PhysicalDisks), &opctx, datastore, deploy_disks_done, - ) - .await?; + ); + + let reassign_saga_output = register_reassign_sagas_step( + &engine.for_component(ExecutionComponent::OmicronZones), + &opctx, + datastore, + blueprint, + nexus_id, + ); + + let register_cockroach_output = register_cockroachdb_settings_step( + &engine.for_component(ExecutionComponent::Cockroach), + &opctx, + datastore, + blueprint, + ); + + let output = register_finalize_step( + &engine.for_component(ExecutionComponent::Cockroach), + reassign_saga_output, + register_cockroach_output, + ); + + // All steps are registered, so execute the engine. + let result = engine.execute().await?; + + Ok(output.into_value(result.token()).await) +} + +fn register_zone_external_networking_step<'a>( + registrar: &ComponentRegistrar<'_, 'a>, + opctx: &'a OpContext, + datastore: &'a DataStore, + blueprint: &'a Blueprint, +) { + // Deallocate external networking resources for non-externally-reachable + // zones first. This will allow external networking resource allocation to + // succeed if we are swapping an external IP between two zones (e.g., moving + // a specific external IP from an old external DNS zone to a new one). + registrar + .new_step( + ExecutionStepId::Ensure, + "Ensure external networking resources", + move |_cx| async move { + datastore + .blueprint_ensure_external_networking_resources( + &opctx, blueprint, + ) + .await + .map_err(|err| anyhow!(err))?; + + StepSuccess::new(()).into() + }, + ) + .register(); +} + +fn register_sled_list_step<'a>( + registrar: &ComponentRegistrar<'_, 'a>, + opctx: &'a OpContext, + datastore: &'a DataStore, +) -> StepHandle>> { + registrar + .new_step( + ExecutionStepId::Fetch, + "Fetch sled list", + move |_cx| async move { + let sleds_by_id: BTreeMap = datastore + .sled_list_all_batched(&opctx, SledFilter::InService) + .await + .context("listing all sleds")? + .into_iter() + .map(|db_sled| { + ( + SledUuid::from_untyped_uuid(db_sled.id()), + Sled::from(db_sled), + ) + }) + .collect(); + + StepSuccess::new(Arc::new(sleds_by_id)).into() + }, + ) + .register() +} + +fn register_deploy_disks_step<'a>( + registrar: &ComponentRegistrar<'_, 'a>, + opctx: &'a OpContext, + blueprint: &'a Blueprint, + sleds: SharedStepHandle>>, +) -> StepHandle { + registrar + .new_step( + ExecutionStepId::Ensure, + "Deploy physical disks", + move |cx| async move { + let sleds_by_id = sleds.into_value(cx.token()).await; + let done = omicron_physical_disks::deploy_disks( + &opctx, + &sleds_by_id, + &blueprint.blueprint_disks, + ) + .await + .map_err(merge_anyhow_list)?; + + StepSuccess::new(done).into() + }, + ) + .register() +} - // From this point on, we'll assume that any errors that we encounter do - // *not* require stopping execution. We'll just accumulate them and return - // them all at the end. +fn register_deploy_zones_step<'a>( + registrar: &ComponentRegistrar<'_, 'a>, + opctx: &'a OpContext, + blueprint: &'a Blueprint, + sleds: SharedStepHandle>>, +) { + registrar + .new_step( + ExecutionStepId::Ensure, + "Deploy Omicron zones", + move |cx| async move { + let sleds_by_id = sleds.into_value(cx.token()).await; + omicron_zones::deploy_zones( + &opctx, + &sleds_by_id, + &blueprint.blueprint_zones, + ) + .await + .map_err(merge_anyhow_list)?; + + StepSuccess::new(()).into() + }, + ) + .register(); +} + +fn register_plumb_firewall_rules_step<'a>( + registrar: &ComponentRegistrar<'_, 'a>, + opctx: &'a OpContext, + datastore: &'a DataStore, +) { + // After deploying omicron zones, we may need to refresh OPTE service + // firewall rules. This is an idempotent operation, so we don't attempt + // to optimize out calling it in unnecessary cases, although it is only + // needed in cases where we've changed the set of services on one or more + // sleds, or the sleds have lost their firewall rules for some reason. + // Fixing the latter case is a side effect and should really be handled by a + // firewall-rule-specific RPW; once that RPW exists, we could trigger it + // here instead of pluming firewall rules ourselves. + registrar + .new_step( + ExecutionStepId::Ensure, + "Plumb service firewall rules", + move |_cx| async move { + nexus_networking::plumb_service_firewall_rules( + datastore, + &opctx, + &[], + &opctx, + &opctx.log, + ) + .await + .context("failed to plumb service firewall rules to sleds")?; + + StepSuccess::new(()).into() + }, + ) + .register(); +} + +fn register_dataset_records_step<'a>( + registrar: &ComponentRegistrar<'_, 'a>, + opctx: &'a OpContext, + datastore: &'a DataStore, + blueprint: &'a Blueprint, +) { + registrar + .new_step( + ExecutionStepId::Ensure, + "Ensure dataset records", + move |_cx| async move { + datasets::ensure_dataset_records_exist( + &opctx, + datastore, + blueprint + .all_omicron_zones(BlueprintZoneFilter::ShouldBeRunning) + .map(|(_sled_id, zone)| zone), + ) + .await?; + + StepSuccess::new(()).into() + }, + ) + .register(); +} + +fn register_dns_records_step<'a>( + registrar: &ComponentRegistrar<'_, 'a>, + opctx: &'a OpContext, + datastore: &'a DataStore, + blueprint: &'a Blueprint, + nexus_id: Uuid, + overrides: &'a Overridables, + sleds: SharedStepHandle>>, +) { + registrar + .new_step( + ExecutionStepId::Ensure, + "Deploy DNS records", + move |cx| async move { + let sleds_by_id = sleds.into_value(cx.token()).await; + + dns::deploy_dns( + &opctx, + datastore, + nexus_id.to_string(), + blueprint, + &sleds_by_id, + overrides, + ) + .await + .map_err(|e| anyhow!("{}", InlineErrorChain::new(&e)))?; + + StepSuccess::new(()).into() + }, + ) + .register(); +} + +fn register_cleanup_expunged_zones_step<'a>( + registrar: &ComponentRegistrar<'_, 'a>, + opctx: &'a OpContext, + datastore: &'a DataStore, + resolver: &'a Resolver, + blueprint: &'a Blueprint, +) { + registrar + .new_step( + ExecutionStepId::Remove, + "Cleanup expunged zones", + move |_cx| async move { + omicron_zones::clean_up_expunged_zones( + &opctx, + datastore, + resolver, + blueprint.all_omicron_zones(BlueprintZoneFilter::Expunged), + ) + .await + .map_err(merge_anyhow_list)?; + + StepSuccess::new(()).into() + }, + ) + .register(); +} + +fn register_decommission_sleds_step<'a>( + registrar: &ComponentRegistrar<'_, 'a>, + opctx: &'a OpContext, + datastore: &'a DataStore, + blueprint: &'a Blueprint, +) { + registrar + .new_step( + ExecutionStepId::Remove, + "Decommission sleds", + move |_cx| async move { + sled_state::decommission_sleds( + &opctx, + datastore, + blueprint + .sled_state + .iter() + .filter(|&(_, &state)| { + state == SledState::Decommissioned + }) + .map(|(&sled_id, _)| sled_id), + ) + .await + .map_err(merge_anyhow_list)?; + + StepSuccess::new(()).into() + }, + ) + .register(); +} + +fn register_decommission_expunged_disks_step<'a>( + registrar: &ComponentRegistrar<'_, 'a>, + opctx: &'a OpContext, + datastore: &'a DataStore, + deploy_disks_done: StepHandle, +) { + // This depends on the "deploy_disks" call earlier -- disk expungement is a + // statement of policy, but we need to be assured that the Sled Agent has + // stopped using that disk before we can mark its state as decommissioned. + registrar + .new_step( + ExecutionStepId::Remove, + "Decommission expunged disks", + move |cx| async move { + let done = deploy_disks_done.into_value(cx.token()).await; + omicron_physical_disks::decommission_expunged_disks( + &opctx, datastore, done, + ) + .await + .map_err(merge_anyhow_list)?; + + StepSuccess::new(()).into() + }, + ) + .register(); +} + +#[derive(Debug)] +struct ReassignSagaOutput { + needs_saga_recovery: bool, + error: Option, +} + +fn register_reassign_sagas_step<'a>( + registrar: &ComponentRegistrar<'_, 'a>, + opctx: &'a OpContext, + datastore: &'a DataStore, + blueprint: &'a Blueprint, + nexus_id: Uuid, +) -> StepHandle { + // For this and subsequent steps, we'll assume that any errors that we + // encounter do *not* require stopping execution. We'll just accumulate + // them and return them all at the end. // // TODO We should probably do this with more of the errors above, too. - let mut errors = Vec::new(); + registrar + .new_step( + ExecutionStepId::Ensure, + "Reassign sagas", + move |_cx| async move { + // For any expunged Nexus zones, re-assign in-progress sagas to + // some other Nexus. If this fails for some reason, it doesn't + // affect anything else. + let sec_id = nexus_db_model::SecId(nexus_id); + let reassigned = sagas::reassign_sagas_from_expunged( + &opctx, datastore, blueprint, sec_id, + ) + .await + .context("failed to re-assign sagas"); + match reassigned { + Ok(needs_saga_recovery) => { + let output = ReassignSagaOutput { + needs_saga_recovery, + error: None, + }; + StepSuccess::new(output).into() + } + Err(error) => { + // We treat errors as non-fatal here, but we still want + // to log them. + let message = error.to_string(); + let output = ReassignSagaOutput { + needs_saga_recovery: false, + error: Some(error), + }; + StepWarning::new(output, message).into() + } + } + }, + ) + .register() +} - // For any expunged Nexus zones, re-assign in-progress sagas to some other - // Nexus. If this fails for some reason, it doesn't affect anything else. - let sec_id = nexus_db_model::SecId(nexus_id); - let reassigned = sagas::reassign_sagas_from_expunged( - &opctx, datastore, blueprint, sec_id, - ) - .await - .context("failed to re-assign sagas"); - let needs_saga_recovery = match reassigned { - Ok(needs_recovery) => needs_recovery, - Err(error) => { - errors.push(error); - false - } - }; - - // This is likely to error if any cluster upgrades are in progress (which - // can take some time), so it should remain at the end so that other parts - // of the blueprint can progress normally. - if let Err(error) = - cockroachdb::ensure_settings(&opctx, datastore, blueprint).await - { - errors.push(error); - } +fn register_cockroachdb_settings_step<'a>( + registrar: &ComponentRegistrar<'_, 'a>, + opctx: &'a OpContext, + datastore: &'a DataStore, + blueprint: &'a Blueprint, +) -> StepHandle> { + registrar + .new_step( + ExecutionStepId::Ensure, + "Ensure CockroachDB settings", + move |_cx| async move { + if let Err(error) = + cockroachdb::ensure_settings(&opctx, datastore, blueprint) + .await + { + let message = error.to_string(); + StepWarning::new(Some(error), message).into() + } else { + StepSuccess::new(None).into() + } + }, + ) + .register() +} - if errors.is_empty() { - Ok(RealizeBlueprintOutput { needs_saga_recovery }) - } else { - Err(errors) - } +fn register_finalize_step( + registrar: &ComponentRegistrar<'_, '_>, + reassign_saga_output: StepHandle, + register_cockroach_output: StepHandle>, +) -> StepHandle { + registrar + .new_step( + ExecutionStepId::Finalize, + "Finalize and check for errors", + move |cx| async move { + let reassign_saga_output = + reassign_saga_output.into_value(cx.token()).await; + let register_cockroach_output = + register_cockroach_output.into_value(cx.token()).await; + + let mut errors = Vec::new(); + if let Some(error) = register_cockroach_output { + errors.push(error); + } + if let Some(error) = reassign_saga_output.error { + errors.push(error); + } + + if errors.is_empty() { + StepSuccess::new(RealizeBlueprintOutput { + needs_saga_recovery: reassign_saga_output + .needs_saga_recovery, + }) + .into() + } else { + Err(merge_anyhow_list(errors)) + } + }, + ) + .register() } #[cfg(test)] diff --git a/nexus/reconfigurator/execution/src/test_utils.rs b/nexus/reconfigurator/execution/src/test_utils.rs index 0d6675c7dd..6af82ef9dd 100644 --- a/nexus/reconfigurator/execution/src/test_utils.rs +++ b/nexus/reconfigurator/execution/src/test_utils.rs @@ -6,7 +6,8 @@ use internal_dns::resolver::Resolver; use nexus_db_queries::{context::OpContext, db::DataStore}; -use nexus_types::deployment::Blueprint; +use nexus_types::deployment::{execution::EventBuffer, Blueprint}; +use update_engine::TerminalKind; use uuid::Uuid; use crate::{overridables::Overridables, RealizeBlueprintOutput}; @@ -17,7 +18,16 @@ pub(crate) async fn realize_blueprint_and_expect( resolver: &Resolver, blueprint: &Blueprint, overrides: &Overridables, -) -> RealizeBlueprintOutput { +) -> (RealizeBlueprintOutput, EventBuffer) { + let (sender, mut receiver) = tokio::sync::mpsc::channel(128); + let receiver_task = tokio::spawn(async move { + let mut buffer = EventBuffer::default(); + while let Some(msg) = receiver.recv().await { + buffer.add_event(msg); + } + buffer + }); + let output = crate::realize_blueprint_with_overrides( opctx, datastore, @@ -25,6 +35,7 @@ pub(crate) async fn realize_blueprint_and_expect( blueprint, Uuid::new_v4(), overrides, + sender, ) .await // We expect here rather than in the caller because we want to assert that @@ -32,6 +43,22 @@ pub(crate) async fn realize_blueprint_and_expect( // `must_use`, the caller may assign it to `_` and miss the `expect` call. .expect("failed to execute blueprint"); + let buffer = receiver_task.await.expect("failed to receive events"); eprintln!("realize_blueprint output: {:#?}", output); - output + + let status = buffer + .root_execution_summary() + .expect("buffer has a root execution") + .execution_status; + let terminal_info = status.terminal_info().unwrap_or_else(|| { + panic!("expected status to be terminal: {:#?}", status) + }); + + assert_eq!( + terminal_info.kind, + TerminalKind::Completed, + "expected completed" + ); + + (output, buffer) } diff --git a/nexus/src/app/background/tasks/blueprint_execution.rs b/nexus/src/app/background/tasks/blueprint_execution.rs index 2b1e3eedca..20d4105d50 100644 --- a/nexus/src/app/background/tasks/blueprint_execution.rs +++ b/nexus/src/app/background/tasks/blueprint_execution.rs @@ -11,10 +11,13 @@ use internal_dns::resolver::Resolver; use nexus_db_queries::context::OpContext; use nexus_db_queries::db::DataStore; use nexus_reconfigurator_execution::RealizeBlueprintOutput; -use nexus_types::deployment::{Blueprint, BlueprintTarget}; +use nexus_types::deployment::{ + execution::EventBuffer, Blueprint, BlueprintTarget, +}; use serde_json::json; use std::sync::Arc; -use tokio::sync::watch; +use tokio::sync::{mpsc, watch}; +use update_engine::NestedError; use uuid::Uuid; /// Background task that takes a [`Blueprint`] and realizes the change to @@ -87,15 +90,34 @@ impl BlueprintExecutor { }); } + // Pick a large-ish buffer for reconfigurator execution to avoid + // blocking it. + let (sender, mut receiver) = mpsc::channel(256); + + let receiver_task = tokio::spawn(async move { + // TODO: report progress + let mut event_buffer = EventBuffer::default(); + while let Some(event) = receiver.recv().await { + event_buffer.add_event(event); + } + + event_buffer.generate_report() + }); + let result = nexus_reconfigurator_execution::realize_blueprint( opctx, &self.datastore, &self.resolver, blueprint, self.nexus_id, + sender, ) .await; + // Get the report for the receiver task. + let event_report = + receiver_task.await.map_err(|error| NestedError::new(&error)); + // Trigger anybody waiting for this to finish. self.tx.send_modify(|count| *count = *count + 1); @@ -112,16 +134,23 @@ impl BlueprintExecutor { json!({ "target_id": blueprint.id.to_string(), "enabled": true, + // Note: The field "error" is treated as special by omdb, + // and if that field is present then nothing else is + // displayed. + "execution_error": null, "needs_saga_recovery": needs_saga_recovery, + "event_report": event_report, }) } - Err(errors) => { - let errors: Vec<_> = - errors.into_iter().map(|e| format!("{:#}", e)).collect(); + Err(error) => { json!({ "target_id": blueprint.id.to_string(), "enabled": true, - "errors": errors + // Note: The field "error" is treated as special by omdb, + // and if that field is present then nothing else is + // displayed. + "execution_error": NestedError::new(error.as_ref()), + "event_report": event_report, }) } } @@ -152,6 +181,10 @@ mod test { use nexus_db_queries::db::DataStore; use nexus_sled_agent_shared::inventory::OmicronZoneDataset; use nexus_test_utils_macros::nexus_test; + use nexus_types::deployment::execution::{ + EventBuffer, EventReport, ExecutionComponent, ExecutionStepId, + ReconfiguratorExecutionSpec, StepInfo, + }; use nexus_types::deployment::BlueprintZoneFilter; use nexus_types::deployment::{ blueprint_zone_type, Blueprint, BlueprintPhysicalDisksConfig, @@ -171,6 +204,7 @@ mod test { use std::net::SocketAddr; use std::sync::Arc; use tokio::sync::watch; + use update_engine::{NestedError, TerminalKind}; use uuid::Uuid; type ControlPlaneTestContext = @@ -312,7 +346,10 @@ mod test { ); let blueprint_id = blueprint.1.id; blueprint_tx.send(Some(blueprint)).unwrap(); - let value = task.activate(&opctx).await; + let mut value = task.activate(&opctx).await; + + let event_buffer = extract_event_buffer(&mut value); + println!("activating with no zones: {:?}", value); assert_eq!( value, @@ -323,6 +360,8 @@ mod test { }) ); + assert_event_buffer_completed(&event_buffer); + // Create a non-empty blueprint describing two servers and verify that // the task correctly winds up making requests to both of them and // reporting success. @@ -407,7 +446,9 @@ mod test { } // Activate the task to trigger zone configuration on the sled-agents - let value = task.activate(&opctx).await; + let mut value = task.activate(&opctx).await; + let event_buffer = extract_event_buffer(&mut value); + println!("activating two sled agents: {:?}", value); assert_eq!( value, @@ -417,6 +458,8 @@ mod test { "needs_saga_recovery": false, }) ); + assert_event_buffer_completed(&event_buffer); + s1.verify_and_clear(); s2.verify_and_clear(); @@ -459,17 +502,92 @@ mod test { #[derive(Deserialize)] struct ErrorResult { - errors: Vec, + error: NestedError, } - let value = task.activate(&opctx).await; + let mut value = task.activate(&opctx).await; + let event_buffer = extract_event_buffer(&mut value); + println!("after failure: {:?}", value); let result: ErrorResult = serde_json::from_value(value).unwrap(); - assert_eq!(result.errors.len(), 1); - assert!( - result.errors[0].starts_with("Failed to put OmicronZonesConfig") + assert_eq!(result.error.message(), "step failed: Deploy Omicron zones"); + + assert_event_buffer_failed_at( + &event_buffer, + ExecutionComponent::OmicronZones, + ExecutionStepId::Ensure, ); + s1.verify_and_clear(); s2.verify_and_clear(); } + + fn extract_event_buffer(value: &mut serde_json::Value) -> EventBuffer { + let event_report = value + .as_object_mut() + .expect("value is an object") + .remove("event_report") + .expect("event_report exists"); + let event_report: Result = + serde_json::from_value(event_report) + .expect("event_report is valid"); + let event_report = event_report.expect("event_report is Ok"); + + let mut event_buffer = EventBuffer::default(); + event_buffer.add_event_report(event_report); + event_buffer + } + + fn assert_event_buffer_completed(event_buffer: &EventBuffer) { + let execution_status = event_buffer + .root_execution_summary() + .expect("event buffer has root execution summary") + .execution_status; + let terminal_info = + execution_status.terminal_info().unwrap_or_else(|| { + panic!( + "execution status has terminal info: {:?}", + execution_status + ); + }); + assert_eq!( + terminal_info.kind, + TerminalKind::Completed, + "execution should have completed successfully" + ); + } + + fn assert_event_buffer_failed_at( + event_buffer: &EventBuffer, + component: ExecutionComponent, + step_id: ExecutionStepId, + ) { + let execution_status = event_buffer + .root_execution_summary() + .expect("event buffer has root execution summary") + .execution_status; + let terminal_info = + execution_status.terminal_info().unwrap_or_else(|| { + panic!( + "execution status has terminal info: {:?}", + execution_status + ); + }); + assert_eq!( + terminal_info.kind, + TerminalKind::Failed, + "execution should have failed" + ); + let step = + event_buffer.get(&terminal_info.step_key).expect("step exists"); + let step_info = StepInfo::::from_generic( + step.step_info().clone(), + ) + .expect("step info follows ReconfiguratorExecutionSpec"); + assert_eq!( + (step_info.component, step_info.id), + (component, step_id), + "component and step id matches expected" + ); + } } diff --git a/nexus/types/Cargo.toml b/nexus/types/Cargo.toml index 6b31013d49..8af94fd25e 100644 --- a/nexus/types/Cargo.toml +++ b/nexus/types/Cargo.toml @@ -37,6 +37,7 @@ steno.workspace = true strum.workspace = true thiserror.workspace = true newtype-uuid.workspace = true +update-engine.workspace = true uuid.workspace = true api_identity.workspace = true diff --git a/nexus/types/src/deployment.rs b/nexus/types/src/deployment.rs index 58bc35528f..7d4d9f72c5 100644 --- a/nexus/types/src/deployment.rs +++ b/nexus/types/src/deployment.rs @@ -42,6 +42,7 @@ use uuid::Uuid; mod blueprint_diff; mod blueprint_display; +pub mod execution; mod network_resources; mod planning_input; mod tri_map; diff --git a/nexus/types/src/deployment/execution.rs b/nexus/types/src/deployment/execution.rs new file mode 100644 index 0000000000..16bf73873a --- /dev/null +++ b/nexus/types/src/deployment/execution.rs @@ -0,0 +1,54 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use update_engine::StepSpec; + +/// The specification for reconfigurator execution events. +#[derive(JsonSchema)] +pub enum ReconfiguratorExecutionSpec {} + +update_engine::define_update_engine!(pub ReconfiguratorExecutionSpec); + +impl StepSpec for ReconfiguratorExecutionSpec { + type Component = ExecutionComponent; + type StepId = ExecutionStepId; + type StepMetadata = serde_json::Value; + type ProgressMetadata = serde_json::Value; + type CompletionMetadata = serde_json::Value; + type SkippedMetadata = serde_json::Value; + type Error = anyhow::Error; +} + +/// Components for reconfigurator execution. +#[derive( + Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize, JsonSchema, +)] +pub enum ExecutionComponent { + ExternalNetworking, + SledList, + PhysicalDisks, + OmicronZones, + FirewallRules, + DatasetRecords, + Dns, + Cockroach, +} + +/// Steps for reconfigurator execution. +#[derive( + Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize, JsonSchema, +)] +pub enum ExecutionStepId { + /// Fetch information that will be used in subsequent steps. + Fetch, + Add, + Remove, + /// Idempotent "ensure" or "deploy" step that delegates removes and adds to + /// other parts of the system. + Ensure, + /// Finalize the blueprint and check for errors at the end of execution. + Finalize, +} diff --git a/update-engine/Cargo.toml b/update-engine/Cargo.toml index 5c8343a432..5a22ac66f5 100644 --- a/update-engine/Cargo.toml +++ b/update-engine/Cargo.toml @@ -10,10 +10,12 @@ workspace = true [dependencies] anyhow.workspace = true cancel-safe-futures.workspace = true +chrono.workspace = true debug-ignore.workspace = true derive-where.workspace = true either.workspace = true futures.workspace = true +indent_write.workspace = true indexmap.workspace = true libsw.workspace = true linear-map.workspace = true @@ -37,6 +39,7 @@ camino.workspace = true camino-tempfile.workspace = true clap.workspace = true indicatif.workspace = true +indoc.workspace = true omicron-test-utils.workspace = true owo-colors.workspace = true supports-color.workspace = true diff --git a/update-engine/src/buffer.rs b/update-engine/src/buffer.rs index 2359ecc03f..e3ac02458a 100644 --- a/update-engine/src/buffer.rs +++ b/update-engine/src/buffer.rs @@ -17,6 +17,7 @@ use indexmap::IndexMap; use petgraph::{prelude::*, visit::Walker}; use crate::{ + display::AbortMessageDisplay, events::{ Event, EventReport, ProgressEvent, ProgressEventKind, StepEvent, StepEventKind, StepEventPriority, StepInfo, StepOutcome, @@ -122,6 +123,16 @@ impl EventBuffer { EventBufferSteps::new(&self.event_store) } + /// Iterates over all known steps in the buffer in a recursive fashion. + /// + /// The iterator is depth-first and pre-order (i.e. for nested steps, the + /// parent step is visited before the child steps). + pub fn iter_steps_recursive( + &self, + ) -> impl Iterator)> { + self.event_store.event_map_value_dfs() + } + /// Returns information about the given step, as currently tracked by the /// buffer. pub fn get(&self, step_key: &StepKey) -> Option<&EventBufferStepData> { @@ -1271,6 +1282,40 @@ impl StepStatus { matches!(self, Self::Running { .. }) } + /// For completed steps, return the completion reason, otherwise None. + pub fn completion_reason(&self) -> Option<&CompletionReason> { + match self { + Self::Completed { reason, .. } => Some(reason), + _ => None, + } + } + + /// For failed steps, return the failure reason, otherwise None. + pub fn failure_reason(&self) -> Option<&FailureReason> { + match self { + Self::Failed { reason, .. } => Some(reason), + _ => None, + } + } + + /// For aborted steps, return the abort reason, otherwise None. + pub fn abort_reason(&self) -> Option<&AbortReason> { + // TODO: probably want to move last_progress into the `AbortReason` + // enum so that we can return it in a reasonable manner here. + match self { + Self::Aborted { reason, .. } => Some(reason), + _ => None, + } + } + + /// For will-not-be-run steps, return the reason, otherwise None. + pub fn will_not_be_run_reason(&self) -> Option<&WillNotBeRunReason> { + match self { + Self::WillNotBeRun { reason } => Some(reason), + _ => None, + } + } + /// Returns low-priority events for this step, if any. /// /// Events are sorted by event index. @@ -1406,6 +1451,16 @@ impl AbortReason { Self::ParentAborted { .. } => None, } } + + /// Returns a displayer for the message. + /// + /// The buffer is used to resolve step keys to step names. + pub fn message_display<'a, S: StepSpec>( + &'a self, + buffer: &'a EventBuffer, + ) -> AbortMessageDisplay<'a, S> { + AbortMessageDisplay::new(self, buffer) + } } #[derive(Clone, Debug)] diff --git a/update-engine/src/display/group_display.rs b/update-engine/src/display/group_display.rs index 9e75b64757..7d99150a9f 100644 --- a/update-engine/src/display/group_display.rs +++ b/update-engine/src/display/group_display.rs @@ -184,8 +184,13 @@ impl GroupDisplay { pub fn write_stats(&mut self, header: &str) -> std::io::Result<()> { // Add a blank prefix which is equal to the maximum width of known prefixes. let prefix = " ".repeat(self.max_width); - let mut line = - self.formatter.start_line(&prefix, Some(self.start_sw.elapsed())); + let mut line = self.formatter.start_line( + &prefix, + // TODO: we don't currently support setting a start time for group + // displays. We should do that at some point. + None, + Some(self.start_sw.elapsed()), + ); self.stats.format_line(&mut line, header, &self.formatter); writeln!(self.writer, "{line}") } diff --git a/update-engine/src/display/line_display.rs b/update-engine/src/display/line_display.rs index 5321ec017c..f6005a9f9e 100644 --- a/update-engine/src/display/line_display.rs +++ b/update-engine/src/display/line_display.rs @@ -4,6 +4,7 @@ // Copyright 2023 Oxide Computer Company +use chrono::{DateTime, Utc}; use debug_ignore::DebugIgnore; use derive_where::derive_where; use owo_colors::Style; @@ -50,6 +51,16 @@ impl LineDisplay { self.formatter.set_styles(styles); } + /// Sets the start time for all future lines. + /// + /// If the start time is set, then the progress display will be relative to + /// that time. Otherwise, only the offset from the start of the job will be + /// displayed. + #[inline] + pub fn set_start_time(&mut self, start_time: DateTime) { + self.shared.set_start_time(start_time); + } + /// Sets the amount of time before the next progress event is shown. #[inline] pub fn set_progress_interval(&mut self, interval: Duration) { diff --git a/update-engine/src/display/line_display_shared.rs b/update-engine/src/display/line_display_shared.rs index e31d36dcd7..73a0e44e19 100644 --- a/update-engine/src/display/line_display_shared.rs +++ b/update-engine/src/display/line_display_shared.rs @@ -9,9 +9,11 @@ use std::{ collections::HashMap, fmt::{self, Write as _}, + sync::LazyLock, time::Duration, }; +use chrono::{DateTime, Utc}; use owo_colors::OwoColorize; use swrite::{swrite, SWrite as _}; @@ -33,6 +35,8 @@ pub(super) const HEADER_WIDTH: usize = 9; #[derive(Debug, Default)] pub(super) struct LineDisplayShared { + // The start time, if provided. + start_time: Option>, // This is a map from root execution ID to data about it. execution_data: HashMap, } @@ -45,6 +49,10 @@ impl LineDisplayShared { ) -> LineDisplaySharedContext<'a> { LineDisplaySharedContext { shared: self, prefix, formatter } } + + pub(super) fn set_start_time(&mut self, start_time: DateTime) { + self.start_time = Some(start_time); + } } #[derive(Debug)] @@ -60,7 +68,11 @@ impl<'a> LineDisplaySharedContext<'a> { /// This line does not have a trailing newline; adding one is the caller's /// responsibility. pub(super) fn format_generic(&self, message: &str) -> String { - let mut line = self.formatter.start_line(self.prefix, None); + let mut line = self.formatter.start_line( + self.prefix, + self.shared.start_time, + None, + ); line.push_str(message); line } @@ -134,9 +146,11 @@ impl<'a> LineDisplaySharedContext<'a> { ) { match &step_event.kind { StepEventKind::NoStepsDefined => { - let mut line = self - .formatter - .start_line(self.prefix, Some(step_event.total_elapsed)); + let mut line = self.formatter.start_line( + self.prefix, + self.shared.start_time, + Some(step_event.total_elapsed), + ); swrite!( line, "{}", @@ -152,9 +166,11 @@ impl<'a> LineDisplaySharedContext<'a> { &first_step.info, &nest_data, ); - let mut line = self - .formatter - .start_line(self.prefix, Some(root_total_elapsed)); + let mut line = self.formatter.start_line( + self.prefix, + self.shared.start_time, + Some(root_total_elapsed), + ); swrite!( line, @@ -178,9 +194,11 @@ impl<'a> LineDisplaySharedContext<'a> { &nest_data, ); - let mut line = self - .formatter - .start_line(self.prefix, Some(root_total_elapsed)); + let mut line = self.formatter.start_line( + self.prefix, + self.shared.start_time, + Some(root_total_elapsed), + ); swrite!( line, @@ -224,9 +242,11 @@ impl<'a> LineDisplaySharedContext<'a> { &nest_data, ); - let mut line = self - .formatter - .start_line(self.prefix, Some(root_total_elapsed)); + let mut line = self.formatter.start_line( + self.prefix, + self.shared.start_time, + Some(root_total_elapsed), + ); swrite!( line, @@ -270,9 +290,11 @@ impl<'a> LineDisplaySharedContext<'a> { &step.info, &nest_data, ); - let mut line = self - .formatter - .start_line(self.prefix, Some(root_total_elapsed)); + let mut line = self.formatter.start_line( + self.prefix, + self.shared.start_time, + Some(root_total_elapsed), + ); self.formatter.add_completion_and_step_info( &mut line, @@ -293,9 +315,11 @@ impl<'a> LineDisplaySharedContext<'a> { &nest_data, ); - let mut line = self - .formatter - .start_line(self.prefix, Some(root_total_elapsed)); + let mut line = self.formatter.start_line( + self.prefix, + self.shared.start_time, + Some(root_total_elapsed), + ); self.format_step_running(&mut line, ld_step_info); @@ -315,9 +339,11 @@ impl<'a> LineDisplaySharedContext<'a> { &nest_data, ); - let mut line = self - .formatter - .start_line(self.prefix, Some(root_total_elapsed)); + let mut line = self.formatter.start_line( + self.prefix, + self.shared.start_time, + Some(root_total_elapsed), + ); self.formatter.add_completion_and_step_info( &mut line, @@ -344,9 +370,11 @@ impl<'a> LineDisplaySharedContext<'a> { &nest_data, ); - let mut line = self - .formatter - .start_line(self.prefix, Some(root_total_elapsed)); + let mut line = self.formatter.start_line( + self.prefix, + self.shared.start_time, + Some(root_total_elapsed), + ); // The prefix is used for "Caused by" lines below. Add // the requisite amount of spacing here. let mut caused_by_prefix = line.clone(); @@ -387,9 +415,11 @@ impl<'a> LineDisplaySharedContext<'a> { &nest_data, ); - let mut line = self - .formatter - .start_line(self.prefix, Some(root_total_elapsed)); + let mut line = self.formatter.start_line( + self.prefix, + self.shared.start_time, + Some(root_total_elapsed), + ); swrite!( line, @@ -463,8 +493,11 @@ impl<'a> LineDisplaySharedContext<'a> { &self, info: &ExecutionTerminalInfo, ) -> String { - let mut line = - self.formatter.start_line(self.prefix, info.leaf_total_elapsed); + let mut line = self.formatter.start_line( + self.prefix, + self.shared.start_time, + info.leaf_total_elapsed, + ); match info.kind { TerminalKind::Completed => { swrite!( @@ -540,9 +573,11 @@ impl<'a> LineDisplaySharedContext<'a> { nest_data: &nest_data, }; - let mut line = self - .formatter - .start_line(self.prefix, Some(root_total_elapsed)); + let mut line = self.formatter.start_line( + self.prefix, + self.shared.start_time, + Some(root_total_elapsed), + ); let (before, after) = match progress { Some(counter) => { @@ -685,6 +720,7 @@ impl LineDisplayFormatter { pub(super) fn start_line( &self, prefix: &str, + start_time: Option>, total_elapsed: Option, ) -> String { let mut line = format!("[{}", prefix.style(self.styles.prefix_style)); @@ -694,17 +730,31 @@ impl LineDisplayFormatter { } // Show total elapsed time in an hh:mm:ss format. - if let Some(total_elapsed) = total_elapsed { - let total_elapsed_secs = total_elapsed.as_secs(); - let hours = total_elapsed_secs / 3600; - let minutes = (total_elapsed_secs % 3600) / 60; - let seconds = total_elapsed_secs % 60; - swrite!(line, "{:02}:{:02}:{:02}", hours, minutes, seconds); - // To show total_elapsed more accurately, use: - // swrite!(line, "{:.2?}", total_elapsed); - } else { - // Add 8 spaces to align with hh:mm:ss. - line.push_str(" "); + match (start_time, total_elapsed) { + (Some(start_time), Some(total_elapsed)) => { + // Add the offset from the start time. + let current_time = start_time + total_elapsed; + swrite!( + line, + "{}", + current_time.format_with_items(DATETIME_FORMAT.iter()) + ); + } + (None, Some(total_elapsed)) => { + let total_elapsed_secs = total_elapsed.as_secs(); + let hours = total_elapsed_secs / 3600; + let minutes = (total_elapsed_secs % 3600) / 60; + let seconds = total_elapsed_secs % 60; + swrite!(line, "{:02}:{:02}:{:02}", hours, minutes, seconds); + // To show total_elapsed more accurately, use: + // swrite!(line, "{:.2?}", total_elapsed); + } + (Some(_), None) => { + line.push_str(DATETIME_FORMAT_INDENT); + } + (None, None) => { + line.push_str(ELAPSED_FORMAT_INDENT); + } } line.push_str("] "); @@ -874,6 +924,23 @@ impl LineDisplayFormatter { } } +static DATETIME_FORMAT: LazyLock>> = + LazyLock::new(|| { + // The format is "Jan 01 00:00:00". + // + // We can add customization in the future, but we want to restrict + // formats to fixed-width so we know how to align them. + chrono::format::StrftimeItems::new("%b %d %H:%M:%S") + .parse() + .expect("datetime format is valid") + }); + +// "Jan 01 00:00:00" is 15 characters wide. +const DATETIME_FORMAT_INDENT: &str = " "; + +// "00:00:00" is 8 characters wide. +const ELAPSED_FORMAT_INDENT: &str = " "; + #[derive(Clone, Debug)] pub(super) struct LineDisplayOutput { lines: Vec, @@ -989,6 +1056,8 @@ impl fmt::Display for AsLetters { #[cfg(test)] mod tests { + use chrono::TimeZone; + use super::*; #[test] @@ -1010,4 +1079,32 @@ mod tests { ); } } + + #[test] + fn test_start_line() { + let formatter = LineDisplayFormatter::new(); + let prefix = "prefix"; + let start_time = Utc.with_ymd_and_hms(2023, 2, 8, 3, 40, 56).unwrap(); + + assert_eq!( + formatter.start_line(prefix, None, None), + "[prefix ] ", + ); + assert_eq!( + formatter.start_line(prefix, None, Some(Duration::from_secs(5))), + "[prefix 00:00:05] ", + ); + assert_eq!( + formatter.start_line(prefix, Some(start_time), None), + "[prefix ] ", + ); + assert_eq!( + formatter.start_line( + prefix, + Some(start_time), + Some(Duration::from_secs(3600)), + ), + "[prefix Feb 08 04:40:56] ", + ); + } } diff --git a/update-engine/src/display/utils.rs b/update-engine/src/display/utils.rs index 08790f352b..f026fb6a1c 100644 --- a/update-engine/src/display/utils.rs +++ b/update-engine/src/display/utils.rs @@ -6,6 +6,8 @@ use std::fmt; +use crate::{AbortReason, EventBuffer, StepSpec}; + /// Given current and total, displays `{current}/{total}`. /// /// * If the `index_and_total` constructor is called, then `current` is `index @@ -106,3 +108,46 @@ impl ToU64 for u64 { self } } + +/// Displays the message for an execution abort. +/// +/// Returned by [`AbortReason::message_display`]. +pub struct AbortMessageDisplay<'a, S: StepSpec> { + reason: &'a AbortReason, + buffer: &'a EventBuffer, + // TODO: color +} + +impl<'a, S: StepSpec> AbortMessageDisplay<'a, S> { + /// Create a new `AbortMessageDisplay`. + pub(crate) fn new( + reason: &'a AbortReason, + buffer: &'a EventBuffer, + ) -> Self { + Self { reason, buffer } + } +} + +impl<'a, S: StepSpec> fmt::Display for AbortMessageDisplay<'a, S> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.reason { + AbortReason::StepAborted(info) => { + write!(f, "{}", info.message) + } + AbortReason::ParentAborted { parent_step, parent_info } => { + let parent_description = + if let Some(step) = self.buffer.get(parent_step) { + &step.step_info().description + } else { + "unknown step" + }; + + write!( + f, + "parent step \"{}\" aborted with: {}", + parent_description, parent_info.message + ) + } + } + } +} diff --git a/update-engine/src/spec.rs b/update-engine/src/spec.rs index 0dfe632181..0ec346a423 100644 --- a/update-engine/src/spec.rs +++ b/update-engine/src/spec.rs @@ -4,8 +4,10 @@ // Copyright 2023 Oxide Computer Company -use std::{fmt, marker::PhantomData}; +use std::{fmt, fmt::Write, marker::PhantomData}; +use anyhow::anyhow; +use indent_write::fmt::IndentWriter; use schemars::JsonSchema; use serde::{de::DeserializeOwned, Serialize}; @@ -144,6 +146,8 @@ pub type NestedSpec = GenericSpec; /// set of nested errors. #[derive(Clone, Debug)] pub struct NestedError { + // TODO: in reality is this used more as a "serializable error" -- we + // should rename this. message: String, source: Option>, } @@ -171,6 +175,16 @@ impl NestedError { } Self { message, source: next.map(Box::new) } } + + /// Returns the message associated with this error. + pub fn message(&self) -> &str { + &self.message + } + + /// Returns the causes of this error as an iterator. + pub fn sources(&self) -> NestedErrorSources<'_> { + NestedErrorSources { current: self.source.as_deref() } + } } impl fmt::Display for NestedError { @@ -185,6 +199,22 @@ impl std::error::Error for NestedError { } } +/// The sources of a nested error as an iterator. +#[derive(Debug)] +pub struct NestedErrorSources<'a> { + current: Option<&'a NestedError>, +} + +impl<'a> Iterator for NestedErrorSources<'a> { + type Item = &'a NestedError; + + fn next(&mut self) -> Option { + let current = self.current?; + self.current = current.source.as_deref(); + Some(current) + } +} + mod nested_error_serde { use super::*; use serde::Deserialize; @@ -244,3 +274,126 @@ impl AsError for anyhow::Error { self.as_ref() } } + +/// A temporary hack to convert a list of anyhow errors into a single +/// `anyhow::Error`. If no errors are provided, panic (this should be handled +/// at a higher level). +/// +/// Eventually we should gain first-class support for representing errors as +/// trees, but this will do for now. +pub fn merge_anyhow_list(errors: I) -> anyhow::Error +where + I: IntoIterator, +{ + let mut iter = errors.into_iter().peekable(); + // How many errors are there? + let Some(first_error) = iter.next() else { + // No errors: panic. + panic!("error_list_to_anyhow called with no errors"); + }; + + if iter.peek().is_none() { + // One error. + return first_error; + } + + // Multiple errors. + let mut out = String::new(); + let mut nerrors = 0; + for error in std::iter::once(first_error).chain(iter) { + if nerrors > 0 { + // Separate errors with a newline (we want there to not be a + // trailing newline to match anyhow generally). + writeln!(&mut out).unwrap(); + } + nerrors += 1; + let mut current = error.as_error(); + + let mut writer = IndentWriter::new_skip_initial(" ", &mut out); + write!(writer, "Error: {current}").unwrap(); + + while let Some(cause) = current.source() { + // This newline is not part of the `IndentWriter`'s output so that + // it is unaffected by the indent logic. + writeln!(&mut out).unwrap(); + + // The spaces align the causes with the "Error: " above. + let mut writer = + IndentWriter::new_skip_initial(" ", &mut out); + write!(writer, " - {cause}").unwrap(); + current = cause; + } + } + anyhow!(out).context(format!("{nerrors} errors encountered")) +} + +#[cfg(test)] +mod tests { + use indoc::indoc; + + use super::*; + + #[test] + fn test_merge_anyhow_list() { + // A single error stays as-is. + let error = anyhow!("base").context("parent").context("root"); + + let merged = merge_anyhow_list(vec![error]); + assert_eq!( + format!("{:?}", merged), + indoc! {" + root + + Caused by: + 0: parent + 1: base" + }, + ); + + // Multiple errors are merged. + let error1 = + anyhow!("base1").context("parent1\nparent1 line2").context("root1"); + let error2 = anyhow!("base2").context("parent2").context("root2"); + + let merged = merge_anyhow_list(vec![error1, error2]); + let merged_debug = format!("{:?}", merged); + println!("merged debug: {}", merged_debug); + + assert_eq!( + merged_debug, + indoc! {" + 2 errors encountered + + Caused by: + Error: root1 + - parent1 + parent1 line2 + - base1 + Error: root2 + - parent2 + - base2" + }, + ); + + // Ensure that this still looks fine if there's even more context. + let error3 = merged.context("overall root"); + let error3_debug = format!("{:?}", error3); + println!("error3 debug: {}", error3_debug); + assert_eq!( + error3_debug, + indoc! {" + overall root + + Caused by: + 0: 2 errors encountered + 1: Error: root1 + - parent1 + parent1 line2 + - base1 + Error: root2 + - parent2 + - base2" + }, + ); + } +}