From 030adce411fe37c9e2d3c70ee5a6cdbdfd49f3f9 Mon Sep 17 00:00:00 2001 From: Rain Date: Mon, 22 Jan 2024 14:57:01 -0800 Subject: [PATCH] [update-engine] reuse parent_key_and_child_index from existing steps (#4858) During a dogfood mupdate on 2024-01-18, I saw some really weird output with wicket's `rack-update attach` that looked like: ``` [sled 8 00:20:48] Running .... 12a 5s 1/3) Writing host phase 2 to slot B ``` The "5s" is all wrong -- the letter there is supposed to indicate, for an engine that has one or more nested engines, the index of that nested engine. So for example, if a step 12 has two nested engines, they would be marked "12a" and "12b". "5s" indicates that that's the 19th nested engine for that step, and we definitely have nowhere near 19 nested engines for a step anywhere in wicketd. This turned out to be because we weren't reusing child indexes from earlier steps in the sequence. Fix that, and also add: * tests which catch this issue * some dev-only code to wicket which made it easy to debug this locally --- update-engine/src/buffer.rs | 185 +++++++++++++++++++++++++------- update-engine/src/test_utils.rs | 31 ++++-- wicket/src/cli/rack_update.rs | 179 +++++++++++++++++++++++++++++- 3 files changed, 350 insertions(+), 45 deletions(-) diff --git a/update-engine/src/buffer.rs b/update-engine/src/buffer.rs index 36a0626963..04363ffc26 100644 --- a/update-engine/src/buffer.rs +++ b/update-engine/src/buffer.rs @@ -262,45 +262,59 @@ impl EventStore { root_event_index, event.total_elapsed, ); + if let Some(new_execution) = actions.new_execution { if new_execution.nest_level == 0 { self.root_execution_id = Some(new_execution.execution_id); } - // If there's a parent key, then what's the child index? - let parent_key_and_child_index = - if let Some(parent_key) = new_execution.parent_key { - match self.map.get_mut(&parent_key) { - Some(parent_data) => { - let child_index = parent_data.child_executions_seen; - parent_data.child_executions_seen += 1; - Some((parent_key, child_index)) - } - None => { - // This should never happen -- it indicates that the - // parent key was unknown. This can happen if we - // didn't receive an event regarding a parent - // execution being started. + + if let Some((first_step_key, ..)) = + new_execution.steps_to_add.first() + { + // Do we already know about this execution? If so, grab the parent + // key and child index from the first step. + let parent_key_and_child_index = + if let Some(data) = self.map.get(first_step_key) { + data.parent_key_and_child_index + } else { + if let Some(parent_key) = new_execution.parent_key { + match self.map.get_mut(&parent_key) { + Some(parent_data) => { + let child_index = + parent_data.child_executions_seen; + parent_data.child_executions_seen += 1; + Some((parent_key, child_index)) + } + None => { + // This should never happen -- it indicates that the + // parent key was unknown. This can happen if we + // didn't receive an event regarding a parent + // execution being started. + None + } + } + } else { None } - } - } else { - None - }; - let total_steps = new_execution.steps_to_add.len(); - for (new_step_key, new_step, sort_key) in new_execution.steps_to_add - { - // These are brand new steps so their keys shouldn't exist in the - // map. But if they do, don't overwrite them. - self.map.entry(new_step_key).or_insert_with(|| { - EventBufferStepData::new( - new_step, - parent_key_and_child_index, - sort_key, - new_execution.nest_level, - total_steps, - root_event_index, - ) - }); + }; + + let total_steps = new_execution.steps_to_add.len(); + for (new_step_key, new_step, sort_key) in + new_execution.steps_to_add + { + // These are brand new steps so their keys shouldn't exist in the + // map. But if they do, don't overwrite them. + self.map.entry(new_step_key).or_insert_with(|| { + EventBufferStepData::new( + new_step, + parent_key_and_child_index, + sort_key, + new_execution.nest_level, + total_steps, + root_event_index, + ) + }); + } } } @@ -1808,6 +1822,7 @@ mod tests { struct BufferTestContext { root_execution_id: ExecutionId, generated_events: Vec>, + // Data derived from generated_events. generated_step_events: Vec>, } @@ -1885,9 +1900,95 @@ mod tests { Event::Progress(_) => None, }) .collect(); + + // Create two buffer and feed events. + // * The incremental buffer has each event fed into it one-by-one. + // * The "idempotent" buffer has events 0, 0..1, 0..2, 0..3, etc + // fed into it one by one. The name is because this is really + // testing the idempotency of the event buffer. + + println!("** generating incremental and idempotent buffers **"); + let mut incremental_buffer = EventBuffer::default(); + let mut idempotent_buffer = EventBuffer::default(); + for event in &generated_events { + incremental_buffer.add_event(event.clone()); + let report = incremental_buffer.generate_report(); + idempotent_buffer.add_event_report(report); + } + + // Check that the two buffers above are similar. + Self::ensure_buffers_similar( + &incremental_buffer, + &idempotent_buffer, + ) + .expect("idempotent buffer is similar to incremental buffer"); + + // Also generate a buffer with a single event report. + println!("** generating oneshot buffer **"); + let mut oneshot_buffer = EventBuffer::default(); + oneshot_buffer + .add_event_report(incremental_buffer.generate_report()); + + Self::ensure_buffers_similar(&incremental_buffer, &oneshot_buffer) + .expect("oneshot buffer is similar to incremental buffer"); + Self { root_execution_id, generated_events, generated_step_events } } + fn ensure_buffers_similar( + buf1: &EventBuffer, + buf2: &EventBuffer, + ) -> anyhow::Result<()> { + // The two should have the same step keys. + let buf1_steps = buf1.steps(); + let buf2_steps = buf2.steps(); + + ensure!( + buf1_steps.as_slice().len() == buf2_steps.as_slice().len(), + "buffers have same number of steps ({} vs {})", + buf1_steps.as_slice().len(), + buf2_steps.as_slice().len() + ); + + for (ix, ((k1, data1), (k2, data2))) in buf1_steps + .as_slice() + .iter() + .zip(buf2_steps.as_slice().iter()) + .enumerate() + { + ensure!( + k1 == k2, + "buffers have same step keys at index {} ({:?} vs {:?})", + ix, + k1, + k2 + ); + ensure!( + data1.sort_key() == data2.sort_key(), + "buffers have same sort key at index {} ({:?} vs {:?})", + ix, + data1.sort_key(), + data2.sort_key() + ); + ensure!( + data1.parent_key_and_child_index() == data2.parent_key_and_child_index(), + "buffers have same parent key and child index at index {} ({:?} vs {:?})", + ix, + data1.parent_key_and_child_index(), + data2.parent_key_and_child_index(), + ); + ensure!( + data1.nest_level() == data2.nest_level(), + "buffers have same nest level at index {} ({:?} vs {:?})", + ix, + data1.nest_level(), + data2.nest_level(), + ); + } + + Ok(()) + } + /// Runs a test in a scenario where all elements should be seen. /// /// Each event is added `times` times. @@ -2165,10 +2266,10 @@ mod tests { ), "this is the last event so ExecutionStatus must be completed" ); - // There are two nested engines. + // There are three nested engines. ensure!( - summary.len() == 3, - "two nested engines must be defined" + summary.len() == 4, + "three nested engines (plus one root engine) must be defined" ); let (_, nested_summary) = summary @@ -2186,6 +2287,18 @@ mod tests { let (_, nested_summary) = summary .get_index(2) .expect("this is the second nested engine"); + ensure!( + matches!( + &nested_summary.execution_status, + ExecutionStatus::Terminal(info) + if info.kind == TerminalKind::Failed + ), + "for this engine, the ExecutionStatus must be failed" + ); + + let (_, nested_summary) = summary + .get_index(3) + .expect("this is the third nested engine"); ensure!( matches!( &nested_summary.execution_status, diff --git a/update-engine/src/test_utils.rs b/update-engine/src/test_utils.rs index b943d1ddfe..539ef28864 100644 --- a/update-engine/src/test_utils.rs +++ b/update-engine/src/test_utils.rs @@ -141,7 +141,24 @@ fn define_test_steps( move |parent_cx| async move { parent_cx .with_nested_engine(|engine| { - define_nested_engine(&parent_cx, engine); + define_nested_engine(&parent_cx, engine, 3, "steps"); + Ok(()) + }) + .await + .expect_err("this is expected to fail"); + + // Define a second nested engine -- this verifies that internal + // buffer indexes match up. + parent_cx + .with_nested_engine(|engine| { + define_nested_engine( + &parent_cx, + engine, + 10, + // The tests in buffer.rs expect the units to be + // "steps" exactly once, so use a different name here. + "steps (again)", + ); Ok(()) }) .await @@ -214,18 +231,20 @@ fn define_test_steps( fn define_nested_engine<'a>( parent_cx: &'a StepContext, engine: &mut UpdateEngine<'a, TestSpec>, + start_id: usize, + step_units: &'static str, ) { engine .new_step( "nested-foo".to_owned(), - 4, + start_id + 1, "Nested step 1", move |cx| async move { parent_cx .send_progress(StepProgress::with_current_and_total( 1, 3, - "steps", + step_units, Default::default(), )) .await; @@ -239,7 +258,7 @@ fn define_nested_engine<'a>( engine .new_step::<_, _, ()>( "nested-bar".to_owned(), - 5, + start_id + 2, "Nested step 2 (fails)", move |cx| async move { // This is used by NestedProgressCheck below. @@ -247,7 +266,7 @@ fn define_nested_engine<'a>( .send_progress(StepProgress::with_current_and_total( 2, 3, - "steps", + step_units, Default::default(), )) .await; @@ -263,7 +282,7 @@ fn define_nested_engine<'a>( .send_progress(StepProgress::with_current_and_total( 3, 3, - "steps", + step_units, Default::default(), )) .await; diff --git a/wicket/src/cli/rack_update.rs b/wicket/src/cli/rack_update.rs index cac0f09ee5..ccacea0e38 100644 --- a/wicket/src/cli/rack_update.rs +++ b/wicket/src/cli/rack_update.rs @@ -8,23 +8,29 @@ use std::{ collections::{BTreeMap, BTreeSet}, + io::{BufReader, Write}, net::SocketAddrV6, time::Duration, }; use anyhow::{anyhow, bail, Context, Result}; +use camino::Utf8PathBuf; use clap::{Args, Subcommand, ValueEnum}; use slog::Logger; use tokio::{sync::watch, task::JoinHandle}; use update_engine::{ display::{GroupDisplay, LineDisplayStyles}, - NestedError, + EventBuffer, NestedError, }; use wicket_common::{ - rack_update::ClearUpdateStateResponse, update_events::EventReport, + rack_update::ClearUpdateStateResponse, + update_events::{EventReport, WicketdEngineSpec}, WICKETD_TIMEOUT, }; -use wicketd_client::types::{ClearUpdateStateParams, StartUpdateParams}; +use wicketd_client::types::{ + ClearUpdateStateParams, GetArtifactsAndEventReportsResponse, + StartUpdateParams, +}; use crate::{ cli::GlobalOpts, @@ -41,10 +47,22 @@ use super::command::CommandOutput; pub(crate) enum RackUpdateArgs { /// Start one or more updates. Start(StartRackUpdateArgs), + /// Attach to one or more running updates. Attach(AttachArgs), + /// Clear updates. Clear(ClearArgs), + + /// Dump artifacts and event reports from wicketd. + /// + /// Debug-only, intended for development. + DebugDump(DumpArgs), + + /// Replay update logs from a dump file. + /// + /// Debug-only, intended for development. + DebugReplay(ReplayArgs), } impl RackUpdateArgs { @@ -65,6 +83,12 @@ impl RackUpdateArgs { RackUpdateArgs::Clear(args) => { args.exec(log, wicketd_addr, global_opts, output).await } + RackUpdateArgs::DebugDump(args) => { + args.exec(log, wicketd_addr).await + } + RackUpdateArgs::DebugReplay(args) => { + args.exec(log, global_opts, output) + } } } } @@ -380,6 +404,155 @@ async fn do_clear_update_state( Ok(response) } +#[derive(Debug, Args)] +pub(crate) struct DumpArgs { + /// Pretty-print JSON output. + #[clap(long)] + pretty: bool, +} + +impl DumpArgs { + async fn exec(self, log: Logger, wicketd_addr: SocketAddrV6) -> Result<()> { + let client = create_wicketd_client(&log, wicketd_addr, WICKETD_TIMEOUT); + + let response = client + .get_artifacts_and_event_reports() + .await + .context("error calling get_artifacts_and_event_reports")?; + let response = response.into_inner(); + + // Return the response as a JSON object. + if self.pretty { + serde_json::to_writer_pretty(std::io::stdout(), &response) + .context("error writing to stdout")?; + } else { + serde_json::to_writer(std::io::stdout(), &response) + .context("error writing to stdout")?; + } + Ok(()) + } +} + +#[derive(Debug, Args)] +pub(crate) struct ReplayArgs { + /// The dump file to replay. + /// + /// This should be the output of `rack-update debug-dump`, or something + /// like . + file: Utf8PathBuf, + + /// How to feed events into the display. + #[clap(long, value_enum, default_value_t)] + strategy: ReplayStrategy, + + #[clap(flatten)] + component_ids: ComponentIdSelector, +} + +impl ReplayArgs { + fn exec( + self, + log: Logger, + global_opts: GlobalOpts, + output: CommandOutput<'_>, + ) -> Result<()> { + let update_ids = self.component_ids.to_component_ids()?; + let mut display = GroupDisplay::new_with_display( + &log, + update_ids.iter().copied(), + output.stderr, + ); + if global_opts.use_color() { + display.set_styles(LineDisplayStyles::colorized()); + } + + let file = BufReader::new( + std::fs::File::open(&self.file) + .with_context(|| format!("error opening {}", self.file))?, + ); + let response: GetArtifactsAndEventReportsResponse = + serde_json::from_reader(file)?; + let event_reports = + parse_event_report_map(&log, response.event_reports); + + self.strategy.execute(display, event_reports)?; + + Ok(()) + } +} + +#[derive(Clone, Copy, Default, Eq, PartialEq, Hash, Debug, ValueEnum)] +enum ReplayStrategy { + /// Feed all events into the buffer immediately. + #[default] + Oneshot, + + /// Feed events into the buffer one at a time. + Incremental, + + /// Feed events into the buffer as 0, 0..1, 0..2, 0..3 etc. + Idempotent, +} + +impl ReplayStrategy { + fn execute( + self, + mut display: GroupDisplay< + ComponentId, + &mut dyn Write, + WicketdEngineSpec, + >, + event_reports: BTreeMap, + ) -> Result<()> { + match self { + ReplayStrategy::Oneshot => { + // TODO: parallelize this computation? + for (id, event_report) in event_reports { + // If display.add_event_report errors out, it's for a report for a + // component we weren't interested in. Ignore it. + _ = display.add_event_report(&id, event_report); + } + + display.write_events()?; + } + ReplayStrategy::Incremental => { + for (id, event_report) in &event_reports { + let mut buffer = EventBuffer::default(); + let mut last_seen = None; + for event in &event_report.step_events { + buffer.add_step_event(event.clone()); + let report = + buffer.generate_report_since(&mut last_seen); + + // If display.add_event_report errors out, it's for a report for a + // component we weren't interested in. Ignore it. + _ = display.add_event_report(&id, report); + + display.write_events()?; + } + } + } + ReplayStrategy::Idempotent => { + for (id, event_report) in &event_reports { + let mut buffer = EventBuffer::default(); + for event in &event_report.step_events { + buffer.add_step_event(event.clone()); + let report = buffer.generate_report(); + + // If display.add_event_report errors out, it's for a report for a + // component we weren't interested in. Ignore it. + _ = display.add_event_report(&id, report); + + display.write_events()?; + } + } + } + } + + Ok(()) + } +} + #[derive(Clone, Copy, Eq, PartialEq, Hash, Debug, ValueEnum)] enum MessageFormat { Human,