Skip to content

Commit

Permalink
Fix docs
Browse files Browse the repository at this point in the history
Created using spr 1.3.4
  • Loading branch information
sunshowers committed Nov 2, 2023
1 parent 9b5bad2 commit f985e59
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 396 deletions.
93 changes: 2 additions & 91 deletions update-engine/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1599,7 +1599,7 @@ mod tests {
use tokio_stream::wrappers::ReceiverStream;

use crate::{
events::{ProgressCounter, ProgressUnits, StepProgress},
events::{ProgressUnits, StepProgress},
test_utils::TestSpec,
StepContext, StepSuccess, UpdateEngine,
};
Expand Down Expand Up @@ -1834,36 +1834,6 @@ mod tests {
}
};

// Ensure that nested step 2 produces progress events in the
// expected order and in succession.
let mut progress_check = NestedProgressCheck::new();
for event in &generated_events {
if let Event::Progress(event) = event {
let progress_counter = event.kind.progress_counter();
if progress_counter
== Some(&ProgressCounter::new(2, 3, "steps"))
{
progress_check.two_out_of_three_seen();
} else if progress_check
== NestedProgressCheck::TwoOutOfThreeSteps
{
assert_eq!(
progress_counter,
Some(&ProgressCounter::current(50, "units"))
);
progress_check.fifty_units_seen();
} else if progress_check == NestedProgressCheck::FiftyUnits
{
assert_eq!(
progress_counter,
Some(&ProgressCounter::new(3, 3, "steps"))
);
progress_check.three_out_of_three_seen();
}
}
}
progress_check.assert_done();

