Skip to content

Commit

Permalink
[π˜€π—½π—Ώ] initial version
Browse files Browse the repository at this point in the history
Created using spr 1.3.5
  • Loading branch information
sunshowers committed Dec 7, 2023
1 parent 9aec9ba commit 8d8cb1d
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 22 deletions.
61 changes: 44 additions & 17 deletions wicketd/src/update_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,19 @@ impl UpdateTracker {
}

/// Starts a fake update that doesn't perform any steps, but simply waits
/// for a watch receiver to resolve.
/// for a receiver to resolve.
///
/// The inner sender will resolve once the update is completed.
#[doc(hidden)]
pub async fn start_fake_update(
&self,
sps: BTreeSet<SpIdentifier>,
watch_receiver: watch::Receiver<()>,
fake_step_receiver: oneshot::Receiver<oneshot::Sender<()>>,
) -> Result<(), Vec<StartUpdateError>> {
let imp = FakeUpdateDriver { watch_receiver, log: self.log.clone() };
let imp = FakeUpdateDriver {
fake_step_receiver: Some(fake_step_receiver),
log: self.log.clone(),
};
self.start_impl(sps, Some(imp)).await
}

Expand Down Expand Up @@ -515,7 +520,7 @@ impl<'tr> SpawnUpdateDriver for RealSpawnUpdateDriver<'tr> {
/// waits for a [`watch::Receiver`] to resolve.
#[derive(Debug)]
struct FakeUpdateDriver {
watch_receiver: watch::Receiver<()>,
fake_step_receiver: Option<oneshot::Receiver<oneshot::Sender<()>>>,
log: Logger,
}

Expand All @@ -539,22 +544,24 @@ impl SpawnUpdateDriver for FakeUpdateDriver {
let engine = UpdateEngine::new(&log, sender);
let abort_handle = engine.abort_handle();

let mut watch_receiver = self.watch_receiver.clone();
let fake_step_receiver = self
.fake_step_receiver
.take()
.expect("fake step receiver is only taken once");

let task = tokio::spawn(async move {
// The step component and ID have been chosen arbitrarily here --
// they aren't important.
engine
let final_sender_handle = engine
.new_step(
UpdateComponent::Host,
UpdateStepId::RunningInstallinator,
"Fake step that waits for receiver to resolve",
move |_cx| async move {
// This will resolve as soon as the watch sender
// (typically a test) sends a value over the watch
// channel.
_ = watch_receiver.changed().await;
StepSuccess::new(()).into()
// This will resolve as soon as the sender (typically a
// test) sends a value over the channel.
let ret = fake_step_receiver.await;
StepSuccess::new(ret).into()
},
)
.register();
Expand All @@ -566,16 +573,36 @@ impl SpawnUpdateDriver for FakeUpdateDriver {
}
});

match engine.execute().await {
Ok(_cx) => (),
Err(err) => {
error!(log, "update failed"; "err" => %err);
}
}
let engine_res = engine.execute().await;

// Wait for all events to be received and written to the event
// buffer.
event_receiving_task.await.expect("event receiving task panicked");

// Finally, notify the receiving end of the inner sender: this
// indicates that the update is done.
match engine_res {
Ok(cx) => {
info!(log, "fake update completed successfully");
let final_sender =
final_sender_handle.into_value(cx.token()).await;
match final_sender {
Ok(sender) => {
if let Err(_) = sender.send(()) {
warn!(log, "failed to send final value");
}
}
Err(error) => {
// This occurs if the fake_step_receiver's sender
// side was closed. Nothing to do here but warn.
warn!(log, "failed to get final sender: {}", error);
}
}
}
Err(error) => {
error!(log, "fake update failed: {}", error);
}
}
});

SpUpdateData { task, abort_handle, event_buffer }
Expand Down
11 changes: 6 additions & 5 deletions wicketd/tests/integration_tests/updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use omicron_common::{
api::internal::nexus::KnownArtifactKind,
update::{ArtifactHashId, ArtifactKind},
};
use tokio::sync::watch;
use tokio::sync::oneshot;
use update_engine::NestedError;
use uuid::Uuid;
use wicket::OutputKind;
Expand Down Expand Up @@ -436,7 +436,7 @@ async fn test_update_races() {
};
let sps: BTreeSet<_> = vec![sp].into_iter().collect();

let (sender, receiver) = watch::channel(());
let (sender, receiver) = oneshot::channel();
wicketd_testctx
.server
.update_tracker
Expand All @@ -455,7 +455,7 @@ async fn test_update_races() {
// Also try starting another fake update, which should fail -- we don't let updates be started
// if there's current update state.
{
let (_, receiver) = watch::channel(());
let (_, receiver) = oneshot::channel();
let err = wicketd_testctx
.server
.update_tracker
Expand All @@ -470,9 +470,10 @@ async fn test_update_races() {
}

// Unblock the update, letting it run to completion.
sender.send(()).expect("receiver kept open by update engine");
let (final_sender, final_receiver) = oneshot::channel();
sender.send(final_sender).expect("receiver kept open by update engine");
final_receiver.await.expect("update engine completed successfully");

// Ensure that the event buffer indicates completion.
let event_buffer = wicketd_testctx
.wicketd_client
.get_update_sp(&SpType::Sled, 0)
Expand Down

0 comments on commit 8d8cb1d

Please sign in to comment.