Skip to content

Commit

Permalink
Driver::register() takes too many arguments (#5985)
Browse files Browse the repository at this point in the history
  • Loading branch information
davepacheco authored Jul 1, 2024
1 parent 94ce70e commit 0f757cd
Show file tree
Hide file tree
Showing 2 changed files with 320 additions and 279 deletions.
154 changes: 94 additions & 60 deletions nexus/src/app/background/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,16 @@ impl Driver {
///
/// This function panics if the `name` or `activator` has previously been
/// passed to a call to this function.
#[allow(clippy::too_many_arguments)]
pub fn register(
pub fn register<N, D>(
&mut self,
name: String,
description: String,
period: Duration,
imp: Box<dyn BackgroundTask>,
opctx: OpContext,
watchers: Vec<Box<dyn GenericWatcher>>,
activator: &Activator,
) -> TaskName {
taskdef: TaskDefinition<'_, N, D>,
) -> TaskName
where
N: ToString,
D: ToString,
{
let name = taskdef.name.to_string();

// Activation of the background task happens in a separate tokio task.
// Set up a channel so that tokio task can report status back to us.
let (status_tx, status_rx) = watch::channel(TaskStatus {
Expand All @@ -118,6 +117,7 @@ impl Driver {
// We'll use a `Notify` to wake up that tokio task when an activation is
// requested. The caller provides their own Activator, which just
// provides a specific Notify for us to use here.
let activator = taskdef.activator;
if let Err(previous) = activator.wired_up.compare_exchange(
false,
true,
Expand All @@ -135,19 +135,29 @@ impl Driver {

// Spawn the tokio task that will manage activation of the background
// task.
let opctx = opctx.child(BTreeMap::from([(
let opctx = taskdef.opctx.child(BTreeMap::from([(
"background_task".to_string(),
name.clone(),
)]));
let task_exec =
TaskExec::new(period, imp, Arc::clone(&notify), opctx, status_tx);
let tokio_task = tokio::task::spawn(task_exec.run(watchers));
let task_exec = TaskExec::new(
taskdef.period,
taskdef.task_impl,
Arc::clone(&notify),
opctx,
status_tx,
);
let tokio_task = tokio::task::spawn(task_exec.run(taskdef.watchers));

// Create an object to track our side of the background task's state.
// This just provides the handles we need to read status and wake up the
// tokio task.
let task =
Task { description, period, status: status_rx, tokio_task, notify };
let task = Task {
description: taskdef.description.to_string(),
period: taskdef.period,
status: status_rx,
tokio_task,
notify,
};
if self.tasks.insert(TaskName(name.clone()), task).is_some() {
panic!("started two background tasks called {:?}", name);
}
Expand Down Expand Up @@ -212,6 +222,26 @@ impl Drop for Driver {
}
}

/// Describes a background task to be registered with [`Driver::register()`]
///
/// See [`Driver::register()`] for more on how these fields get used.
pub struct TaskDefinition<'a, N: ToString, D: ToString> {
/// identifier for this task
pub name: N,
/// short human-readable summary of this task
pub description: D,
/// driver should activate the task if it hasn't run in this long
pub period: Duration,
/// impl of [`BackgroundTask`] that represents the work of the task
pub task_impl: Box<dyn BackgroundTask>,
/// `OpContext` used for task activations
pub opctx: OpContext,
/// list of watchers that will trigger activation of this task
pub watchers: Vec<Box<dyn GenericWatcher>>,
/// an [`Activator]` that will be wired up to activate this task
pub activator: &'a Activator,
}

/// Activates a background task
///
/// See [`crate::app::background`] module-level documentation for more on what
Expand Down Expand Up @@ -390,6 +420,7 @@ impl<T: Send + Sync> GenericWatcher for watch::Receiver<T> {
mod test {
use super::BackgroundTask;
use super::Driver;
use crate::app::background::driver::TaskDefinition;
use crate::app::background::Activator;
use crate::app::sagas::SagaRequest;
use assert_matches::assert_matches;
Expand Down Expand Up @@ -481,35 +512,38 @@ mod test {
let mut driver = Driver::new();

assert_eq!(*rx1.borrow(), 0);
let h1 = driver.register(
"t1".to_string(),
"test task".to_string(),
Duration::from_millis(100),
Box::new(t1),
opctx.child(std::collections::BTreeMap::new()),
vec![Box::new(dep_rx1.clone()), Box::new(dep_rx2.clone())],
&act1,
);
let h1 = driver.register(TaskDefinition {
name: "t1",
description: "test task",
period: Duration::from_millis(100),
task_impl: Box::new(t1),
opctx: opctx.child(std::collections::BTreeMap::new()),
watchers: vec![
Box::new(dep_rx1.clone()),
Box::new(dep_rx2.clone()),
],
activator: &act1,
});

let h2 = driver.register(
"t2".to_string(),
"test task".to_string(),
Duration::from_secs(300), // should never fire in this test
Box::new(t2),
opctx.child(std::collections::BTreeMap::new()),
vec![Box::new(dep_rx1.clone())],
&act2,
);
let h2 = driver.register(TaskDefinition {
name: "t2",
description: "test task",
period: Duration::from_secs(300), // should never fire in this test
task_impl: Box::new(t2),
opctx: opctx.child(std::collections::BTreeMap::new()),
watchers: vec![Box::new(dep_rx1.clone())],
activator: &act2,
});

let h3 = driver.register(
"t3".to_string(),
"test task".to_string(),
Duration::from_secs(300), // should never fire in this test
Box::new(t3),
let h3 = driver.register(TaskDefinition {
name: "t3",
description: "test task",
period: Duration::from_secs(300), // should never fire in this test
task_impl: Box::new(t3),
opctx,
vec![Box::new(dep_rx1), Box::new(dep_rx2)],
&act3,
);
watchers: vec![Box::new(dep_rx1), Box::new(dep_rx2)],
activator: &act3,
});

// Wait for four activations of our task. (This is three periods.) That
// should take between 300ms and 400ms. Allow extra time for a busy
Expand Down Expand Up @@ -654,15 +688,15 @@ mod test {
let before_wall = Utc::now();
let before_instant = Instant::now();
let act1 = Activator::new();
let h1 = driver.register(
"t1".to_string(),
"test task".to_string(),
Duration::from_secs(300), // should not elapse during test
Box::new(t1),
opctx.child(std::collections::BTreeMap::new()),
vec![Box::new(dep_rx1.clone())],
&act1,
);
let h1 = driver.register(TaskDefinition {
name: "t1",
description: "test task",
period: Duration::from_secs(300), // should not elapse during test
task_impl: Box::new(t1),
opctx: opctx.child(std::collections::BTreeMap::new()),
watchers: vec![Box::new(dep_rx1.clone())],
activator: &act1,
});

// Wait to enter the first activation.
let which = ready_rx1.recv().await.unwrap();
Expand Down Expand Up @@ -801,15 +835,15 @@ mod test {
let (_dep_tx1, dep_rx1) = watch::channel(0);
let act1 = Activator::new();

let h1 = driver.register(
"t1".to_string(),
"test saga request flow task".to_string(),
Duration::from_secs(300), // should not fire in this test
Box::new(t1),
opctx.child(std::collections::BTreeMap::new()),
vec![Box::new(dep_rx1.clone())],
&act1,
);
let h1 = driver.register(TaskDefinition {
name: "t1",
description: "test saga request flow task",
period: Duration::from_secs(300), // should not fire in this test
task_impl: Box::new(t1),
opctx: opctx.child(std::collections::BTreeMap::new()),
watchers: vec![Box::new(dep_rx1.clone())],
activator: &act1,
});

assert!(matches!(
saga_request_recv.try_recv(),
Expand Down
Loading

0 comments on commit 0f757cd

Please sign in to comment.