diff --git a/update-engine/src/context.rs b/update-engine/src/context.rs index a02fe07386..6b22578c2e 100644 --- a/update-engine/src/context.rs +++ b/update-engine/src/context.rs @@ -4,7 +4,6 @@ // Copyright 2023 Oxide Computer Company -use std::convert::Infallible; use std::marker::PhantomData; use std::sync::Mutex; use std::{collections::HashMap, fmt}; @@ -59,7 +58,7 @@ impl StepContext { #[inline] pub async fn send_progress(&self, progress: StepProgress) { let now = Instant::now(); - let (done, done_rx) = oneshot::channel::(); + let (done, done_rx) = oneshot::channel(); self.payload_sender .send(StepContextPayload::Progress { now, progress, done }) .await @@ -286,19 +285,31 @@ impl NestedEventBuffer { } } +/// An uninhabited type for oneshot channels, since we only care about them +/// being dropped. +#[derive(Debug)] +pub(crate) enum Never {} + #[derive_where(Debug)] pub(crate) enum StepContextPayload { Progress { now: Instant, progress: StepProgress, - done: oneshot::Sender, + done: oneshot::Sender, + }, + /// A single nested event with synchronization. + NestedSingle { + now: Instant, + event: Event, + done: oneshot::Sender, }, + /// One out of a series of nested events sent in succession. Nested { now: Instant, event: Event, }, Sync { - done: oneshot::Sender, + done: oneshot::Sender, }, } diff --git a/update-engine/src/engine.rs b/update-engine/src/engine.rs index fef7f29468..56d7739a40 100644 --- a/update-engine/src/engine.rs +++ b/update-engine/src/engine.rs @@ -33,7 +33,7 @@ use crate::{ StepEvent, StepEventKind, StepInfo, StepInfoWithMetadata, StepOutcome, StepProgress, }, - AsError, CompletionContext, MetadataContext, StepContext, + AsError, CompletionContext, MetadataContext, NestedSpec, StepContext, StepContextPayload, StepHandle, StepSpec, }; @@ -389,18 +389,15 @@ impl SenderImpl for NestedSender { ) -> BoxFuture<'_, Result<(), ExecutionError>> { let now = Instant::now(); async move { + let (done, done_rx) = oneshot::channel(); self.sender - .send(StepContextPayload::Nested { + .send(StepContextPayload::NestedSingle { now, event: event.into_generic(), + done, }) .await .expect("our code always keeps payload_receiver open"); - let (done, done_rx) = oneshot::channel(); - self.sender - .send(StepContextPayload::Sync { done }) - .await - .expect("our code always keeps payload_receiver open"); _ = done_rx.await; Ok(()) } @@ -1084,41 +1081,12 @@ impl usize> StepProgressReporter { self.handle_progress(now, progress).await?; std::mem::drop(done); } - StepContextPayload::Nested { now, event: Event::Step(event) } => { - self.sender - .send(Event::Step(StepEvent { - spec: S::schema_name(), - execution_id: self.execution_id, - event_index: (self.next_event_index)(), - total_elapsed: now - self.total_start, - kind: StepEventKind::Nested { - step: self.step_info.clone(), - attempt: self.attempt, - event: Box::new(event), - step_elapsed: now - self.step_start, - attempt_elapsed: now - self.attempt_start, - }, - })) - .await?; + StepContextPayload::NestedSingle { now, event, done } => { + self.handle_nested(now, event).await?; + std::mem::drop(done); } - StepContextPayload::Nested { - now, - event: Event::Progress(event), - } => { - self.sender - .send(Event::Progress(ProgressEvent { - spec: S::schema_name(), - execution_id: self.execution_id, - total_elapsed: now - self.total_start, - kind: ProgressEventKind::Nested { - step: self.step_info.clone(), - attempt: self.attempt, - event: Box::new(event), - step_elapsed: now - self.step_start, - attempt_elapsed: now - self.attempt_start, - }, - })) - .await?; + StepContextPayload::Nested { now, event } => { + self.handle_nested(now, event).await?; } StepContextPayload::Sync { done } => { std::mem::drop(done); @@ -1197,6 +1165,48 @@ impl usize> StepProgressReporter { } } + async fn handle_nested( + &mut self, + now: Instant, + event: Event, + ) -> Result<(), ExecutionError> { + match event { + Event::Step(event) => { + self.sender + .send(Event::Step(StepEvent { + spec: S::schema_name(), + execution_id: self.execution_id, + event_index: (self.next_event_index)(), + total_elapsed: now - self.total_start, + kind: StepEventKind::Nested { + step: self.step_info.clone(), + attempt: self.attempt, + event: Box::new(event), + step_elapsed: now - self.step_start, + attempt_elapsed: now - self.attempt_start, + }, + })) + .await + } + Event::Progress(event) => { + self.sender + .send(Event::Progress(ProgressEvent { + spec: S::schema_name(), + execution_id: self.execution_id, + total_elapsed: now - self.total_start, + kind: ProgressEventKind::Nested { + step: self.step_info.clone(), + attempt: self.attempt, + event: Box::new(event), + step_elapsed: now - self.step_start, + attempt_elapsed: now - self.attempt_start, + }, + })) + .await + } + } + } + async fn handle_abort(mut self, message: String) -> ExecutionError { // Send the abort message over the channel. //