diff --git a/nexus/src/app/instance.rs b/nexus/src/app/instance.rs index 5d9c05331a..b64757b690 100644 --- a/nexus/src/app/instance.rs +++ b/nexus/src/app/instance.rs @@ -28,6 +28,7 @@ use nexus_db_queries::db::datastore::InstanceAndActiveVmm; use nexus_db_queries::db::identity::Resource; use nexus_db_queries::db::lookup; use nexus_db_queries::db::lookup::LookupPath; +use nexus_db_queries::db::DataStore; use nexus_types::external_api::views; use omicron_common::api::external::http_pagination::PaginatedBy; use omicron_common::api::external::ByteCount; @@ -1513,176 +1514,16 @@ impl super::Nexus { instance_id: &Uuid, new_runtime_state: &nexus::SledInstanceState, ) -> Result<(), Error> { - let log = &self.log; - let propolis_id = new_runtime_state.propolis_id; - - info!(log, "received new runtime state from sled agent"; - "instance_id" => %instance_id, - "instance_state" => ?new_runtime_state.instance_state, - "propolis_id" => %propolis_id, - "vmm_state" => ?new_runtime_state.vmm_state); - - // Grab the current state of the instance in the DB to reason about - // whether this update is stale or not. - let (.., authz_instance, db_instance) = - LookupPath::new(&opctx, &self.db_datastore) - .instance_id(*instance_id) - .fetch() - .await?; - - // Update OPTE and Dendrite if the instance's active sled assignment - // changed or a migration was retired. If these actions fail, sled agent - // is expected to retry this update. - // - // This configuration must be updated before updating any state in CRDB - // so that, if the instance was migrating or has shut down, it will not - // appear to be able to migrate or start again until the appropriate - // networking state has been written. Without this interlock, another - // thread or another Nexus can race with this routine to write - // conflicting configuration. - // - // In the future, this should be replaced by a call to trigger a - // networking state update RPW. - self.ensure_updated_instance_network_config( + notify_instance_updated( + &self.db_datastore, + &self.resolver().await, + &self.opctx_alloc, opctx, - &authz_instance, - db_instance.runtime(), - &new_runtime_state.instance_state, + &self.log, + instance_id, + new_runtime_state, ) - .await?; - - // If the supplied instance state indicates that the instance no longer - // has an active VMM, attempt to delete the virtual provisioning record, - // and the assignment of the Propolis metric producer to an oximeter - // collector. - // - // As with updating networking state, this must be done before - // committing the new runtime state to the database: once the DB is - // written, a new start saga can arrive and start the instance, which - // will try to create its own virtual provisioning charges, which will - // race with this operation. - if new_runtime_state.instance_state.propolis_id.is_none() { - self.db_datastore - .virtual_provisioning_collection_delete_instance( - opctx, - *instance_id, - db_instance.project_id, - i64::from(db_instance.ncpus.0 .0), - db_instance.memory, - (&new_runtime_state.instance_state.gen).into(), - ) - .await?; - - // TODO-correctness: The `notify_instance_updated` method can run - // concurrently with itself in some situations, such as where a - // sled-agent attempts to update Nexus about a stopped instance; - // that times out; and it makes another request to a different - // Nexus. The call to `unassign_producer` is racy in those - // situations, and we may end with instances with no metrics. - // - // This unfortunate case should be handled as part of - // instance-lifecycle improvements, notably using a reliable - // persistent workflow to correctly update the oximete assignment as - // an instance's state changes. - // - // Tracked in https://github.com/oxidecomputer/omicron/issues/3742. - self.unassign_producer(opctx, instance_id).await?; - } - - // Write the new instance and VMM states back to CRDB. This needs to be - // done before trying to clean up the VMM, since the datastore will only - // allow a VMM to be marked as deleted if it is already in a terminal - // state. - let result = self - .db_datastore - .instance_and_vmm_update_runtime( - instance_id, - &db::model::InstanceRuntimeState::from( - new_runtime_state.instance_state.clone(), - ), - &propolis_id, - &db::model::VmmRuntimeState::from( - new_runtime_state.vmm_state.clone(), - ), - ) - .await; - - // If the VMM is now in a terminal state, make sure its resources get - // cleaned up. - // - // For idempotency, only check to see if the update was successfully - // processed and ignore whether the VMM record was actually updated. - // This is required to handle the case where this routine is called - // once, writes the terminal VMM state, fails before all per-VMM - // resources are released, returns a retriable error, and is retried: - // the per-VMM resources still need to be cleaned up, but the DB update - // will return Ok(_, false) because the database was already updated. - // - // Unlike the pre-update cases, it is legal to do this cleanup *after* - // committing state to the database, because a terminated VMM cannot be - // reused (restarting or migrating its former instance will use new VMM - // IDs). - if result.is_ok() { - let propolis_terminated = matches!( - new_runtime_state.vmm_state.state, - InstanceState::Destroyed | InstanceState::Failed - ); - - if propolis_terminated { - info!(log, "vmm is terminated, cleaning up resources"; - "instance_id" => %instance_id, - "propolis_id" => %propolis_id); - - self.db_datastore - .sled_reservation_delete(opctx, propolis_id) - .await?; - - if !self - .db_datastore - .vmm_mark_deleted(opctx, &propolis_id) - .await? - { - warn!(log, "failed to mark vmm record as deleted"; - "instance_id" => %instance_id, - "propolis_id" => %propolis_id, - "vmm_state" => ?new_runtime_state.vmm_state); - } - } - } - - match result { - Ok((instance_updated, vmm_updated)) => { - info!(log, "instance and vmm updated by sled agent"; - "instance_id" => %instance_id, - "propolis_id" => %propolis_id, - "instance_updated" => instance_updated, - "vmm_updated" => vmm_updated); - Ok(()) - } - - // The update command should swallow object-not-found errors and - // return them back as failures to update, so this error case is - // unexpected. There's no work to do if this occurs, however. - Err(Error::ObjectNotFound { .. }) => { - error!(log, "instance/vmm update unexpectedly returned \ - an object not found error"; - "instance_id" => %instance_id, - "propolis_id" => %propolis_id); - Ok(()) - } - - // If the datastore is unavailable, propagate that to the caller. - // TODO-robustness Really this should be any _transient_ error. How - // can we distinguish? Maybe datastore should emit something - // different from Error with an Into. - Err(error) => { - warn!(log, "failed to update instance from sled agent"; - "instance_id" => %instance_id, - "propolis_id" => %propolis_id, - "error" => ?error); - Err(error) - } - } + .await } /// Returns the requested range of serial console output bytes, @@ -2111,6 +1952,185 @@ impl super::Nexus { } } +/// Invoked by a sled agent to publish an updated runtime state for an +/// Instance. +pub(crate) async fn notify_instance_updated( + datastore: &DataStore, + resolver: &internal_dns::resolver::Resolver, + opctx_alloc: &OpContext, + opctx: &OpContext, + log: &slog::Logger, + instance_id: &Uuid, + new_runtime_state: &nexus::SledInstanceState, +) -> Result<(), Error> { + let propolis_id = new_runtime_state.propolis_id; + + info!(log, "received new runtime state from sled agent"; + "instance_id" => %instance_id, + "instance_state" => ?new_runtime_state.instance_state, + "propolis_id" => %propolis_id, + "vmm_state" => ?new_runtime_state.vmm_state); + + // Grab the current state of the instance in the DB to reason about + // whether this update is stale or not. + let (.., authz_instance, db_instance) = LookupPath::new(&opctx, &datastore) + .instance_id(*instance_id) + .fetch() + .await?; + + // Update OPTE and Dendrite if the instance's active sled assignment + // changed or a migration was retired. If these actions fail, sled agent + // is expected to retry this update. + // + // This configuration must be updated before updating any state in CRDB + // so that, if the instance was migrating or has shut down, it will not + // appear to be able to migrate or start again until the appropriate + // networking state has been written. Without this interlock, another + // thread or another Nexus can race with this routine to write + // conflicting configuration. + // + // In the future, this should be replaced by a call to trigger a + // networking state update RPW. + super::instance_network::ensure_updated_instance_network_config( + datastore, + log, + resolver, + opctx, + opctx_alloc, + &authz_instance, + db_instance.runtime(), + &new_runtime_state.instance_state, + ) + .await?; + + // If the supplied instance state indicates that the instance no longer + // has an active VMM, attempt to delete the virtual provisioning record, + // and the assignment of the Propolis metric producer to an oximeter + // collector. + // + // As with updating networking state, this must be done before + // committing the new runtime state to the database: once the DB is + // written, a new start saga can arrive and start the instance, which + // will try to create its own virtual provisioning charges, which will + // race with this operation. + if new_runtime_state.instance_state.propolis_id.is_none() { + datastore + .virtual_provisioning_collection_delete_instance( + opctx, + *instance_id, + db_instance.project_id, + i64::from(db_instance.ncpus.0 .0), + db_instance.memory, + (&new_runtime_state.instance_state.gen).into(), + ) + .await?; + + // TODO-correctness: The `notify_instance_updated` method can run + // concurrently with itself in some situations, such as where a + // sled-agent attempts to update Nexus about a stopped instance; + // that times out; and it makes another request to a different + // Nexus. The call to `unassign_producer` is racy in those + // situations, and we may end with instances with no metrics. + // + // This unfortunate case should be handled as part of + // instance-lifecycle improvements, notably using a reliable + // persistent workflow to correctly update the oximete assignment as + // an instance's state changes. + // + // Tracked in https://github.com/oxidecomputer/omicron/issues/3742. + super::oximeter::unassign_producer(datastore, log, opctx, instance_id) + .await?; + } + + // Write the new instance and VMM states back to CRDB. This needs to be + // done before trying to clean up the VMM, since the datastore will only + // allow a VMM to be marked as deleted if it is already in a terminal + // state. + let result = datastore + .instance_and_vmm_update_runtime( + instance_id, + &db::model::InstanceRuntimeState::from( + new_runtime_state.instance_state.clone(), + ), + &propolis_id, + &db::model::VmmRuntimeState::from( + new_runtime_state.vmm_state.clone(), + ), + ) + .await; + + // If the VMM is now in a terminal state, make sure its resources get + // cleaned up. + // + // For idempotency, only check to see if the update was successfully + // processed and ignore whether the VMM record was actually updated. + // This is required to handle the case where this routine is called + // once, writes the terminal VMM state, fails before all per-VMM + // resources are released, returns a retriable error, and is retried: + // the per-VMM resources still need to be cleaned up, but the DB update + // will return Ok(_, false) because the database was already updated. + // + // Unlike the pre-update cases, it is legal to do this cleanup *after* + // committing state to the database, because a terminated VMM cannot be + // reused (restarting or migrating its former instance will use new VMM + // IDs). + if result.is_ok() { + let propolis_terminated = matches!( + new_runtime_state.vmm_state.state, + InstanceState::Destroyed | InstanceState::Failed + ); + + if propolis_terminated { + info!(log, "vmm is terminated, cleaning up resources"; + "instance_id" => %instance_id, + "propolis_id" => %propolis_id); + + datastore.sled_reservation_delete(opctx, propolis_id).await?; + + if !datastore.vmm_mark_deleted(opctx, &propolis_id).await? { + warn!(log, "failed to mark vmm record as deleted"; + "instance_id" => %instance_id, + "propolis_id" => %propolis_id, + "vmm_state" => ?new_runtime_state.vmm_state); + } + } + } + + match result { + Ok((instance_updated, vmm_updated)) => { + info!(log, "instance and vmm updated by sled agent"; + "instance_id" => %instance_id, + "propolis_id" => %propolis_id, + "instance_updated" => instance_updated, + "vmm_updated" => vmm_updated); + Ok(()) + } + + // The update command should swallow object-not-found errors and + // return them back as failures to update, so this error case is + // unexpected. There's no work to do if this occurs, however. + Err(Error::ObjectNotFound { .. }) => { + error!(log, "instance/vmm update unexpectedly returned \ + an object not found error"; + "instance_id" => %instance_id, + "propolis_id" => %propolis_id); + Ok(()) + } + + // If the datastore is unavailable, propagate that to the caller. + // TODO-robustness Really this should be any _transient_ error. How + // can we distinguish? Maybe datastore should emit something + // different from Error with an Into. + Err(error) => { + warn!(log, "failed to update instance from sled agent"; + "instance_id" => %instance_id, + "propolis_id" => %propolis_id, + "error" => ?error); + Err(error) + } + } +} + #[cfg(test)] mod tests { use super::super::Nexus; diff --git a/nexus/src/app/instance_network.rs b/nexus/src/app/instance_network.rs index c345809f4d..de8a205a0d 100644 --- a/nexus/src/app/instance_network.rs +++ b/nexus/src/app/instance_network.rs @@ -4,6 +4,7 @@ //! Routines that manage instance-related networking state. +use crate::app::switch_port; use ipnetwork::IpNetwork; use ipnetwork::Ipv6Network; use nexus_db_model::ExternalIp; @@ -16,6 +17,7 @@ use nexus_db_queries::context::OpContext; use nexus_db_queries::db; use nexus_db_queries::db::identity::Asset; use nexus_db_queries::db::lookup::LookupPath; +use nexus_db_queries::db::DataStore; use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Error; use omicron_common::api::external::Ipv4Net; @@ -37,19 +39,7 @@ impl super::Nexus { &self, opctx: &OpContext, ) -> Result, Error> { - let mut boundary_switches: HashSet = HashSet::new(); - let uplinks = self.list_switch_ports_with_uplinks(opctx).await?; - for uplink in &uplinks { - let location: SwitchLocation = - uplink.switch_location.parse().map_err(|_| { - Error::internal_error(&format!( - "invalid switch location in uplink config: {}", - uplink.switch_location - )) - })?; - boundary_switches.insert(location); - } - Ok(boundary_switches) + boundary_switches(&self.db_datastore, opctx).await } /// Ensures that V2P mappings exist that indicate that the instance with ID @@ -60,133 +50,15 @@ impl super::Nexus { instance_id: Uuid, sled_id: Uuid, ) -> Result<(), Error> { - info!(&self.log, "creating V2P mappings for instance"; - "instance_id" => %instance_id, - "sled_id" => %sled_id); - - // For every sled that isn't the sled this instance was allocated to, create - // a virtual to physical mapping for each of this instance's NICs. - // - // For the mappings to be correct, a few invariants must hold: - // - // - mappings must be set whenever an instance's sled changes (eg. - // during instance creation, migration, stop + start) - // - // - an instances' sled must not change while its corresponding mappings - // are being created - // - // - the same mapping creation must be broadcast to all sleds - // - // A more targeted approach would be to see what other instances share - // the VPC this instance is in (or more generally, what instances should - // have connectivity to this one), see what sleds those are allocated - // to, and only create V2P mappings for those sleds. - // - // There's additional work with this approach: - // - // - it means that delete calls are required as well as set calls, - // meaning that now the ordering of those matters (this may also - // necessitate a generation number for V2P mappings) - // - // - V2P mappings have to be bidirectional in order for both instances's - // packets to make a round trip. This isn't a problem with the - // broadcast approach because one of the sides will exist already, but - // it is something to orchestrate with a more targeted approach. - // - // TODO-correctness Default firewall rules currently will block - // instances in different VPCs from connecting to each other. If it ever - // stops doing this, the broadcast approach will create V2P mappings - // that shouldn't exist. - let (.., authz_instance) = LookupPath::new(&opctx, &self.db_datastore) - .instance_id(instance_id) - .lookup_for(authz::Action::Read) - .await?; - - let instance_nics = self - .db_datastore - .derive_guest_network_interface_info(&opctx, &authz_instance) - .await?; - - // Look up the supplied sled's physical host IP. - let physical_host_ip = - *self.sled_lookup(&self.opctx_alloc, &sled_id)?.fetch().await?.1.ip; - - let mut last_sled_id: Option = None; - loop { - let pagparams = DataPageParams { - marker: last_sled_id.as_ref(), - direction: dropshot::PaginationOrder::Ascending, - limit: std::num::NonZeroU32::new(10).unwrap(), - }; - - let sleds_page = - self.sled_list(&self.opctx_alloc, &pagparams).await?; - let mut join_handles = - Vec::with_capacity(sleds_page.len() * instance_nics.len()); - - for sled in &sleds_page { - // set_v2p not required for sled instance was allocated to, OPTE - // currently does that automatically - // - // TODO(#3107): Remove this when XDE stops creating mappings - // implicitly. - if sled.id() == sled_id { - continue; - } - - for nic in &instance_nics { - let client = self.sled_client(&sled.id()).await?; - let nic_id = nic.id; - let mapping = SetVirtualNetworkInterfaceHost { - virtual_ip: nic.ip, - virtual_mac: nic.mac, - physical_host_ip, - vni: nic.vni, - }; - - let log = self.log.clone(); - - // This function is idempotent: calling the set_v2p ioctl with - // the same information is a no-op. - join_handles.push(tokio::spawn(futures::future::lazy( - move |_ctx| async move { - retry_until_known_result(&log, || async { - client.set_v2p(&nic_id, &mapping).await - }) - .await - }, - ))); - } - } - - // Concurrently run each future to completion, but return the last - // error seen. - let mut error = None; - for join_handle in join_handles { - let result = join_handle - .await - .map_err(|e| Error::internal_error(&e.to_string()))? - .await; - - if result.is_err() { - error!(self.log, "{:?}", result); - error = Some(result); - } - } - if let Some(e) = error { - return e.map(|_| ()).map_err(|e| e.into()); - } - - if sleds_page.len() < 10 { - break; - } - - if let Some(last) = sleds_page.last() { - last_sled_id = Some(last.id()); - } - } - - Ok(()) + create_instance_v2p_mappings( + &self.db_datastore, + &self.log, + opctx, + &self.opctx_alloc, + instance_id, + sled_id, + ) + .await } /// Ensure that the necessary v2p mappings for an instance are deleted @@ -195,85 +67,14 @@ impl super::Nexus { opctx: &OpContext, instance_id: Uuid, ) -> Result<(), Error> { - // For every sled that isn't the sled this instance was allocated to, delete - // the virtual to physical mapping for each of this instance's NICs. If - // there isn't a V2P mapping, del_v2p should be a no-op. - let (.., authz_instance) = LookupPath::new(&opctx, &self.db_datastore) - .instance_id(instance_id) - .lookup_for(authz::Action::Read) - .await?; - - let instance_nics = self - .db_datastore - .derive_guest_network_interface_info(&opctx, &authz_instance) - .await?; - - let mut last_sled_id: Option = None; - - loop { - let pagparams = DataPageParams { - marker: last_sled_id.as_ref(), - direction: dropshot::PaginationOrder::Ascending, - limit: std::num::NonZeroU32::new(10).unwrap(), - }; - - let sleds_page = - self.sled_list(&self.opctx_alloc, &pagparams).await?; - let mut join_handles = - Vec::with_capacity(sleds_page.len() * instance_nics.len()); - - for sled in &sleds_page { - for nic in &instance_nics { - let client = self.sled_client(&sled.id()).await?; - let nic_id = nic.id; - let mapping = DeleteVirtualNetworkInterfaceHost { - virtual_ip: nic.ip, - vni: nic.vni, - }; - - let log = self.log.clone(); - - // This function is idempotent: calling the set_v2p ioctl with - // the same information is a no-op. - join_handles.push(tokio::spawn(futures::future::lazy( - move |_ctx| async move { - retry_until_known_result(&log, || async { - client.del_v2p(&nic_id, &mapping).await - }) - .await - }, - ))); - } - } - - // Concurrently run each future to completion, but return the last - // error seen. - let mut error = None; - for join_handle in join_handles { - let result = join_handle - .await - .map_err(|e| Error::internal_error(&e.to_string()))? - .await; - - if result.is_err() { - error!(self.log, "{:?}", result); - error = Some(result); - } - } - if let Some(e) = error { - return e.map(|_| ()).map_err(|e| e.into()); - } - - if sleds_page.len() < 10 { - break; - } - - if let Some(last) = sleds_page.last() { - last_sled_id = Some(last.id()); - } - } - - Ok(()) + delete_instance_v2p_mappings( + &self.db_datastore, + &self.log, + opctx, + &self.opctx_alloc, + instance_id, + ) + .await } /// Ensures that the Dendrite configuration for the supplied instance is @@ -305,152 +106,17 @@ impl super::Nexus { sled_ip_address: &std::net::SocketAddrV6, ip_filter: Option, ) -> Result, Error> { - let log = &self.log; - - info!(log, "looking up instance's primary network interface"; - "instance_id" => %instance_id); - - let (.., authz_instance) = LookupPath::new(opctx, &self.db_datastore) - .instance_id(instance_id) - .lookup_for(authz::Action::ListChildren) - .await?; - - // XXX: Need to abstract over v6 and v4 entries here. - let mut nat_entries = vec![]; - - // All external IPs map to the primary network interface, so find that - // interface. If there is no such interface, there's no way to route - // traffic destined to those IPs, so there's nothing to configure and - // it's safe to return early. - let network_interface = match self - .db_datastore - .derive_guest_network_interface_info(&opctx, &authz_instance) - .await? - .into_iter() - .find(|interface| interface.primary) - { - Some(interface) => interface, - None => { - info!(log, "Instance has no primary network interface"; - "instance_id" => %instance_id); - return Ok(nat_entries); - } - }; - - let mac_address = - macaddr::MacAddr6::from_str(&network_interface.mac.to_string()) - .map_err(|e| { - Error::internal_error(&format!( - "failed to convert mac address: {e}" - )) - })?; - - info!(log, "looking up instance's external IPs"; - "instance_id" => %instance_id); - - let ips = self - .db_datastore - .instance_lookup_external_ips(&opctx, instance_id) - .await?; - - let (ips_of_interest, must_all_be_attached) = if let Some(wanted_id) = - ip_filter - { - if let Some(ip) = ips.iter().find(|v| v.id == wanted_id) { - (std::slice::from_ref(ip), false) - } else { - return Err(Error::internal_error(&format!( - "failed to find external ip address with id: {wanted_id}, saw {ips:?}", - ))); - } - } else { - (&ips[..], true) - }; - - // This is performed so that an IP attach/detach will block the - // instance_start saga. Return service unavailable to indicate - // the request is retryable. - if must_all_be_attached - && ips_of_interest - .iter() - .any(|ip| ip.state != IpAttachState::Attached) - { - return Err(Error::unavail( - "cannot push all DPD state: IP attach/detach in progress", - )); - } - - let sled_address = - Ipv6Net(Ipv6Network::new(*sled_ip_address.ip(), 128).unwrap()); - - // If all of our IPs are attached or are guaranteed to be owned - // by the saga calling this fn, then we need to disregard and - // remove conflicting rows. No other instance/service should be - // using these as its own, and we are dealing with detritus, e.g., - // the case where we have a concurrent stop -> detach followed - // by an attach to another instance, or other ongoing attach saga - // cleanup. - let mut err_and_limit = None; - for (i, external_ip) in ips_of_interest.iter().enumerate() { - // For each external ip, add a nat entry to the database - if let Ok(id) = self - .ensure_nat_entry( - external_ip, - sled_address, - &network_interface, - mac_address, - opctx, - ) - .await - { - nat_entries.push(id); - continue; - } - - // We seem to be blocked by a bad row -- take it out and retry. - // This will return Ok() for a non-existent row. - if let Err(e) = self - .external_ip_delete_dpd_config_inner(opctx, external_ip) - .await - { - err_and_limit = Some((e, i)); - break; - }; - - match self - .ensure_nat_entry( - external_ip, - sled_address, - &network_interface, - mac_address, - opctx, - ) - .await - { - Ok(id) => nat_entries.push(id), - Err(e) => { - err_and_limit = Some((e, i)); - break; - } - } - } - - // In the event of an unresolvable failure, we need to remove - // the entries we just added because the undo won't call into - // `instance_delete_dpd_config`. These entries won't stop a - // future caller, but it's better not to pollute switch state. - if let Some((e, max)) = err_and_limit { - for external_ip in &ips_of_interest[..max] { - let _ = self - .external_ip_delete_dpd_config_inner(opctx, external_ip) - .await; - } - return Err(e); - } - - self.notify_dendrite_nat_state(Some(instance_id), true).await?; - - Ok(nat_entries) + instance_ensure_dpd_config( + &self.db_datastore, + &self.log, + &self.resolver().await, + opctx, + &self.opctx_alloc, + instance_id, + sled_ip_address, + ip_filter, + ) + .await } // The logic of this function should follow very closely what @@ -465,120 +131,16 @@ impl super::Nexus { ip_index_filter: Option, dpd_client: &dpd_client::Client, ) -> Result<(), Error> { - let log = &self.log; - - // All external IPs map to the primary network interface, so find that - // interface. If there is no such interface, there's no way to route - // traffic destined to those IPs, so there's nothing to configure and - // it's safe to return early. - let network_interface = match self - .db_datastore - .derive_probe_network_interface_info(&opctx, probe_id) - .await? - .into_iter() - .find(|interface| interface.primary) - { - Some(interface) => interface, - None => { - info!(log, "probe has no primary network interface"; - "probe_id" => %probe_id); - return Ok(()); - } - }; - - let mac_address = - macaddr::MacAddr6::from_str(&network_interface.mac.to_string()) - .map_err(|e| { - Error::internal_error(&format!( - "failed to convert mac address: {e}" - )) - })?; - - info!(log, "looking up probe's external IPs"; - "probe_id" => %probe_id); - - let ips = self - .db_datastore - .probe_lookup_external_ips(&opctx, probe_id) - .await?; - - if let Some(wanted_index) = ip_index_filter { - if let None = ips.get(wanted_index) { - return Err(Error::internal_error(&format!( - "failed to find external ip address at index: {}", - wanted_index - ))); - } - } - - let sled_address = - Ipv6Net(Ipv6Network::new(sled_ip_address, 128).unwrap()); - - for target_ip in ips - .iter() - .enumerate() - .filter(|(index, _)| { - if let Some(wanted_index) = ip_index_filter { - *index == wanted_index - } else { - true - } - }) - .map(|(_, ip)| ip) - { - // For each external ip, add a nat entry to the database - self.ensure_nat_entry( - target_ip, - sled_address, - &network_interface, - mac_address, - opctx, - ) - .await?; - } - - // Notify dendrite that there are changes for it to reconcile. - // In the event of a failure to notify dendrite, we'll log an error - // and rely on dendrite's RPW timer to catch it up. - if let Err(e) = dpd_client.ipv4_nat_trigger_update().await { - error!(self.log, "failed to notify dendrite of nat updates"; "error" => ?e); - }; - - Ok(()) - } - - async fn ensure_nat_entry( - &self, - target_ip: &nexus_db_model::ExternalIp, - sled_address: Ipv6Net, - network_interface: &NetworkInterface, - mac_address: macaddr::MacAddr6, - opctx: &OpContext, - ) -> Result { - match target_ip.ip { - IpNetwork::V4(v4net) => { - let nat_entry = Ipv4NatValues { - external_address: Ipv4Net(v4net).into(), - first_port: target_ip.first_port, - last_port: target_ip.last_port, - sled_address: sled_address.into(), - vni: DbVni(network_interface.vni), - mac: nexus_db_model::MacAddr( - omicron_common::api::external::MacAddr(mac_address), - ), - }; - Ok(self - .db_datastore - .ensure_ipv4_nat_entry(opctx, nat_entry) - .await?) - } - IpNetwork::V6(_v6net) => { - // TODO: implement handling of v6 nat. - return Err(Error::InternalError { - internal_message: "ipv6 nat is not yet implemented".into(), - }); - } - } + probe_ensure_dpd_config( + &self.db_datastore, + &self.log, + opctx, + probe_id, + sled_ip_address, + ip_index_filter, + dpd_client, + ) + .await } /// Attempts to delete all of the Dendrite NAT configuration for the @@ -608,22 +170,16 @@ impl super::Nexus { opctx: &OpContext, authz_instance: &authz::Instance, ) -> Result<(), Error> { - let log = &self.log; - let instance_id = authz_instance.id(); - - info!(log, "deleting instance dpd configuration"; - "instance_id" => %instance_id); - - let external_ips = self - .db_datastore - .instance_lookup_external_ips(opctx, instance_id) - .await?; - - for entry in external_ips { - self.external_ip_delete_dpd_config_inner(opctx, &entry).await?; - } - - self.notify_dendrite_nat_state(Some(instance_id), false).await + let resolver = self.resolver().await; + instance_delete_dpd_config( + &self.db_datastore, + &self.log, + &resolver, + opctx, + &self.opctx_alloc, + authz_instance, + ) + .await } /// Attempts to delete Dendrite NAT configuration for a single external IP. @@ -635,16 +191,13 @@ impl super::Nexus { opctx: &OpContext, external_ip: &ExternalIp, ) -> Result<(), Error> { - let log = &self.log; - let instance_id = external_ip.parent_id; - - info!(log, "deleting individual NAT entry from dpd configuration"; - "instance_id" => ?instance_id, - "external_ip" => %external_ip.ip); - - self.external_ip_delete_dpd_config_inner(opctx, external_ip).await?; - - self.notify_dendrite_nat_state(instance_id, false).await + external_ip_delete_dpd_config_inner( + &self.db_datastore, + &self.log, + opctx, + external_ip, + ) + .await } /// Attempts to soft-delete Dendrite NAT configuration for a specific entry @@ -658,371 +211,1099 @@ impl super::Nexus { opctx: &OpContext, nat_entry: &Ipv4NatEntry, ) -> Result<(), Error> { - let log = &self.log; - - info!(log, "deleting individual NAT entry from dpd configuration"; - "id" => ?nat_entry.id, - "version_added" => %nat_entry.external_address.0); - - match self.db_datastore.ipv4_nat_delete(&opctx, nat_entry).await { - Ok(_) => {} - Err(err) => match err { - Error::ObjectNotFound { .. } => { - warn!(log, "no matching nat entries to soft delete"); - } - _ => { - let message = format!( - "failed to delete nat entry due to error: {err:?}" - ); - error!(log, "{}", message); - return Err(Error::internal_error(&message)); - } - }, - } - - self.notify_dendrite_nat_state(None, false).await + delete_dpd_config_by_entry( + &self.db_datastore, + &self.resolver().await, + &self.log, + opctx, + &self.opctx_alloc, + nat_entry, + ) + .await } - /// Soft-delete an individual external IP from the NAT RPW, without - /// triggering a Dendrite notification. - async fn external_ip_delete_dpd_config_inner( + // The logic of this function should follow very closely what + // `instance_delete_dpd_config` does. However, there are enough differences + // in the mechanics of how the logic is being carried out to justify having + // this separate function, it seems. + pub(crate) async fn probe_delete_dpd_config( &self, opctx: &OpContext, - external_ip: &ExternalIp, + probe_id: Uuid, ) -> Result<(), Error> { - let log = &self.log; - - // Soft delete the NAT entry - match self - .db_datastore - .ipv4_nat_delete_by_external_ip(&opctx, external_ip) - .await - { - Ok(_) => Ok(()), - Err(err) => match err { - Error::ObjectNotFound { .. } => { - warn!(log, "no matching nat entries to soft delete"); - Ok(()) - } - _ => { - let message = format!( - "failed to delete nat entry due to error: {err:?}" - ); - error!(log, "{}", message); - Err(Error::internal_error(&message)) - } - }, - } + probe_delete_dpd_config( + &self.db_datastore, + &self.log, + &self.resolver().await, + opctx, + &self.opctx_alloc, + probe_id, + ) + .await } +} - /// Informs all available boundary switches that the set of NAT entries - /// has changed. - /// - /// When `fail_fast` is set, this function will return on any error when - /// acquiring a handle to a DPD client. Otherwise, it will attempt to notify - /// all clients and then finally return the first error. - async fn notify_dendrite_nat_state( - &self, - instance_id: Option, - fail_fast: bool, - ) -> Result<(), Error> { - // Querying boundary switches also requires fleet access and the use of the - // instance allocator context. - let boundary_switches = - self.boundary_switches(&self.opctx_alloc).await?; - - let mut errors = vec![]; - for switch in &boundary_switches { - debug!(&self.log, "notifying dendrite of updates"; - "instance_id" => ?instance_id, - "switch" => switch.to_string()); - - let clients = self.dpd_clients().await.map_err(|e| { +/// Returns the set of switches with uplinks configured and boundary +/// services enabled. +pub(crate) async fn boundary_switches( + datastore: &DataStore, + opctx: &OpContext, +) -> Result, Error> { + let mut boundary_switches: HashSet = HashSet::new(); + let uplinks = + switch_port::list_switch_ports_with_uplinks(datastore, opctx).await?; + for uplink in &uplinks { + let location: SwitchLocation = + uplink.switch_location.parse().map_err(|_| { Error::internal_error(&format!( - "failed to get dpd clients: {e}" + "invalid switch location in uplink config: {}", + uplink.switch_location )) })?; - let client_result = clients.get(switch).ok_or_else(|| { - Error::internal_error(&format!( - "unable to find dendrite client for {switch}" - )) - }); + boundary_switches.insert(location); + } + Ok(boundary_switches) +} - let dpd_client = match client_result { - Ok(client) => client, - Err(new_error) => { - errors.push(new_error); - if fail_fast { - break; - } else { - continue; - } - } - }; +/// Given old and new instance runtime states, determines the desired +/// networking configuration for a given instance and ensures it has been +/// propagated to all relevant sleds. +/// +/// # Arguments +/// +/// - `datastore`: the datastore to use for lookups and updates. +/// - `log`: the [`slog::Logger`] to log to. +/// - `resolver`: an internal DNS resolver to look up DPD service addresses. +/// - `opctx`: An operation context for this operation. +/// - `opctx_alloc`: An operational context list permissions for all sleds. When +/// called by methods on the [`Nexus`] type, this is the `OpContext` used for +/// instance allocation. In a background task, this may be the background +/// task's operational context; nothing stops you from passing the same +/// `OpContext` as both `opctx` and `opctx_alloc`. +/// - `authz_instance``: A resolved authorization context for the instance of +/// interest. +/// - `prev_instance_state``: The most-recently-recorded instance runtime +/// state for this instance. +/// - `new_instance_state`: The instance state that the caller of this routine +/// has observed and that should be used to set up this instance's +/// networking state. +/// +/// # Return value +/// +/// `Ok(())` if this routine completed all the operations it wanted to +/// complete, or an appropriate `Err` otherwise. +#[allow(clippy::too_many_arguments)] // Yeah, I know, I know, Clippy... +pub(crate) async fn ensure_updated_instance_network_config( + datastore: &DataStore, + log: &slog::Logger, + resolver: &internal_dns::resolver::Resolver, + opctx: &OpContext, + opctx_alloc: &OpContext, + authz_instance: &authz::Instance, + prev_instance_state: &db::model::InstanceRuntimeState, + new_instance_state: &nexus::InstanceRuntimeState, +) -> Result<(), Error> { + let instance_id = authz_instance.id(); + + // If this instance update is stale, do nothing, since the superseding + // update may have allowed the instance's location to change further. + if prev_instance_state.gen >= new_instance_state.gen.into() { + debug!(log, + "instance state generation already advanced, \ + won't touch network config"; + "instance_id" => %instance_id); + + return Ok(()); + } - // Notify dendrite that there are changes for it to reconcile. - // In the event of a failure to notify dendrite, we'll log an error - // and rely on dendrite's RPW timer to catch it up. - if let Err(e) = dpd_client.ipv4_nat_trigger_update().await { - error!(self.log, "failed to notify dendrite of nat updates"; "error" => ?e); - }; + // If this update will retire the instance's active VMM, delete its + // networking state. It will be re-established the next time the + // instance starts. + if new_instance_state.propolis_id.is_none() { + info!(log, + "instance cleared its Propolis ID, cleaning network config"; + "instance_id" => %instance_id, + "propolis_id" => ?prev_instance_state.propolis_id); + + clear_instance_networking_state( + datastore, + log, + resolver, + opctx, + opctx_alloc, + authz_instance, + ) + .await?; + return Ok(()); + } + + // If the instance still has a migration in progress, don't change + // any networking state until an update arrives that retires that + // migration. + // + // This is needed to avoid the following race: + // + // 1. Migration from S to T completes. + // 2. Migration source sends an update that changes the instance's + // active VMM but leaves the migration ID in place. + // 3. Meanwhile, migration target sends an update that changes the + // instance's active VMM and clears the migration ID. + // 4. The migration target's call updates networking state and commits + // the new instance record. + // 5. The instance migrates from T to T' and Nexus applies networking + // configuration reflecting that the instance is on T'. + // 6. The update in step 2 applies configuration saying the instance + // is on sled T. + if new_instance_state.migration_id.is_some() { + debug!(log, + "instance still has a migration in progress, won't touch \ + network config"; + "instance_id" => %instance_id, + "migration_id" => ?new_instance_state.migration_id); + + return Ok(()); + } + + let new_propolis_id = new_instance_state.propolis_id.unwrap(); + + // Updates that end live migration need to push OPTE V2P state even if + // the instance's active sled did not change (see below). + let migration_retired = prev_instance_state.migration_id.is_some() + && new_instance_state.migration_id.is_none(); + + if (prev_instance_state.propolis_id == new_instance_state.propolis_id) + && !migration_retired + { + debug!(log, "instance didn't move, won't touch network config"; + "instance_id" => %instance_id); + + return Ok(()); + } + + // Either the instance moved from one sled to another, or it attempted + // to migrate and failed. Ensure the correct networking configuration + // exists for its current home. + // + // TODO(#3107) This is necessary even if the instance didn't move, + // because registering a migration target on a sled creates OPTE ports + // for its VNICs, and that creates new V2P mappings on that sled that + // place the relevant virtual IPs on the local sled. Once OPTE stops + // creating these mappings, this path only needs to be taken if an + // instance has changed sleds. + let new_sled_id = match datastore + .vmm_fetch(&opctx, authz_instance, &new_propolis_id) + .await + { + Ok(vmm) => vmm.sled_id, + + // A VMM in the active position should never be destroyed. If the + // sled sending this message is the owner of the instance's last + // active VMM and is destroying it, it should also have retired that + // VMM. + Err(Error::ObjectNotFound { .. }) => { + error!(log, "instance's active vmm unexpectedly not found"; + "instance_id" => %instance_id, + "propolis_id" => %new_propolis_id); + + return Ok(()); } - if let Some(e) = errors.into_iter().next() { - return Err(e); + Err(e) => return Err(e), + }; + + create_instance_v2p_mappings( + datastore, + log, + opctx, + opctx_alloc, + instance_id, + new_sled_id, + ) + .await?; + + let (.., sled) = + LookupPath::new(opctx, datastore).sled_id(new_sled_id).fetch().await?; + + instance_ensure_dpd_config( + datastore, + log, + resolver, + opctx, + opctx_alloc, + instance_id, + &sled.address(), + None, + ) + .await?; + + Ok(()) +} + +/// Ensures that the Dendrite configuration for the supplied instance is +/// up-to-date. +/// +/// Returns a list of live NAT RPW table entries from this call. Generally +/// these should only be needed for specific unwind operations, like in +/// the IP attach saga. +/// +/// # Parameters +/// +/// - `datastore`: the datastore to use for lookups and updates. +/// - `log`: the [`slog::Logger`] to log to. +/// - `resolver`: an internal DNS resolver to look up DPD service addresses. +/// - `opctx`: An operation context that grants read and list-children +/// permissions on the identified instance. +/// - `opctx_alloc`: An operational context list permissions for all sleds. When +/// called by methods on the [`Nexus`] type, this is the `OpContext` used for +/// instance allocation. In a background task, this may be the background +/// task's operational context; nothing stops you from passing the same +/// `OpContext` as both `opctx` and `opctx_alloc`. +/// - `instance_id`: The ID of the instance to act on. +/// - `sled_ip_address`: The internal IP address assigned to the sled's +/// sled agent. +/// - `ip_filter`: An optional filter on the index into the instance's +/// external IP array. +/// - If this is `Some(id)`, this routine configures DPD state for only the +/// external IP with `id` in the collection returned from CRDB. This will +/// proceed even when the target IP is 'attaching'. +/// - If this is `None`, this routine configures DPD for all external +/// IPs and *will back out* if any IPs are not yet fully attached to +/// the instance. +#[allow(clippy::too_many_arguments)] // I don't like it either, clippy... +pub(crate) async fn instance_ensure_dpd_config( + datastore: &DataStore, + log: &slog::Logger, + resolver: &internal_dns::resolver::Resolver, + opctx: &OpContext, + opctx_alloc: &OpContext, + instance_id: Uuid, + sled_ip_address: &std::net::SocketAddrV6, + ip_filter: Option, +) -> Result, Error> { + info!(log, "looking up instance's primary network interface"; + "instance_id" => %instance_id); + + let (.., authz_instance) = LookupPath::new(opctx, datastore) + .instance_id(instance_id) + .lookup_for(authz::Action::ListChildren) + .await?; + + // XXX: Need to abstract over v6 and v4 entries here. + let mut nat_entries = vec![]; + + // All external IPs map to the primary network interface, so find that + // interface. If there is no such interface, there's no way to route + // traffic destined to those IPs, so there's nothing to configure and + // it's safe to return early. + let network_interface = match datastore + .derive_guest_network_interface_info(&opctx, &authz_instance) + .await? + .into_iter() + .find(|interface| interface.primary) + { + Some(interface) => interface, + None => { + info!(log, "Instance has no primary network interface"; + "instance_id" => %instance_id); + return Ok(nat_entries); } + }; - Ok(()) + let mac_address = + macaddr::MacAddr6::from_str(&network_interface.mac.to_string()) + .map_err(|e| { + Error::internal_error(&format!( + "failed to convert mac address: {e}" + )) + })?; + + info!(log, "looking up instance's external IPs"; + "instance_id" => %instance_id); + + let ips = + datastore.instance_lookup_external_ips(&opctx, instance_id).await?; + + let (ips_of_interest, must_all_be_attached) = if let Some(wanted_id) = + ip_filter + { + if let Some(ip) = ips.iter().find(|v| v.id == wanted_id) { + (std::slice::from_ref(ip), false) + } else { + return Err(Error::internal_error(&format!( + "failed to find external ip address with id: {wanted_id}, saw {ips:?}", + ))); + } + } else { + (&ips[..], true) + }; + + // This is performed so that an IP attach/detach will block the + // instance_start saga. Return service unavailable to indicate + // the request is retryable. + if must_all_be_attached + && ips_of_interest.iter().any(|ip| ip.state != IpAttachState::Attached) + { + return Err(Error::unavail( + "cannot push all DPD state: IP attach/detach in progress", + )); } - // The logic of this function should follow very closely what - // `instance_delete_dpd_config` does. However, there are enough differences - // in the mechanics of how the logic is being carried out to justify having - // this separate function, it seems. - pub(crate) async fn probe_delete_dpd_config( - &self, - opctx: &OpContext, - probe_id: Uuid, - ) -> Result<(), Error> { - let log = &self.log; + let sled_address = + Ipv6Net(Ipv6Network::new(*sled_ip_address.ip(), 128).unwrap()); + + // If all of our IPs are attached or are guaranteed to be owned + // by the saga calling this fn, then we need to disregard and + // remove conflicting rows. No other instance/service should be + // using these as its own, and we are dealing with detritus, e.g., + // the case where we have a concurrent stop -> detach followed + // by an attach to another instance, or other ongoing attach saga + // cleanup. + let mut err_and_limit = None; + for (i, external_ip) in ips_of_interest.iter().enumerate() { + // For each external ip, add a nat entry to the database + if let Ok(id) = ensure_nat_entry( + datastore, + external_ip, + sled_address, + &network_interface, + mac_address, + opctx, + ) + .await + { + nat_entries.push(id); + continue; + } - info!(log, "deleting probe dpd configuration"; - "probe_id" => %probe_id); + // We seem to be blocked by a bad row -- take it out and retry. + // This will return Ok() for a non-existent row. + if let Err(e) = external_ip_delete_dpd_config_inner( + datastore, + log, + opctx, + external_ip, + ) + .await + { + err_and_limit = Some((e, i)); + break; + }; - let external_ips = self - .db_datastore - .probe_lookup_external_ips(opctx, probe_id) - .await?; + match ensure_nat_entry( + datastore, + external_ip, + sled_address, + &network_interface, + mac_address, + opctx, + ) + .await + { + Ok(id) => nat_entries.push(id), + Err(e) => { + err_and_limit = Some((e, i)); + break; + } + } + } - let mut errors = vec![]; - for entry in external_ips { - // Soft delete the NAT entry - match self - .db_datastore - .ipv4_nat_delete_by_external_ip(&opctx, &entry) - .await - { - Ok(_) => Ok(()), - Err(err) => match err { - Error::ObjectNotFound { .. } => { - warn!(log, "no matching nat entries to soft delete"); - Ok(()) - } - _ => { - let message = format!( - "failed to delete nat entry due to error: {err:?}" - ); - error!(log, "{}", message); - Err(Error::internal_error(&message)) - } - }, - }?; + // In the event of an unresolvable failure, we need to remove + // the entries we just added because the undo won't call into + // `instance_delete_dpd_config`. These entries won't stop a + // future caller, but it's better not to pollute switch state. + if let Some((e, max)) = err_and_limit { + for external_ip in &ips_of_interest[..max] { + let _ = external_ip_delete_dpd_config_inner( + datastore, + log, + opctx, + external_ip, + ) + .await; } + return Err(e); + } - let boundary_switches = - self.boundary_switches(&self.opctx_alloc).await?; + notify_dendrite_nat_state( + datastore, + log, + resolver, + opctx_alloc, + Some(instance_id), + true, + ) + .await?; + + Ok(nat_entries) +} - for switch in &boundary_switches { - debug!(&self.log, "notifying dendrite of updates"; - "probe_id" => %probe_id, - "switch" => switch.to_string()); +// The logic of this function should follow very closely what +// `instance_ensure_dpd_config` does. However, there are enough differences +// in the mechanics of how the logic is being carried out to justify having +// this separate function, it seems. +pub(crate) async fn probe_ensure_dpd_config( + datastore: &DataStore, + log: &slog::Logger, + opctx: &OpContext, + probe_id: Uuid, + sled_ip_address: std::net::Ipv6Addr, + ip_index_filter: Option, + dpd_client: &dpd_client::Client, +) -> Result<(), Error> { + // All external IPs map to the primary network interface, so find that + // interface. If there is no such interface, there's no way to route + // traffic destined to those IPs, so there's nothing to configure and + // it's safe to return early. + let network_interface = match datastore + .derive_probe_network_interface_info(&opctx, probe_id) + .await? + .into_iter() + .find(|interface| interface.primary) + { + Some(interface) => interface, + None => { + info!(log, "probe has no primary network interface"; + "probe_id" => %probe_id); + return Ok(()); + } + }; - let dpd_clients = self.dpd_clients().await.map_err(|e| { + let mac_address = + macaddr::MacAddr6::from_str(&network_interface.mac.to_string()) + .map_err(|e| { Error::internal_error(&format!( - "unable to get dpd_clients: {e}" + "failed to convert mac address: {e}" )) })?; - let client_result = dpd_clients.get(switch).ok_or_else(|| { - Error::internal_error(&format!( - "unable to find dendrite client for {switch}" - )) - }); + info!(log, "looking up probe's external IPs"; + "probe_id" => %probe_id); - let dpd_client = match client_result { - Ok(client) => client, - Err(new_error) => { - errors.push(new_error); - continue; - } - }; + let ips = datastore.probe_lookup_external_ips(&opctx, probe_id).await?; - // Notify dendrite that there are changes for it to reconcile. - // In the event of a failure to notify dendrite, we'll log an error - // and rely on dendrite's RPW timer to catch it up. - if let Err(e) = dpd_client.ipv4_nat_trigger_update().await { - error!(self.log, "failed to notify dendrite of nat updates"; "error" => ?e); - }; + if let Some(wanted_index) = ip_index_filter { + if let None = ips.get(wanted_index) { + return Err(Error::internal_error(&format!( + "failed to find external ip address at index: {}", + wanted_index + ))); } + } - if let Some(e) = errors.into_iter().nth(0) { - return Err(e); - } + let sled_address = Ipv6Net(Ipv6Network::new(sled_ip_address, 128).unwrap()); - Ok(()) + for target_ip in ips + .iter() + .enumerate() + .filter(|(index, _)| { + if let Some(wanted_index) = ip_index_filter { + *index == wanted_index + } else { + true + } + }) + .map(|(_, ip)| ip) + { + // For each external ip, add a nat entry to the database + ensure_nat_entry( + datastore, + target_ip, + sled_address, + &network_interface, + mac_address, + opctx, + ) + .await?; } - /// Deletes an instance's OPTE V2P mappings and the boundary switch NAT - /// entries for its external IPs. - /// - /// This routine returns immediately upon encountering any errors (and will - /// not try to destroy any more objects after the point of failure). - async fn clear_instance_networking_state( - &self, - opctx: &OpContext, - authz_instance: &authz::Instance, - ) -> Result<(), Error> { - self.delete_instance_v2p_mappings(opctx, authz_instance.id()).await?; + // Notify dendrite that there are changes for it to reconcile. + // In the event of a failure to notify dendrite, we'll log an error + // and rely on dendrite's RPW timer to catch it up. + if let Err(e) = dpd_client.ipv4_nat_trigger_update().await { + error!(log, "failed to notify dendrite of nat updates"; "error" => ?e); + }; - self.instance_delete_dpd_config(opctx, authz_instance).await?; + Ok(()) +} - self.notify_dendrite_nat_state(Some(authz_instance.id()), true).await - } +/// Deletes an instance's OPTE V2P mappings and the boundary switch NAT +/// entries for its external IPs. +/// +/// This routine returns immediately upon encountering any errors (and will +/// not try to destroy any more objects after the point of failure). +async fn clear_instance_networking_state( + datastore: &DataStore, + log: &slog::Logger, + + resolver: &internal_dns::resolver::Resolver, + opctx: &OpContext, + opctx_alloc: &OpContext, + authz_instance: &authz::Instance, +) -> Result<(), Error> { + delete_instance_v2p_mappings( + datastore, + log, + opctx, + opctx_alloc, + authz_instance.id(), + ) + .await?; + + instance_delete_dpd_config( + datastore, + log, + resolver, + opctx, + opctx_alloc, + authz_instance, + ) + .await?; + + notify_dendrite_nat_state( + datastore, + log, + resolver, + opctx_alloc, + Some(authz_instance.id()), + true, + ) + .await +} - /// Given old and new instance runtime states, determines the desired - /// networking configuration for a given instance and ensures it has been - /// propagated to all relevant sleds. - /// - /// # Arguments - /// - /// - opctx: An operation context for this operation. - /// - authz_instance: A resolved authorization context for the instance of - /// interest. - /// - prev_instance_state: The most-recently-recorded instance runtime - /// state for this instance. - /// - new_instance_state: The instance state that the caller of this routine - /// has observed and that should be used to set up this instance's - /// networking state. - /// - /// # Return value - /// - /// `Ok(())` if this routine completed all the operations it wanted to - /// complete, or an appropriate `Err` otherwise. - pub(crate) async fn ensure_updated_instance_network_config( - &self, - opctx: &OpContext, - authz_instance: &authz::Instance, - prev_instance_state: &db::model::InstanceRuntimeState, - new_instance_state: &nexus::InstanceRuntimeState, - ) -> Result<(), Error> { - let log = &self.log; - let instance_id = authz_instance.id(); +/// Ensures that V2P mappings exist that indicate that the instance with ID +/// `instance_id` is resident on the sled with ID `sled_id`. +pub(crate) async fn create_instance_v2p_mappings( + datastore: &DataStore, + log: &slog::Logger, + opctx: &OpContext, + opctx_alloc: &OpContext, + instance_id: Uuid, + sled_id: Uuid, +) -> Result<(), Error> { + info!(log, "creating V2P mappings for instance"; + "instance_id" => %instance_id, + "sled_id" => %sled_id); + + // For every sled that isn't the sled this instance was allocated to, create + // a virtual to physical mapping for each of this instance's NICs. + // + // For the mappings to be correct, a few invariants must hold: + // + // - mappings must be set whenever an instance's sled changes (eg. + // during instance creation, migration, stop + start) + // + // - an instances' sled must not change while its corresponding mappings + // are being created + // + // - the same mapping creation must be broadcast to all sleds + // + // A more targeted approach would be to see what other instances share + // the VPC this instance is in (or more generally, what instances should + // have connectivity to this one), see what sleds those are allocated + // to, and only create V2P mappings for those sleds. + // + // There's additional work with this approach: + // + // - it means that delete calls are required as well as set calls, + // meaning that now the ordering of those matters (this may also + // necessitate a generation number for V2P mappings) + // + // - V2P mappings have to be bidirectional in order for both instances's + // packets to make a round trip. This isn't a problem with the + // broadcast approach because one of the sides will exist already, but + // it is something to orchestrate with a more targeted approach. + // + // TODO-correctness Default firewall rules currently will block + // instances in different VPCs from connecting to each other. If it ever + // stops doing this, the broadcast approach will create V2P mappings + // that shouldn't exist. + let (.., authz_instance) = LookupPath::new(&opctx, &datastore) + .instance_id(instance_id) + .lookup_for(authz::Action::Read) + .await?; - // If this instance update is stale, do nothing, since the superseding - // update may have allowed the instance's location to change further. - if prev_instance_state.gen >= new_instance_state.gen.into() { - debug!(log, - "instance state generation already advanced, \ - won't touch network config"; - "instance_id" => %instance_id); + let instance_nics = datastore + .derive_guest_network_interface_info(&opctx, &authz_instance) + .await?; - return Ok(()); + // Look up the supplied sled's physical host IP. + let physical_host_ip = + nexus_networking::sled_lookup(&datastore, &opctx_alloc, sled_id)? + .fetch() + .await? + .1 + .ip + .into(); + + let mut last_sled_id: Option = None; + loop { + let pagparams = DataPageParams { + marker: last_sled_id.as_ref(), + direction: dropshot::PaginationOrder::Ascending, + limit: std::num::NonZeroU32::new(10).unwrap(), + }; + + let sleds_page = datastore.sled_list(&opctx_alloc, &pagparams).await?; + let mut join_handles = + Vec::with_capacity(sleds_page.len() * instance_nics.len()); + + for sled in &sleds_page { + // set_v2p not required for sled instance was allocated to, OPTE + // currently does that automatically + // + // TODO(#3107): Remove this when XDE stops creating mappings + // implicitly. + if sled.id() == sled_id { + continue; + } + + for nic in &instance_nics { + let client = nexus_networking::sled_client( + datastore, + opctx_alloc, + sled.id(), + log, + ) + .await?; + let nic_id = nic.id; + let mapping = SetVirtualNetworkInterfaceHost { + virtual_ip: nic.ip, + virtual_mac: nic.mac, + physical_host_ip, + vni: nic.vni, + }; + + let log = log.clone(); + + // This function is idempotent: calling the set_v2p ioctl with + // the same information is a no-op. + join_handles.push(tokio::spawn(futures::future::lazy( + move |_ctx| async move { + retry_until_known_result(&log, || async { + client.set_v2p(&nic_id, &mapping).await + }) + .await + }, + ))); + } } - // If this update will retire the instance's active VMM, delete its - // networking state. It will be re-established the next time the - // instance starts. - if new_instance_state.propolis_id.is_none() { - info!(log, - "instance cleared its Propolis ID, cleaning network config"; - "instance_id" => %instance_id, - "propolis_id" => ?prev_instance_state.propolis_id); + // Concurrently run each future to completion, but return the last + // error seen. + let mut error = None; + for join_handle in join_handles { + let result = join_handle + .await + .map_err(|e| Error::internal_error(&e.to_string()))? + .await; - self.clear_instance_networking_state(opctx, authz_instance).await?; - return Ok(()); + if result.is_err() { + error!(log, "{:?}", result); + error = Some(result); + } + } + if let Some(e) = error { + return e.map(|_| ()).map_err(|e| e.into()); } - // If the instance still has a migration in progress, don't change - // any networking state until an update arrives that retires that - // migration. - // - // This is needed to avoid the following race: - // - // 1. Migration from S to T completes. - // 2. Migration source sends an update that changes the instance's - // active VMM but leaves the migration ID in place. - // 3. Meanwhile, migration target sends an update that changes the - // instance's active VMM and clears the migration ID. - // 4. The migration target's call updates networking state and commits - // the new instance record. - // 5. The instance migrates from T to T' and Nexus applies networking - // configuration reflecting that the instance is on T'. - // 6. The update in step 2 applies configuration saying the instance - // is on sled T. - if new_instance_state.migration_id.is_some() { - debug!(log, - "instance still has a migration in progress, won't touch \ - network config"; - "instance_id" => %instance_id, - "migration_id" => ?new_instance_state.migration_id); + if sleds_page.len() < 10 { + break; + } - return Ok(()); + if let Some(last) = sleds_page.last() { + last_sled_id = Some(last.id()); } + } - let new_propolis_id = new_instance_state.propolis_id.unwrap(); + Ok(()) +} - // Updates that end live migration need to push OPTE V2P state even if - // the instance's active sled did not change (see below). - let migration_retired = prev_instance_state.migration_id.is_some() - && new_instance_state.migration_id.is_none(); +/// Ensure that the necessary v2p mappings for an instance are deleted +pub(crate) async fn delete_instance_v2p_mappings( + datastore: &DataStore, + log: &slog::Logger, + opctx: &OpContext, + opctx_alloc: &OpContext, + instance_id: Uuid, +) -> Result<(), Error> { + // For every sled that isn't the sled this instance was allocated to, delete + // the virtual to physical mapping for each of this instance's NICs. If + // there isn't a V2P mapping, del_v2p should be a no-op. + let (.., authz_instance) = LookupPath::new(&opctx, datastore) + .instance_id(instance_id) + .lookup_for(authz::Action::Read) + .await?; - if (prev_instance_state.propolis_id == new_instance_state.propolis_id) - && !migration_retired - { - debug!(log, "instance didn't move, won't touch network config"; - "instance_id" => %instance_id); + let instance_nics = datastore + .derive_guest_network_interface_info(&opctx, &authz_instance) + .await?; - return Ok(()); + let mut last_sled_id: Option = None; + + loop { + let pagparams = DataPageParams { + marker: last_sled_id.as_ref(), + direction: dropshot::PaginationOrder::Ascending, + limit: std::num::NonZeroU32::new(10).unwrap(), + }; + + let sleds_page = datastore.sled_list(&opctx_alloc, &pagparams).await?; + let mut join_handles = + Vec::with_capacity(sleds_page.len() * instance_nics.len()); + + for sled in &sleds_page { + for nic in &instance_nics { + let client = nexus_networking::sled_client( + &datastore, + &opctx_alloc, + sled.id(), + &log, + ) + .await?; + let nic_id = nic.id; + let mapping = DeleteVirtualNetworkInterfaceHost { + virtual_ip: nic.ip, + vni: nic.vni, + }; + + let log = log.clone(); + + // This function is idempotent: calling the set_v2p ioctl with + // the same information is a no-op. + join_handles.push(tokio::spawn(futures::future::lazy( + move |_ctx| async move { + retry_until_known_result(&log, || async { + client.del_v2p(&nic_id, &mapping).await + }) + .await + }, + ))); + } } - // Either the instance moved from one sled to another, or it attempted - // to migrate and failed. Ensure the correct networking configuration - // exists for its current home. - // - // TODO(#3107) This is necessary even if the instance didn't move, - // because registering a migration target on a sled creates OPTE ports - // for its VNICs, and that creates new V2P mappings on that sled that - // place the relevant virtual IPs on the local sled. Once OPTE stops - // creating these mappings, this path only needs to be taken if an - // instance has changed sleds. - let new_sled_id = match self - .db_datastore - .vmm_fetch(&opctx, authz_instance, &new_propolis_id) - .await - { - Ok(vmm) => vmm.sled_id, - - // A VMM in the active position should never be destroyed. If the - // sled sending this message is the owner of the instance's last - // active VMM and is destroying it, it should also have retired that - // VMM. - Err(Error::ObjectNotFound { .. }) => { - error!(log, "instance's active vmm unexpectedly not found"; - "instance_id" => %instance_id, - "propolis_id" => %new_propolis_id); - - return Ok(()); + // Concurrently run each future to completion, but return the last + // error seen. + let mut error = None; + for join_handle in join_handles { + let result = join_handle + .await + .map_err(|e| Error::internal_error(&e.to_string()))? + .await; + + if result.is_err() { + error!(log, "{:?}", result); + error = Some(result); } + } + if let Some(e) = error { + return e.map(|_| ()).map_err(|e| e.into()); + } - Err(e) => return Err(e), - }; + if sleds_page.len() < 10 { + break; + } - self.create_instance_v2p_mappings(opctx, instance_id, new_sled_id) - .await?; + if let Some(last) = sleds_page.last() { + last_sled_id = Some(last.id()); + } + } - let (.., sled) = LookupPath::new(opctx, &self.db_datastore) - .sled_id(new_sled_id) - .fetch() + Ok(()) +} + +/// Attempts to delete all of the Dendrite NAT configuration for the +/// instance identified by `authz_instance`. +/// +/// Unlike `instance_ensure_dpd_config`, this function will disregard the +/// attachment states of any external IPs because likely callers (instance +/// delete) cannot be piecewise undone. +/// +/// # Return value +/// +/// - `Ok(())` if all NAT entries were successfully deleted. +/// - If an operation fails before this routine begins to walk and delete +/// individual NAT entries, this routine returns `Err` and reports that +/// error. +/// - If an operation fails while this routine is walking NAT entries, it +/// will continue trying to delete subsequent entries but will return the +/// first error it encountered. +/// - `ip_filter`: An optional filter on the index into the instance's +/// external IP array. +/// - If this is `Some(id)`, this routine configures DPD state for only the +/// external IP with `id` in the collection returned from CRDB. +/// - If this is `None`, this routine configures DPD for all external +/// IPs. +pub(crate) async fn instance_delete_dpd_config( + datastore: &DataStore, + log: &slog::Logger, + resolver: &internal_dns::resolver::Resolver, + opctx: &OpContext, + opctx_alloc: &OpContext, + authz_instance: &authz::Instance, +) -> Result<(), Error> { + let instance_id = authz_instance.id(); + + info!(log, "deleting instance dpd configuration"; + "instance_id" => %instance_id); + + let external_ips = + datastore.instance_lookup_external_ips(opctx, instance_id).await?; + + for entry in external_ips { + external_ip_delete_dpd_config_inner(&datastore, &log, opctx, &entry) .await?; + } - self.instance_ensure_dpd_config( - opctx, - instance_id, - &sled.address(), - None, - ) - .await?; + notify_dendrite_nat_state( + datastore, + log, + resolver, + opctx_alloc, + Some(instance_id), + false, + ) + .await +} + +// The logic of this function should follow very closely what +// `instance_delete_dpd_config` does. However, there are enough differences +// in the mechanics of how the logic is being carried out to justify having +// this separate function, it seems. +pub(crate) async fn probe_delete_dpd_config( + datastore: &DataStore, + log: &slog::Logger, + resolver: &internal_dns::resolver::Resolver, + opctx: &OpContext, + opctx_alloc: &OpContext, + probe_id: Uuid, +) -> Result<(), Error> { + info!(log, "deleting probe dpd configuration"; + "probe_id" => %probe_id); + + let external_ips = + datastore.probe_lookup_external_ips(opctx, probe_id).await?; + + let mut errors = vec![]; + for entry in external_ips { + // Soft delete the NAT entry + match datastore.ipv4_nat_delete_by_external_ip(&opctx, &entry).await { + Ok(_) => Ok(()), + Err(err) => match err { + Error::ObjectNotFound { .. } => { + warn!(log, "no matching nat entries to soft delete"); + Ok(()) + } + _ => { + let message = format!( + "failed to delete nat entry due to error: {err:?}" + ); + error!(log, "{}", message); + Err(Error::internal_error(&message)) + } + }, + }?; + } + + let boundary_switches = boundary_switches(datastore, opctx_alloc).await?; + + for switch in &boundary_switches { + debug!(log, "notifying dendrite of updates"; + "probe_id" => %probe_id, + "switch" => switch.to_string()); + + let dpd_clients = + super::dpd_clients(resolver, log).await.map_err(|e| { + Error::internal_error(&format!( + "unable to get dpd_clients: {e}" + )) + })?; + + let client_result = dpd_clients.get(switch).ok_or_else(|| { + Error::internal_error(&format!( + "unable to find dendrite client for {switch}" + )) + }); + + let dpd_client = match client_result { + Ok(client) => client, + Err(new_error) => { + errors.push(new_error); + continue; + } + }; - Ok(()) + // Notify dendrite that there are changes for it to reconcile. + // In the event of a failure to notify dendrite, we'll log an error + // and rely on dendrite's RPW timer to catch it up. + if let Err(e) = dpd_client.ipv4_nat_trigger_update().await { + error!(log, "failed to notify dendrite of nat updates"; "error" => ?e); + }; + } + + if let Some(e) = errors.into_iter().nth(0) { + return Err(e); + } + + Ok(()) +} + +/// Attempts to soft-delete Dendrite NAT configuration for a specific entry +/// via ID. +/// +/// This function is needed to safely cleanup in at least one unwind scenario +/// where a potential second user could need to use the same (IP, portset) pair, +/// e.g. a rapid reattach or a reallocated ephemeral IP. +pub(crate) async fn delete_dpd_config_by_entry( + datastore: &DataStore, + resolver: &internal_dns::resolver::Resolver, + log: &slog::Logger, + opctx: &OpContext, + opctx_alloc: &OpContext, + nat_entry: &Ipv4NatEntry, +) -> Result<(), Error> { + info!(log, "deleting individual NAT entry from dpd configuration"; + "id" => ?nat_entry.id, + "version_added" => %nat_entry.external_address.0); + + match datastore.ipv4_nat_delete(&opctx, nat_entry).await { + Ok(_) => {} + Err(err) => match err { + Error::ObjectNotFound { .. } => { + warn!(log, "no matching nat entries to soft delete"); + } + _ => { + let message = + format!("failed to delete nat entry due to error: {err:?}"); + error!(log, "{}", message); + return Err(Error::internal_error(&message)); + } + }, + } + + notify_dendrite_nat_state( + datastore, + log, + resolver, + opctx_alloc, + None, + false, + ) + .await +} + +/// Soft-delete an individual external IP from the NAT RPW, without +/// triggering a Dendrite notification. +async fn external_ip_delete_dpd_config_inner( + datastore: &DataStore, + log: &slog::Logger, + opctx: &OpContext, + external_ip: &ExternalIp, +) -> Result<(), Error> { + // Soft delete the NAT entry + match datastore.ipv4_nat_delete_by_external_ip(&opctx, external_ip).await { + Ok(_) => Ok(()), + Err(err) => match err { + Error::ObjectNotFound { .. } => { + warn!(log, "no matching nat entries to soft delete"); + Ok(()) + } + _ => { + let message = + format!("failed to delete nat entry due to error: {err:?}"); + error!(log, "{}", message); + Err(Error::internal_error(&message)) + } + }, + } +} + +/// Informs all available boundary switches that the set of NAT entries +/// has changed. +/// +/// When `fail_fast` is set, this function will return on any error when +/// acquiring a handle to a DPD client. Otherwise, it will attempt to notify +/// all clients and then finally return the first error. +async fn notify_dendrite_nat_state( + datastore: &DataStore, + log: &slog::Logger, + resolver: &internal_dns::resolver::Resolver, + opctx_alloc: &OpContext, + instance_id: Option, + fail_fast: bool, +) -> Result<(), Error> { + // Querying boundary switches also requires fleet access and the use of the + // instance allocator context. + let boundary_switches = boundary_switches(datastore, opctx_alloc).await?; + + let mut errors = vec![]; + for switch in &boundary_switches { + debug!(log, "notifying dendrite of updates"; + "instance_id" => ?instance_id, + "switch" => switch.to_string()); + + let clients = super::dpd_clients(resolver, log).await.map_err(|e| { + Error::internal_error(&format!("failed to get dpd clients: {e}")) + })?; + let client_result = clients.get(switch).ok_or_else(|| { + Error::internal_error(&format!( + "unable to find dendrite client for {switch}" + )) + }); + + let dpd_client = match client_result { + Ok(client) => client, + Err(new_error) => { + errors.push(new_error); + if fail_fast { + break; + } else { + continue; + } + } + }; + + // Notify dendrite that there are changes for it to reconcile. + // In the event of a failure to notify dendrite, we'll log an error + // and rely on dendrite's RPW timer to catch it up. + if let Err(e) = dpd_client.ipv4_nat_trigger_update().await { + error!(log, "failed to notify dendrite of nat updates"; "error" => ?e); + }; + } + + if let Some(e) = errors.into_iter().next() { + return Err(e); + } + + Ok(()) +} + +async fn ensure_nat_entry( + datastore: &DataStore, + target_ip: &nexus_db_model::ExternalIp, + sled_address: Ipv6Net, + network_interface: &NetworkInterface, + mac_address: macaddr::MacAddr6, + opctx: &OpContext, +) -> Result { + match target_ip.ip { + IpNetwork::V4(v4net) => { + let nat_entry = Ipv4NatValues { + external_address: Ipv4Net(v4net).into(), + first_port: target_ip.first_port, + last_port: target_ip.last_port, + sled_address: sled_address.into(), + vni: DbVni(network_interface.vni), + mac: nexus_db_model::MacAddr( + omicron_common::api::external::MacAddr(mac_address), + ), + }; + Ok(datastore.ensure_ipv4_nat_entry(opctx, nat_entry).await?) + } + IpNetwork::V6(_v6net) => { + // TODO: implement handling of v6 nat. + return Err(Error::InternalError { + internal_message: "ipv6 nat is not yet implemented".into(), + }); + } } } diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index 1c7fadea05..5a7d6393c5 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -882,33 +882,16 @@ impl Nexus { pub(crate) async fn dpd_clients( &self, ) -> Result, String> { - let mappings = self.switch_zone_address_mappings().await?; - let clients: HashMap = mappings - .iter() - .map(|(location, addr)| { - let port = DENDRITE_PORT; - - let client_state = dpd_client::ClientState { - tag: String::from("nexus"), - log: self.log.new(o!( - "component" => "DpdClient" - )), - }; - - let dpd_client = dpd_client::Client::new( - &format!("http://[{addr}]:{port}"), - client_state, - ); - (*location, dpd_client) - }) - .collect(); - Ok(clients) + let resolver = self.resolver().await; + dpd_clients(&resolver, &self.log).await } pub(crate) async fn mg_clients( &self, ) -> Result, String> { - let mappings = self.switch_zone_address_mappings().await?; + let resolver = self.resolver().await; + let mappings = + switch_zone_address_mappings(&resolver, &self.log).await?; let mut clients: Vec<(SwitchLocation, mg_admin_client::Client)> = vec![]; for (location, addr) in &mappings { @@ -923,24 +906,6 @@ impl Nexus { } Ok(clients.into_iter().collect::>()) } - - async fn switch_zone_address_mappings( - &self, - ) -> Result, String> { - let switch_zone_addresses = match self - .resolver() - .await - .lookup_all_ipv6(ServiceName::Dendrite) - .await - { - Ok(addrs) => addrs, - Err(e) => { - error!(self.log, "failed to resolve addresses for Dendrite services"; "error" => %e); - return Err(e.to_string()); - } - }; - Ok(map_switch_zone_addrs(&self.log, switch_zone_addresses).await) - } } /// For unimplemented endpoints, indicates whether the resource identified @@ -959,6 +924,50 @@ pub enum Unimpl { ProtectedLookup(Error), } +pub(crate) async fn dpd_clients( + resolver: &internal_dns::resolver::Resolver, + log: &slog::Logger, +) -> Result, String> { + let mappings = switch_zone_address_mappings(resolver, log).await?; + let clients: HashMap = mappings + .iter() + .map(|(location, addr)| { + let port = DENDRITE_PORT; + + let client_state = dpd_client::ClientState { + tag: String::from("nexus"), + log: log.new(o!( + "component" => "DpdClient" + )), + }; + + let dpd_client = dpd_client::Client::new( + &format!("http://[{addr}]:{port}"), + client_state, + ); + (*location, dpd_client) + }) + .collect(); + Ok(clients) +} + +async fn switch_zone_address_mappings( + resolver: &internal_dns::resolver::Resolver, + log: &slog::Logger, +) -> Result, String> { + let switch_zone_addresses = match resolver + .lookup_all_ipv6(ServiceName::Dendrite) + .await + { + Ok(addrs) => addrs, + Err(e) => { + error!(log, "failed to resolve addresses for Dendrite services"; "error" => %e); + return Err(e.to_string()); + } + }; + Ok(map_switch_zone_addrs(&log, switch_zone_addresses).await) +} + // TODO: #3596 Allow updating of Nexus from `handoff_to_nexus()` // This logic is duplicated from RSS // RSS needs to know which addresses are managing which slots, and so does Nexus, diff --git a/nexus/src/app/oximeter.rs b/nexus/src/app/oximeter.rs index 8b204392eb..92ea3c0a0d 100644 --- a/nexus/src/app/oximeter.rs +++ b/nexus/src/app/oximeter.rs @@ -11,6 +11,7 @@ use internal_dns::resolver::{ResolveError, Resolver}; use internal_dns::ServiceName; use nexus_db_queries::context::OpContext; use nexus_db_queries::db; +use nexus_db_queries::db::DataStore; use omicron_common::address::CLICKHOUSE_PORT; use omicron_common::api::external::Error; use omicron_common::api::external::{DataPageParams, ListResultVec}; @@ -162,59 +163,6 @@ impl super::Nexus { Ok(()) } - /// Idempotently un-assign a producer from an oximeter collector. - pub(crate) async fn unassign_producer( - &self, - opctx: &OpContext, - id: &Uuid, - ) -> Result<(), Error> { - if let Some(collector_id) = - self.db_datastore.producer_endpoint_delete(opctx, id).await? - { - debug!( - self.log, - "deleted metric producer assignment"; - "producer_id" => %id, - "collector_id" => %collector_id, - ); - let oximeter_info = - self.db_datastore.oximeter_lookup(opctx, &collector_id).await?; - let address = - SocketAddr::new(oximeter_info.ip.ip(), *oximeter_info.port); - let client = self.build_oximeter_client(&id, address); - if let Err(e) = client.producer_delete(&id).await { - error!( - self.log, - "failed to delete producer from collector"; - "producer_id" => %id, - "collector_id" => %collector_id, - "address" => %address, - "error" => ?e, - ); - return Err(Error::internal_error( - format!("failed to delete producer from collector: {e:?}") - .as_str(), - )); - } else { - debug!( - self.log, - "successfully deleted producer from collector"; - "producer_id" => %id, - "collector_id" => %collector_id, - "address" => %address, - ); - Ok(()) - } - } else { - trace!( - self.log, - "un-assigned non-existent metric producer"; - "producer_id" => %id, - ); - Ok(()) - } - } - /// Returns a results from the timeseries DB based on the provided query /// parameters. /// @@ -324,25 +272,6 @@ impl super::Nexus { .unwrap()) } - // Internal helper to build an Oximeter client from its ID and address (common data between - // model type and the API type). - fn build_oximeter_client( - &self, - id: &Uuid, - address: SocketAddr, - ) -> OximeterClient { - let client_log = - self.log.new(o!("oximeter-collector" => id.to_string())); - let client = - OximeterClient::new(&format!("http://{}", address), client_log); - info!( - self.log, - "registered oximeter collector client"; - "id" => id.to_string(), - ); - client - } - // Return an oximeter collector to assign a newly-registered producer async fn next_collector( &self, @@ -361,7 +290,61 @@ impl super::Nexus { let address = SocketAddr::from((info.ip.ip(), info.port.try_into().unwrap())); let id = info.id; - Ok((self.build_oximeter_client(&id, address), id)) + Ok((build_oximeter_client(&self.log, &id, address), id)) + } +} + +/// Idempotently un-assign a producer from an oximeter collector. +pub(crate) async fn unassign_producer( + datastore: &DataStore, + log: &slog::Logger, + opctx: &OpContext, + id: &Uuid, +) -> Result<(), Error> { + if let Some(collector_id) = + datastore.producer_endpoint_delete(opctx, id).await? + { + debug!( + log, + "deleted metric producer assignment"; + "producer_id" => %id, + "collector_id" => %collector_id, + ); + let oximeter_info = + datastore.oximeter_lookup(opctx, &collector_id).await?; + let address = + SocketAddr::new(oximeter_info.ip.ip(), *oximeter_info.port); + let client = build_oximeter_client(&log, &id, address); + if let Err(e) = client.producer_delete(&id).await { + error!( + log, + "failed to delete producer from collector"; + "producer_id" => %id, + "collector_id" => %collector_id, + "address" => %address, + "error" => ?e, + ); + return Err(Error::internal_error( + format!("failed to delete producer from collector: {e:?}") + .as_str(), + )); + } else { + debug!( + log, + "successfully deleted producer from collector"; + "producer_id" => %id, + "collector_id" => %collector_id, + "address" => %address, + ); + Ok(()) + } + } else { + trace!( + log, + "un-assigned non-existent metric producer"; + "producer_id" => %id, + ); + Ok(()) } } @@ -373,3 +356,21 @@ fn map_oximeter_err(error: oximeter_db::Error) -> Error { _ => Error::InternalError { internal_message: error.to_string() }, } } + +// Internal helper to build an Oximeter client from its ID and address (common data between +// model type and the API type). +fn build_oximeter_client( + log: &slog::Logger, + id: &Uuid, + address: SocketAddr, +) -> OximeterClient { + let client_log = log.new(o!("oximeter-collector" => id.to_string())); + let client = + OximeterClient::new(&format!("http://{}", address), client_log); + info!( + log, + "registered oximeter collector client"; + "id" => id.to_string(), + ); + client +} diff --git a/nexus/src/app/switch_port.rs b/nexus/src/app/switch_port.rs index c7d5272ae1..e0c303f277 100644 --- a/nexus/src/app/switch_port.rs +++ b/nexus/src/app/switch_port.rs @@ -18,6 +18,7 @@ use nexus_db_queries::context::OpContext; use nexus_db_queries::db; use nexus_db_queries::db::datastore::UpdatePrecondition; use nexus_db_queries::db::model::{SwitchPort, SwitchPortSettings}; +use nexus_db_queries::db::DataStore; use nexus_types::identity::Resource; use omicron_common::api::external::http_pagination::PaginatedBy; use omicron_common::api::external::{ @@ -168,14 +169,6 @@ impl super::Nexus { self.db_datastore.switch_port_list(opctx, pagparams).await } - pub(crate) async fn list_switch_ports_with_uplinks( - &self, - opctx: &OpContext, - ) -> ListResultVec { - opctx.authorize(authz::Action::Read, &authz::FLEET).await?; - self.db_datastore.switch_ports_with_uplinks(opctx).await - } - pub(crate) async fn set_switch_port_settings_id( &self, opctx: &OpContext, @@ -297,3 +290,11 @@ impl super::Nexus { Ok(()) } } + +pub(crate) async fn list_switch_ports_with_uplinks( + datastore: &DataStore, + opctx: &OpContext, +) -> ListResultVec { + opctx.authorize(authz::Action::Read, &authz::FLEET).await?; + datastore.switch_ports_with_uplinks(opctx).await +}