diff --git a/nexus/src/app/background/driver.rs b/nexus/src/app/background/driver.rs index ad547a443c..9e856cdc4f 100644 --- a/nexus/src/app/background/driver.rs +++ b/nexus/src/app/background/driver.rs @@ -97,17 +97,16 @@ impl Driver { /// /// This function panics if the `name` or `activator` has previously been /// passed to a call to this function. - #[allow(clippy::too_many_arguments)] - pub fn register( + pub fn register( &mut self, - name: String, - description: String, - period: Duration, - imp: Box, - opctx: OpContext, - watchers: Vec>, - activator: &Activator, - ) -> TaskName { + taskdef: TaskDefinition<'_, N, D>, + ) -> TaskName + where + N: ToString, + D: ToString, + { + let name = taskdef.name.to_string(); + // Activation of the background task happens in a separate tokio task. // Set up a channel so that tokio task can report status back to us. let (status_tx, status_rx) = watch::channel(TaskStatus { @@ -118,6 +117,7 @@ impl Driver { // We'll use a `Notify` to wake up that tokio task when an activation is // requested. The caller provides their own Activator, which just // provides a specific Notify for us to use here. + let activator = taskdef.activator; if let Err(previous) = activator.wired_up.compare_exchange( false, true, @@ -135,19 +135,29 @@ impl Driver { // Spawn the tokio task that will manage activation of the background // task. - let opctx = opctx.child(BTreeMap::from([( + let opctx = taskdef.opctx.child(BTreeMap::from([( "background_task".to_string(), name.clone(), )])); - let task_exec = - TaskExec::new(period, imp, Arc::clone(¬ify), opctx, status_tx); - let tokio_task = tokio::task::spawn(task_exec.run(watchers)); + let task_exec = TaskExec::new( + taskdef.period, + taskdef.task_impl, + Arc::clone(¬ify), + opctx, + status_tx, + ); + let tokio_task = tokio::task::spawn(task_exec.run(taskdef.watchers)); // Create an object to track our side of the background task's state. // This just provides the handles we need to read status and wake up the // tokio task. - let task = - Task { description, period, status: status_rx, tokio_task, notify }; + let task = Task { + description: taskdef.description.to_string(), + period: taskdef.period, + status: status_rx, + tokio_task, + notify, + }; if self.tasks.insert(TaskName(name.clone()), task).is_some() { panic!("started two background tasks called {:?}", name); } @@ -212,6 +222,26 @@ impl Drop for Driver { } } +/// Describes a background task to be registered with [`Driver::register()`] +/// +/// See [`Driver::register()`] for more on how these fields get used. +pub struct TaskDefinition<'a, N: ToString, D: ToString> { + /// identifier for this task + pub name: N, + /// short human-readable summary of this task + pub description: D, + /// driver should activate the task if it hasn't run in this long + pub period: Duration, + /// impl of [`BackgroundTask`] that represents the work of the task + pub task_impl: Box, + /// `OpContext` used for task activations + pub opctx: OpContext, + /// list of watchers that will trigger activation of this task + pub watchers: Vec>, + /// an [`Activator]` that will be wired up to activate this task + pub activator: &'a Activator, +} + /// Activates a background task /// /// See [`crate::app::background`] module-level documentation for more on what @@ -390,6 +420,7 @@ impl GenericWatcher for watch::Receiver { mod test { use super::BackgroundTask; use super::Driver; + use crate::app::background::driver::TaskDefinition; use crate::app::background::Activator; use crate::app::sagas::SagaRequest; use assert_matches::assert_matches; @@ -481,35 +512,38 @@ mod test { let mut driver = Driver::new(); assert_eq!(*rx1.borrow(), 0); - let h1 = driver.register( - "t1".to_string(), - "test task".to_string(), - Duration::from_millis(100), - Box::new(t1), - opctx.child(std::collections::BTreeMap::new()), - vec![Box::new(dep_rx1.clone()), Box::new(dep_rx2.clone())], - &act1, - ); + let h1 = driver.register(TaskDefinition { + name: "t1", + description: "test task", + period: Duration::from_millis(100), + task_impl: Box::new(t1), + opctx: opctx.child(std::collections::BTreeMap::new()), + watchers: vec![ + Box::new(dep_rx1.clone()), + Box::new(dep_rx2.clone()), + ], + activator: &act1, + }); - let h2 = driver.register( - "t2".to_string(), - "test task".to_string(), - Duration::from_secs(300), // should never fire in this test - Box::new(t2), - opctx.child(std::collections::BTreeMap::new()), - vec![Box::new(dep_rx1.clone())], - &act2, - ); + let h2 = driver.register(TaskDefinition { + name: "t2", + description: "test task", + period: Duration::from_secs(300), // should never fire in this test + task_impl: Box::new(t2), + opctx: opctx.child(std::collections::BTreeMap::new()), + watchers: vec![Box::new(dep_rx1.clone())], + activator: &act2, + }); - let h3 = driver.register( - "t3".to_string(), - "test task".to_string(), - Duration::from_secs(300), // should never fire in this test - Box::new(t3), + let h3 = driver.register(TaskDefinition { + name: "t3", + description: "test task", + period: Duration::from_secs(300), // should never fire in this test + task_impl: Box::new(t3), opctx, - vec![Box::new(dep_rx1), Box::new(dep_rx2)], - &act3, - ); + watchers: vec![Box::new(dep_rx1), Box::new(dep_rx2)], + activator: &act3, + }); // Wait for four activations of our task. (This is three periods.) That // should take between 300ms and 400ms. Allow extra time for a busy @@ -654,15 +688,15 @@ mod test { let before_wall = Utc::now(); let before_instant = Instant::now(); let act1 = Activator::new(); - let h1 = driver.register( - "t1".to_string(), - "test task".to_string(), - Duration::from_secs(300), // should not elapse during test - Box::new(t1), - opctx.child(std::collections::BTreeMap::new()), - vec![Box::new(dep_rx1.clone())], - &act1, - ); + let h1 = driver.register(TaskDefinition { + name: "t1", + description: "test task", + period: Duration::from_secs(300), // should not elapse during test + task_impl: Box::new(t1), + opctx: opctx.child(std::collections::BTreeMap::new()), + watchers: vec![Box::new(dep_rx1.clone())], + activator: &act1, + }); // Wait to enter the first activation. let which = ready_rx1.recv().await.unwrap(); @@ -801,15 +835,15 @@ mod test { let (_dep_tx1, dep_rx1) = watch::channel(0); let act1 = Activator::new(); - let h1 = driver.register( - "t1".to_string(), - "test saga request flow task".to_string(), - Duration::from_secs(300), // should not fire in this test - Box::new(t1), - opctx.child(std::collections::BTreeMap::new()), - vec![Box::new(dep_rx1.clone())], - &act1, - ); + let h1 = driver.register(TaskDefinition { + name: "t1", + description: "test saga request flow task", + period: Duration::from_secs(300), // should not fire in this test + task_impl: Box::new(t1), + opctx: opctx.child(std::collections::BTreeMap::new()), + watchers: vec![Box::new(dep_rx1.clone())], + activator: &act1, + }); assert!(matches!( saga_request_recv.try_recv(), diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index fd37de344d..e668c6034b 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -87,6 +87,7 @@ //! It's not foolproof but hopefully these mechanisms will catch the easy //! mistakes. +use super::driver::TaskDefinition; use super::tasks::abandoned_vmm_reaper; use super::tasks::bfd; use super::tasks::blueprint_execution; @@ -328,18 +329,18 @@ impl BackgroundTasksInitializer { datastore.clone(), PRODUCER_LEASE_DURATION, ); - driver.register( - String::from("metrics_producer_gc"), - String::from( + + driver.register(TaskDefinition { + name: "metrics_producer_gc", + description: "unregisters Oximeter metrics producers that have not \ renewed their lease", - ), - config.metrics_producer_gc.period_secs, - Box::new(gc), - opctx.child(BTreeMap::new()), - vec![], - task_metrics_producer_gc, - ) + period: config.metrics_producer_gc.period_secs, + task_impl: Box::new(gc), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_metrics_producer_gc, + }) }; // Background task: External endpoints list watcher @@ -348,78 +349,77 @@ impl BackgroundTasksInitializer { datastore.clone(), self.external_endpoints_tx, ); - driver.register( - String::from("external_endpoints"), - String::from( + driver.register(TaskDefinition { + name: "external_endpoints", + description: "reads config for silos and TLS certificates to determine \ the right set of HTTP endpoints, their HTTP server \ names, and which TLS certificates to use on each one", - ), - config.external_endpoints.period_secs, - Box::new(watcher), - opctx.child(BTreeMap::new()), - vec![], - task_external_endpoints, - ); + period: config.external_endpoints.period_secs, + task_impl: Box::new(watcher), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_external_endpoints, + }); } - driver.register( - "nat_v4_garbage_collector".to_string(), - String::from( + driver.register(TaskDefinition { + name: "nat_v4_garbage_collector", + description: "prunes soft-deleted IPV4 NAT entries from ipv4_nat_entry \ table based on a predetermined retention policy", - ), - config.nat_cleanup.period_secs, - Box::new(nat_cleanup::Ipv4NatGarbageCollector::new( + period: config.nat_cleanup.period_secs, + task_impl: Box::new(nat_cleanup::Ipv4NatGarbageCollector::new( datastore.clone(), resolver.clone(), )), - opctx.child(BTreeMap::new()), - vec![], - task_nat_cleanup, - ); - - driver.register( - "bfd_manager".to_string(), - String::from( - "Manages bidirectional fowarding detection (BFD) \ + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_nat_cleanup, + }); + + driver.register(TaskDefinition { + name: "bfd_manager", + description: "Manages bidirectional fowarding detection (BFD) \ configuration on rack switches", - ), - config.bfd_manager.period_secs, - Box::new(bfd::BfdManager::new(datastore.clone(), resolver.clone())), - opctx.child(BTreeMap::new()), - vec![], - task_bfd_manager, - ); + period: config.bfd_manager.period_secs, + task_impl: Box::new(bfd::BfdManager::new( + datastore.clone(), + resolver.clone(), + )), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_bfd_manager, + }); // Background task: phantom disk detection { let detector = phantom_disks::PhantomDiskDetector::new(datastore.clone()); - driver.register( - String::from("phantom_disks"), - String::from("detects and un-deletes phantom disks"), - config.phantom_disks.period_secs, - Box::new(detector), - opctx.child(BTreeMap::new()), - vec![], - task_phantom_disks, - ); + driver.register(TaskDefinition { + name: "phantom_disks", + description: "detects and un-deletes phantom disks", + period: config.phantom_disks.period_secs, + task_impl: Box::new(detector), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_phantom_disks, + }); }; // Background task: blueprint loader let blueprint_loader = blueprint_load::TargetBlueprintLoader::new(datastore.clone()); let rx_blueprint = blueprint_loader.watcher(); - driver.register( - String::from("blueprint_loader"), - String::from("Loads the current target blueprint from the DB"), - config.blueprints.period_secs_load, - Box::new(blueprint_loader), - opctx.child(BTreeMap::new()), - vec![], - task_blueprint_loader, - ); + driver.register(TaskDefinition { + name: "blueprint_loader", + description: "Loads the current target blueprint from the DB", + period: config.blueprints.period_secs_load, + task_impl: Box::new(blueprint_loader), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_blueprint_loader, + }); // Background task: blueprint executor let blueprint_executor = blueprint_execution::BlueprintExecutor::new( @@ -428,15 +428,15 @@ impl BackgroundTasksInitializer { nexus_id.to_string(), ); let rx_blueprint_exec = blueprint_executor.watcher(); - driver.register( - String::from("blueprint_executor"), - String::from("Executes the target blueprint"), - config.blueprints.period_secs_execute, - Box::new(blueprint_executor), - opctx.child(BTreeMap::new()), - vec![Box::new(rx_blueprint.clone())], - task_blueprint_executor, - ); + driver.register(TaskDefinition { + name: "blueprint_executor", + description: "Executes the target blueprint", + period: config.blueprints.period_secs_execute, + task_impl: Box::new(blueprint_executor), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![Box::new(rx_blueprint.clone())], + activator: task_blueprint_executor, + }); // Background task: CockroachDB node ID collector let crdb_node_id_collector = @@ -444,15 +444,15 @@ impl BackgroundTasksInitializer { datastore.clone(), rx_blueprint.clone(), ); - driver.register( - String::from("crdb_node_id_collector"), - String::from("Collects node IDs of running CockroachDB zones"), - config.blueprints.period_secs_collect_crdb_node_ids, - Box::new(crdb_node_id_collector), - opctx.child(BTreeMap::new()), - vec![Box::new(rx_blueprint)], - task_crdb_node_id_collector, - ); + driver.register(TaskDefinition { + name: "crdb_node_id_collector", + description: "Collects node IDs of running CockroachDB zones", + period: config.blueprints.period_secs_collect_crdb_node_ids, + task_impl: Box::new(crdb_node_id_collector), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![Box::new(rx_blueprint)], + activator: task_crdb_node_id_collector, + }); // Background task: inventory collector // @@ -472,76 +472,76 @@ impl BackgroundTasksInitializer { config.inventory.disable, ); let inventory_watcher = collector.watcher(); - driver.register( - String::from("inventory_collection"), - String::from( + driver.register(TaskDefinition { + name: "inventory_collection", + description: "collects hardware and software inventory data from the \ whole system", - ), - config.inventory.period_secs, - Box::new(collector), - opctx.child(BTreeMap::new()), - vec![Box::new(rx_blueprint_exec)], - task_inventory_collection, - ); + period: config.inventory.period_secs, + task_impl: Box::new(collector), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![Box::new(rx_blueprint_exec)], + activator: task_inventory_collection, + }); inventory_watcher }; - driver.register( - "physical_disk_adoption".to_string(), - "ensure new physical disks are automatically marked in-service" - .to_string(), - config.physical_disk_adoption.period_secs, - Box::new(physical_disk_adoption::PhysicalDiskAdoption::new( - datastore.clone(), - inventory_watcher.clone(), - config.physical_disk_adoption.disable, - rack_id, - )), - opctx.child(BTreeMap::new()), - vec![Box::new(inventory_watcher)], - task_physical_disk_adoption, - ); - - driver.register( - "service_zone_nat_tracker".to_string(), - String::from( + driver.register(TaskDefinition { + name: "physical_disk_adoption", + description: + "ensure new physical disks are automatically marked in-service", + period: config.physical_disk_adoption.period_secs, + task_impl: Box::new( + physical_disk_adoption::PhysicalDiskAdoption::new( + datastore.clone(), + inventory_watcher.clone(), + config.physical_disk_adoption.disable, + rack_id, + ), + ), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![Box::new(inventory_watcher)], + activator: task_physical_disk_adoption, + }); + + driver.register(TaskDefinition { + name: "service_zone_nat_tracker", + description: "ensures service zone nat records are recorded in NAT RPW \ table", - ), - config.sync_service_zone_nat.period_secs, - Box::new(ServiceZoneNatTracker::new( + period: config.sync_service_zone_nat.period_secs, + task_impl: Box::new(ServiceZoneNatTracker::new( datastore.clone(), resolver.clone(), )), - opctx.child(BTreeMap::new()), - vec![], - task_service_zone_nat_tracker, - ); - - driver.register( - "switch_port_config_manager".to_string(), - String::from("manages switch port settings for rack switches"), - config.switch_port_settings_manager.period_secs, - Box::new(SwitchPortSettingsManager::new( + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_service_zone_nat_tracker, + }); + + driver.register(TaskDefinition { + name: "switch_port_config_manager", + description: "manages switch port settings for rack switches", + period: config.switch_port_settings_manager.period_secs, + task_impl: Box::new(SwitchPortSettingsManager::new( datastore.clone(), resolver.clone(), )), - opctx.child(BTreeMap::new()), - vec![], - task_switch_port_settings_manager, - ); - - driver.register( - "v2p_manager".to_string(), - String::from("manages opte v2p mappings for vpc networking"), - config.v2p_mapping_propagation.period_secs, - Box::new(V2PManager::new(datastore.clone())), - opctx.child(BTreeMap::new()), - vec![Box::new(v2p_watcher.1)], - task_v2p_manager, - ); + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_switch_port_settings_manager, + }); + + driver.register(TaskDefinition { + name: "v2p_manager", + description: "manages opte v2p mappings for vpc networking", + period: config.v2p_mapping_propagation.period_secs, + task_impl: Box::new(V2PManager::new(datastore.clone())), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![Box::new(v2p_watcher.1)], + activator: task_v2p_manager, + }); // Background task: detect if a region needs replacement and begin the // process @@ -551,18 +551,17 @@ impl BackgroundTasksInitializer { saga_request.clone(), ); - driver.register( - String::from("region_replacement"), - String::from( + driver.register(TaskDefinition { + name: "region_replacement", + description: "detects if a region requires replacing and begins the \ process", - ), - config.region_replacement.period_secs, - Box::new(detector), - opctx.child(BTreeMap::new()), - vec![], - task_region_replacement, - ); + period: config.region_replacement.period_secs, + task_impl: Box::new(detector), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_region_replacement, + }); }; // Background task: drive region replacements forward to completion @@ -573,15 +572,15 @@ impl BackgroundTasksInitializer { saga_request.clone(), ); - driver.register( - String::from("region_replacement_driver"), - String::from("drive region replacements forward to completion"), - config.region_replacement_driver.period_secs, - Box::new(detector), - opctx.child(BTreeMap::new()), - vec![], - task_region_replacement_driver, - ); + driver.register(TaskDefinition { + name: "region_replacement_driver", + description: "drive region replacements forward to completion", + period: config.region_replacement_driver.period_secs, + task_impl: Box::new(detector), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_region_replacement_driver, + }); }; { @@ -592,60 +591,62 @@ impl BackgroundTasksInitializer { instance_watcher::WatcherIdentity { nexus_id, rack_id }, v2p_watcher.0, ); - driver.register( - "instance_watcher".to_string(), - "periodically checks instance states".to_string(), - config.instance_watcher.period_secs, - Box::new(watcher), - opctx.child(BTreeMap::new()), - vec![], - task_instance_watcher, - ) + driver.register(TaskDefinition { + name: "instance_watcher", + description: "periodically checks instance states", + period: config.instance_watcher.period_secs, + task_impl: Box::new(watcher), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_instance_watcher, + }) }; // Background task: service firewall rule propagation - driver.register( - String::from("service_firewall_rule_propagation"), - String::from( + driver.register(TaskDefinition { + name: "service_firewall_rule_propagation", + description: "propagates VPC firewall rules for Omicron services with \ external network connectivity", + period: config.service_firewall_propagation.period_secs, + task_impl: Box::new( + service_firewall_rules::ServiceRulePropagator::new( + datastore.clone(), + ), ), - config.service_firewall_propagation.period_secs, - Box::new(service_firewall_rules::ServiceRulePropagator::new( - datastore.clone(), - )), - opctx.child(BTreeMap::new()), - vec![], - task_service_firewall_propagation, - ); + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_service_firewall_propagation, + }); // Background task: OPTE port route propagation { let watcher = vpc_routes::VpcRouteManager::new(datastore.clone()); - driver.register( - "vpc_route_manager".to_string(), - "propagates updated VPC routes to all OPTE ports".into(), - config.switch_port_settings_manager.period_secs, - Box::new(watcher), - opctx.child(BTreeMap::new()), - vec![], - task_vpc_route_manager, - ) + driver.register(TaskDefinition { + name: "vpc_route_manager", + description: "propagates updated VPC routes to all OPTE ports", + period: config.switch_port_settings_manager.period_secs, + task_impl: Box::new(watcher), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_vpc_route_manager, + }) }; // Background task: abandoned VMM reaping - driver.register( - String::from("abandoned_vmm_reaper"), - String::from( + driver.register(TaskDefinition { + name: "abandoned_vmm_reaper", + description: "deletes sled reservations for VMMs that have been abandoned \ by their instances", - ), - config.abandoned_vmm_reaper.period_secs, - Box::new(abandoned_vmm_reaper::AbandonedVmmReaper::new(datastore)), - opctx.child(BTreeMap::new()), - vec![], - task_abandoned_vmm_reaper, - ); + period: config.abandoned_vmm_reaper.period_secs, + task_impl: Box::new(abandoned_vmm_reaper::AbandonedVmmReaper::new( + datastore, + )), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_abandoned_vmm_reaper, + }); driver } @@ -673,32 +674,35 @@ fn init_dns( dns_config::DnsConfigWatcher::new(Arc::clone(&datastore), dns_group); let dns_config_watcher = dns_config.watcher(); let task_name_config = format!("dns_config_{}", dns_group); - driver.register( - task_name_config.clone(), - format!("watches {} DNS data stored in CockroachDB", dns_group), - config.period_secs_config, - Box::new(dns_config), - opctx.child(metadata.clone()), - vec![], - task_config, - ); + driver.register(TaskDefinition { + name: task_name_config.clone(), + description: format!( + "watches {} DNS data stored in CockroachDB", + dns_group + ), + period: config.period_secs_config, + task_impl: Box::new(dns_config), + opctx: opctx.child(metadata.clone()), + watchers: vec![], + activator: task_config, + }); // Background task: DNS server list watcher let dns_servers = dns_servers::DnsServersWatcher::new(dns_group, resolver); let dns_servers_watcher = dns_servers.watcher(); let task_name_servers = format!("dns_servers_{}", dns_group); - driver.register( - task_name_servers.clone(), - format!( + driver.register(TaskDefinition { + name: task_name_servers.clone(), + description: format!( "watches list of {} DNS servers stored in internal DNS", dns_group, ), - config.period_secs_servers, - Box::new(dns_servers), - opctx.child(metadata.clone()), - vec![], - task_servers, - ); + period: config.period_secs_servers, + task_impl: Box::new(dns_servers), + opctx: opctx.child(metadata.clone()), + watchers: vec![], + activator: task_servers, + }); // Background task: DNS propagation let dns_propagate = dns_propagation::DnsPropagator::new( @@ -706,20 +710,23 @@ fn init_dns( dns_servers_watcher.clone(), config.max_concurrent_server_updates, ); - driver.register( - format!("dns_propagation_{}", dns_group), - format!( + driver.register(TaskDefinition { + name: format!("dns_propagation_{}", dns_group), + description: format!( "propagates latest {} DNS configuration (from {:?} background \ task) to the latest list of DNS servers (from {:?} background \ task)", dns_group, task_name_config, task_name_servers, ), - config.period_secs_propagation, - Box::new(dns_propagate), - opctx.child(metadata), - vec![Box::new(dns_config_watcher), Box::new(dns_servers_watcher)], - task_propagation, - ); + period: config.period_secs_propagation, + task_impl: Box::new(dns_propagate), + opctx: opctx.child(metadata), + watchers: vec![ + Box::new(dns_config_watcher), + Box::new(dns_servers_watcher), + ], + activator: task_propagation, + }); } #[cfg(test)]