Skip to content

Commit

Permalink
[nexus] Remove v2p_notification_tx (#5983)
Browse files Browse the repository at this point in the history
Currently, a `tokio::sync::watch` channel is explicitly passed around to
allow both Nexus and other background tasks to notify the `v2p_manager`
background task. This must be passed explicitly through most of Nexus,
which is a bit awkward.

The new `background::Activator` API introduced in PR #5962 makes it a
bit easier to nicely activate background tasks. Now, we have an
`Activator` type which represents the `tokio::sync::Notify` used to wake
the background task, and this can be passed into the various functions
which must activate it. Because `Activator`s are constructed _before_
background tasks are started, the `Activator` can be passed to the
background tasks that must activate the `v2p_manager` task (in this
case, the `instance_watcher` task).

This branch removes the `tokio::sync::watch` channel used to activate
the `v2p_manager`, and replaces it with a use of the `Activator` API.
Because the `Activator` type is not currently `Clone`, I refactored it
slightly so that both the wired-up flag _and_ the `Notify` live within
an `Arc`, allowing a clone of the `Activator` for the `v2p_manager` task
to be passed into the `instance_watcher` task when it's constructed. I
don't think this really introduces any new opportunities for accidental
`Activator` misuse, as the assertion that an activator is not wired up
twice still stands.
  • Loading branch information
hawkw authored Jul 1, 2024
1 parent 0f757cd commit bfcd6df
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 52 deletions.
48 changes: 31 additions & 17 deletions nexus/src/app/background/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ struct Task {
status: watch::Receiver<TaskStatus>,
/// join handle for the tokio task that's executing this background task
tokio_task: tokio::task::JoinHandle<()>,
/// `Notify` used to wake up the tokio task when a caller explicit wants to
/// `Activator` used to wake up the tokio task when a caller explicit wants to
/// activate the background task
notify: Arc<Notify>,
activator: Activator,
}

impl Driver {
Expand Down Expand Up @@ -118,7 +118,7 @@ impl Driver {
// requested. The caller provides their own Activator, which just
// provides a specific Notify for us to use here.
let activator = taskdef.activator;
if let Err(previous) = activator.wired_up.compare_exchange(
if let Err(previous) = activator.0.wired_up.compare_exchange(
false,
true,
Ordering::SeqCst,
Expand All @@ -131,7 +131,6 @@ impl Driver {
previous, name
);
}
let notify = Arc::clone(&activator.notify);

// Spawn the tokio task that will manage activation of the background
// task.
Expand All @@ -142,7 +141,7 @@ impl Driver {
let task_exec = TaskExec::new(
taskdef.period,
taskdef.task_impl,
Arc::clone(&notify),
Arc::clone(&activator.0),
opctx,
status_tx,
);
Expand All @@ -156,7 +155,7 @@ impl Driver {
period: taskdef.period,
status: status_rx,
tokio_task,
notify,
activator: activator.clone(),
};
if self.tasks.insert(TaskName(name.clone()), task).is_some() {
panic!("started two background tasks called {:?}", name);
Expand Down Expand Up @@ -200,7 +199,7 @@ impl Driver {
/// If the task is currently running, it will be activated again when it
/// finishes.
pub(super) fn activate(&self, task: &TaskName) {
self.task_required(task).notify.notify_one();
self.task_required(task).activator.activate();
}

/// Returns the runtime status of the background task
Expand Down Expand Up @@ -257,26 +256,41 @@ pub struct TaskDefinition<'a, N: ToString, D: ToString> {
/// corresponding task has been created and then wired up with just an
/// `&Activator` (not a `&mut Activator`). See the [`super::init`] module-level
/// documentation for more on why.
pub struct Activator {
pub(super) notify: Arc<Notify>,
#[derive(Clone)]
pub struct Activator(Arc<ActivatorInner>);

/// Shared state for an `Activator`.
struct ActivatorInner {
pub(super) notify: Notify,
pub(super) wired_up: AtomicBool,
}

impl Activator {
/// Create an activator that is not yet wired up to any background task
pub fn new() -> Activator {
Activator {
notify: Arc::new(Notify::new()),
Self(Arc::new(ActivatorInner {
notify: Notify::new(),
wired_up: AtomicBool::new(false),
}
}))
}

/// Activate the background task that this Activator has been wired up to
///
/// If this Activator has not yet been wired up with [`Driver::register()`],
/// then whenever it _is_ wired up, that task will be immediately activated.
pub fn activate(&self) {
self.notify.notify_one();
self.0.notify.notify_one();
}
}

impl ActivatorInner {
async fn activated(&self) {
debug_assert!(
self.wired_up.load(Ordering::SeqCst),
"nothing should await activation from an activator that hasn't \
been wired up"
);
self.notify.notified().await
}
}

Expand All @@ -289,7 +303,7 @@ struct TaskExec {
imp: Box<dyn BackgroundTask>,
/// used to receive notifications from the Driver that someone has requested
/// explicit activation
notify: Arc<Notify>,
activation: Arc<ActivatorInner>,
/// passed through to the background task impl when activated
opctx: OpContext,
/// used to send current status back to the Driver
Expand All @@ -302,11 +316,11 @@ impl TaskExec {
fn new(
period: Duration,
imp: Box<dyn BackgroundTask>,
notify: Arc<Notify>,
activation: Arc<ActivatorInner>,
opctx: OpContext,
status_tx: watch::Sender<TaskStatus>,
) -> TaskExec {
TaskExec { period, imp, notify, opctx, status_tx, iteration: 0 }
TaskExec { period, imp, activation, opctx, status_tx, iteration: 0 }
}

/// Body of the tokio task that manages activation of this background task
Expand All @@ -326,7 +340,7 @@ impl TaskExec {
self.activate(ActivationReason::Timeout).await;
},

_ = self.notify.notified() => {
_ = self.activation.activated() => {
self.activate(ActivationReason::Signaled).await;
}

Expand Down
5 changes: 2 additions & 3 deletions nexus/src/app/background/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ impl BackgroundTasksInitializer {
nexus_id: Uuid,
resolver: internal_dns::resolver::Resolver,
saga_request: Sender<SagaRequest>,
v2p_watcher: (watch::Sender<()>, watch::Receiver<()>),
producer_registry: ProducerRegistry,
) -> Driver {
let mut driver = self.driver;
Expand Down Expand Up @@ -539,7 +538,7 @@ impl BackgroundTasksInitializer {
period: config.v2p_mapping_propagation.period_secs,
task_impl: Box::new(V2PManager::new(datastore.clone())),
opctx: opctx.child(BTreeMap::new()),
watchers: vec![Box::new(v2p_watcher.1)],
watchers: vec![],
activator: task_v2p_manager,
});

Expand Down Expand Up @@ -589,7 +588,7 @@ impl BackgroundTasksInitializer {
resolver.clone(),
producer_registry,
instance_watcher::WatcherIdentity { nexus_id, rack_id },
v2p_watcher.0,
task_v2p_manager.clone(),
);
driver.register(TaskDefinition {
name: "instance_watcher",
Expand Down
11 changes: 6 additions & 5 deletions nexus/src/app/background/tasks/instance_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

//! Background task for pulling instance state from sled-agents.
use crate::app::background::Activator;
use crate::app::background::BackgroundTask;
use futures::{future::BoxFuture, FutureExt};
use http::StatusCode;
Expand Down Expand Up @@ -37,7 +38,7 @@ pub(crate) struct InstanceWatcher {
resolver: internal_dns::resolver::Resolver,
metrics: Arc<Mutex<metrics::Metrics>>,
id: WatcherIdentity,
v2p_notification_tx: tokio::sync::watch::Sender<()>,
v2p_manager: Activator,
}

const MAX_SLED_AGENTS: NonZeroU32 = unsafe {
Expand All @@ -51,13 +52,13 @@ impl InstanceWatcher {
resolver: internal_dns::resolver::Resolver,
producer_registry: &ProducerRegistry,
id: WatcherIdentity,
v2p_notification_tx: tokio::sync::watch::Sender<()>,
v2p_manager: Activator,
) -> Self {
let metrics = Arc::new(Mutex::new(metrics::Metrics::default()));
producer_registry
.register_producer(metrics::Producer(metrics.clone()))
.unwrap();
Self { datastore, resolver, metrics, id, v2p_notification_tx }
Self { datastore, resolver, metrics, id, v2p_manager }
}

fn check_instance(
Expand All @@ -77,7 +78,7 @@ impl InstanceWatcher {
.collect(),
);
let client = client.clone();
let v2p_notification_tx = self.v2p_notification_tx.clone();
let v2p_manager = self.v2p_manager.clone();

async move {
slog::trace!(opctx.log, "checking on instance...");
Expand Down Expand Up @@ -162,7 +163,7 @@ impl InstanceWatcher {
&opctx.log,
&InstanceUuid::from_untyped_uuid(target.instance_id),
&new_runtime_state,
v2p_notification_tx,
&v2p_manager,
)
.await
.map_err(|e| {
Expand Down
6 changes: 3 additions & 3 deletions nexus/src/app/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1552,7 +1552,7 @@ impl super::Nexus {
&self.log,
instance_id,
new_runtime_state,
self.v2p_notification_tx.clone(),
&self.background_tasks.task_v2p_manager,
)
.await?;
self.vpc_needed_notify_sleds();
Expand Down Expand Up @@ -2005,7 +2005,7 @@ pub(crate) async fn notify_instance_updated(
log: &slog::Logger,
instance_id: &InstanceUuid,
new_runtime_state: &nexus::SledInstanceState,
v2p_notification_tx: tokio::sync::watch::Sender<()>,
v2p_manager: &crate::app::background::Activator,
) -> Result<Option<InstanceUpdateResult>, Error> {
let propolis_id = new_runtime_state.propolis_id;

Expand Down Expand Up @@ -2045,7 +2045,7 @@ pub(crate) async fn notify_instance_updated(
&authz_instance,
db_instance.runtime(),
&new_runtime_state.instance_state,
v2p_notification_tx.clone(),
v2p_manager,
)
.await?;

Expand Down
23 changes: 6 additions & 17 deletions nexus/src/app/instance_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

//! Routines that manage instance-related networking state.
use crate::app::background;
use crate::app::switch_port;
use ipnetwork::IpNetwork;
use nexus_db_model::ExternalIp;
Expand Down Expand Up @@ -267,7 +268,7 @@ pub(crate) async fn ensure_updated_instance_network_config(
authz_instance: &authz::Instance,
prev_instance_state: &db::model::InstanceRuntimeState,
new_instance_state: &nexus::InstanceRuntimeState,
v2p_notification_tx: tokio::sync::watch::Sender<()>,
v2p_manager: &background::Activator,
) -> Result<(), Error> {
let instance_id = InstanceUuid::from_untyped_uuid(authz_instance.id());

Expand Down Expand Up @@ -298,7 +299,7 @@ pub(crate) async fn ensure_updated_instance_network_config(
opctx,
opctx_alloc,
authz_instance,
v2p_notification_tx,
v2p_manager,
)
.await?;
return Ok(());
Expand Down Expand Up @@ -379,13 +380,7 @@ pub(crate) async fn ensure_updated_instance_network_config(
Err(e) => return Err(e),
};

if let Err(e) = v2p_notification_tx.send(()) {
error!(
log,
"error notifying background task of v2p change";
"error" => ?e
)
};
v2p_manager.activate();

let (.., sled) =
LookupPath::new(opctx, datastore).sled_id(new_sled_id).fetch().await?;
Expand Down Expand Up @@ -703,15 +698,9 @@ async fn clear_instance_networking_state(
opctx: &OpContext,
opctx_alloc: &OpContext,
authz_instance: &authz::Instance,
v2p_notification_tx: tokio::sync::watch::Sender<()>,
v2p_manager: &background::Activator,
) -> Result<(), Error> {
if let Err(e) = v2p_notification_tx.send(()) {
error!(
log,
"error notifying background task of v2p change";
"error" => ?e
)
};
v2p_manager.activate();

instance_delete_dpd_config(
datastore,
Expand Down
7 changes: 0 additions & 7 deletions nexus/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,6 @@ pub struct Nexus {

/// Default Crucible region allocation strategy
default_region_allocation_strategy: RegionAllocationStrategy,

/// Channel for notifying background task of change to opte v2p state
v2p_notification_tx: tokio::sync::watch::Sender<()>,
}

impl Nexus {
Expand Down Expand Up @@ -405,8 +402,6 @@ impl Nexus {
Arc::clone(&db_datastore) as Arc<dyn nexus_auth::storage::Storage>,
);

let v2p_watcher_channel = tokio::sync::watch::channel(());

let (saga_request, mut saga_request_recv) = SagaRequest::channel();

let (background_tasks_initializer, background_tasks) =
Expand Down Expand Up @@ -465,7 +460,6 @@ impl Nexus {
.pkg
.default_region_allocation_strategy
.clone(),
v2p_notification_tx: v2p_watcher_channel.0.clone(),
};

// TODO-cleanup all the extra Arcs here seems wrong
Expand Down Expand Up @@ -524,7 +518,6 @@ impl Nexus {
task_config.deployment.id,
resolver,
saga_request,
v2p_watcher_channel.clone(),
task_registry,
);

Expand Down

0 comments on commit bfcd6df

Please sign in to comment.