Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Driver::register() takes too many arguments #5985

Merged
merged 4 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 94 additions & 60 deletions nexus/src/app/background/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,16 @@ impl Driver {
///
/// This function panics if the `name` or `activator` has previously been
/// passed to a call to this function.
#[allow(clippy::too_many_arguments)]
pub fn register(
pub fn register<N, D>(
&mut self,
name: String,
description: String,
period: Duration,
imp: Box<dyn BackgroundTask>,
opctx: OpContext,
watchers: Vec<Box<dyn GenericWatcher>>,
activator: &Activator,
) -> TaskName {
taskdef: TaskDefinition<'_, N, D>,
) -> TaskName
where
N: ToString,
D: ToString,
{
let name = taskdef.name.to_string();

// Activation of the background task happens in a separate tokio task.
// Set up a channel so that tokio task can report status back to us.
let (status_tx, status_rx) = watch::channel(TaskStatus {
Expand All @@ -118,6 +117,7 @@ impl Driver {
// We'll use a `Notify` to wake up that tokio task when an activation is
// requested. The caller provides their own Activator, which just
// provides a specific Notify for us to use here.
let activator = taskdef.activator;
if let Err(previous) = activator.wired_up.compare_exchange(
false,
true,
Expand All @@ -135,19 +135,29 @@ impl Driver {

// Spawn the tokio task that will manage activation of the background
// task.
let opctx = opctx.child(BTreeMap::from([(
let opctx = taskdef.opctx.child(BTreeMap::from([(
"background_task".to_string(),
name.clone(),
)]));
let task_exec =
TaskExec::new(period, imp, Arc::clone(&notify), opctx, status_tx);
let tokio_task = tokio::task::spawn(task_exec.run(watchers));
let task_exec = TaskExec::new(
taskdef.period,
taskdef.task_impl,
Arc::clone(&notify),
opctx,
status_tx,
);
let tokio_task = tokio::task::spawn(task_exec.run(taskdef.watchers));

// Create an object to track our side of the background task's state.
// This just provides the handles we need to read status and wake up the
// tokio task.
let task =
Task { description, period, status: status_rx, tokio_task, notify };
let task = Task {
description: taskdef.description.to_string(),
period: taskdef.period,
status: status_rx,
tokio_task,
notify,
};
if self.tasks.insert(TaskName(name.clone()), task).is_some() {
panic!("started two background tasks called {:?}", name);
}
Expand Down Expand Up @@ -212,6 +222,26 @@ impl Drop for Driver {
}
}

/// Describes a background task to be registered with [`Driver::register()`]
///
/// See [`Driver::register()`] for more on how these fields get used.
pub struct TaskDefinition<'a, N: ToString, D: ToString> {
/// identifier for this task
pub name: N,
/// short human-readable summary of this task
pub description: D,
/// driver should activate the task if it hasn't run in this long
pub period: Duration,
/// impl of [`BackgroundTask`] that represents the work of the task
pub task_impl: Box<dyn BackgroundTask>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wondered about whether it would be nicer to also be generic over a T: BackgroundTask and make boxing it the responsibility of the driver, but that would mean that Driver::register gets monomorphized separately for every background task, which seemed unfortunate (although I don't think binary size is as big a concern for Nexus as, e.g. embedded projects...). So, I think the current design seems the best to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OTTOH then we could spawn the future unboxed, which means we avoid an additional heap pointer dereference every time the future is polled...but I don't know if we actually care about that overhead, either...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's interesting -- I hadn't considered that. Within Nexus, I think we've usually gone the other way, seeking to reduce monomorphization to decrease compile times. I had some second thoughts on even using generics here to simplify the strings, but I figured there'd be at most 4 cases and probably more like 2, and the convenience for callers is considerable.

My general approach within the control plane is to not worry about straight-line performance at the level of pointer derefs and allocations. I'll almost always choose constructs that seem clearer, easier to use (or harder to misuse), or more debuggable even if they cost a few instructions. I've seen cases where on-CPU performance of the control plane was a problem (either for latency or resource utilization), but almost always because it was pathological, not just a few percent slower than it could be. (Scalability is another question but I think is less often in tension here.)

/// `OpContext` used for task activations
pub opctx: OpContext,
/// list of watchers that will trigger activation of this task
pub watchers: Vec<Box<dyn GenericWatcher>>,
/// an [`Activator]` that will be wired up to activate this task
pub activator: &'a Activator,
}

/// Activates a background task
///
/// See [`crate::app::background`] module-level documentation for more on what
Expand Down Expand Up @@ -390,6 +420,7 @@ impl<T: Send + Sync> GenericWatcher for watch::Receiver<T> {
mod test {
use super::BackgroundTask;
use super::Driver;
use crate::app::background::driver::TaskDefinition;
use crate::app::background::Activator;
use crate::app::sagas::SagaRequest;
use assert_matches::assert_matches;
Expand Down Expand Up @@ -481,35 +512,38 @@ mod test {
let mut driver = Driver::new();

assert_eq!(*rx1.borrow(), 0);
let h1 = driver.register(
"t1".to_string(),
"test task".to_string(),
Duration::from_millis(100),
Box::new(t1),
opctx.child(std::collections::BTreeMap::new()),
vec![Box::new(dep_rx1.clone()), Box::new(dep_rx2.clone())],
&act1,
);
let h1 = driver.register(TaskDefinition {
name: "t1",
description: "test task",
period: Duration::from_millis(100),
task_impl: Box::new(t1),
opctx: opctx.child(std::collections::BTreeMap::new()),
watchers: vec![
Box::new(dep_rx1.clone()),
Box::new(dep_rx2.clone()),
],
activator: &act1,
});

let h2 = driver.register(
"t2".to_string(),
"test task".to_string(),
Duration::from_secs(300), // should never fire in this test
Box::new(t2),
opctx.child(std::collections::BTreeMap::new()),
vec![Box::new(dep_rx1.clone())],
&act2,
);
let h2 = driver.register(TaskDefinition {
name: "t2",
description: "test task",
period: Duration::from_secs(300), // should never fire in this test
task_impl: Box::new(t2),
opctx: opctx.child(std::collections::BTreeMap::new()),
watchers: vec![Box::new(dep_rx1.clone())],
activator: &act2,
});

let h3 = driver.register(
"t3".to_string(),
"test task".to_string(),
Duration::from_secs(300), // should never fire in this test
Box::new(t3),
let h3 = driver.register(TaskDefinition {
name: "t3",
description: "test task",
period: Duration::from_secs(300), // should never fire in this test
task_impl: Box::new(t3),
opctx,
vec![Box::new(dep_rx1), Box::new(dep_rx2)],
&act3,
);
watchers: vec![Box::new(dep_rx1), Box::new(dep_rx2)],
activator: &act3,
});

// Wait for four activations of our task. (This is three periods.) That
// should take between 300ms and 400ms. Allow extra time for a busy
Expand Down Expand Up @@ -654,15 +688,15 @@ mod test {
let before_wall = Utc::now();
let before_instant = Instant::now();
let act1 = Activator::new();
let h1 = driver.register(
"t1".to_string(),
"test task".to_string(),
Duration::from_secs(300), // should not elapse during test
Box::new(t1),
opctx.child(std::collections::BTreeMap::new()),
vec![Box::new(dep_rx1.clone())],
&act1,
);
let h1 = driver.register(TaskDefinition {
name: "t1",
description: "test task",
period: Duration::from_secs(300), // should not elapse during test
task_impl: Box::new(t1),
opctx: opctx.child(std::collections::BTreeMap::new()),
watchers: vec![Box::new(dep_rx1.clone())],
activator: &act1,
});

// Wait to enter the first activation.
let which = ready_rx1.recv().await.unwrap();
Expand Down Expand Up @@ -801,15 +835,15 @@ mod test {
let (_dep_tx1, dep_rx1) = watch::channel(0);
let act1 = Activator::new();

let h1 = driver.register(
"t1".to_string(),
"test saga request flow task".to_string(),
Duration::from_secs(300), // should not fire in this test
Box::new(t1),
opctx.child(std::collections::BTreeMap::new()),
vec![Box::new(dep_rx1.clone())],
&act1,
);
let h1 = driver.register(TaskDefinition {
name: "t1",
description: "test saga request flow task",
period: Duration::from_secs(300), // should not fire in this test
task_impl: Box::new(t1),
opctx: opctx.child(std::collections::BTreeMap::new()),
watchers: vec![Box::new(dep_rx1.clone())],
activator: &act1,
});

assert!(matches!(
saga_request_recv.try_recv(),
Expand Down
Loading
Loading