diff --git a/Cargo.lock b/Cargo.lock index cc10947083..8c6c3f21be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3722,6 +3722,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "is_ci" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "616cde7c720bb2bb5824a224687d8f77bfd38922027f01d825cd7453be5099fb" + [[package]] name = "itertools" version = "0.10.5" @@ -8714,6 +8720,16 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" +[[package]] +name = "supports-color" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6398cde53adc3c4557306a96ce67b302968513830a77a95b2b17305d9719a89" +dependencies = [ + "is-terminal", + "is_ci", +] + [[package]] name = "swrite" version = "0.1.0" @@ -9733,6 +9749,7 @@ dependencies = [ "camino", "camino-tempfile", "cancel-safe-futures", + "clap 4.4.3", "debug-ignore", "derive-where", "either", @@ -9749,8 +9766,11 @@ dependencies = [ "serde_json", "serde_with", "slog", + "supports-color", + "swrite", "tokio", "tokio-stream", + "unicode-width", "uuid", ] @@ -10110,6 +10130,7 @@ dependencies = [ "ciborium", "clap 4.4.3", "crossterm 0.27.0", + "debug-ignore", "futures", "hex", "humantime", @@ -10133,6 +10154,7 @@ dependencies = [ "slog-async", "slog-envlogger", "slog-term", + "supports-color", "tempfile", "textwrap 0.16.0", "tokio", @@ -10142,6 +10164,7 @@ dependencies = [ "tui-tree-widget", "unicode-width", "update-engine", + "uuid", "wicket-common", "wicketd-client", "zeroize", diff --git a/Cargo.toml b/Cargo.toml index 63b18a9f9a..184620787c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -345,6 +345,7 @@ static_assertions = "1.1.0" steno = "0.4.0" strum = { version = "0.25", features = [ "derive" ] } subprocess = "0.2.9" +supports-color = "2.1.0" swrite = "0.1.0" libsw = { version = "3.3.0", features = ["tokio"] } syn = { version = "2.0" } diff --git a/update-engine/Cargo.toml b/update-engine/Cargo.toml index af988bf091..12e718e902 100644 --- a/update-engine/Cargo.toml +++ b/update-engine/Cargo.toml @@ -13,13 +13,16 @@ either.workspace = true futures.workspace = true indexmap.workspace = true linear-map.workspace = true +owo-colors.workspace = true petgraph.workspace = true serde.workspace = true serde_json.workspace = true serde_with.workspace = true schemars = { workspace = true, features = ["uuid1"] } slog.workspace = true +swrite.workspace = true tokio = { workspace = true, features = ["macros", "sync", "time", "rt-multi-thread"] } +unicode-width.workspace = true uuid.workspace = true omicron-workspace-hack.workspace = true @@ -28,8 +31,10 @@ buf-list.workspace = true bytes.workspace = true camino.workspace = true camino-tempfile.workspace = true +clap.workspace = true indicatif.workspace = true omicron-test-utils.workspace = true owo-colors.workspace = true +supports-color.workspace = true tokio = { workspace = true, features = ["io-util"] } tokio-stream.workspace = true diff --git a/update-engine/examples/update-engine-basic/display.rs b/update-engine/examples/update-engine-basic/display.rs index e6b80e3637..122777211b 100644 --- a/update-engine/examples/update-engine-basic/display.rs +++ b/update-engine/examples/update-engine-basic/display.rs @@ -12,28 +12,135 @@ use indexmap::{map::Entry, IndexMap}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use owo_colors::OwoColorize; use tokio::{sync::mpsc, task::JoinHandle}; -use update_engine::events::ProgressCounter; +use update_engine::{ + display::{GroupDisplay, LineDisplay, LineDisplayStyles}, + events::ProgressCounter, +}; -use crate::spec::{ - Event, ExampleComponent, ExampleStepId, ExampleStepMetadata, ProgressEvent, - ProgressEventKind, StepEventKind, StepInfoWithMetadata, StepOutcome, +use crate::{ + spec::{ + Event, EventBuffer, ExampleComponent, ExampleStepId, + ExampleStepMetadata, ProgressEvent, ProgressEventKind, StepEventKind, + StepInfoWithMetadata, StepOutcome, + }, + DisplayStyle, }; /// An example that displays an event stream on the command line. pub(crate) fn make_displayer( log: &slog::Logger, + display_style: DisplayStyle, + prefix: Option, ) -> (JoinHandle>, mpsc::Sender) { let (sender, receiver) = mpsc::channel(512); let log = log.clone(); let join_handle = - tokio::task::spawn( - async move { display_messages(&log, receiver).await }, - ); + match display_style { + DisplayStyle::ProgressBar => tokio::task::spawn(async move { + display_progress_bar(&log, receiver).await + }), + DisplayStyle::Line => tokio::task::spawn(async move { + display_line(&log, receiver, prefix).await + }), + DisplayStyle::Group => tokio::task::spawn(async move { + display_group(&log, receiver).await + }), + }; (join_handle, sender) } -async fn display_messages( +async fn display_line( + log: &slog::Logger, + mut receiver: mpsc::Receiver, + prefix: Option, +) -> Result<()> { + slog::info!(log, "setting up display"); + let mut buffer = EventBuffer::new(8); + let mut display = LineDisplay::new(std::io::stdout()); + // For now, always colorize. TODO: figure out whether colorization should be + // done based on always/auto/never etc. + if supports_color::on(supports_color::Stream::Stdout).is_some() { + display.set_styles(LineDisplayStyles::colorized()); + } + if let Some(prefix) = prefix { + display.set_prefix(prefix); + } + display.set_progress_interval(Duration::from_millis(50)); + while let Some(event) = receiver.recv().await { + buffer.add_event(event); + display.write_event_buffer(&buffer)?; + } + + Ok(()) +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)] +enum GroupDisplayKey { + Example, + Other, +} + +async fn display_group( + log: &slog::Logger, + mut receiver: mpsc::Receiver, +) -> Result<()> { + slog::info!(log, "setting up display"); + + let mut display = GroupDisplay::new( + [ + (GroupDisplayKey::Example, "example"), + (GroupDisplayKey::Other, "other"), + ], + std::io::stdout(), + ); + // For now, always colorize. TODO: figure out whether colorization should be + // done based on always/auto/never etc. + if supports_color::on(supports_color::Stream::Stdout).is_some() { + display.set_styles(LineDisplayStyles::colorized()); + } + + display.set_progress_interval(Duration::from_millis(50)); + + let mut example_buffer = EventBuffer::default(); + let mut example_buffer_last_seen = None; + let mut other_buffer = EventBuffer::default(); + let mut other_buffer_last_seen = None; + + let mut interval = tokio::time::interval(Duration::from_secs(2)); + interval.tick().await; + + loop { + tokio::select! { + _ = interval.tick() => { + // Print out status lines every 2 seconds. + display.write_stats("Status")?; + } + event = receiver.recv() => { + let Some(event) = event else { break }; + example_buffer.add_event(event.clone()); + other_buffer.add_event(event); + + display.add_event_report( + &GroupDisplayKey::Example, + example_buffer.generate_report_since(&mut example_buffer_last_seen), + )?; + display.add_event_report( + &GroupDisplayKey::Other, + other_buffer.generate_report_since(&mut other_buffer_last_seen), + )?; + display.write_events()?; + } + } + } + + // Print status at the end. + display.write_stats("Summary")?; + + Ok(()) +} + +async fn display_progress_bar( log: &slog::Logger, mut receiver: mpsc::Receiver, ) -> Result<()> { diff --git a/update-engine/examples/update-engine-basic/main.rs b/update-engine/examples/update-engine-basic/main.rs index 260473edde..339db1a450 100644 --- a/update-engine/examples/update-engine-basic/main.rs +++ b/update-engine/examples/update-engine-basic/main.rs @@ -4,85 +4,139 @@ // Copyright 2023 Oxide Computer Company -use std::time::Duration; +use std::{io::IsTerminal, time::Duration}; -use anyhow::{bail, Context}; +use anyhow::{bail, Context, Result}; use buf_list::BufList; use bytes::Buf; use camino::Utf8PathBuf; use camino_tempfile::Utf8TempDir; +use clap::{Parser, ValueEnum}; use display::make_displayer; use omicron_test_utils::dev::test_setup_log; use spec::{ - ComponentRegistrar, ExampleCompletionMetadata, ExampleComponent, - ExampleSpec, ExampleStepId, ExampleStepMetadata, ExampleWriteSpec, - ExampleWriteStepId, StepHandle, StepProgress, StepSkipped, StepWarning, - UpdateEngine, + ComponentRegistrar, EventBuffer, ExampleCompletionMetadata, + ExampleComponent, ExampleSpec, ExampleStepId, ExampleStepMetadata, + ExampleWriteSpec, ExampleWriteStepId, StepHandle, StepProgress, + StepSkipped, StepWarning, UpdateEngine, +}; +use tokio::{io::AsyncWriteExt, sync::mpsc}; +use update_engine::{ + events::{Event, ProgressUnits}, + StepContext, StepSuccess, }; -use tokio::io::AsyncWriteExt; -use update_engine::{events::ProgressUnits, StepContext, StepSuccess}; mod display; mod spec; #[tokio::main(worker_threads = 2)] -async fn main() { - let logctx = test_setup_log("update_engine_basic_example"); - - let context = ExampleContext::new(&logctx.log); - let (display_handle, sender) = make_displayer(&logctx.log); - - let engine = UpdateEngine::new(&logctx.log, sender); - - // Download component 1. - let component_1 = engine.for_component(ExampleComponent::Component1); - let download_handle_1 = context.register_download_step( - &component_1, - "https://www.example.org".to_owned(), - 1_048_576, - ); - - // An example of a skipped step for component 1. - context.register_skipped_step(&component_1); - - // Create temporary directories for component 1. - let temp_dirs_handle_1 = - context.register_create_temp_dirs_step(&component_1, 2); - - // Write component 1 out to disk. - context.register_write_step( - &component_1, - download_handle_1, - temp_dirs_handle_1, - None, - ); - - // Download component 2. - let component_2 = engine.for_component(ExampleComponent::Component2); - let download_handle_2 = context.register_download_step( - &component_2, - "https://www.example.com".to_owned(), - 1_048_576 * 8, - ); - - // Create temporary directories for component 2. - let temp_dirs_handle_2 = - context.register_create_temp_dirs_step(&component_2, 3); - - // Now write component 2 out to disk. - context.register_write_step( - &component_2, - download_handle_2, - temp_dirs_handle_2, - Some(1), - ); - - _ = engine.execute().await; - - // Wait until all messages have been received by the displayer. - _ = display_handle.await; - - // Do not clean up the log file so people can inspect it. +async fn main() -> Result<()> { + let app = App::parse(); + app.exec().await +} + +#[derive(Debug, Parser)] +struct App { + /// Display style to use. + #[clap(long, short = 's', default_value_t, value_enum)] + display_style: DisplayStyleOpt, + + /// Prefix to set on all log messages with display-style=line. + #[clap(long, short = 'p')] + prefix: Option, +} + +impl App { + async fn exec(self) -> Result<()> { + let logctx = test_setup_log("update_engine_basic_example"); + + let display_style = match self.display_style { + DisplayStyleOpt::ProgressBar => DisplayStyle::ProgressBar, + DisplayStyleOpt::Line => DisplayStyle::Line, + DisplayStyleOpt::Group => DisplayStyle::Group, + DisplayStyleOpt::Auto => { + if std::io::stdout().is_terminal() { + DisplayStyle::ProgressBar + } else { + DisplayStyle::Line + } + } + }; + + let context = ExampleContext::new(&logctx.log); + let (display_handle, sender) = + make_displayer(&logctx.log, display_style, self.prefix); + + let engine = UpdateEngine::new(&logctx.log, sender); + + // Download component 1. + let component_1 = engine.for_component(ExampleComponent::Component1); + let download_handle_1 = context.register_download_step( + &component_1, + "https://www.example.org".to_owned(), + 1_048_576, + ); + + // An example of a skipped step for component 1. + context.register_skipped_step(&component_1); + + // Create temporary directories for component 1. + let temp_dirs_handle_1 = + context.register_create_temp_dirs_step(&component_1, 2); + + // Write component 1 out to disk. + context.register_write_step( + &component_1, + download_handle_1, + temp_dirs_handle_1, + None, + ); + + // Download component 2. + let component_2 = engine.for_component(ExampleComponent::Component2); + let download_handle_2 = context.register_download_step( + &component_2, + "https://www.example.com".to_owned(), + 1_048_576 * 8, + ); + + // Create temporary directories for component 2. + let temp_dirs_handle_2 = + context.register_create_temp_dirs_step(&component_2, 3); + + // Now write component 2 out to disk. + context.register_write_step( + &component_2, + download_handle_2, + temp_dirs_handle_2, + Some(1), + ); + + _ = engine.execute().await; + + // Wait until all messages have been received by the displayer. + _ = display_handle.await; + + // Do not clean up the log file so people can inspect it. + + Ok(()) + } +} + +#[derive(Copy, Clone, Debug, Default, ValueEnum)] +enum DisplayStyleOpt { + ProgressBar, + Line, + Group, + #[default] + Auto, +} + +#[derive(Copy, Clone, Debug)] +enum DisplayStyle { + ProgressBar, + Line, + Group, } /// Context shared across steps. This forms the lifetime "'a" defined by the @@ -146,9 +200,30 @@ impl ExampleContext { ({num_bytes} bytes)", ); - // Try a second time, and this time go all the way to 100%. + // Try a second time, and this time go to 80%. let mut buf_list = BufList::new(); - for i in 0..10 { + for i in 0..8 { + tokio::time::sleep(Duration::from_millis(100)).await; + cx.send_progress(StepProgress::with_current_and_total( + num_bytes * i / 10, + num_bytes, + ProgressUnits::BYTES, + serde_json::Value::Null, + )) + .await; + buf_list.push_chunk(&b"downloaded-data"[..]); + } + + // Now indicate a progress reset. + cx.send_progress(StepProgress::reset( + serde_json::Value::Null, + "Progress reset", + )) + .await; + + // Try again. + let mut buf_list = BufList::new(); + for i in 0..8 { tokio::time::sleep(Duration::from_millis(100)).await; cx.send_progress(StepProgress::with_current_and_total( num_bytes * i / 10, @@ -243,6 +318,7 @@ impl ExampleContext { cx.with_nested_engine(|engine| { register_nested_write_steps( + &self.log, engine, component, &destinations, @@ -282,6 +358,7 @@ impl ExampleContext { } fn register_nested_write_steps<'a>( + log: &'a slog::Logger, engine: &mut UpdateEngine<'a, ExampleWriteSpec>, component: ExampleComponent, destinations: &'a [Utf8PathBuf], @@ -307,6 +384,38 @@ fn register_nested_write_steps<'a>( Default::default(), )) .await; + + let mut remote_engine_receiver = create_remote_engine( + log, + component, + buf_list.clone(), + destination.clone(), + ); + let mut buffer = EventBuffer::default(); + let mut last_seen = None; + while let Some(event) = remote_engine_receiver.recv().await + { + // Only send progress up to 50% to demonstrate + // not receiving full progress. + if let Event::Progress(event) = &event { + if let Some(counter) = event.kind.progress_counter() + { + if let Some(total) = counter.total { + if counter.current > total / 2 { + break; + } + } + } + } + + buffer.add_event(event); + let report = + buffer.generate_report_since(&mut last_seen); + cx.send_nested_report(report) + .await + .expect("this engine should never fail"); + } + let mut file = tokio::fs::File::create(destination) .await @@ -345,3 +454,50 @@ fn register_nested_write_steps<'a>( .register(); } } + +/// Sets up a remote engine that can be used to execute steps. +fn create_remote_engine( + log: &slog::Logger, + component: ExampleComponent, + mut buf_list: BufList, + destination: Utf8PathBuf, +) -> mpsc::Receiver> { + let (sender, receiver) = tokio::sync::mpsc::channel(128); + let engine = UpdateEngine::new(log, sender); + engine + .for_component(component) + .new_step( + ExampleWriteStepId::Write { destination: destination.clone() }, + format!("Writing to {destination} (remote, fake)"), + move |cx| async move { + let num_bytes = buf_list.num_bytes(); + let mut total_written = 0; + + while buf_list.has_remaining() { + tokio::time::sleep(Duration::from_millis(20)).await; + // Don't actually write these bytes -- this engine is just + // for demoing. + let written_bytes = + (num_bytes / 10).min(buf_list.num_bytes()); + total_written += written_bytes; + buf_list.advance(written_bytes); + cx.send_progress(StepProgress::with_current_and_total( + total_written as u64, + num_bytes as u64, + ProgressUnits::new_const("fake bytes"), + (), + )) + .await; + } + + StepSuccess::new(()).into() + }, + ) + .register(); + + tokio::spawn(async move { + engine.execute().await.expect("remote engine succeeded") + }); + + receiver +} diff --git a/update-engine/src/buffer.rs b/update-engine/src/buffer.rs index 3cb2e0849b..dc816acd02 100644 --- a/update-engine/src/buffer.rs +++ b/update-engine/src/buffer.rs @@ -107,6 +107,23 @@ impl EventBuffer { self.event_store.root_execution_id } + /// Returns information about terminal status for this buffer's root + /// execution ID, or None if the execution has not started or is currently + /// running. + pub fn root_terminal_info(&self) -> Option { + let Some(root_execution_id) = self.root_execution_id() else { + return None; + }; + + let summary = self.steps().summarize(); + summary + .get(&root_execution_id) + .expect("root execution ID must always be present in summary") + .execution_status + .terminal_info() + .cloned() + } + /// Returns information about each step, as currently tracked by the buffer, /// in order of when the events were first defined. pub fn steps(&self) -> EventBufferSteps<'_, S> { @@ -249,6 +266,7 @@ impl EventStore { &event, 0, None, + None, root_event_index, event.total_elapsed, ); @@ -256,6 +274,29 @@ impl EventStore { if new_execution.nest_level == 0 { self.root_execution_id = Some(new_execution.execution_id); } + // If there's a parent key, then what's the child index? + let parent_key_and_child_index = + if let Some(parent_key) = new_execution.parent_key { + match self.map.get_mut(&parent_key) { + Some(parent_data) => { + let child_index = parent_data.child_executions_seen; + parent_data.child_executions_seen += 1; + Some((parent_key, child_index)) + } + None => { + // This should never happen -- it indicates that the + // parent key was unknown. This can happen if we + // didn't receive an event regarding a parent + // execution being started. + // + // TODO: This should probably be an error that gets + // bubbled up to callers. + None + } + } + } else { + None + }; let total_steps = new_execution.steps_to_add.len(); for (new_step_key, new_step, sort_key) in new_execution.steps_to_add { @@ -264,6 +305,7 @@ impl EventStore { self.map.entry(new_step_key).or_insert_with(|| { EventBufferStepData::new( new_step, + parent_key_and_child_index, sort_key, new_execution.nest_level, total_steps, @@ -320,6 +362,7 @@ impl EventStore { &mut self, event: &StepEvent, nest_level: usize, + parent_key: Option, parent_sort_key: Option<&StepSortKey>, root_event_index: RootEventIndex, root_total_elapsed: Duration, @@ -348,6 +391,7 @@ impl EventStore { } new_execution = Some(NewExecutionAction { execution_id: event.execution_id, + parent_key, nest_level, steps_to_add, }); @@ -498,6 +542,7 @@ impl EventStore { let actions = self.recurse_for_step_event( nested_event, nest_level + 1, + Some(parent_key), parent_sort_key.as_ref(), root_event_index, root_total_elapsed, @@ -796,6 +841,9 @@ struct NewExecutionAction { // An execution ID corresponding to a new run, if seen. execution_id: ExecutionId, + // The parent key for this execution, if this is a nested step. + parent_key: Option, + // The nest level for this execution. nest_level: usize, @@ -857,12 +905,16 @@ impl<'buf, S: StepSpec> EventBufferSteps<'buf, S> { #[derive_where(Clone, Debug)] pub struct EventBufferStepData { step_info: StepInfo, + sort_key: StepSortKey, - // XXX: nest_level and total_steps are common to each execution, but are - // stored separately here. Should we store them in a separate map - // indexed by execution ID? + + // TODO: These steps are common to each execution, but are stored separately + // here. These should likely move into EventBufferExecutionData. + parent_key_and_child_index: Option<(StepKey, usize)>, nest_level: usize, total_steps: usize, + child_executions_seen: usize, + // Invariant: stored in order sorted by leaf event index. high_priority: Vec>, step_status: StepStatus, @@ -874,6 +926,7 @@ pub struct EventBufferStepData { impl EventBufferStepData { fn new( step_info: StepInfo, + parent_key_and_child_index: Option<(StepKey, usize)>, sort_key: StepSortKey, nest_level: usize, total_steps: usize, @@ -881,9 +934,11 @@ impl EventBufferStepData { ) -> Self { Self { step_info, + parent_key_and_child_index, sort_key, nest_level, total_steps, + child_executions_seen: 0, high_priority: Vec::new(), step_status: StepStatus::NotStarted, last_root_event_index: root_event_index, @@ -895,6 +950,11 @@ impl EventBufferStepData { &self.step_info } + #[inline] + pub fn parent_key_and_child_index(&self) -> Option<(StepKey, usize)> { + self.parent_key_and_child_index + } + #[inline] pub fn nest_level(&self) -> usize { self.nest_level @@ -905,6 +965,11 @@ impl EventBufferStepData { self.total_steps } + #[inline] + pub fn child_executions_seen(&self) -> usize { + self.child_executions_seen + } + #[inline] pub fn step_status(&self) -> &StepStatus { &self.step_status @@ -1561,6 +1626,20 @@ pub enum TerminalKind { Aborted, } +impl ExecutionStatus { + /// Returns the terminal status and the total amount of time elapsed, or + /// None if the execution has not reached a terminal state. + /// + /// The time elapsed might be None if the execution was interrupted and + /// completion information wasn't available. + pub fn terminal_info(&self) -> Option<&ExecutionTerminalInfo> { + match self { + Self::NotStarted | Self::Running { .. } => None, + Self::Terminal(info) => Some(info), + } + } +} + /// Keys for the event tree. #[derive(Copy, Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)] enum EventTreeNode { diff --git a/update-engine/src/display/group_display.rs b/update-engine/src/display/group_display.rs new file mode 100644 index 0000000000..9a541c3cab --- /dev/null +++ b/update-engine/src/display/group_display.rs @@ -0,0 +1,478 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +// Copyright 2023 Oxide Computer Company + +use std::{borrow::Borrow, collections::BTreeMap, fmt, time::Duration}; + +use owo_colors::OwoColorize; +use swrite::{swrite, SWrite}; +use tokio::time::Instant; +use unicode_width::UnicodeWidthStr; + +use crate::{ + errors::UnknownReportKey, events::EventReport, EventBuffer, + ExecutionTerminalInfo, StepSpec, TerminalKind, +}; + +use super::{ + line_display_shared::{LineDisplayFormatter, LineDisplayOutput}, + LineDisplayShared, LineDisplayStyles, HEADER_WIDTH, +}; + +/// A displayer that simultaneously manages and shows line-based output for +/// several event buffers. +/// +/// `K` is the key type for each element in the group. Its [`fmt::Display`] impl +/// is called to obtain the prefix, and `Eq + Ord` is used for keys. +#[derive(Debug)] +pub struct GroupDisplay { + // We don't need to add any buffering here because we already write data to + // the writer in a line-buffered fashion (see Self::write_events). + writer: W, + max_width: usize, + // This is set to Instant::now as soon as the first event report is + // received. + start_instant: Option, + single_states: BTreeMap>, + formatter: LineDisplayFormatter, + stats: GroupDisplayStats, +} + +impl GroupDisplay { + /// Creates a new `GroupDisplay` with the provided report keys and + /// prefixes. + /// + /// The function passed in is expected to create a writer. + pub fn new( + keys_and_prefixes: impl IntoIterator, + writer: W, + ) -> Self + where + Str: Into, + { + // Right-align prefixes to their maximum width -- this helps keep the + // output organized. + let mut max_width = 0; + let keys_and_prefixes: Vec<_> = keys_and_prefixes + .into_iter() + .map(|(k, prefix)| { + let prefix = prefix.into(); + max_width = + max_width.max(UnicodeWidthStr::width(prefix.as_str())); + (k, prefix) + }) + .collect(); + let single_states: BTreeMap<_, _> = keys_and_prefixes + .into_iter() + .map(|(k, prefix)| (k, SingleState::new(prefix, max_width))) + .collect(); + + let not_started = single_states.len(); + Self { + writer, + max_width, + start_instant: None, + single_states, + formatter: LineDisplayFormatter::new(), + stats: GroupDisplayStats::new(not_started), + } + } + + /// Creates a new `GroupDisplay` with the provided report keys, using the + /// `Display` impl to obtain the respective prefixes. + pub fn new_with_display( + keys: impl IntoIterator, + writer: W, + ) -> Self + where + K: fmt::Display, + { + Self::new( + keys.into_iter().map(|k| { + let prefix = k.to_string(); + (k, prefix) + }), + writer, + ) + } + + /// Sets the styles for all future lines. + #[inline] + pub fn set_styles(&mut self, styles: LineDisplayStyles) { + self.formatter.set_styles(styles); + } + + /// Sets the amount of time before new progress events are shown. + #[inline] + pub fn set_progress_interval(&mut self, interval: Duration) { + self.formatter.set_progress_interval(interval); + } + + /// Returns true if this `GroupDisplay` is producing reports corresponding + /// to the given key. + pub fn contains_key(&self, key: &Q) -> bool + where + K: Borrow, + Q: Ord, + { + self.single_states.contains_key(key) + } + + /// Adds an event report to the display, keyed by the index, and updates + /// internal state. + /// + /// Returns `Ok(())` if the report was accepted because the key was + /// known to this `GroupDisplay`, and an error if it was not. + pub fn add_event_report( + &mut self, + key: &Q, + event_report: EventReport, + ) -> Result<(), UnknownReportKey> + where + K: Borrow, + Q: Ord, + { + if let Some(state) = self.single_states.get_mut(key) { + let result = state.add_event_report(event_report); + if self.start_instant.is_none() { + self.start_instant = Some(Instant::now()); + } + self.stats.apply_result(result); + Ok(()) + } else { + Err(UnknownReportKey {}) + } + } + + /// Writes a "Status" or "Summary" line to the writer with statistics. + pub fn write_stats(&mut self, header: &str) -> std::io::Result<()> { + // Add a prefix which is equal to the maximum width of the prefixes. + // [prefix 00:00:00] takes up self.max_width + 9 characters inside the + // brackets. + let prefix = " ".repeat(self.max_width); + let mut line = self.formatter.start_line( + &prefix, + self.start_instant.as_ref().map(Instant::elapsed), + ); + self.stats.format_line(&mut line, header, &self.formatter); + writeln!(self.writer, "{line}") + } + + /// Writes all pending events to the writer. + pub fn write_events(&mut self) -> std::io::Result<()> { + let mut out = LineDisplayOutput::new(); + for state in self.single_states.values_mut() { + state.format_events(&self.formatter, &mut out); + } + for line in out.iter() { + writeln!(self.writer, "{line}")?; + } + Ok(()) + } + + /// Returns the current statistics for this `GroupDisplay`. + pub fn stats(&self) -> &GroupDisplayStats { + &self.stats + } +} + +#[derive(Clone, Copy, Debug)] +pub struct GroupDisplayStats { + /// The total number of reports. + pub total: usize, + + /// The number of reports that have not yet started. + pub not_started: usize, + + /// The number of reports that are currently running. + pub running: usize, + + /// The number of reports that indicate successful completion. + pub completed: usize, + + /// The number of reports that indicate failure. + pub failed: usize, + + /// The number of reports that indicate being aborted. + pub aborted: usize, + + /// The number of reports where we didn't receive a final state and it got + /// overwritten by another report. + /// + /// Overwritten reports are considered failures since we don't know what + /// happened. + pub overwritten: usize, +} + +impl GroupDisplayStats { + fn new(total: usize) -> Self { + Self { + total, + not_started: total, + completed: 0, + failed: 0, + aborted: 0, + overwritten: 0, + running: 0, + } + } + + /// Returns the number of terminal reports. + pub fn terminal_count(&self) -> usize { + self.completed + self.failed + self.aborted + self.overwritten + } + + /// Returns true if all reports have reached a terminal state. + pub fn is_terminal(&self) -> bool { + self.not_started == 0 && self.running == 0 + } + + /// Returns true if there are any failures. + pub fn has_failures(&self) -> bool { + self.failed > 0 || self.aborted > 0 || self.overwritten > 0 + } + + fn apply_result(&mut self, result: AddEventReportResult) { + // Process result.after first to avoid integer underflow. + match result.after { + SingleStateTag::NotStarted => self.not_started += 1, + SingleStateTag::Running => self.running += 1, + SingleStateTag::Terminal(TerminalKind::Completed) => { + self.completed += 1 + } + SingleStateTag::Terminal(TerminalKind::Failed) => self.failed += 1, + SingleStateTag::Terminal(TerminalKind::Aborted) => { + self.aborted += 1 + } + SingleStateTag::Overwritten => self.overwritten += 1, + } + + match result.before { + SingleStateTag::NotStarted => self.not_started -= 1, + SingleStateTag::Running => self.running -= 1, + SingleStateTag::Terminal(TerminalKind::Completed) => { + self.completed -= 1 + } + SingleStateTag::Terminal(TerminalKind::Failed) => self.failed -= 1, + SingleStateTag::Terminal(TerminalKind::Aborted) => { + self.aborted -= 1 + } + SingleStateTag::Overwritten => self.overwritten -= 1, + } + } + + fn format_line( + &self, + line: &mut String, + header: &str, + formatter: &LineDisplayFormatter, + ) { + let header_style = if self.has_failures() { + formatter.styles().error_style + } else { + formatter.styles().progress_style + }; + + swrite!(line, "{:>HEADER_WIDTH$} ", header.style(header_style)); + let terminal_count = self.terminal_count(); + swrite!( + line, + "{terminal_count}/{}: {} running, {} {}", + self.total, + self.running.style(formatter.styles().meta_style), + self.completed.style(formatter.styles().meta_style), + "completed".style(formatter.styles().progress_style), + ); + if self.failed > 0 { + swrite!( + line, + ", {} {}", + self.failed.style(formatter.styles().meta_style), + "failed".style(formatter.styles().error_style), + ); + } + if self.aborted > 0 { + swrite!( + line, + ", {} {}", + self.aborted.style(formatter.styles().meta_style), + "aborted".style(formatter.styles().error_style), + ); + } + if self.overwritten > 0 { + swrite!( + line, + ", {} {}", + self.overwritten.style(formatter.styles().meta_style), + "overwritten".style(formatter.styles().error_style), + ); + } + } +} + +#[derive(Debug)] +struct SingleState { + shared: LineDisplayShared, + kind: SingleStateKind, + prefix: String, +} + +impl SingleState { + fn new(prefix: String, max_width: usize) -> Self { + // Right-align the prefix to the maximum width. + let prefix = format!("{:>max_width$}", prefix); + Self { + shared: LineDisplayShared::default(), + kind: SingleStateKind::NotStarted { displayed: false }, + prefix, + } + } + + /// Adds an event report and updates the internal state. + fn add_event_report( + &mut self, + event_report: EventReport, + ) -> AddEventReportResult { + let before = match &self.kind { + SingleStateKind::NotStarted { .. } => { + self.kind = SingleStateKind::Running { + event_buffer: EventBuffer::new(8), + }; + SingleStateTag::NotStarted + } + SingleStateKind::Running { .. } => SingleStateTag::Running, + + SingleStateKind::Terminal { info, .. } => { + // Once we've reached a terminal state, we don't record any more + // events. + return AddEventReportResult::unchanged( + SingleStateTag::Terminal(info.kind), + ); + } + SingleStateKind::Overwritten { .. } => { + // This update has already completed -- assume that the event + // buffer is for a new update, which we don't show. + return AddEventReportResult::unchanged( + SingleStateTag::Overwritten, + ); + } + }; + + let SingleStateKind::Running { event_buffer } = &mut self.kind else { + unreachable!("other branches were handled above"); + }; + + if let Some(root_execution_id) = event_buffer.root_execution_id() { + if event_report.root_execution_id != Some(root_execution_id) { + // The report is for a different execution ID -- assume that + // this event is completed and mark our current execution as + // completed. + self.kind = SingleStateKind::Overwritten { displayed: false }; + return AddEventReportResult { + before, + after: SingleStateTag::Overwritten, + }; + } + } + + event_buffer.add_event_report(event_report); + let after = if let Some(info) = event_buffer.root_terminal_info() { + // Grab the event buffer to store it in the terminal state. + let event_buffer = + std::mem::replace(event_buffer, EventBuffer::new(0)); + let terminal_kind = info.kind; + self.kind = SingleStateKind::Terminal { + info, + pending_event_buffer: Some(event_buffer), + }; + SingleStateTag::Terminal(terminal_kind) + } else { + SingleStateTag::Running + }; + + AddEventReportResult { before, after } + } + + pub(super) fn format_events( + &mut self, + formatter: &LineDisplayFormatter, + out: &mut LineDisplayOutput, + ) { + let mut cx = self.shared.with_context(&self.prefix, formatter); + match &mut self.kind { + SingleStateKind::NotStarted { displayed } => { + if !*displayed { + let line = + cx.format_generic("Update not started, waiting..."); + out.add_line(line); + *displayed = true; + } + } + SingleStateKind::Running { event_buffer } => { + cx.format_event_buffer(event_buffer, out); + } + SingleStateKind::Terminal { info, pending_event_buffer } => { + // Are any remaining events left? This also sets pending_event_buffer + // to None after displaying remaining events. + if let Some(event_buffer) = pending_event_buffer.take() { + cx.format_event_buffer(&event_buffer, out); + // Also show a line to wrap up the terminal status. + let line = cx.format_terminal_info(info); + out.add_line(line); + } + + // Nothing to do, the terminal status was already printed above. + } + SingleStateKind::Overwritten { displayed } => { + if !*displayed { + let line = cx.format_generic( + "Update overwritten (a different update was started)\ + assuming failure", + ); + out.add_line(line); + *displayed = true; + } + } + } + } +} + +#[derive(Debug)] +enum SingleStateKind { + NotStarted { + displayed: bool, + }, + Running { + event_buffer: EventBuffer, + }, + Terminal { + info: ExecutionTerminalInfo, + // The event buffer is kept around so that we can display any remaining + // lines. + pending_event_buffer: Option>, + }, + Overwritten { + displayed: bool, + }, +} + +struct AddEventReportResult { + before: SingleStateTag, + after: SingleStateTag, +} + +impl AddEventReportResult { + fn unchanged(tag: SingleStateTag) -> Self { + Self { before: tag, after: tag } + } +} + +#[derive(Copy, Clone, Debug)] +enum SingleStateTag { + NotStarted, + Running, + Terminal(TerminalKind), + Overwritten, +} diff --git a/update-engine/src/display/line_display.rs b/update-engine/src/display/line_display.rs new file mode 100644 index 0000000000..5321ec017c --- /dev/null +++ b/update-engine/src/display/line_display.rs @@ -0,0 +1,137 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +// Copyright 2023 Oxide Computer Company + +use debug_ignore::DebugIgnore; +use derive_where::derive_where; +use owo_colors::Style; +use std::time::Duration; + +use crate::{EventBuffer, ExecutionTerminalInfo, StepSpec}; + +use super::{ + line_display_shared::LineDisplayOutput, LineDisplayFormatter, + LineDisplayShared, +}; + +/// A line-oriented display. +/// +/// This display produces output to the provided writer. +#[derive_where(Debug)] +pub struct LineDisplay { + writer: DebugIgnore, + shared: LineDisplayShared, + formatter: LineDisplayFormatter, + prefix: String, +} + +impl LineDisplay { + /// Creates a new LineDisplay. + pub fn new(writer: W) -> Self { + Self { + writer: DebugIgnore(writer), + shared: LineDisplayShared::default(), + formatter: LineDisplayFormatter::new(), + prefix: String::new(), + } + } + + /// Sets the prefix for all future lines. + #[inline] + pub fn set_prefix(&mut self, prefix: impl Into) { + self.prefix = prefix.into(); + } + + /// Sets the styles for all future lines. + #[inline] + pub fn set_styles(&mut self, styles: LineDisplayStyles) { + self.formatter.set_styles(styles); + } + + /// Sets the amount of time before the next progress event is shown. + #[inline] + pub fn set_progress_interval(&mut self, interval: Duration) { + self.formatter.set_progress_interval(interval); + } + + /// Writes an event buffer to the writer, incrementally. + /// + /// This is a stateful method that will only display events that have not + /// been displayed before. + pub fn write_event_buffer( + &mut self, + buffer: &EventBuffer, + ) -> std::io::Result<()> { + let mut out = LineDisplayOutput::new(); + self.shared + .with_context(&self.prefix, &self.formatter) + .format_event_buffer(buffer, &mut out); + for line in out.iter() { + writeln!(self.writer, "{line}")?; + } + + Ok(()) + } + + /// Writes terminal information to the writer. + pub fn write_terminal_info( + &mut self, + info: &ExecutionTerminalInfo, + ) -> std::io::Result<()> { + let line = self + .shared + .with_context(&self.prefix, &self.formatter) + .format_terminal_info(info); + writeln!(self.writer, "{line}") + } + + /// Writes a generic line to the writer, with prefix attached if provided. + pub fn write_generic(&mut self, message: &str) -> std::io::Result<()> { + let line = self + .shared + .with_context(&self.prefix, &self.formatter) + .format_generic(message); + writeln!(self.writer, "{line}") + } +} + +/// Styles for [`LineDisplay`]. +/// +/// By default this isn't colorized, but it can be if so chosen. +#[derive(Debug, Default)] +#[non_exhaustive] +pub struct LineDisplayStyles { + pub prefix_style: Style, + pub meta_style: Style, + pub step_name_style: Style, + pub progress_style: Style, + pub progress_message_style: Style, + pub warning_style: Style, + pub warning_message_style: Style, + pub error_style: Style, + pub error_message_style: Style, + pub skipped_style: Style, + pub retry_style: Style, +} + +impl LineDisplayStyles { + /// Returns a default set of colorized styles with ANSI colors. + pub fn colorized() -> Self { + let mut ret = Self::default(); + ret.prefix_style = Style::new().bold(); + ret.meta_style = Style::new().bold(); + ret.step_name_style = Style::new().cyan(); + ret.progress_style = Style::new().bold().green(); + ret.progress_message_style = Style::new().green(); + ret.warning_style = Style::new().bold().yellow(); + ret.warning_message_style = Style::new().yellow(); + ret.error_style = Style::new().bold().red(); + ret.error_message_style = Style::new().red(); + ret.skipped_style = Style::new().bold().yellow(); + ret.retry_style = Style::new().bold().yellow(); + + ret + } +} diff --git a/update-engine/src/display/line_display_shared.rs b/update-engine/src/display/line_display_shared.rs new file mode 100644 index 0000000000..b0c2a3695b --- /dev/null +++ b/update-engine/src/display/line_display_shared.rs @@ -0,0 +1,1017 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +// Copyright 2023 Oxide Computer Company + +//! Types and code shared between `LineDisplay` and `GroupDisplay`. + +use std::{ + collections::HashMap, + fmt::{self, Write as _}, + time::Duration, +}; + +use owo_colors::OwoColorize; +use swrite::{swrite, SWrite as _}; + +use crate::{ + events::{ + ProgressCounter, ProgressEvent, ProgressEventKind, StepEvent, + StepEventKind, StepInfo, StepOutcome, + }, + EventBuffer, ExecutionId, ExecutionTerminalInfo, StepKey, StepSpec, + TerminalKind, +}; + +use super::LineDisplayStyles; + +// This is chosen to leave enough room for all possible headers: "Completed" at +// 9 characters is the longest. +pub(super) const HEADER_WIDTH: usize = 9; + +#[derive(Debug, Default)] +pub(super) struct LineDisplayShared { + // This is a map from root execution ID to data about it. + execution_data: HashMap, +} + +impl LineDisplayShared { + pub(super) fn with_context<'a>( + &'a mut self, + prefix: &'a str, + formatter: &'a LineDisplayFormatter, + ) -> LineDisplaySharedContext<'a> { + LineDisplaySharedContext { shared: self, prefix, formatter } + } +} + +#[derive(Debug)] +pub(super) struct LineDisplaySharedContext<'a> { + shared: &'a mut LineDisplayShared, + prefix: &'a str, + formatter: &'a LineDisplayFormatter, +} + +impl<'a> LineDisplaySharedContext<'a> { + /// Produces a generic line from the prefix and message. + /// + /// This line does not have a trailing newline; adding one is the caller's + /// responsibility. + pub(super) fn format_generic(&self, message: &str) -> String { + let mut line = self.formatter.start_println(self.prefix); + line.push_str(message); + line + } + + /// Produces lines for this event buffer, and advances internal state. + /// + /// Returned lines do not have a trailing newline; adding them is the + /// caller's responsibility. + pub(super) fn format_event_buffer( + &mut self, + buffer: &EventBuffer, + out: &mut LineDisplayOutput, + ) { + let Some(execution_id) = buffer.root_execution_id() else { + // No known events, so nothing to display. + return; + }; + let execution_data = + self.shared.execution_data.entry(execution_id).or_default(); + let prev_progress_event_at = execution_data.last_progress_event_at; + let mut current_progress_event_at = prev_progress_event_at; + + let report = + buffer.generate_report_since(&mut execution_data.last_seen); + + for event in &report.step_events { + self.format_step_event(buffer, event, out); + } + + // Update progress events. + for event in &report.progress_events { + if Some(event.total_elapsed) > prev_progress_event_at { + self.format_progress_event(buffer, event, out); + current_progress_event_at = + current_progress_event_at.max(Some(event.total_elapsed)); + } + } + + // Finally, write to last_progress_event_at. (Need to re-fetch execution data.) + let execution_data = self + .shared + .execution_data + .get_mut(&execution_id) + .expect("we created this execution data above"); + execution_data.last_progress_event_at = current_progress_event_at; + } + + /// Format this step event. + fn format_step_event( + &self, + buffer: &EventBuffer, + step_event: &StepEvent, + out: &mut LineDisplayOutput, + ) { + self.format_step_event_impl( + buffer, + step_event, + Default::default(), + step_event.total_elapsed, + out, + ); + } + + fn format_step_event_impl( + &self, + buffer: &EventBuffer, + step_event: &StepEvent, + mut nest_data: NestData, + root_total_elapsed: Duration, + out: &mut LineDisplayOutput, + ) { + match &step_event.kind { + StepEventKind::NoStepsDefined => { + let mut line = self + .formatter + .start_line(self.prefix, Some(step_event.total_elapsed)); + swrite!( + line, + "{:>HEADER_WIDTH$} ", + "No steps defined" + .style(self.formatter.styles.progress_style), + ); + out.add_line(line); + } + StepEventKind::ExecutionStarted { first_step, .. } => { + let ld_step_info = LineDisplayStepInfo::new( + buffer, + step_event.execution_id, + &first_step.info, + &nest_data, + ); + let mut line = self + .formatter + .start_line(self.prefix, Some(root_total_elapsed)); + + swrite!( + line, + "{:>HEADER_WIDTH$} ", + "Running".style(self.formatter.styles.progress_style), + ); + self.formatter.add_step_info(&mut line, ld_step_info); + out.add_line(line); + } + StepEventKind::AttemptRetry { + step, + next_attempt, + attempt_elapsed, + message, + .. + } => { + let ld_step_info = LineDisplayStepInfo::new( + buffer, + step_event.execution_id, + &step.info, + &nest_data, + ); + + let mut line = self + .formatter + .start_line(self.prefix, Some(root_total_elapsed)); + + swrite!( + line, + "{:>HEADER_WIDTH$} ", + "Retry".style(self.formatter.styles.warning_style) + ); + self.formatter.add_step_info(&mut line, ld_step_info); + swrite!( + line, + ": after {:.2?}", + attempt_elapsed.style(self.formatter.styles.meta_style), + ); + if *next_attempt > 1 { + swrite!( + line, + " (at attempt {})", + next_attempt + .saturating_sub(1) + .style(self.formatter.styles.meta_style), + ); + } + swrite!( + line, + " with message: {}", + message.style(self.formatter.styles.warning_message_style) + ); + + out.add_line(line); + } + StepEventKind::ProgressReset { + step, + attempt, + attempt_elapsed, + message, + .. + } => { + let ld_step_info = LineDisplayStepInfo::new( + buffer, + step_event.execution_id, + &step.info, + &nest_data, + ); + + let mut line = self + .formatter + .start_line(self.prefix, Some(root_total_elapsed)); + + swrite!( + line, + "{:>HEADER_WIDTH$} ", + "Reset".style(self.formatter.styles.warning_style) + ); + self.formatter.add_step_info(&mut line, ld_step_info); + swrite!( + line, + ": after {:.2?}", + attempt_elapsed.style(self.formatter.styles.meta_style), + ); + if *attempt > 1 { + swrite!( + line, + " (at attempt {})", + attempt.style(self.formatter.styles.meta_style), + ); + } + swrite!( + line, + " with message: {}", + message.style(self.formatter.styles.warning_message_style) + ); + + out.add_line(line); + } + StepEventKind::StepCompleted { + step, + attempt, + outcome, + next_step, + attempt_elapsed, + .. + } => { + // --- Add completion info about this step. + + let ld_step_info = LineDisplayStepInfo::new( + buffer, + step_event.execution_id, + &step.info, + &nest_data, + ); + let mut line = self + .formatter + .start_line(self.prefix, Some(root_total_elapsed)); + + self.formatter.add_completion_and_step_info( + &mut line, + ld_step_info, + *attempt_elapsed, + *attempt, + outcome, + ); + + out.add_line(line); + + // --- Add information about the next step. + + let ld_step_info = LineDisplayStepInfo::new( + buffer, + step_event.execution_id, + &next_step.info, + &nest_data, + ); + + let mut line = self + .formatter + .start_line(self.prefix, Some(root_total_elapsed)); + + self.format_step_running(&mut line, ld_step_info); + + out.add_line(line); + } + StepEventKind::ExecutionCompleted { + last_step, + last_attempt, + last_outcome, + attempt_elapsed, + .. + } => { + let ld_step_info = LineDisplayStepInfo::new( + buffer, + step_event.execution_id, + &last_step.info, + &nest_data, + ); + + let mut line = self + .formatter + .start_line(self.prefix, Some(root_total_elapsed)); + + self.formatter.add_completion_and_step_info( + &mut line, + ld_step_info, + *attempt_elapsed, + *last_attempt, + last_outcome, + ); + + out.add_line(line); + } + StepEventKind::ExecutionFailed { + failed_step, + total_attempts, + attempt_elapsed, + message, + causes, + .. + } => { + let ld_step_info = LineDisplayStepInfo::new( + buffer, + step_event.execution_id, + &failed_step.info, + &nest_data, + ); + + let mut line = self + .formatter + .start_line(self.prefix, Some(root_total_elapsed)); + // The prefix is used for "Caused by" lines below. Add + // the requisite amount of spacing here. + let mut caused_by_prefix = line.clone(); + swrite!(caused_by_prefix, "{:>HEADER_WIDTH$} ", ""); + nest_data.add_prefix(&mut caused_by_prefix); + + swrite!( + line, + "{:>HEADER_WIDTH$} ", + "Failed".style(self.formatter.styles.error_style) + ); + + self.formatter.add_step_info(&mut line, ld_step_info); + line.push_str(": "); + + self.formatter.add_failure_info( + &mut line, + &caused_by_prefix, + *attempt_elapsed, + *total_attempts, + message, + causes, + ); + + out.add_line(line); + } + StepEventKind::ExecutionAborted { + aborted_step, + attempt, + attempt_elapsed, + message, + .. + } => { + let ld_step_info = LineDisplayStepInfo::new( + buffer, + step_event.execution_id, + &aborted_step.info, + &nest_data, + ); + + let mut line = self + .formatter + .start_line(self.prefix, Some(root_total_elapsed)); + + swrite!( + line, + "{:>HEADER_WIDTH$} ", + "Aborted".style(self.formatter.styles.error_style) + ); + self.formatter.add_step_info(&mut line, ld_step_info); + line.push_str(": "); + + self.formatter.add_abort_info( + &mut line, + *attempt_elapsed, + *attempt, + message, + ); + + out.add_line(line); + } + StepEventKind::Nested { step, event, .. } => { + // Look up the child event's ID to add to the nest data. + let child_step_key = StepKey { + execution_id: event.execution_id, + // TODO: we currently look up index 0 because that should + // always exist (unless no steps are defined, in which case + // we skip this). The child index is actually shared by all + // steps within an execution. Fix this by changing + // EventBuffer to also track general per-execution data. + index: 0, + }; + let Some(child_step_data) = buffer.get(&child_step_key) else { + // This should only happen if no steps are defined. See TODO + // above. + return; + }; + let (_, child_index) = child_step_data + .parent_key_and_child_index() + .expect("child steps should have a child index"); + + nest_data.add_nest_level(step.info.index, child_index); + + self.format_step_event_impl( + buffer, + &**event, + nest_data, + root_total_elapsed, + out, + ); + } + StepEventKind::Unknown => {} + } + } + + fn format_step_running( + &self, + line: &mut String, + ld_step_info: LineDisplayStepInfo<'_, S>, + ) { + swrite!( + line, + "{:>HEADER_WIDTH$} ", + "Running".style(self.formatter.styles.progress_style), + ); + self.formatter.add_step_info(line, ld_step_info); + } + + /// Formats this terminal information. + /// + /// This line does not have a trailing newline; adding one is the caller's + /// responsibility. + pub(super) fn format_terminal_info( + &self, + info: &ExecutionTerminalInfo, + ) -> String { + let mut line = + self.formatter.start_line(self.prefix, info.leaf_total_elapsed); + match info.kind { + TerminalKind::Completed => { + swrite!( + line, + "{:>HEADER_WIDTH$} Execution {}", + "Terminal".style(self.formatter.styles.progress_style), + "completed".style(self.formatter.styles.progress_style), + ); + } + TerminalKind::Failed => { + swrite!( + line, + "{:>HEADER_WIDTH$} Execution {}", + "Terminal".style(self.formatter.styles.error_style), + "failed".style(self.formatter.styles.error_style), + ); + } + TerminalKind::Aborted => { + swrite!( + line, + "{:>HEADER_WIDTH$} Execution {}", + "Terminal".style(self.formatter.styles.error_style), + "aborted".style(self.formatter.styles.error_style), + ); + } + } + line + } + + fn format_progress_event( + &self, + buffer: &EventBuffer, + progress_event: &ProgressEvent, + out: &mut LineDisplayOutput, + ) { + self.format_progress_event_impl( + buffer, + progress_event, + NestData::default(), + progress_event.total_elapsed, + out, + ) + } + + fn format_progress_event_impl( + &self, + buffer: &EventBuffer, + progress_event: &ProgressEvent, + mut nest_data: NestData, + root_total_elapsed: Duration, + out: &mut LineDisplayOutput, + ) { + match &progress_event.kind { + ProgressEventKind::WaitingForProgress { .. } => { + // Don't need to show this because "Running" is shown within + // step events. + } + ProgressEventKind::Progress { + step, + progress, + attempt_elapsed, + .. + } => { + let step_key = StepKey { + execution_id: progress_event.execution_id, + index: step.info.index, + }; + let step_data = + buffer.get(&step_key).expect("step key must exist"); + let ld_step_info = LineDisplayStepInfo { + step_info: &step.info, + total_steps: step_data.total_steps(), + nest_data: &nest_data, + }; + + let mut line = self + .formatter + .start_line(self.prefix, Some(root_total_elapsed)); + + let (before, after) = match progress { + Some(counter) => { + let progress_str = format_progress_counter(counter); + ( + format!( + "{:>HEADER_WIDTH$} ", + "Progress".style( + self.formatter.styles.progress_style + ) + ), + format!( + "{progress_str} after {:.2?}", + attempt_elapsed + .style(self.formatter.styles.meta_style), + ), + ) + } + None => { + let before = format!( + "{:>HEADER_WIDTH$} ", + "Running" + .style(self.formatter.styles.progress_style), + ); + + // If the attempt elapsed is non-zero, show it. + let after = if *attempt_elapsed > Duration::ZERO { + format!( + "after {:.2?}", + attempt_elapsed + .style(self.formatter.styles.meta_style), + ) + } else { + String::new() + }; + + (before, after) + } + }; + + swrite!(line, "{}", before); + self.formatter.add_step_info(&mut line, ld_step_info); + if !after.is_empty() { + swrite!(line, ": {}", after); + } + + out.add_line(line); + } + ProgressEventKind::Nested { step, event, .. } => { + // Look up the child event's ID to add to the nest data. + let child_step_key = StepKey { + execution_id: event.execution_id, + // TODO: we currently look up index 0 because that should + // always exist (unless no steps are defined, in which case + // we skip this). The child index is actually shared by all + // steps within an execution. Fix this by changing + // EventBuffer to also track general per-execution data. + index: 0, + }; + let Some(child_step_data) = buffer.get(&child_step_key) else { + // This should only happen if no steps are defined. See TODO + // above. + return; + }; + let (_, child_index) = child_step_data + .parent_key_and_child_index() + .expect("child steps should have a child index"); + + nest_data.add_nest_level(step.info.index, child_index); + + self.format_progress_event_impl( + buffer, + &**event, + nest_data, + root_total_elapsed, + out, + ); + } + ProgressEventKind::Unknown => {} + } + } +} + +fn format_progress_counter(counter: &ProgressCounter) -> String { + match counter.total { + Some(total) => { + // Show a percentage value. Correct alignment requires converting to + // a string in the middle like this. + let percent = (counter.current as f64 / total as f64) * 100.0; + // <12.34> is 5 characters wide. + let percent_width = 5; + let counter_width = total.to_string().len(); + format!( + "{:>percent_width$.2}% ({:>counter_width$}/{} {})", + percent, counter.current, total, counter.units, + ) + } + None => format!("{} {}", counter.current, counter.units), + } +} + +/// State that tracks line display formatting. +/// +/// Each `LineDisplay` and `GroupDisplay` has one of these. +#[derive(Debug)] +pub(super) struct LineDisplayFormatter { + styles: LineDisplayStyles, + progress_interval: Duration, +} + +impl LineDisplayFormatter { + pub(super) fn new() -> Self { + Self { + styles: LineDisplayStyles::default(), + progress_interval: Duration::from_secs(1), + } + } + + #[inline] + pub(super) fn styles(&self) -> &LineDisplayStyles { + &self.styles + } + + #[inline] + pub(super) fn set_styles(&mut self, styles: LineDisplayStyles) { + self.styles = styles; + } + + #[inline] + pub(super) fn set_progress_interval(&mut self, interval: Duration) { + self.progress_interval = interval; + } + + // --- + // Internal helpers + // --- + + fn start_println(&self, prefix: &str) -> String { + if !prefix.is_empty() { + format!("[{}] ", prefix.style(self.styles.prefix_style)) + } else { + String::new() + } + } + + pub(super) fn start_line( + &self, + prefix: &str, + total_elapsed: Option, + ) -> String { + let mut line = format!("[{}", prefix.style(self.styles.prefix_style)); + + if !prefix.is_empty() { + line.push(' '); + } + + // Show total elapsed time in an hh:mm:ss format. + if let Some(total_elapsed) = total_elapsed { + let total_elapsed_secs = total_elapsed.as_secs(); + let hours = total_elapsed_secs / 3600; + let minutes = (total_elapsed_secs % 3600) / 60; + let seconds = total_elapsed_secs % 60; + swrite!(&mut line, "{:02}:{:02}:{:02}", hours, minutes, seconds); + } else { + // Add 8 spaces to align with hh:mm:ss. + line.push_str(" "); + } + + line.push_str("] "); + + line + } + + fn add_step_info( + &self, + line: &mut String, + ld_step_info: LineDisplayStepInfo<'_, S>, + ) { + ld_step_info.nest_data.add_prefix(line); + + // Print out "/)". Leave space such that we + // print out e.g. "1/8)" and " 3/14)". + // Add 1 to the index to make it 1-based. + let step_index = ld_step_info.step_info.index + 1; + let step_index_width = ld_step_info.total_steps.to_string().len(); + swrite!( + line, + "{:width$}/{:width$}) ", + step_index, + ld_step_info.total_steps, + width = step_index_width + ); + + swrite!( + line, + "{}", + ld_step_info + .step_info + .description + .style(self.styles.step_name_style) + ); + } + + pub(super) fn add_completion_and_step_info( + &self, + line: &mut String, + ld_step_info: LineDisplayStepInfo<'_, S>, + attempt_elapsed: Duration, + attempt: usize, + outcome: &StepOutcome, + ) { + let mut meta = format!( + "after {:.2?}", + attempt_elapsed.style(self.styles.meta_style) + ); + if attempt > 1 { + swrite!( + meta, + " (at attempt {})", + attempt.style(self.styles.meta_style) + ); + } + + match &outcome { + StepOutcome::Success { message, .. } => { + swrite!( + line, + "{:>HEADER_WIDTH$} ", + "Completed".style(self.styles.progress_style), + ); + self.add_step_info(line, ld_step_info); + match message { + Some(message) => { + swrite!( + line, + ": {meta} with message: {}", + message.style(self.styles.progress_message_style) + ); + } + None => { + swrite!(line, ": {meta}"); + } + } + } + StepOutcome::Warning { message, .. } => { + swrite!( + line, + "{:>HEADER_WIDTH$} ", + "Completed".style(self.styles.warning_style), + ); + self.add_step_info(line, ld_step_info); + swrite!( + line, + ": {meta} with warning: {}", + message.style(self.styles.warning_message_style) + ); + } + StepOutcome::Skipped { message, .. } => { + swrite!( + line, + "{:>HEADER_WIDTH$} ", + "Skipped".style(self.styles.skipped_style), + ); + self.add_step_info(line, ld_step_info); + swrite!( + line, + ": {}", + message.style(self.styles.warning_message_style) + ); + } + }; + } + + pub(super) fn add_failure_info( + &self, + line: &mut String, + line_prefix: &str, + attempt_elapsed: Duration, + total_attempts: usize, + message: &str, + causes: &[String], + ) { + let mut meta = format!( + "after {:.2?}", + attempt_elapsed.style(self.styles.meta_style) + ); + if total_attempts > 1 { + swrite!( + meta, + " (after {} attempts)", + total_attempts.style(self.styles.meta_style) + ); + } + + swrite!( + line, + "{meta}: {}", + message.style(self.styles.error_message_style) + ); + if !causes.is_empty() { + swrite!( + line, + "\n{line_prefix}{}", + " Caused by:".style(self.styles.meta_style) + ); + for cause in causes { + swrite!(line, "\n{line_prefix} - {}", cause); + } + } + + // The last newline is added by the caller. + } + + pub(super) fn add_abort_info( + &self, + line: &mut String, + attempt_elapsed: Duration, + attempt: usize, + message: &str, + ) { + let mut meta = format!( + "after {:.2?}", + attempt_elapsed.style(self.styles.meta_style) + ); + if attempt > 1 { + swrite!( + meta, + " (at attempt {})", + attempt.style(self.styles.meta_style) + ); + } + + swrite!(line, "{meta} with message \"{}\"", message); + } +} + +#[derive(Clone, Debug)] +pub(super) struct LineDisplayOutput { + lines: Vec, +} + +impl LineDisplayOutput { + pub(super) fn new() -> Self { + Self { lines: Vec::new() } + } + + pub(super) fn add_line(&mut self, line: String) { + self.lines.push(line); + } + + pub(super) fn iter(&self) -> impl Iterator { + self.lines.iter().map(|line| line.as_str()) + } +} + +#[derive(Clone, Copy, Debug)] +pub(super) struct LineDisplayStepInfo<'a, S: StepSpec> { + pub(super) step_info: &'a StepInfo, + pub(super) total_steps: usize, + pub(super) nest_data: &'a NestData, +} + +impl<'a, S: StepSpec> LineDisplayStepInfo<'a, S> { + fn new( + buffer: &'a EventBuffer, + execution_id: ExecutionId, + step_info: &'a StepInfo, + nest_data: &'a NestData, + ) -> Self { + let step_key = StepKey { execution_id, index: step_info.index }; + let step_data = buffer.get(&step_key).expect("step key must exist"); + LineDisplayStepInfo { + step_info, + total_steps: step_data.total_steps(), + nest_data, + } + } +} + +/// Per-step stateful data tracked by the line displayer. +#[derive(Debug, Default)] +struct ExecutionData { + /// The last seen root event index. + /// + /// This is used to avoid displaying the same event twice. + last_seen: Option, + + /// The last `root_total_elapsed` at which a progress event was displayed for + /// this execution. + last_progress_event_at: Option, +} + +#[derive(Clone, Debug, Default)] +pub(super) struct NestData { + nest_indexes: Vec, +} + +impl NestData { + fn add_nest_level(&mut self, parent_step_index: usize, child_index: usize) { + self.nest_indexes.push(NestIndex { parent_step_index, child_index }); + } + + fn add_prefix(&self, line: &mut String) { + if !self.nest_indexes.is_empty() { + line.push_str(&"..".repeat(self.nest_indexes.len())); + line.push_str(" "); + } + + for nest_index in &self.nest_indexes { + swrite!( + line, + "{}{} ", + // Add 1 to the index to make it 1-based. + nest_index.parent_step_index + 1, + AsLetters(nest_index.child_index) + ); + } + } +} + +#[derive(Clone, Debug)] +struct NestIndex { + parent_step_index: usize, + // If a parent has multiple nested executions, this counts which execution + // this is, up from 0. + child_index: usize, +} + +/// A display impl that converts a 0-based index into a letter or a series of +/// letters. +/// +/// This is effectively a conversion to base 26. +struct AsLetters(usize); + +impl fmt::Display for AsLetters { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut index = self.0; + loop { + let letter = (b'a' + (index % 26) as u8) as char; + f.write_char(letter)?; + index /= 26; + if index == 0 { + break; + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_format_progress_counter() { + let tests = vec![ + (ProgressCounter::new(5, 20, "units"), "25.00% ( 5/20 units)"), + (ProgressCounter::new(0, 20, "bytes"), " 0.00% ( 0/20 bytes)"), + (ProgressCounter::new(20, 20, "cubes"), "100.00% (20/20 cubes)"), + // NaN is a weird case that is a buggy update engine impl in practice + (ProgressCounter::new(0, 0, "units"), " NaN% (0/0 units)"), + (ProgressCounter::current(5, "units"), "5 units"), + ]; + for (input, output) in tests { + assert_eq!( + format_progress_counter(&input), + output, + "format matches for input: {:?}", + input + ); + } + } +} diff --git a/update-engine/src/display/mod.rs b/update-engine/src/display/mod.rs new file mode 100644 index 0000000000..c58a4535a0 --- /dev/null +++ b/update-engine/src/display/mod.rs @@ -0,0 +1,21 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +// Copyright 2023 Oxide Computer Company + +//! Displayers for the update engine. +//! +//! Currently implemented are: +//! +//! * [`LineDisplay`]: a line-oriented display suitable for the command line. +//! * [`GroupDisplay`]: manages state and shows the results of several +//! [`LineDisplay`]s at once. + +mod group_display; +mod line_display; +mod line_display_shared; + +pub use group_display::GroupDisplay; +pub use line_display::{LineDisplay, LineDisplayStyles}; +use line_display_shared::*; diff --git a/update-engine/src/errors.rs b/update-engine/src/errors.rs index abb0d4cd22..0607ad6e27 100644 --- a/update-engine/src/errors.rs +++ b/update-engine/src/errors.rs @@ -185,3 +185,17 @@ pub enum ConvertGenericPathElement { Path(&'static str), ArrayIndex(&'static str, usize), } + +/// The +/// [`GroupDisplay::add_event_report`](crate::display::GroupDisplay::add_event_report) +/// method was called with an unknown key. +#[derive(Clone, Debug)] +pub struct UnknownReportKey {} + +impl fmt::Display for UnknownReportKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("unknown report key") + } +} + +impl error::Error for UnknownReportKey {} diff --git a/update-engine/src/events.rs b/update-engine/src/events.rs index 3816157d0d..900a9776f5 100644 --- a/update-engine/src/events.rs +++ b/update-engine/src/events.rs @@ -1143,6 +1143,8 @@ impl ProgressEventKind { /// Returns `step_elapsed` for the leaf event, recursing into nested events /// as necessary. + /// + /// Returns None for unknown events. pub fn leaf_step_elapsed(&self) -> Option { match self { ProgressEventKind::WaitingForProgress { step_elapsed, .. } @@ -1156,6 +1158,25 @@ impl ProgressEventKind { } } + /// Returns `attempt_elapsed` for the leaf event, recursing into nested + /// events as necessary. + /// + /// Returns None for unknown events. + pub fn leaf_attempt_elapsed(&self) -> Option { + match self { + ProgressEventKind::WaitingForProgress { + attempt_elapsed, .. + } + | ProgressEventKind::Progress { attempt_elapsed, .. } => { + Some(*attempt_elapsed) + } + ProgressEventKind::Nested { event, .. } => { + event.kind.leaf_attempt_elapsed() + } + ProgressEventKind::Unknown => None, + } + } + /// Converts a generic version into self. /// /// This version can be used to convert a generic type into a more concrete diff --git a/update-engine/src/lib.rs b/update-engine/src/lib.rs index f753fa738a..fea92d3b73 100644 --- a/update-engine/src/lib.rs +++ b/update-engine/src/lib.rs @@ -57,6 +57,7 @@ mod buffer; mod context; +pub mod display; mod engine; pub mod errors; pub mod events; diff --git a/wicket/Cargo.toml b/wicket/Cargo.toml index 5392e72e9f..11f476d98c 100644 --- a/wicket/Cargo.toml +++ b/wicket/Cargo.toml @@ -13,6 +13,7 @@ camino.workspace = true ciborium.workspace = true clap.workspace = true crossterm.workspace = true +debug-ignore.workspace = true futures.workspace = true hex = { workspace = true, features = ["serde"] } humantime.workspace = true @@ -33,6 +34,7 @@ slog.workspace = true slog-async.workspace = true slog-envlogger.workspace = true slog-term.workspace = true +supports-color.workspace = true textwrap.workspace = true tokio = { workspace = true, features = ["full"] } tokio-util.workspace = true @@ -40,6 +42,7 @@ toml.workspace = true toml_edit.workspace = true tui-tree-widget = "0.13.0" unicode-width.workspace = true +uuid.workspace = true zeroize.workspace = true omicron-passwords.workspace = true diff --git a/wicket/src/cli/command.rs b/wicket/src/cli/command.rs index 34f041b203..c6c9100bf7 100644 --- a/wicket/src/cli/command.rs +++ b/wicket/src/cli/command.rs @@ -6,32 +6,77 @@ use std::net::SocketAddrV6; -use anyhow::{Context, Result}; -use clap::Parser; +use anyhow::Result; +use clap::{Args, ColorChoice, Parser, Subcommand}; +use tokio::runtime::Handle; use super::{ - preflight::PreflightArgs, rack_setup::SetupArgs, upload::UploadArgs, + preflight::PreflightArgs, rack_setup::SetupArgs, + rack_update::RackUpdateArgs, upload::UploadArgs, }; -pub fn exec( - log: slog::Logger, - args: &str, - wicketd_addr: SocketAddrV6, -) -> Result<()> { - // The argument is in a quoted form, so split it using Unix shell semantics. - let args = shell_words::split(&args).with_context(|| { - format!("could not parse shell arguments from input {args}") - })?; - - // parse_from uses the the first argument as the command name. Insert "wicket" as - // the command name. - let args = ShellCommand::parse_from( - std::iter::once("wicket".to_owned()).chain(args), - ); - match args { - ShellCommand::UploadRepo(args) => args.exec(log, wicketd_addr), - ShellCommand::Setup(args) => args.exec(log, wicketd_addr), - ShellCommand::Preflight(args) => args.exec(log, wicketd_addr), +pub(crate) struct CommandOutput<'a> { + #[allow(dead_code)] + pub(crate) stdout: &'a mut dyn std::io::Write, + pub(crate) stderr: &'a mut dyn std::io::Write, +} + +/// An app that represents wicket started with arguments over ssh. +#[derive(Debug, Parser)] +pub(crate) struct ShellApp { + /// Global options. + #[clap(flatten)] + pub(crate) global_opts: GlobalOpts, + + /// The command to run. + #[clap(subcommand)] + command: ShellCommand, +} + +impl ShellApp { + pub(crate) fn exec( + self, + log: slog::Logger, + handle: &Handle, + wicketd_addr: SocketAddrV6, + output: CommandOutput<'_>, + ) -> Result<()> { + match self.command { + ShellCommand::UploadRepo(args) => { + args.exec(log, handle, wicketd_addr) + } + ShellCommand::RackUpdate(args) => { + args.exec(log, handle, wicketd_addr, self.global_opts, output) + } + ShellCommand::Setup(args) => args.exec(log, handle, wicketd_addr), + ShellCommand::Preflight(args) => { + args.exec(log, handle, wicketd_addr) + } + } + } +} + +#[derive(Debug, Args)] +#[clap(next_help_heading = "Global options")] +pub(crate) struct GlobalOpts { + /// Color output + /// + /// This may not be obeyed everywhere at the moment. + #[clap(long, value_enum, global = true, default_value_t)] + pub(crate) color: ColorChoice, +} + +impl GlobalOpts { + /// Returns true if color should be used on standard error. + pub(crate) fn use_color(&self) -> bool { + match self.color { + ColorChoice::Auto => { + supports_color::on_cached(supports_color::Stream::Stderr) + .is_some() + } + ColorChoice::Always => true, + ColorChoice::Never => false, + } } } @@ -41,14 +86,20 @@ pub fn exec( /// ForceCommand. If no arguments are specified, wicket behaves like a TUI. /// However, if arguments are specified via SSH_ORIGINAL_COMMAND, wicketd /// accepts an upload command. -#[derive(Debug, Parser)] +#[derive(Debug, Subcommand)] enum ShellCommand { /// Upload a TUF repository to wicketd. #[command(visible_alias = "upload")] UploadRepo(UploadArgs), + + /// Perform a rack update. + #[command(subcommand)] + RackUpdate(RackUpdateArgs), + /// Interact with rack setup configuration. #[command(subcommand)] Setup(SetupArgs), + /// Run checks prior to setting up the rack. #[command(subcommand)] Preflight(PreflightArgs), diff --git a/wicket/src/cli/mod.rs b/wicket/src/cli/mod.rs index 7e8f6540ea..e63ef467e7 100644 --- a/wicket/src/cli/mod.rs +++ b/wicket/src/cli/mod.rs @@ -13,6 +13,7 @@ mod command; mod preflight; mod rack_setup; +mod rack_update; mod upload; -pub use command::exec; +pub(super) use command::{CommandOutput, GlobalOpts, ShellApp}; diff --git a/wicket/src/cli/preflight.rs b/wicket/src/cli/preflight.rs index ddbbf95c1a..a607d03f66 100644 --- a/wicket/src/cli/preflight.rs +++ b/wicket/src/cli/preflight.rs @@ -17,6 +17,7 @@ use std::borrow::Cow; use std::fmt::Display; use std::net::SocketAddrV6; use std::time::Duration; +use tokio::runtime::Handle; use update_engine::events::StepEvent; use update_engine::events::StepEventKind; use update_engine::events::StepInfo; @@ -50,12 +51,10 @@ impl PreflightArgs { pub(crate) fn exec( self, log: Logger, + handle: &Handle, wicketd_addr: SocketAddrV6, ) -> Result<()> { - let runtime = - tokio::runtime::Runtime::new().context("creating tokio runtime")?; - - runtime.block_on(self.exec_impl(log, wicketd_addr)) + handle.block_on(self.exec_impl(log, wicketd_addr)) } async fn exec_impl( diff --git a/wicket/src/cli/rack_setup.rs b/wicket/src/cli/rack_setup.rs index c678d14e11..9a3850a632 100644 --- a/wicket/src/cli/rack_setup.rs +++ b/wicket/src/cli/rack_setup.rs @@ -17,6 +17,7 @@ use std::io::Read; use std::mem; use std::net::SocketAddrV6; use std::time::Duration; +use tokio::runtime::Handle; use wicket_common::rack_setup::PutRssUserConfigInsensitive; use wicketd_client::types::CertificateUploadResponse; use wicketd_client::types::NewPasswordHash; @@ -64,12 +65,10 @@ impl SetupArgs { pub(crate) fn exec( self, log: Logger, + handle: &Handle, wicketd_addr: SocketAddrV6, ) -> Result<()> { - let runtime = - tokio::runtime::Runtime::new().context("creating tokio runtime")?; - - runtime.block_on(self.exec_impl(log, wicketd_addr)) + handle.block_on(self.exec_impl(log, wicketd_addr)) } async fn exec_impl( diff --git a/wicket/src/cli/rack_update.rs b/wicket/src/cli/rack_update.rs new file mode 100644 index 0000000000..0ddd5f0ae8 --- /dev/null +++ b/wicket/src/cli/rack_update.rs @@ -0,0 +1,320 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Command-line driven rack update. +//! +//! This is an alternative to using the Wicket UI to perform a rack update. + +use std::{ + collections::{BTreeMap, BTreeSet}, + net::SocketAddrV6, + time::Duration, +}; + +use anyhow::{anyhow, bail, Context, Result}; +use clap::{Args, Subcommand}; +use slog::Logger; +use tokio::{runtime::Handle, sync::watch, task::JoinHandle}; +use update_engine::display::{GroupDisplay, LineDisplayStyles}; +use wicket_common::update_events::EventReport; +use wicketd_client::types::StartUpdateParams; + +use crate::{ + cli::GlobalOpts, + state::{parse_event_report_map, ComponentId, CreateStartUpdateOptions}, + wicketd::{create_wicketd_client, WICKETD_TIMEOUT}, +}; + +use super::command::CommandOutput; + +#[derive(Debug, Subcommand)] +pub(crate) enum RackUpdateArgs { + /// Start one or more updates. + Start(StartRackUpdateArgs), + /// Attach to one or more running updates. + Attach(AttachArgs), +} + +impl RackUpdateArgs { + pub(crate) fn exec( + self, + log: Logger, + handle: &Handle, + wicketd_addr: SocketAddrV6, + global_opts: GlobalOpts, + output: CommandOutput<'_>, + ) -> Result<()> { + handle.block_on(self.exec_impl(log, wicketd_addr, global_opts, output)) + } + + async fn exec_impl( + self, + log: Logger, + wicketd_addr: SocketAddrV6, + global_opts: GlobalOpts, + output: CommandOutput<'_>, + ) -> Result<()> { + match self { + RackUpdateArgs::Start(args) => { + args.exec(log, wicketd_addr, global_opts, output).await + } + RackUpdateArgs::Attach(args) => { + args.exec(log, wicketd_addr, global_opts, output).await + } + } + } +} + +#[derive(Debug, Args)] +pub(crate) struct StartRackUpdateArgs { + #[clap(flatten)] + component_ids: ComponentIdSelector, + + /// Force update the RoT even if the version is the same. + #[clap(long, help_heading = "Update options")] + force_update_rot: bool, + + /// Force update the SP even if the version is the same. + #[clap(long, help_heading = "Update options")] + force_update_sp: bool, + + /// Detach after starting the update. + /// + /// The `attach` command can be used to reattach to the running update. + #[clap(short, long, help_heading = "Update options")] + detach: bool, +} + +impl StartRackUpdateArgs { + async fn exec( + self, + log: Logger, + wicketd_addr: SocketAddrV6, + global_opts: GlobalOpts, + output: CommandOutput<'_>, + ) -> Result<()> { + let client = create_wicketd_client(&log, wicketd_addr, WICKETD_TIMEOUT); + + let update_ids = self.component_ids.to_component_ids()?; + let options = CreateStartUpdateOptions { + force_update_rot: self.force_update_rot, + force_update_sp: self.force_update_sp, + } + .to_start_update_options()?; + + let num_update_ids = update_ids.len(); + + let params = StartUpdateParams { + targets: update_ids.iter().copied().map(Into::into).collect(), + options, + }; + + slog::debug!(log, "Sending post_start_update"; "num_update_ids" => num_update_ids); + match client.post_start_update(¶ms).await { + Ok(_) => { + slog::info!(log, "Update started for {num_update_ids} targets"); + } + Err(error) => { + // Error responses can be printed out more clearly. + if let wicketd_client::Error::ErrorResponse(rv) = &error { + slog::error!( + log, + "Error response from wicketd: {}", + rv.message + ); + bail!("Received error from wicketd while starting update"); + } else { + bail!(error); + } + } + } + + if self.detach { + return Ok(()); + } + + // Now, attach to the update by printing out update logs. + do_attach_to_updates(log, client, update_ids, global_opts, output) + .await?; + + Ok(()) + } +} + +#[derive(Debug, Args)] +pub(crate) struct AttachArgs { + #[clap(flatten)] + component_ids: ComponentIdSelector, +} + +impl AttachArgs { + async fn exec( + self, + log: Logger, + wicketd_addr: SocketAddrV6, + global_opts: GlobalOpts, + output: CommandOutput<'_>, + ) -> Result<()> { + let client = create_wicketd_client(&log, wicketd_addr, WICKETD_TIMEOUT); + + let update_ids = self.component_ids.to_component_ids()?; + do_attach_to_updates(log, client, update_ids, global_opts, output).await + } +} + +async fn do_attach_to_updates( + log: Logger, + client: wicketd_client::Client, + update_ids: BTreeSet, + global_opts: GlobalOpts, + output: CommandOutput<'_>, +) -> Result<()> { + let mut display = GroupDisplay::new_with_display( + update_ids.iter().copied(), + output.stderr, + ); + if global_opts.use_color() { + display.set_styles(LineDisplayStyles::colorized()); + } + + let (mut rx, handle) = start_fetch_reports_task(&log, client.clone()).await; + let mut status_timer = tokio::time::interval(Duration::from_secs(5)); + status_timer.tick().await; + + while !display.stats().is_terminal() { + tokio::select! { + res = rx.changed() => { + if res.is_err() { + // The sending end is closed, which means that the task + // created by start_fetch_reports_task died... this can + // happen either due to a panic or due to an error. + match handle.await { + Ok(Ok(())) => { + // The task exited normally, which means that the + // sending end was closed normally. This cannot + // happen. + bail!("fetch_reports task exited with Ok(()) \ + -- this should never happen here"); + } + Ok(Err(error)) => { + // The task exited with an error. + return Err(error).context("fetch_reports task errored out"); + } + Err(error) => { + // The task panicked. + return Err(anyhow!(error)).context("fetch_reports task panicked"); + } + } + } + + let event_reports = rx.borrow_and_update(); + // TODO: parallelize this computation? + for (id, event_report) in &*event_reports { + // If display.add_event_report errors out, it's for a report for a + // component we weren't interested in. Ignore it. + _ = display.add_event_report(&id, event_report.clone()); + } + + // Print out status for each component ID at the end -- do it here so + // that we also consider components for which we haven't seen status + // yet. + display.write_events()?; + } + _ = status_timer.tick() => { + display.write_stats("Status")?; + } + } + } + + // Show any remaining events. + display.write_events()?; + // And also show a summary. + display.write_stats("Summary")?; + + std::mem::drop(rx); + handle + .await + .context("fetch_reports task panicked after rx dropped")? + .context("fetch_reports task errored out after rx dropped")?; + + if display.stats().has_failures() { + bail!("one or more failures occurred"); + } + + Ok(()) +} + +async fn start_fetch_reports_task( + log: &Logger, + client: wicketd_client::Client, +) -> (watch::Receiver>, JoinHandle>) +{ + // Since reports are always cumulative, we can use a watch receiver here + // rather than an mpsc receiver. If we start using incremental reports at + // some point this would need to be changed to be an mpsc receiver. + let (tx, rx) = watch::channel(BTreeMap::new()); + let log = log.new(slog::o!("task" => "fetch_reports")); + + let handle = tokio::spawn(async move { + loop { + let response = client.get_artifacts_and_event_reports().await?; + let reports = response.into_inner().event_reports; + let reports = parse_event_report_map(&log, reports); + if tx.send(reports).is_err() { + // The receiving end is closed, exit. + break; + } + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(1)) => {}, + _ = tx.closed() => { + // The receiving end is closed, exit. + break; + } + } + } + + Ok(()) + }); + (rx, handle) +} + +/// Command-line arguments for selecting component IDs. +#[derive(Debug, Args)] +#[clap(next_help_heading = "Component selectors")] +struct ComponentIdSelector { + /// The sleds to operate on. + #[clap(long, value_delimiter = ',')] + sled: Vec, + + /// The switches to operate on. + #[clap(long, value_delimiter = ',')] + switch: Vec, + + /// The PSCs to operate on. + #[clap(long, value_delimiter = ',')] + psc: Vec, +} + +impl ComponentIdSelector { + /// Validates that all the sleds, switches, and PSCs are reasonable (though + /// they might not exist on the actual hardware), then return the set of + /// selected component IDs. + fn to_component_ids(&self) -> Result> { + let mut component_ids = BTreeSet::new(); + for sled in &self.sled { + component_ids.insert(ComponentId::new_sled(*sled)?); + } + for switch in &self.switch { + component_ids.insert(ComponentId::new_switch(*switch)?); + } + for psc in &self.psc { + component_ids.insert(ComponentId::new_psc(*psc)?); + } + if component_ids.is_empty() { + bail!("at least one component ID must be selected via --sled, --switch or --psc"); + } + + Ok(component_ids) + } +} diff --git a/wicket/src/cli/upload.rs b/wicket/src/cli/upload.rs index 6e154fd4d9..8bf02b9d0f 100644 --- a/wicket/src/cli/upload.rs +++ b/wicket/src/cli/upload.rs @@ -15,6 +15,7 @@ use buf_list::BufList; use clap::Args; use futures::StreamExt; use reqwest::Body; +use tokio::runtime::Handle; use tokio_util::io::ReaderStream; use crate::wicketd::create_wicketd_client; @@ -34,11 +35,10 @@ impl UploadArgs { pub(crate) fn exec( self, log: slog::Logger, + handle: &Handle, wicketd_addr: SocketAddrV6, ) -> Result<()> { - let runtime = - tokio::runtime::Runtime::new().context("creating tokio runtime")?; - runtime.block_on(self.do_upload(log, wicketd_addr)) + handle.block_on(self.do_upload(log, wicketd_addr)) } async fn do_upload( diff --git a/wicket/src/dispatch.rs b/wicket/src/dispatch.rs index 3ef04ee302..1524052728 100644 --- a/wicket/src/dispatch.rs +++ b/wicket/src/dispatch.rs @@ -8,24 +8,92 @@ use std::net::{Ipv6Addr, SocketAddrV6}; use anyhow::{bail, Context, Result}; use camino::{Utf8Path, Utf8PathBuf}; +use clap::Parser; use omicron_common::{address::WICKETD_PORT, FileKv}; use slog::Drain; +use tokio::runtime::Handle; -use crate::Runner; +use crate::{ + cli::{CommandOutput, ShellApp}, + Runner, +}; pub fn exec() -> Result<()> { let wicketd_addr = SocketAddrV6::new(Ipv6Addr::LOCALHOST, WICKETD_PORT, 0, 0); // SSH_ORIGINAL_COMMAND contains additional arguments, if any. - if let Ok(ssh_args) = std::env::var("SSH_ORIGINAL_COMMAND") { - let log = setup_log(&log_path()?, WithStderr::Yes)?; - crate::cli::exec(log, &ssh_args, wicketd_addr) - } else { - // Do not expose log messages via standard error since they'll show up - // on top of the TUI. - let log = setup_log(&log_path()?, WithStderr::No)?; - Runner::new(log, wicketd_addr).run() + match std::env::var("SSH_ORIGINAL_COMMAND") { + Ok(ssh_args) => { + let args = shell_words::split(&ssh_args).with_context(|| { + format!("could not parse shell arguments from input {ssh_args}") + })?; + + let runtime = tokio::runtime::Runtime::new() + .context("creating tokio runtime")?; + exec_with_args( + runtime.handle(), + wicketd_addr, + args, + OutputKind::Terminal, + ) + } + Err(_) => { + // Do not expose log messages via standard error since they'll show up + // on top of the TUI. + let log = setup_log(&log_path()?, WithStderr::No)?; + Runner::new(log, wicketd_addr).run() + } + } +} + +/// Enables capturing of wicket's output. +pub enum OutputKind<'a> { + /// Captures output to the provided log, as well as a buffer. + Captured { + log: slog::Logger, + stdout: &'a mut Vec, + stderr: &'a mut Vec, + }, + + /// Writes output to a terminal. + Terminal, +} + +pub fn exec_with_args( + // While it is possible to obtain an ambient handle, it's more type-safe to + // require that users explicitly pass in a handle (this means that they're + // more likely to have created a runtime). + handle: &Handle, + wicketd_addr: SocketAddrV6, + args: Vec, + output: OutputKind<'_>, +) -> Result<()> +where + S: AsRef, +{ + // parse_from uses the the first argument as the command name. Insert "wicket" as + // the command name. + let app = ShellApp::parse_from( + std::iter::once("wicket").chain(args.iter().map(|s| s.as_ref())), + ); + + match output { + OutputKind::Captured { log, stdout, stderr } => { + let output = CommandOutput { stdout, stderr }; + app.exec(log, handle, wicketd_addr, output) + } + OutputKind::Terminal => { + let log = setup_log( + &log_path()?, + WithStderr::Yes { use_color: app.global_opts.use_color() }, + )?; + let mut stdout = std::io::stdout(); + let mut stderr = std::io::stderr(); + let output = + CommandOutput { stdout: &mut stdout, stderr: &mut stderr }; + app.exec(log, handle, wicketd_addr, output) + } } } @@ -44,8 +112,8 @@ fn setup_log( let drain = slog_term::FullFormat::new(decorator).build().fuse(); let drain = match with_stderr { - WithStderr::Yes => { - let stderr_drain = stderr_env_drain("RUST_LOG"); + WithStderr::Yes { use_color } => { + let stderr_drain = stderr_env_drain("RUST_LOG", use_color); let drain = slog::Duplicate::new(drain, stderr_drain).fuse(); slog_async::Async::new(drain).build().fuse() } @@ -57,7 +125,7 @@ fn setup_log( #[derive(Copy, Clone, Debug)] enum WithStderr { - Yes, + Yes { use_color: bool }, No, } @@ -71,8 +139,17 @@ fn log_path() -> Result { } } -fn stderr_env_drain(env_var: &str) -> impl Drain { - let stderr_decorator = slog_term::TermDecorator::new().build(); +fn stderr_env_drain( + env_var: &str, + use_color: bool, +) -> impl Drain { + let mut builder = slog_term::TermDecorator::new(); + if use_color { + builder = builder.force_color(); + } else { + builder = builder.force_plain(); + } + let stderr_decorator = builder.build(); let stderr_drain = slog_term::FullFormat::new(stderr_decorator).build().fuse(); let mut builder = slog_envlogger::LogBuilder::new(stderr_drain); diff --git a/wicket/src/helpers.rs b/wicket/src/helpers.rs new file mode 100644 index 0000000000..564b7e9348 --- /dev/null +++ b/wicket/src/helpers.rs @@ -0,0 +1,70 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Utility functions. + +use std::env::VarError; + +use anyhow::{bail, Context}; +use wicketd_client::types::{UpdateSimulatedResult, UpdateTestError}; + +pub(crate) fn get_update_test_error( + env_var: &str, +) -> Result, anyhow::Error> { + // 30 seconds should always be enough to cause a timeout. (The default + // timeout for progenitor is 15 seconds, and in wicket we set an even + // shorter timeout.) + const DEFAULT_TEST_TIMEOUT_SECS: u64 = 30; + + let test_error = match std::env::var(env_var) { + Ok(v) if v == "fail" => Some(UpdateTestError::Fail), + Ok(v) if v == "timeout" => { + Some(UpdateTestError::Timeout { secs: DEFAULT_TEST_TIMEOUT_SECS }) + } + Ok(v) if v.starts_with("timeout:") => { + // Extended start_timeout syntax with a custom + // number of seconds. + let suffix = v.strip_prefix("timeout:").unwrap(); + match suffix.parse::() { + Ok(secs) => Some(UpdateTestError::Timeout { secs }), + Err(error) => { + return Err(error).with_context(|| { + format!( + "could not parse {env_var} \ + in the form `timeout:`: {v}" + ) + }); + } + } + } + Ok(value) => { + bail!("unrecognized value for {env_var}: {value}"); + } + Err(VarError::NotPresent) => None, + Err(VarError::NotUnicode(value)) => { + bail!("invalid Unicode for {env_var}: {}", value.to_string_lossy()); + } + }; + Ok(test_error) +} + +pub(crate) fn get_update_simulated_result( + env_var: &str, +) -> Result, anyhow::Error> { + let result = match std::env::var(env_var) { + Ok(v) if v == "success" => Some(UpdateSimulatedResult::Success), + Ok(v) if v == "warning" => Some(UpdateSimulatedResult::Warning), + Ok(v) if v == "skipped" => Some(UpdateSimulatedResult::Skipped), + Ok(v) if v == "failure" => Some(UpdateSimulatedResult::Failure), + Ok(value) => { + bail!("unrecognized value for {env_var}: {value}"); + } + Err(VarError::NotPresent) => None, + Err(VarError::NotUnicode(value)) => { + bail!("invalid Unicode for {env_var}: {}", value.to_string_lossy()); + } + }; + + Ok(result) +} diff --git a/wicket/src/lib.rs b/wicket/src/lib.rs index 6e760968f8..5e09cb91f4 100644 --- a/wicket/src/lib.rs +++ b/wicket/src/lib.rs @@ -10,6 +10,7 @@ use std::time::Duration; mod cli; mod dispatch; mod events; +mod helpers; mod keymap; mod runner; mod state; diff --git a/wicket/src/runner.rs b/wicket/src/runner.rs index c37b16d5d9..2d988814dd 100644 --- a/wicket/src/runner.rs +++ b/wicket/src/runner.rs @@ -2,8 +2,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -use anyhow::bail; -use anyhow::Context; use crossterm::event::Event as TermEvent; use crossterm::event::EventStream; use crossterm::event::{KeyCode, KeyEvent, KeyModifiers}; @@ -17,7 +15,6 @@ use ratatui::backend::CrosstermBackend; use ratatui::Terminal; use slog::Logger; use slog::{debug, error, info}; -use std::env::VarError; use std::io::{stdout, Stdout}; use std::net::SocketAddrV6; use std::time::Instant; @@ -27,11 +24,10 @@ use tokio::sync::mpsc::{ use tokio::time::{interval, Duration}; use wicketd_client::types::AbortUpdateOptions; use wicketd_client::types::ClearUpdateStateOptions; -use wicketd_client::types::StartUpdateOptions; -use wicketd_client::types::UpdateSimulatedResult; -use wicketd_client::types::UpdateTestError; use crate::events::EventReportMap; +use crate::helpers::get_update_test_error; +use crate::state::CreateStartUpdateOptions; use crate::ui::Screen; use crate::wicketd::{self, WicketdHandle, WicketdManager}; use crate::{Action, Cmd, Event, KeyHandler, Recorder, State, TICK_INTERVAL}; @@ -180,43 +176,18 @@ impl RunnerCore { } Action::StartUpdate(component_id) => { if let Some(wicketd) = wicketd { - let test_error = get_update_test_error( - "WICKET_TEST_START_UPDATE_ERROR", - )?; - - // This is a debug environment variable used to - // add a test step. - let test_step_seconds = - std::env::var("WICKET_UPDATE_TEST_STEP_SECONDS") - .ok() - .map(|v| { - v.parse().expect( - "parsed WICKET_UPDATE_TEST_STEP_SECONDS \ - as a u64", - ) - }); - - let test_simulate_rot_result = get_update_simulated_result( - "WICKET_UPDATE_TEST_SIMULATE_ROT_RESULT", - )?; - let test_simulate_sp_result = get_update_simulated_result( - "WICKET_UPDATE_TEST_SIMULATE_SP_RESULT", - )?; - - let options = StartUpdateOptions { - test_error, - test_step_seconds, - test_simulate_rot_result, - test_simulate_sp_result, - skip_rot_version_check: self + let options = CreateStartUpdateOptions { + force_update_rot: self .state .force_update_state .force_update_rot, - skip_sp_version_check: self + force_update_sp: self .state .force_update_state .force_update_sp, - }; + } + .to_start_update_options()?; + wicketd.tx.blocking_send( wicketd::Request::StartUpdate { component_id, options }, )?; @@ -281,66 +252,6 @@ impl RunnerCore { } } -fn get_update_test_error( - env_var: &str, -) -> Result, anyhow::Error> { - // 30 seconds should always be enough to cause a timeout. (The default - // timeout for progenitor is 15 seconds, and in wicket we set an even - // shorter timeout.) - const DEFAULT_TEST_TIMEOUT_SECS: u64 = 30; - - let test_error = match std::env::var(env_var) { - Ok(v) if v == "fail" => Some(UpdateTestError::Fail), - Ok(v) if v == "timeout" => { - Some(UpdateTestError::Timeout { secs: DEFAULT_TEST_TIMEOUT_SECS }) - } - Ok(v) if v.starts_with("timeout:") => { - // Extended start_timeout syntax with a custom - // number of seconds. - let suffix = v.strip_prefix("timeout:").unwrap(); - match suffix.parse::() { - Ok(secs) => Some(UpdateTestError::Timeout { secs }), - Err(error) => { - return Err(error).with_context(|| { - format!( - "could not parse {env_var} \ - in the form `timeout:`: {v}" - ) - }); - } - } - } - Ok(value) => { - bail!("unrecognized value for {env_var}: {value}"); - } - Err(VarError::NotPresent) => None, - Err(VarError::NotUnicode(value)) => { - bail!("invalid Unicode for {env_var}: {}", value.to_string_lossy()); - } - }; - Ok(test_error) -} - -fn get_update_simulated_result( - env_var: &str, -) -> Result, anyhow::Error> { - let result = match std::env::var(env_var) { - Ok(v) if v == "success" => Some(UpdateSimulatedResult::Success), - Ok(v) if v == "warning" => Some(UpdateSimulatedResult::Warning), - Ok(v) if v == "skipped" => Some(UpdateSimulatedResult::Skipped), - Ok(v) if v == "failure" => Some(UpdateSimulatedResult::Failure), - Ok(value) => { - bail!("unrecognized value for {env_var}: {value}"); - } - Err(VarError::NotPresent) => None, - Err(VarError::NotUnicode(value)) => { - bail!("invalid Unicode for {env_var}: {}", value.to_string_lossy()); - } - }; - - Ok(result) -} - /// The `Runner` owns the main UI thread, and starts a tokio runtime /// for interaction with downstream services. pub struct Runner { diff --git a/wicket/src/state/inventory.rs b/wicket/src/state/inventory.rs index 23a0e244cf..1c232e653b 100644 --- a/wicket/src/state/inventory.rs +++ b/wicket/src/state/inventory.rs @@ -4,7 +4,8 @@ //! Information about all top-level Oxide components (sleds, switches, PSCs) -use anyhow::anyhow; +use anyhow::{bail, Result}; +use omicron_common::api::internal::nexus::KnownArtifactKind; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; @@ -64,26 +65,13 @@ impl Inventory { }; // Validate and get a ComponentId - let (id, component) = match type_ { - SpType::Sled => { - if i > 31 { - return Err(anyhow!("Invalid sled slot: {}", i)); - } - (ComponentId::Sled(i as u8), Component::Sled(sp)) - } - SpType::Switch => { - if i > 1 { - return Err(anyhow!("Invalid switch slot: {}", i)); - } - (ComponentId::Switch(i as u8), Component::Switch(sp)) - } - SpType::Power => { - if i > 1 { - return Err(anyhow!("Invalid power shelf slot: {}", i)); - } - (ComponentId::Psc(i as u8), Component::Psc(sp)) - } + let id = ComponentId::from_sp_type_and_slot(type_, i as u8)?; + let component = match type_ { + SpType::Sled => Component::Sled(sp), + SpType::Switch => Component::Switch(sp), + SpType::Power => Component::Psc(sp), }; + new_inventory.inventory.insert(id, component); // TODO: Plumb through real power state @@ -204,6 +192,66 @@ pub enum ComponentId { } impl ComponentId { + /// The maximum possible sled ID. + pub const MAX_SLED_ID: u8 = 31; + + /// The maximum possible switch ID. + pub const MAX_SWITCH_ID: u8 = 1; + + /// The maximum possible power shelf ID. + /// + /// Currently shipping racks don't have PSC 1. + pub const MAX_PSC_ID: u8 = 0; + + pub fn new_sled(slot: u8) -> Result { + if slot > Self::MAX_SLED_ID { + bail!("Invalid sled slot: {}", slot); + } + Ok(Self::Sled(slot)) + } + + pub fn new_switch(slot: u8) -> Result { + if slot > Self::MAX_SWITCH_ID { + bail!("Invalid switch slot: {}", slot); + } + Ok(Self::Switch(slot)) + } + + pub fn new_psc(slot: u8) -> Result { + if slot > Self::MAX_PSC_ID { + bail!("Invalid power shelf slot: {}", slot); + } + Ok(Self::Psc(slot)) + } + + pub fn from_sp_type_and_slot(sp_type: SpType, slot: u8) -> Result { + match sp_type { + SpType::Sled => Self::new_sled(slot), + SpType::Switch => Self::new_switch(slot), + SpType::Power => Self::new_psc(slot), + } + } + + pub fn name(&self) -> String { + self.to_string() + } + + pub fn sp_known_artifact_kind(&self) -> KnownArtifactKind { + match self { + ComponentId::Sled(_) => KnownArtifactKind::GimletSp, + ComponentId::Switch(_) => KnownArtifactKind::SwitchSp, + ComponentId::Psc(_) => KnownArtifactKind::PscSp, + } + } + + pub fn rot_known_artifact_kind(&self) -> KnownArtifactKind { + match self { + ComponentId::Sled(_) => KnownArtifactKind::GimletRot, + ComponentId::Switch(_) => KnownArtifactKind::SwitchRot, + ComponentId::Psc(_) => KnownArtifactKind::PscRot, + } + } + pub fn to_string_uppercase(&self) -> String { let mut s = self.to_string(); s.make_ascii_uppercase(); diff --git a/wicket/src/state/mod.rs b/wicket/src/state/mod.rs index ac9cbcae2f..246c4c31ae 100644 --- a/wicket/src/state/mod.rs +++ b/wicket/src/state/mod.rs @@ -18,8 +18,8 @@ pub use inventory::{ pub use rack::{KnightRiderMode, RackState}; pub use status::{Liveness, ServiceStatus}; pub use update::{ - update_component_title, RackUpdateState, UpdateItemState, - UpdateRunningState, + parse_event_report_map, update_component_title, CreateStartUpdateOptions, + RackUpdateState, UpdateItemState, UpdateRunningState, }; use serde::{Deserialize, Serialize}; diff --git a/wicket/src/state/update.rs b/wicket/src/state/update.rs index 1a0aafb9cf..2fde662b2f 100644 --- a/wicket/src/state/update.rs +++ b/wicket/src/state/update.rs @@ -2,21 +2,23 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. +use anyhow::Result; use ratatui::style::Style; use wicket_common::update_events::{ EventReport, ProgressEventKind, StepEventKind, UpdateComponent, UpdateStepId, }; +use crate::helpers::{get_update_simulated_result, get_update_test_error}; use crate::{events::EventReportMap, ui::defaults::style}; use super::{ComponentId, ParsableComponentId, ALL_COMPONENT_IDS}; use omicron_common::api::internal::nexus::KnownArtifactKind; use serde::{Deserialize, Serialize}; -use slog::{warn, Logger}; -use std::collections::{BTreeMap, HashSet}; +use slog::Logger; +use std::collections::BTreeMap; use std::fmt::Display; -use wicketd_client::types::{ArtifactId, SemverVersion}; +use wicketd_client::types::{ArtifactId, SemverVersion, StartUpdateOptions}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RackUpdateState { @@ -102,34 +104,18 @@ impl RackUpdateState { } } - let mut updated_component_ids = HashSet::new(); - - for (sp_type, logs) in reports { - for (i, log) in logs { - let Ok(id) = ComponentId::try_from(ParsableComponentId { - sp_type: &sp_type, - i: &i, - }) else { - warn!( - logger, - "Invalid ComponentId in EventReport: {} {}", - &sp_type, - &i - ); - continue; - }; - let item_state = self.items.get_mut(&id).unwrap(); - item_state.update(log); - updated_component_ids.insert(id); - } - } - - // Reset all component IDs that weren't updated. + let reports = parse_event_report_map(logger, reports); + // Reset all component IDs that aren't in the event report map. for (id, item) in &mut self.items { - if !updated_component_ids.contains(id) { + if !reports.contains_key(id) { item.reset(); } } + + for (id, report) in reports { + let item_state = self.items.get_mut(&id).unwrap(); + item_state.update(report); + } } } @@ -445,3 +431,68 @@ pub fn update_component_title(component: UpdateComponent) -> &'static str { UpdateComponent::Host => "HOST", } } + +pub struct CreateStartUpdateOptions { + pub(crate) force_update_rot: bool, + pub(crate) force_update_sp: bool, +} + +impl CreateStartUpdateOptions { + pub fn to_start_update_options(&self) -> Result { + let test_error = + get_update_test_error("WICKET_TEST_START_UPDATE_ERROR")?; + + // This is a debug environment variable used to + // add a test step. + let test_step_seconds = + std::env::var("WICKET_UPDATE_TEST_STEP_SECONDS").ok().map(|v| { + v.parse().expect( + "parsed WICKET_UPDATE_TEST_STEP_SECONDS \ + as a u64", + ) + }); + + let test_simulate_rot_result = get_update_simulated_result( + "WICKET_UPDATE_TEST_SIMULATE_ROT_RESULT", + )?; + let test_simulate_sp_result = get_update_simulated_result( + "WICKET_UPDATE_TEST_SIMULATE_SP_RESULT", + )?; + + Ok(StartUpdateOptions { + test_error, + test_step_seconds, + test_simulate_rot_result, + test_simulate_sp_result, + skip_rot_version_check: self.force_update_rot, + skip_sp_version_check: self.force_update_sp, + }) + } +} + +/// Converts an `EventReportMap` to a map by component ID. +pub fn parse_event_report_map( + log: &Logger, + reports: EventReportMap, +) -> BTreeMap { + let mut component_id_map = BTreeMap::new(); + for (sp_type, logs) in reports { + for (i, event_report) in logs { + let Ok(id) = ComponentId::try_from(ParsableComponentId { + sp_type: &sp_type, + i: &i, + }) else { + slog::warn!( + log, + "Invalid ComponentId in EventReportMap: {} {}", + &sp_type, + &i + ); + continue; + }; + component_id_map.insert(id, event_report); + } + } + + component_id_map +} diff --git a/wicket/src/wicketd.rs b/wicket/src/wicketd.rs index 33c80410d8..98f4d0f72b 100644 --- a/wicket/src/wicketd.rs +++ b/wicket/src/wicketd.rs @@ -41,7 +41,7 @@ const WICKETD_POLL_INTERVAL: Duration = Duration::from_millis(500); // WICKETD_TIMEOUT used to be 1 second, but that might be too short (and in // particular might be responsible for // https://github.com/oxidecomputer/omicron/issues/3103). -const WICKETD_TIMEOUT: Duration = Duration::from_secs(5); +pub(crate) const WICKETD_TIMEOUT: Duration = Duration::from_secs(5); // Assume that these requests are periodic on the order of seconds or the // result of human interaction. In either case, this buffer should be plenty