// Ensure that events are never seen twice.
let mut event_indexes_seen = HashSet::new();
let mut leaf_event_indexes_seen = HashSet::new();
Expand Down Expand Up @@ -2370,7 +2340,6 @@ mod tests {
5,
"Nested step 2 (fails)",
move |cx| async move {
// This is used by NestedProgressCheck below.
parent_cx
.send_progress(StepProgress::with_current_and_total(
2,
Expand All @@ -2381,76 +2350,18 @@ mod tests {
.await;

cx.send_progress(StepProgress::with_current(
50,
20,
"units",
Default::default(),
))
.await;

parent_cx
.send_progress(StepProgress::with_current_and_total(
3,
3,
"steps",
Default::default(),
))
.await;

bail!("failing step")
},
)
.register();
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum NestedProgressCheck {
Initial,
TwoOutOfThreeSteps,
FiftyUnits,
ThreeOutOfThreeSteps,
}

impl NestedProgressCheck {
fn new() -> Self {
Self::Initial
}

fn two_out_of_three_seen(&mut self) {
assert_eq!(
*self,
Self::Initial,
"two_out_of_three_seen: expected Initial",
);
*self = Self::TwoOutOfThreeSteps;
}

fn fifty_units_seen(&mut self) {
assert_eq!(
*self,
Self::TwoOutOfThreeSteps,
"twenty_units_seen: expected TwoOutOfThreeSteps",
);
*self = Self::FiftyUnits;
}

fn three_out_of_three_seen(&mut self) {
assert_eq!(
*self,
Self::FiftyUnits,
"three_out_of_three_seen: expected TwentyUnits",
);
*self = Self::ThreeOutOfThreeSteps;
}

fn assert_done(&self) {
assert_eq!(
*self,
Self::ThreeOutOfThreeSteps,
"assert_done: expected ThreeOutOfThreeSteps",
);
}
}

fn define_remote_nested_engine(
engine: &mut UpdateEngine<'_, TestSpec>,
start_id: usize,
Expand Down
164 changes: 52 additions & 112 deletions update-engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use std::{collections::HashMap, fmt};
use derive_where::derive_where;
use futures::FutureExt;
use tokio::sync::{mpsc, oneshot};
use tokio::time::Instant;

use crate::errors::NestedEngineError;
use crate::{
Expand Down Expand Up @@ -57,13 +56,10 @@ impl<S: StepSpec> StepContext<S> {
/// Sends a progress update to the update engine.
#[inline]
pub async fn send_progress(&self, progress: StepProgress<S>) {
let now = Instant::now();
let (done, done_rx) = oneshot::channel();
self.payload_sender
.send(StepContextPayload::Progress { now, progress, done })
.send(StepContextPayload::Progress(progress))
.await
.expect("our code always keeps payload_receiver open");
_ = done_rx.await;
.expect("our code always keeps the receiver open")
}

/// Sends a report from a nested engine, typically one running on a remote
Expand All @@ -75,8 +71,6 @@ impl<S: StepSpec> StepContext<S> {
&self,
report: EventReport<S2>,
) -> Result<(), NestedEngineError<NestedSpec>> {
let now = Instant::now();

let mut res = Ok(());
let delta_report = if let Some(id) = report.root_execution_id {
let mut nested_buffers = self.nested_buffers.lock().unwrap();
Expand Down Expand Up @@ -140,32 +134,17 @@ impl<S: StepSpec> StepContext<S> {
}

self.payload_sender
.send(StepContextPayload::Nested {
now,
event: Event::Step(event),
})
.send(StepContextPayload::Nested(Event::Step(event)))
.await
.expect("our code always keeps payload_receiver open");
.expect("our code always keeps the receiver open");
}

for event in delta_report.progress_events {
self.payload_sender
.send(StepContextPayload::Nested {
now,
event: Event::Progress(event),
})
.send(StepContextPayload::Nested(Event::Progress(event)))
.await
.expect("our code always keeps payload_receiver open");
.expect("our code always keeps the receiver open");
}

// Ensure that all reports have been received by the engine before
// returning.
let (done, done_rx) = oneshot::channel();
self.payload_sender
.send(StepContextPayload::Sync { done })
.await
.expect("our code always keeps payload_receiver open");
_ = done_rx.await;
}

res
Expand All @@ -184,75 +163,58 @@ impl<S: StepSpec> StepContext<S> {
F: FnOnce(&mut UpdateEngine<'a, S2>) -> Result<(), S2::Error> + Send,
S2: StepSpec + 'a,
{
// Previously, this code was of the form:
//
// let (sender, mut receiver) = mpsc::channel(128);
// let mut engine = UpdateEngine::new(&self.log, sender);
//
// And there was a loop below that selected over `engine` and
// `receiver`.
//
// That approach was abandoned because it had ordering issues, because
// it wasn't guaranteed that events were received in the order they were
// processed. For example, consider what happens if:
//
// 1. User code sent an event E1 through a child (nested) StepContext.
// 2. Then in quick succession, the same code sent an event E2 through
// self.
//
// What users would expect to happen is that E1 is received before E2.
// However, what actually happened was that:
//
// 1. `engine` was driven until the next suspend point. This caused E2
// to be sent.
// 2. Then, `receiver` was polled. This caused E1 to be received.
//
// So the order of events was reversed.
//
// To fix this, we now use a single channel, and send events through it
// both from the nested engine and from self.
//
// An alternative would be to use a oneshot channel as a synchronization
// tool. However, just sharing a channel is easier.
let mut engine = UpdateEngine::<S2>::new_nested(
&self.log,
self.payload_sender.clone(),
);

let (sender, mut receiver) = mpsc::channel(128);
let mut engine = UpdateEngine::new(&self.log, sender);
// Create the engine's steps.
(engine_fn)(&mut engine)
.map_err(|error| NestedEngineError::Creation { error })?;

// Now run the engine.
let engine = engine.execute();
match engine.await {
Ok(cx) => Ok(cx),
Err(ExecutionError::EventSendError(_)) => {
unreachable!("our code always keeps payload_receiver open")
tokio::pin!(engine);

let mut result = None;
let mut events_done = false;

loop {
tokio::select! {
ret = &mut engine, if result.is_none() => {
match ret {
Ok(cx) => {
result = Some(Ok(cx));
}
Err(ExecutionError::EventSendError(_)) => {
unreachable!("we always keep the receiver open")
}
Err(ExecutionError::StepFailed { component, id, description, error }) => {
result = Some(Err(NestedEngineError::StepFailed { component, id, description, error }));
}
Err(ExecutionError::Aborted { component, id, description, message }) => {
result = Some(Err(NestedEngineError::Aborted { component, id, description, message }));
}
}
}
event = receiver.recv(), if !events_done => {
match event {
Some(event) => {
self.payload_sender.send(
StepContextPayload::Nested(event.into_generic())
)
.await
.expect("we always keep the receiver open");
}
None => {
events_done = true;
}
}
}
else => {
break;
}
}
Err(ExecutionError::StepFailed {
component,
id,
description,
error,
}) => Err(NestedEngineError::StepFailed {
component,
id,
description,
error,
}),
Err(ExecutionError::Aborted {
component,
id,
description,
message,
}) => Err(NestedEngineError::Aborted {
component,
id,
description,
message,
}),
}

result.expect("the loop only exits if result is set")
}

/// Retrieves a token used to fetch the value out of a [`StepHandle`].
Expand Down Expand Up @@ -285,32 +247,10 @@ impl NestedEventBuffer {
}
}

/// An uninhabited type for oneshot channels, since we only care about them
/// being dropped.
#[derive(Debug)]
pub(crate) enum Never {}

#[derive_where(Debug)]
pub(crate) enum StepContextPayload<S: StepSpec> {
Progress {
now: Instant,
progress: StepProgress<S>,
done: oneshot::Sender<Never>,
},
/// A single nested event with synchronization.
NestedSingle {
now: Instant,
event: Event<NestedSpec>,
done: oneshot::Sender<Never>,
},
/// One out of a series of nested events sent in succession.
Nested {
now: Instant,
event: Event<NestedSpec>,
},
Sync {
done: oneshot::Sender<Never>,
},
Progress(StepProgress<S>),
Nested(Event<NestedSpec>),
}

/// Context for a step's metadata-generation function.
Expand Down
Loading

0 comments on commit f985e59

Please sign in to comment.