Skip to content

Commit

Permalink
Synchronize events from nested senders
Browse files Browse the repository at this point in the history
Created using spr 1.3.4
  • Loading branch information
sunshowers committed Nov 2, 2023
1 parent d3d56f4 commit b898efa
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 45 deletions.
19 changes: 15 additions & 4 deletions update-engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

// Copyright 2023 Oxide Computer Company

use std::convert::Infallible;
use std::marker::PhantomData;
use std::sync::Mutex;
use std::{collections::HashMap, fmt};
Expand Down Expand Up @@ -59,7 +58,7 @@ impl<S: StepSpec> StepContext<S> {
#[inline]
pub async fn send_progress(&self, progress: StepProgress<S>) {
let now = Instant::now();
let (done, done_rx) = oneshot::channel::<Infallible>();
let (done, done_rx) = oneshot::channel();
self.payload_sender
.send(StepContextPayload::Progress { now, progress, done })
.await
Expand Down Expand Up @@ -286,19 +285,31 @@ impl NestedEventBuffer {
}
}

/// An uninhabited type for oneshot channels, since we only care about them
/// being dropped.
#[derive(Debug)]
pub(crate) enum Never {}

#[derive_where(Debug)]
pub(crate) enum StepContextPayload<S: StepSpec> {
Progress {
now: Instant,
progress: StepProgress<S>,
done: oneshot::Sender<Infallible>,
done: oneshot::Sender<Never>,
},
/// A single nested event with synchronization.
NestedSingle {
now: Instant,
event: Event<NestedSpec>,
done: oneshot::Sender<Never>,
},
/// One out of a series of nested events sent in succession.
Nested {
now: Instant,
event: Event<NestedSpec>,
},
Sync {
done: oneshot::Sender<Infallible>,
done: oneshot::Sender<Never>,
},
}

Expand Down
92 changes: 51 additions & 41 deletions update-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
StepEvent, StepEventKind, StepInfo, StepInfoWithMetadata, StepOutcome,
StepProgress,
},
AsError, CompletionContext, MetadataContext, StepContext,
AsError, CompletionContext, MetadataContext, NestedSpec, StepContext,
StepContextPayload, StepHandle, StepSpec,
};

Expand Down Expand Up @@ -389,18 +389,15 @@ impl<S: StepSpec, S2: StepSpec> SenderImpl<S2> for NestedSender<S> {
) -> BoxFuture<'_, Result<(), ExecutionError<S2>>> {
let now = Instant::now();
async move {
let (done, done_rx) = oneshot::channel();
self.sender
.send(StepContextPayload::Nested {
.send(StepContextPayload::NestedSingle {
now,
event: event.into_generic(),
done,
})
.await
.expect("our code always keeps payload_receiver open");
let (done, done_rx) = oneshot::channel();
self.sender
.send(StepContextPayload::Sync { done })
.await
.expect("our code always keeps payload_receiver open");
_ = done_rx.await;
Ok(())
}
Expand Down Expand Up @@ -1084,41 +1081,12 @@ impl<S: StepSpec, F: FnMut() -> usize> StepProgressReporter<S, F> {
self.handle_progress(now, progress).await?;
std::mem::drop(done);
}
StepContextPayload::Nested { now, event: Event::Step(event) } => {
self.sender
.send(Event::Step(StepEvent {
spec: S::schema_name(),
execution_id: self.execution_id,
event_index: (self.next_event_index)(),
total_elapsed: now - self.total_start,
kind: StepEventKind::Nested {
step: self.step_info.clone(),
attempt: self.attempt,
event: Box::new(event),
step_elapsed: now - self.step_start,
attempt_elapsed: now - self.attempt_start,
},
}))
.await?;
StepContextPayload::NestedSingle { now, event, done } => {
self.handle_nested(now, event).await?;
std::mem::drop(done);
}
StepContextPayload::Nested {
now,
event: Event::Progress(event),
} => {
self.sender
.send(Event::Progress(ProgressEvent {
spec: S::schema_name(),
execution_id: self.execution_id,
total_elapsed: now - self.total_start,
kind: ProgressEventKind::Nested {
step: self.step_info.clone(),
attempt: self.attempt,
event: Box::new(event),
step_elapsed: now - self.step_start,
attempt_elapsed: now - self.attempt_start,
},
}))
.await?;
StepContextPayload::Nested { now, event } => {
self.handle_nested(now, event).await?;
}
StepContextPayload::Sync { done } => {
std::mem::drop(done);
Expand Down Expand Up @@ -1197,6 +1165,48 @@ impl<S: StepSpec, F: FnMut() -> usize> StepProgressReporter<S, F> {
}
}

async fn handle_nested(
&mut self,
now: Instant,
event: Event<NestedSpec>,
) -> Result<(), ExecutionError<S>> {
match event {
Event::Step(event) => {
self.sender
.send(Event::Step(StepEvent {
spec: S::schema_name(),
execution_id: self.execution_id,
event_index: (self.next_event_index)(),
total_elapsed: now - self.total_start,
kind: StepEventKind::Nested {
step: self.step_info.clone(),
attempt: self.attempt,
event: Box::new(event),
step_elapsed: now - self.step_start,
attempt_elapsed: now - self.attempt_start,
},
}))
.await
}
Event::Progress(event) => {
self.sender
.send(Event::Progress(ProgressEvent {
spec: S::schema_name(),
execution_id: self.execution_id,
total_elapsed: now - self.total_start,
kind: ProgressEventKind::Nested {
step: self.step_info.clone(),
attempt: self.attempt,
event: Box::new(event),
step_elapsed: now - self.step_start,
attempt_elapsed: now - self.attempt_start,
},
}))
.await
}
}
}

async fn handle_abort(mut self, message: String) -> ExecutionError<S> {
// Send the abort message over the channel.
//
Expand Down

0 comments on commit b898efa

Please sign in to comment.