Skip to content

Commit

Permalink
Merge branch 'main' into eliza/rm-v2p-tx
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkw authored Jul 1, 2024
2 parents 947557c + 0f757cd commit 5edd8db
Show file tree
Hide file tree
Showing 5 changed files with 327 additions and 286 deletions.
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ static_assertions = "1.1.0"
# Please do not change the Steno version to a Git dependency. It makes it
# harder than expected to make breaking changes (even if you specify a specific
# SHA). Cut a new Steno release instead. See omicron#2117.
steno = "0.4.0"
steno = "0.4.1"
strum = { version = "0.26", features = [ "derive" ] }
subprocess = "0.2.9"
supports-color = "3.0.0"
Expand Down
150 changes: 92 additions & 58 deletions nexus/src/app/background/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,16 @@ impl Driver {
///
/// This function panics if the `name` or `activator` has previously been
/// passed to a call to this function.
#[allow(clippy::too_many_arguments)]
pub fn register(
pub fn register<N, D>(
&mut self,
name: String,
description: String,
period: Duration,
imp: Box<dyn BackgroundTask>,
opctx: OpContext,
watchers: Vec<Box<dyn GenericWatcher>>,
activator: &Activator,
) -> TaskName {
taskdef: TaskDefinition<'_, N, D>,
) -> TaskName
where
N: ToString,
D: ToString,
{
let name = taskdef.name.to_string();

// Activation of the background task happens in a separate tokio task.
// Set up a channel so that tokio task can report status back to us.
let (status_tx, status_rx) = watch::channel(TaskStatus {
Expand All @@ -118,6 +117,7 @@ impl Driver {
// We'll use a `Notify` to wake up that tokio task when an activation is
// requested. The caller provides their own Activator, which just
// provides a specific Notify for us to use here.
let activator = taskdef.activator;
if let Err(previous) = activator.0.wired_up.compare_exchange(
false,
true,
Expand All @@ -134,13 +134,18 @@ impl Driver {

// Spawn the tokio task that will manage activation of the background
// task.
let opctx = opctx.child(BTreeMap::from([(
let opctx = taskdef.opctx.child(BTreeMap::from([(
"background_task".to_string(),
name.clone(),
)]));
let task_exec =
TaskExec::new(period, imp, activator.0.clone(), opctx, status_tx);
let tokio_task = tokio::task::spawn(task_exec.run(watchers));
let task_exec = TaskExec::new(
taskdef.period,
taskdef.task_impl,
Arc::clone(&activator.0),
opctx,
status_tx,
);
let tokio_task = tokio::task::spawn(task_exec.run(taskdef.watchers));

// Create an object to track our side of the background task's state.
// This just provides the handles we need to read status and wake up the
Expand All @@ -151,6 +156,11 @@ impl Driver {
status: status_rx,
tokio_task,
activator: activator.clone(),
description: taskdef.description.to_string(),
period: taskdef.period,
status: status_rx,
tokio_task,
activator: activator.clone(),
};
if self.tasks.insert(TaskName(name.clone()), task).is_some() {
panic!("started two background tasks called {:?}", name);
Expand Down Expand Up @@ -216,6 +226,26 @@ impl Drop for Driver {
}
}

/// Describes a background task to be registered with [`Driver::register()`]
///
/// See [`Driver::register()`] for more on how these fields get used.
pub struct TaskDefinition<'a, N: ToString, D: ToString> {
/// identifier for this task
pub name: N,
/// short human-readable summary of this task
pub description: D,
/// driver should activate the task if it hasn't run in this long
pub period: Duration,
/// impl of [`BackgroundTask`] that represents the work of the task
pub task_impl: Box<dyn BackgroundTask>,
/// `OpContext` used for task activations
pub opctx: OpContext,
/// list of watchers that will trigger activation of this task
pub watchers: Vec<Box<dyn GenericWatcher>>,
/// an [`Activator]` that will be wired up to activate this task
pub activator: &'a Activator,
}

/// Activates a background task
///
/// See [`crate::app::background`] module-level documentation for more on what
Expand Down Expand Up @@ -409,6 +439,7 @@ impl<T: Send + Sync> GenericWatcher for watch::Receiver<T> {
mod test {
use super::BackgroundTask;
use super::Driver;
use crate::app::background::driver::TaskDefinition;
use crate::app::background::Activator;
use crate::app::sagas::SagaRequest;
use assert_matches::assert_matches;
Expand Down Expand Up @@ -500,35 +531,38 @@ mod test {
let mut driver = Driver::new();

assert_eq!(*rx1.borrow(), 0);
let h1 = driver.register(
"t1".to_string(),
"test task".to_string(),
Duration::from_millis(100),
Box::new(t1),
opctx.child(std::collections::BTreeMap::new()),
vec![Box::new(dep_rx1.clone()), Box::new(dep_rx2.clone())],
&act1,
);
let h1 = driver.register(TaskDefinition {
name: "t1",
description: "test task",
period: Duration::from_millis(100),
task_impl: Box::new(t1),
opctx: opctx.child(std::collections::BTreeMap::new()),
watchers: vec![
Box::new(dep_rx1.clone()),
Box::new(dep_rx2.clone()),
],
activator: &act1,
});

let h2 = driver.register(
"t2".to_string(),
"test task".to_string(),
Duration::from_secs(300), // should never fire in this test
Box::new(t2),
opctx.child(std::collections::BTreeMap::new()),
vec![Box::new(dep_rx1.clone())],
&act2,
);
let h2 = driver.register(TaskDefinition {
name: "t2",
description: "test task",
period: Duration::from_secs(300), // should never fire in this test
task_impl: Box::new(t2),
opctx: opctx.child(std::collections::BTreeMap::new()),
watchers: vec![Box::new(dep_rx1.clone())],
activator: &act2,
});

let h3 = driver.register(
"t3".to_string(),
"test task".to_string(),
Duration::from_secs(300), // should never fire in this test
Box::new(t3),
let h3 = driver.register(TaskDefinition {
name: "t3",
description: "test task",
period: Duration::from_secs(300), // should never fire in this test
task_impl: Box::new(t3),
opctx,
vec![Box::new(dep_rx1), Box::new(dep_rx2)],
&act3,
);
watchers: vec![Box::new(dep_rx1), Box::new(dep_rx2)],
activator: &act3,
});

// Wait for four activations of our task. (This is three periods.) That
// should take between 300ms and 400ms. Allow extra time for a busy
Expand Down Expand Up @@ -673,15 +707,15 @@ mod test {
let before_wall = Utc::now();
let before_instant = Instant::now();
let act1 = Activator::new();
let h1 = driver.register(
"t1".to_string(),
"test task".to_string(),
Duration::from_secs(300), // should not elapse during test
Box::new(t1),
opctx.child(std::collections::BTreeMap::new()),
vec![Box::new(dep_rx1.clone())],
&act1,
);
let h1 = driver.register(TaskDefinition {
name: "t1",
description: "test task",
period: Duration::from_secs(300), // should not elapse during test
task_impl: Box::new(t1),
opctx: opctx.child(std::collections::BTreeMap::new()),
watchers: vec![Box::new(dep_rx1.clone())],
activator: &act1,
});

// Wait to enter the first activation.
let which = ready_rx1.recv().await.unwrap();
Expand Down Expand Up @@ -820,15 +854,15 @@ mod test {
let (_dep_tx1, dep_rx1) = watch::channel(0);
let act1 = Activator::new();

let h1 = driver.register(
"t1".to_string(),
"test saga request flow task".to_string(),
Duration::from_secs(300), // should not fire in this test
Box::new(t1),
opctx.child(std::collections::BTreeMap::new()),
vec![Box::new(dep_rx1.clone())],
&act1,
);
let h1 = driver.register(TaskDefinition {
name: "t1",
description: "test saga request flow task",
period: Duration::from_secs(300), // should not fire in this test
task_impl: Box::new(t1),
opctx: opctx.child(std::collections::BTreeMap::new()),
watchers: vec![Box::new(dep_rx1.clone())],
activator: &act1,
});

assert!(matches!(
saga_request_recv.try_recv(),
Expand Down
Loading

0 comments on commit 5edd8db

Please sign in to comment.