diff --git a/Cargo.lock b/Cargo.lock index 69d4aa3aa1..685423f24d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9671,6 +9671,7 @@ dependencies = [ "swrite", "tokio", "tokio-stream", + "unicode-width", "uuid", ] diff --git a/update-engine/Cargo.toml b/update-engine/Cargo.toml index b12f191940..12e718e902 100644 --- a/update-engine/Cargo.toml +++ b/update-engine/Cargo.toml @@ -22,6 +22,7 @@ schemars = { workspace = true, features = ["uuid1"] } slog.workspace = true swrite.workspace = true tokio = { workspace = true, features = ["macros", "sync", "time", "rt-multi-thread"] } +unicode-width.workspace = true uuid.workspace = true omicron-workspace-hack.workspace = true diff --git a/update-engine/examples/update-engine-basic/display.rs b/update-engine/examples/update-engine-basic/display.rs index 3e22e84f14..122777211b 100644 --- a/update-engine/examples/update-engine-basic/display.rs +++ b/update-engine/examples/update-engine-basic/display.rs @@ -13,7 +13,7 @@ use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use owo_colors::OwoColorize; use tokio::{sync::mpsc, task::JoinHandle}; use update_engine::{ - display::{LineDisplay, LineDisplayStyles}, + display::{GroupDisplay, LineDisplay, LineDisplayStyles}, events::ProgressCounter, }; @@ -34,14 +34,18 @@ pub(crate) fn make_displayer( ) -> (JoinHandle>, mpsc::Sender) { let (sender, receiver) = mpsc::channel(512); let log = log.clone(); - let join_handle = match display_style { - DisplayStyle::ProgressBar => tokio::task::spawn(async move { - display_progress_bar(&log, receiver).await - }), - DisplayStyle::Line => tokio::task::spawn(async move { - display_line(&log, receiver, prefix).await - }), - }; + let join_handle = + match display_style { + DisplayStyle::ProgressBar => tokio::task::spawn(async move { + display_progress_bar(&log, receiver).await + }), + DisplayStyle::Line => tokio::task::spawn(async move { + display_line(&log, receiver, prefix).await + }), + DisplayStyle::Group => tokio::task::spawn(async move { + display_group(&log, receiver).await + }), + }; (join_handle, sender) } @@ -71,6 +75,71 @@ async fn display_line( Ok(()) } +#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)] +enum GroupDisplayKey { + Example, + Other, +} + +async fn display_group( + log: &slog::Logger, + mut receiver: mpsc::Receiver, +) -> Result<()> { + slog::info!(log, "setting up display"); + + let mut display = GroupDisplay::new( + [ + (GroupDisplayKey::Example, "example"), + (GroupDisplayKey::Other, "other"), + ], + std::io::stdout(), + ); + // For now, always colorize. TODO: figure out whether colorization should be + // done based on always/auto/never etc. + if supports_color::on(supports_color::Stream::Stdout).is_some() { + display.set_styles(LineDisplayStyles::colorized()); + } + + display.set_progress_interval(Duration::from_millis(50)); + + let mut example_buffer = EventBuffer::default(); + let mut example_buffer_last_seen = None; + let mut other_buffer = EventBuffer::default(); + let mut other_buffer_last_seen = None; + + let mut interval = tokio::time::interval(Duration::from_secs(2)); + interval.tick().await; + + loop { + tokio::select! { + _ = interval.tick() => { + // Print out status lines every 2 seconds. + display.write_stats("Status")?; + } + event = receiver.recv() => { + let Some(event) = event else { break }; + example_buffer.add_event(event.clone()); + other_buffer.add_event(event); + + display.add_event_report( + &GroupDisplayKey::Example, + example_buffer.generate_report_since(&mut example_buffer_last_seen), + )?; + display.add_event_report( + &GroupDisplayKey::Other, + other_buffer.generate_report_since(&mut other_buffer_last_seen), + )?; + display.write_events()?; + } + } + } + + // Print status at the end. + display.write_stats("Summary")?; + + Ok(()) +} + async fn display_progress_bar( log: &slog::Logger, mut receiver: mpsc::Receiver, diff --git a/update-engine/examples/update-engine-basic/main.rs b/update-engine/examples/update-engine-basic/main.rs index b43dc9719d..075e4ed253 100644 --- a/update-engine/examples/update-engine-basic/main.rs +++ b/update-engine/examples/update-engine-basic/main.rs @@ -50,6 +50,7 @@ impl App { let display_style = match self.display_style { DisplayStyleOpt::ProgressBar => DisplayStyle::ProgressBar, DisplayStyleOpt::Line => DisplayStyle::Line, + DisplayStyleOpt::Group => DisplayStyle::Group, DisplayStyleOpt::Auto => { if std::io::stdout().is_terminal() { DisplayStyle::ProgressBar @@ -123,6 +124,7 @@ impl App { enum DisplayStyleOpt { ProgressBar, Line, + Group, #[default] Auto, } @@ -131,6 +133,7 @@ enum DisplayStyleOpt { enum DisplayStyle { ProgressBar, Line, + Group, } /// Context shared across steps. This forms the lifetime "'a" defined by the diff --git a/update-engine/src/buffer.rs b/update-engine/src/buffer.rs index a6ca98cbd3..8cdef8e02e 100644 --- a/update-engine/src/buffer.rs +++ b/update-engine/src/buffer.rs @@ -139,19 +139,23 @@ impl EventBuffer { /// /// This report can be serialized and sent over the wire. pub fn generate_report(&self) -> EventReport { - self.generate_report_since(None) + self.generate_report_since(&mut None) } + /// Generates an [`EventReport`] for this buffer, updating `last_seen` to a + /// new value for incremental report generation. + /// + /// This report can be serialized and sent over the wire. pub fn generate_report_since( &self, - mut last_seen: Option, + last_seen: &mut Option, ) -> EventReport { // Gather step events across all keys. let mut step_events = Vec::new(); let mut progress_events = Vec::new(); for (_, step_data) in self.steps().as_slice() { step_events - .extend(step_data.step_events_since_impl(last_seen).cloned()); + .extend(step_data.step_events_since_impl(*last_seen).cloned()); progress_events .extend(step_data.step_status.progress_event().cloned()); } @@ -162,14 +166,14 @@ impl EventBuffer { if let Some(last) = step_events.last() { // Only update last_seen if there are new step events (otherwise it // stays the same). - last_seen = Some(last.event_index); + *last_seen = Some(last.event_index); } EventReport { step_events, progress_events, root_execution_id: self.root_execution_id(), - last_seen, + last_seen: *last_seen, } } @@ -1864,7 +1868,7 @@ mod tests { for (i, event) in self.generated_events.iter().enumerate() { for time in 0..times { (event_fn)(&mut buffer, event); - let report = buffer.generate_report_since(last_seen); + let report = buffer.generate_report_since(&mut last_seen); let is_last_event = i == self.generated_events.len() - 1; self.assert_general_properties( &buffer, @@ -1879,7 +1883,6 @@ mod tests { }) .unwrap(); reported_step_events.extend(report.step_events); - last_seen = report.last_seen; // Ensure that the last root index was updated for this // event's corresponding steps, but not for any others. @@ -1895,7 +1898,8 @@ mod tests { // Call last_seen without feeding a new event in to ensure that // a report with no step events is produced. - let report = buffer.generate_report_since(last_seen); + let mut last_seen_2 = last_seen; + let report = buffer.generate_report_since(&mut last_seen_2); ensure!( report.step_events.is_empty(), "{description}, at index {i} (time {time}),\ @@ -1974,16 +1978,12 @@ mod tests { for (i, event) in self.generated_events.iter().enumerate() { let event_added = (event_fn)(&mut buffer, event); - let report = match last_seen_opt { + let report = match &mut last_seen_opt { Some(last_seen) => buffer.generate_report_since(last_seen), None => buffer.generate_report(), }; let is_last_event = i == self.generated_events.len() - 1; - if let Some(last_seen) = &mut last_seen_opt { - *last_seen = report.last_seen; - } - self.assert_general_properties(&buffer, &report, is_last_event) .with_context(|| { format!( diff --git a/update-engine/src/context.rs b/update-engine/src/context.rs index cd85687cf9..c2c1e32119 100644 --- a/update-engine/src/context.rs +++ b/update-engine/src/context.rs @@ -242,8 +242,7 @@ impl NestedEventBuffer { report: EventReport, ) -> EventReport { self.buffer.add_event_report(report.into_generic()); - let ret = self.buffer.generate_report_since(self.last_seen); - self.last_seen = ret.last_seen; + let ret = self.buffer.generate_report_since(&mut self.last_seen); ret } } diff --git a/update-engine/src/display/group_display.rs b/update-engine/src/display/group_display.rs index 1982600656..ae9c22b09c 100644 --- a/update-engine/src/display/group_display.rs +++ b/update-engine/src/display/group_display.rs @@ -6,14 +6,18 @@ use std::{borrow::Borrow, collections::BTreeMap, fmt, time::Duration}; +use owo_colors::OwoColorize; +use swrite::{swrite, SWrite}; +use unicode_width::UnicodeWidthStr; + use crate::{ errors::UnknownReportKey, events::EventReport, EventBuffer, - ExecutionTerminalInfo, StepSpec, + ExecutionTerminalInfo, StepSpec, TerminalKind, }; use super::{ line_display_shared::LineDisplayFormatter, LineDisplayShared, - LineDisplayStyles, + LineDisplayStyles, HEADER_WIDTH, }; /// A displayer that simultaneously manages and shows line-based output for @@ -23,7 +27,10 @@ use super::{ /// is called to obtain the prefix, and `Eq + Ord` is used for keys. #[derive(Debug)] pub struct GroupDisplay { + // We don't need to add any buffering here because we already write data to + // the writer in a line-buffered fashion (see Self::write_events). writer: W, + max_width: usize, single_states: BTreeMap>, formatter: LineDisplayFormatter, stats: GroupDisplayStats, @@ -34,17 +41,34 @@ impl GroupDisplay { /// prefixes. /// /// The function passed in is expected to create a writer. - pub fn new( - keys_and_prefixes: impl IntoIterator, + pub fn new( + keys_and_prefixes: impl IntoIterator, writer: W, - ) -> Self { + ) -> Self + where + Str: Into, + { + // Right-align prefixes to their maximum width -- this helps keep the + // output organized. + let mut max_width = 0; + let keys_and_prefixes: Vec<_> = keys_and_prefixes + .into_iter() + .map(|(k, prefix)| { + let prefix = prefix.into(); + max_width = + max_width.max(UnicodeWidthStr::width(prefix.as_str())); + (k, prefix) + }) + .collect(); let single_states: BTreeMap<_, _> = keys_and_prefixes .into_iter() - .map(|(k, prefix)| (k, SingleState::new(prefix))) + .map(|(k, prefix)| (k, SingleState::new(prefix, max_width))) .collect(); + let not_started = single_states.len(); Self { writer, + max_width, single_states, formatter: LineDisplayFormatter::new(), stats: GroupDisplayStats::new(not_started), @@ -114,6 +138,17 @@ impl GroupDisplay { } } + /// Writes a "Status" or "Summary" line to the writer with statistics. + pub fn write_stats(&mut self, header: &str) -> std::io::Result<()> { + // Add a prefix which is equal to the maximum width of the prefixes. + // [prefix 00:00:00] takes up self.max_width + 9 characters inside the + // brackets. + let total_width = self.max_width + 9; + let mut line = format!("[{:total_width$}] ", ""); + self.stats.format_line(&mut line, header, &self.formatter); + writeln!(self.writer, "{line}") + } + /// Writes all pending events to the writer. pub fn write_events(&mut self) -> std::io::Result<()> { let mut lines = Vec::new(); @@ -134,59 +169,136 @@ impl GroupDisplay { #[derive(Clone, Copy, Debug)] pub struct GroupDisplayStats { + /// The total number of reports. + pub total: usize, + /// The number of reports that have not yet started. pub not_started: usize, /// The number of reports that are currently running. pub running: usize, - /// The number of reports that have successfully completed. + /// The number of reports that indicate successful completion. pub completed: usize, - /// The number of reports that have failed. + /// The number of reports that indicate failure. pub failed: usize, - /// The number of reports that have been aborted. + /// The number of reports that indicate being aborted. pub aborted: usize, /// The number of reports where we didn't receive a final state and it got /// overwritten by another report. + /// + /// Overwritten reports are considered failures since we don't know what + /// happened. pub overwritten: usize, } impl GroupDisplayStats { - fn new(not_started: usize) -> Self { + fn new(total: usize) -> Self { Self { + total, + not_started: total, completed: 0, failed: 0, aborted: 0, overwritten: 0, running: 0, - not_started, } } + /// Returns the number of terminal reports. + pub fn terminal_count(&self) -> usize { + self.completed + self.failed + self.aborted + self.overwritten + } + /// Returns true if all reports have reached a terminal state. pub fn is_terminal(&self) -> bool { self.not_started == 0 && self.running == 0 } + /// Returns true if there are any failures. + pub fn has_failures(&self) -> bool { + self.failed > 0 || self.aborted > 0 || self.overwritten > 0 + } + fn apply_result(&mut self, result: AddEventReportResult) { // Process result.after first to avoid integer underflow. match result.after { SingleStateTag::NotStarted => self.not_started += 1, SingleStateTag::Running => self.running += 1, - SingleStateTag::Terminal => self.completed += 1, + SingleStateTag::Terminal(TerminalKind::Completed) => { + self.completed += 1 + } + SingleStateTag::Terminal(TerminalKind::Failed) => self.failed += 1, + SingleStateTag::Terminal(TerminalKind::Aborted) => { + self.aborted += 1 + } SingleStateTag::Overwritten => self.overwritten += 1, } match result.before { SingleStateTag::NotStarted => self.not_started -= 1, SingleStateTag::Running => self.running -= 1, - SingleStateTag::Terminal => self.completed -= 1, + SingleStateTag::Terminal(TerminalKind::Completed) => { + self.completed -= 1 + } + SingleStateTag::Terminal(TerminalKind::Failed) => self.failed -= 1, + SingleStateTag::Terminal(TerminalKind::Aborted) => { + self.aborted -= 1 + } SingleStateTag::Overwritten => self.overwritten -= 1, } } + + fn format_line( + &self, + line: &mut String, + header: &str, + formatter: &LineDisplayFormatter, + ) { + let header_style = if self.has_failures() { + formatter.styles().error_style + } else { + formatter.styles().progress_style + }; + + swrite!(line, "{:>HEADER_WIDTH$} ", header.style(header_style)); + let terminal_count = self.terminal_count(); + swrite!( + line, + "{terminal_count}/{}: {} running, {} {}", + self.total, + self.running.style(formatter.styles().meta_style), + self.completed.style(formatter.styles().meta_style), + "completed".style(formatter.styles().progress_style), + ); + if self.failed > 0 { + swrite!( + line, + ", {} {}", + self.failed.style(formatter.styles().meta_style), + "failed".style(formatter.styles().error_style), + ); + } + if self.aborted > 0 { + swrite!( + line, + ", {} {}", + self.aborted.style(formatter.styles().meta_style), + "aborted".style(formatter.styles().error_style), + ); + } + if self.overwritten > 0 { + swrite!( + line, + ", {} {}", + self.overwritten.style(formatter.styles().meta_style), + "overwritten".style(formatter.styles().error_style), + ); + } + } } #[derive(Debug)] @@ -197,7 +309,9 @@ struct SingleState { } impl SingleState { - fn new(prefix: String) -> Self { + fn new(prefix: String, max_width: usize) -> Self { + // Right-align the prefix to the maximum width. + let prefix = format!("{:>max_width$}", prefix); Self { shared: LineDisplayShared::new(), kind: SingleStateKind::NotStarted { displayed: false }, @@ -219,11 +333,11 @@ impl SingleState { } SingleStateKind::Running { .. } => SingleStateTag::Running, - SingleStateKind::Terminal { .. } => { + SingleStateKind::Terminal { info, .. } => { // Once we've reached a terminal state, we don't record any more // events. return AddEventReportResult::unchanged( - SingleStateTag::Terminal, + SingleStateTag::Terminal(info.kind), ); } SingleStateKind::Overwritten { .. } => { @@ -257,11 +371,12 @@ impl SingleState { // Grab the event buffer to store it in the terminal state. let event_buffer = std::mem::replace(event_buffer, EventBuffer::new(0)); + let terminal_kind = info.kind; self.kind = SingleStateKind::Terminal { info, pending_event_buffer: Some(event_buffer), }; - SingleStateTag::Terminal + SingleStateTag::Terminal(terminal_kind) } else { SingleStateTag::Running }; @@ -365,6 +480,6 @@ impl AddEventReportResult { enum SingleStateTag { NotStarted, Running, - Terminal, + Terminal(TerminalKind), Overwritten, } diff --git a/update-engine/src/display/line_display.rs b/update-engine/src/display/line_display.rs index b14f496b45..631594d13c 100644 --- a/update-engine/src/display/line_display.rs +++ b/update-engine/src/display/line_display.rs @@ -120,7 +120,7 @@ impl LineDisplayStyles { let mut ret = Self::default(); ret.prefix_style = Style::new().bold(); ret.meta_style = Style::new().bold(); - ret.step_name_style = Style::new(); + ret.step_name_style = Style::new().cyan(); ret.progress_style = Style::new().bold().green(); ret.progress_message_style = Style::new().green(); ret.warning_style = Style::new().bold().yellow(); diff --git a/update-engine/src/display/line_display_shared.rs b/update-engine/src/display/line_display_shared.rs index 0822f48515..e0ea3f9c4e 100644 --- a/update-engine/src/display/line_display_shared.rs +++ b/update-engine/src/display/line_display_shared.rs @@ -17,8 +17,8 @@ use swrite::{swrite, SWrite as _}; use crate::{ events::{ - LowPriorityStepEventKind, ProgressEvent, StepEvent, StepInfo, - StepOutcome, + LowPriorityStepEventKind, ProgressCounter, ProgressEvent, StepEvent, + StepInfo, StepOutcome, }, AbortInfo, AbortReason, CompletionInfo, EventBuffer, EventBufferStepData, ExecutionTerminalInfo, FailureInfo, FailureReason, NestedSpec, @@ -27,7 +27,9 @@ use crate::{ use super::LineDisplayStyles; -const HEADER_WIDTH: usize = 9; +// This is chosen to leave enough room for all possible headers: "Completed" at +// 9 characters is the longest. +pub(super) const HEADER_WIDTH: usize = 9; #[derive(Debug)] pub(super) struct LineDisplayShared { @@ -514,20 +516,14 @@ impl LineDisplayShared { let (before, after) = match progress_event.kind.progress_counter() { Some(counter) => { - let progress_str = match counter.total { - Some(total) => { - format!("{}/{}", counter.current, total) - } - None => format!("{}", counter.current), - }; + let progress_str = format_progress_counter(counter); ( format!( "{:>HEADER_WIDTH$} ", "Progress".style(formatter.styles.progress_style) ), format!( - "{progress_str} {} after {:.2?}", - counter.units, + "{progress_str} after {:.2?}", leaf_attempt_elapsed .style(formatter.styles.meta_style), ), @@ -582,6 +578,24 @@ impl LineDisplayShared { } } +fn format_progress_counter(counter: &ProgressCounter) -> String { + match counter.total { + Some(total) => { + // Show a percentage value. Correct alignment requires converting to + // a string in the middle like this. + let percent = (counter.current as f64 / total as f64) * 100.0; + // <12.34> is 5 characters wide. + let percent_width = 5; + let counter_width = total.to_string().len(); + format!( + "{:>percent_width$.2}% ({:>counter_width$}/{} {})", + percent, counter.current, total, counter.units, + ) + } + None => format!("{} {}", counter.current, counter.units), + } +} + /// State that tracks line display formatting. /// /// Each `LineDisplay` and `GroupDisplay` has one of these. @@ -599,6 +613,11 @@ impl LineDisplayFormatter { } } + #[inline] + pub(super) fn styles(&self) -> &LineDisplayStyles { + &self.styles + } + #[inline] pub(super) fn set_styles(&mut self, styles: LineDisplayStyles) { self.styles = styles; @@ -897,3 +916,28 @@ impl fmt::Display for AsLetters { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_format_progress_counter() { + let tests = vec![ + (ProgressCounter::new(5, 20, "units"), "25.00% ( 5/20 units)"), + (ProgressCounter::new(0, 20, "bytes"), " 0.00% ( 0/20 bytes)"), + (ProgressCounter::new(20, 20, "cubes"), "100.00% (20/20 cubes)"), + // NaN is a weird case that is a buggy update engine impl in practice + (ProgressCounter::new(0, 0, "units"), " NaN% (0/0 units)"), + (ProgressCounter::current(5, "units"), "5 units"), + ]; + for (input, output) in tests { + assert_eq!( + format_progress_counter(&input), + output, + "format matches for input: {:?}", + input + ); + } + } +}