From fdbbbe54cc706ff92c1a327d20ab40febfa623a6 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Tue, 12 Mar 2024 17:00:01 -0400 Subject: [PATCH] Blueprint execution: Send OPTE firewall rules for services (#5233) This is the reworked and caught-up-with-main #5157 (which I'll close momentarily). I believe I addressed all the comments on the PR except those related to its extension trait style and the `nexus-capabilities` crate. This introduces a more focused `nexus-networking` crate that exposes free functions instead of extension traits. I'll run this through madrid before merging, but I believe it should work (absent any mistakes I made merging changes across branches) now that #5202 has landed. Closes #4886 --- Cargo.lock | 17 + Cargo.toml | 3 + nexus/Cargo.toml | 1 + nexus/networking/Cargo.toml | 16 + nexus/networking/src/firewall_rules.rs | 441 ++++++++++++++++++ nexus/networking/src/lib.rs | 12 + nexus/networking/src/sled_client.rs | 53 +++ nexus/reconfigurator/execution/Cargo.toml | 1 + nexus/reconfigurator/execution/src/lib.rs | 21 +- .../execution/src/omicron_zones.rs | 26 +- .../src/app/background/blueprint_execution.rs | 5 +- nexus/src/app/sled.rs | 41 +- nexus/src/app/vpc.rs | 412 +--------------- 13 files changed, 611 insertions(+), 438 deletions(-) create mode 100644 nexus/networking/Cargo.toml create mode 100644 nexus/networking/src/firewall_rules.rs create mode 100644 nexus/networking/src/lib.rs create mode 100644 nexus/networking/src/sled_client.rs diff --git a/Cargo.lock b/Cargo.lock index 5900cf3dec..e50918bba5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4562,6 +4562,21 @@ dependencies = [ "syn 2.0.51", ] +[[package]] +name = "nexus-networking" +version = "0.1.0" +dependencies = [ + "futures", + "ipnetwork", + "nexus-db-queries", + "omicron-common", + "omicron-workspace-hack", + "reqwest", + "sled-agent-client", + "slog", + "uuid", +] + [[package]] name = "nexus-reconfigurator-execution" version = "0.1.0" @@ -4578,6 +4593,7 @@ dependencies = [ "nexus-db-model", "nexus-db-queries", "nexus-inventory", + "nexus-networking", "nexus-reconfigurator-planning", "nexus-test-utils", "nexus-test-utils-macros", @@ -5176,6 +5192,7 @@ dependencies = [ "nexus-db-queries", "nexus-defaults", "nexus-inventory", + "nexus-networking", "nexus-reconfigurator-execution", "nexus-reconfigurator-planning", "nexus-reconfigurator-preparation", diff --git a/Cargo.toml b/Cargo.toml index f291d9642a..45c45c9d56 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ members = [ "nexus/defaults", "nexus/inventory", "nexus/macros-common", + "nexus/networking", "nexus/reconfigurator/execution", "nexus/reconfigurator/planning", "nexus/reconfigurator/preparation", @@ -120,6 +121,7 @@ default-members = [ "nexus-config", "nexus/authz-macros", "nexus/macros-common", + "nexus/networking", "nexus/db-macros", "nexus/db-model", "nexus/db-queries", @@ -272,6 +274,7 @@ nexus-db-queries = { path = "nexus/db-queries" } nexus-defaults = { path = "nexus/defaults" } nexus-inventory = { path = "nexus/inventory" } nexus-macros-common = { path = "nexus/macros-common" } +nexus-networking = { path = "nexus/networking" } nexus-reconfigurator-execution = { path = "nexus/reconfigurator/execution" } nexus-reconfigurator-planning = { path = "nexus/reconfigurator/planning" } nexus-reconfigurator-preparation = { path = "nexus/reconfigurator/preparation" } diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index 0712ba6743..2b8845f6f5 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -41,6 +41,7 @@ mime_guess.workspace = true # Not under "dev-dependencies"; these also need to be implemented for # integration tests. nexus-config.workspace = true +nexus-networking.workspace = true nexus-test-interface.workspace = true num-integer.workspace = true once_cell.workspace = true diff --git a/nexus/networking/Cargo.toml b/nexus/networking/Cargo.toml new file mode 100644 index 0000000000..11f6d83993 --- /dev/null +++ b/nexus/networking/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "nexus-networking" +version = "0.1.0" +edition = "2021" +license = "MPL-2.0" + +[dependencies] +futures.workspace = true +ipnetwork.workspace = true +nexus-db-queries.workspace = true +omicron-common.workspace = true +reqwest.workspace = true +sled-agent-client.workspace = true +slog.workspace = true +uuid.workspace = true +omicron-workspace-hack.workspace = true diff --git a/nexus/networking/src/firewall_rules.rs b/nexus/networking/src/firewall_rules.rs new file mode 100644 index 0000000000..85c457d522 --- /dev/null +++ b/nexus/networking/src/firewall_rules.rs @@ -0,0 +1,441 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Functionality related to firewall rules. + +use futures::future::join_all; +use ipnetwork::IpNetwork; +use nexus_db_queries::authz; +use nexus_db_queries::context::OpContext; +use nexus_db_queries::db; +use nexus_db_queries::db::identity::Asset; +use nexus_db_queries::db::identity::Resource; +use nexus_db_queries::db::lookup; +use nexus_db_queries::db::lookup::LookupPath; +use nexus_db_queries::db::model::Name; +use nexus_db_queries::db::DataStore; +use omicron_common::api::external; +use omicron_common::api::external::Error; +use omicron_common::api::external::IpNet; +use omicron_common::api::external::ListResultVec; +use omicron_common::api::internal::nexus::HostIdentifier; +use omicron_common::api::internal::shared::NetworkInterface; +use slog::debug; +use slog::info; +use slog::warn; +use slog::Logger; +use std::collections::{HashMap, HashSet}; +use std::net::IpAddr; +use uuid::Uuid; + +pub async fn vpc_list_firewall_rules( + datastore: &DataStore, + opctx: &OpContext, + vpc_lookup: &lookup::Vpc<'_>, +) -> ListResultVec { + let (.., authz_vpc) = vpc_lookup.lookup_for(authz::Action::Read).await?; + let rules = datastore.vpc_list_firewall_rules(&opctx, &authz_vpc).await?; + Ok(rules) +} + +pub async fn resolve_firewall_rules_for_sled_agent( + datastore: &DataStore, + opctx: &OpContext, + vpc: &db::model::Vpc, + rules: &[db::model::VpcFirewallRule], + log: &Logger, +) -> Result, Error> { + // Collect the names of instances, subnets, and VPCs that are either + // targets or host filters. We have to find the sleds for all the + // targets, and we'll need information about the IP addresses or + // subnets for things that are specified as host filters as well. + let mut instances: HashSet = HashSet::new(); + let mut subnets: HashSet = HashSet::new(); + let mut vpcs: HashSet = HashSet::new(); + for rule in rules { + for target in &rule.targets { + match &target.0 { + external::VpcFirewallRuleTarget::Instance(name) => { + instances.insert(name.clone().into()); + } + external::VpcFirewallRuleTarget::Subnet(name) => { + subnets.insert(name.clone().into()); + } + external::VpcFirewallRuleTarget::Vpc(name) => { + if name != vpc.name() { + return Err(Error::invalid_request( + "cross-VPC firewall target unsupported", + )); + } + vpcs.insert(name.clone().into()); + } + external::VpcFirewallRuleTarget::Ip(_) + | external::VpcFirewallRuleTarget::IpNet(_) => { + vpcs.insert(vpc.name().clone().into()); + } + } + } + + for host in rule.filter_hosts.iter().flatten() { + match &host.0 { + external::VpcFirewallRuleHostFilter::Instance(name) => { + instances.insert(name.clone().into()); + } + external::VpcFirewallRuleHostFilter::Subnet(name) => { + subnets.insert(name.clone().into()); + } + external::VpcFirewallRuleHostFilter::Vpc(name) => { + if name != vpc.name() { + return Err(Error::invalid_request( + "cross-VPC firewall host filter unsupported", + )); + } + vpcs.insert(name.clone().into()); + } + // We don't need to resolve anything for Ip(Net)s. + external::VpcFirewallRuleHostFilter::Ip(_) => (), + external::VpcFirewallRuleHostFilter::IpNet(_) => (), + } + } + } + + // Resolve named instances, VPCs, and subnets. + // TODO-correctness: It's possible the resolving queries produce + // inconsistent results due to concurrent changes. They should be + // transactional. + type NetMap = HashMap>; + type NicMap = HashMap>; + let no_networks: Vec = Vec::new(); + let no_interfaces: Vec = Vec::new(); + + let mut instance_interfaces: NicMap = HashMap::new(); + for instance_name in &instances { + if let Ok((.., authz_instance)) = LookupPath::new(opctx, datastore) + .project_id(vpc.project_id) + .instance_name(instance_name) + .lookup_for(authz::Action::ListChildren) + .await + { + for iface in datastore + .derive_guest_network_interface_info(opctx, &authz_instance) + .await? + { + instance_interfaces + .entry(instance_name.0.clone()) + .or_insert_with(Vec::new) + .push(iface); + } + } + } + + let mut vpc_interfaces: NicMap = HashMap::new(); + for vpc_name in &vpcs { + if let Ok((.., authz_vpc)) = LookupPath::new(opctx, datastore) + .project_id(vpc.project_id) + .vpc_name(vpc_name) + .lookup_for(authz::Action::ListChildren) + .await + { + for iface in datastore + .derive_vpc_network_interface_info(opctx, &authz_vpc) + .await? + { + vpc_interfaces + .entry(vpc_name.0.clone()) + .or_insert_with(Vec::new) + .push(iface); + } + } + } + + let mut subnet_interfaces: NicMap = HashMap::new(); + for subnet_name in &subnets { + if let Ok((.., authz_subnet)) = LookupPath::new(opctx, datastore) + .project_id(vpc.project_id) + .vpc_name(&Name::from(vpc.name().clone())) + .vpc_subnet_name(subnet_name) + .lookup_for(authz::Action::ListChildren) + .await + { + for iface in datastore + .derive_subnet_network_interface_info(opctx, &authz_subnet) + .await? + { + subnet_interfaces + .entry(subnet_name.0.clone()) + .or_insert_with(Vec::new) + .push(iface); + } + } + } + + let subnet_networks: NetMap = datastore + .resolve_vpc_subnets_to_ip_networks(vpc, subnets) + .await? + .into_iter() + .map(|(name, v)| (name.0, v)) + .collect(); + + debug!( + log, + "resolved names for firewall rules"; + "instance_interfaces" => ?instance_interfaces, + "vpc_interfaces" => ?vpc_interfaces, + "subnet_interfaces" => ?subnet_interfaces, + "subnet_networks" => ?subnet_networks, + ); + + // Compile resolved rules for the sled agents. + let mut sled_agent_rules = Vec::with_capacity(rules.len()); + for rule in rules { + // TODO: what is the correct behavior when a name is not found? + // Options: + // (1) Fail update request (though note this can still arise + // from things like instance deletion) + // (2) Allow update request, ignore this rule (but store it + // in case it becomes valid later). This is consistent + // with the semantics of the rules. Rules with bad + // references should likely at least be flagged to users. + // We currently adopt option (2), as this allows users to add + // firewall rules (including default rules) before instances + // and their interfaces are instantiated. + + // Collect unique network interface targets. + // This would be easier if `NetworkInterface` were `Hash`, + // but that's not easy because it's a generated type. We + // use the pair (VNI, MAC) as a unique interface identifier. + let mut nics = HashSet::new(); + let mut targets = Vec::with_capacity(rule.targets.len()); + let mut push_target_nic = |nic: &NetworkInterface| { + if nics.insert((nic.vni, *nic.mac)) { + targets.push(nic.clone()); + } + }; + for target in &rule.targets { + match &target.0 { + external::VpcFirewallRuleTarget::Vpc(name) => { + vpc_interfaces + .get(&name) + .unwrap_or(&no_interfaces) + .iter() + .for_each(&mut push_target_nic); + } + external::VpcFirewallRuleTarget::Subnet(name) => { + subnet_interfaces + .get(&name) + .unwrap_or(&no_interfaces) + .iter() + .for_each(&mut push_target_nic); + } + external::VpcFirewallRuleTarget::Instance(name) => { + instance_interfaces + .get(&name) + .unwrap_or(&no_interfaces) + .iter() + .for_each(&mut push_target_nic); + } + external::VpcFirewallRuleTarget::Ip(addr) => { + vpc_interfaces + .get(vpc.name()) + .unwrap_or(&no_interfaces) + .iter() + .filter(|nic| nic.ip == *addr) + .for_each(&mut push_target_nic); + } + external::VpcFirewallRuleTarget::IpNet(net) => { + vpc_interfaces + .get(vpc.name()) + .unwrap_or(&no_interfaces) + .iter() + .filter(|nic| match (net, nic.ip) { + (IpNet::V4(net), IpAddr::V4(ip)) => { + net.contains(ip) + } + (IpNet::V6(net), IpAddr::V6(ip)) => { + net.contains(ip) + } + (_, _) => false, + }) + .for_each(&mut push_target_nic); + } + } + } + if !rule.targets.is_empty() && targets.is_empty() { + // Target not found; skip this rule. + continue; + } + + let filter_hosts = match &rule.filter_hosts { + None => None, + Some(hosts) => { + let mut host_addrs = Vec::with_capacity(hosts.len()); + for host in hosts { + match &host.0 { + external::VpcFirewallRuleHostFilter::Instance(name) => { + for interface in instance_interfaces + .get(&name) + .unwrap_or(&no_interfaces) + { + host_addrs.push( + HostIdentifier::Ip(IpNet::from( + interface.ip, + )) + .into(), + ) + } + } + external::VpcFirewallRuleHostFilter::Subnet(name) => { + for subnet in subnet_networks + .get(&name) + .unwrap_or(&no_networks) + { + host_addrs.push( + HostIdentifier::Ip(IpNet::from(*subnet)) + .into(), + ); + } + } + external::VpcFirewallRuleHostFilter::Ip(addr) => { + host_addrs.push( + HostIdentifier::Ip(IpNet::from(*addr)).into(), + ) + } + external::VpcFirewallRuleHostFilter::IpNet(net) => { + host_addrs.push(HostIdentifier::Ip(*net).into()) + } + external::VpcFirewallRuleHostFilter::Vpc(name) => { + for interface in vpc_interfaces + .get(&name) + .unwrap_or(&no_interfaces) + { + host_addrs.push( + HostIdentifier::Vpc(interface.vni).into(), + ) + } + } + } + } + if !hosts.is_empty() && host_addrs.is_empty() { + // Filter host not found; skip this rule. + continue; + } + Some(host_addrs) + } + }; + + let filter_ports = rule + .filter_ports + .as_ref() + .map(|ports| ports.iter().map(|v| v.0.into()).collect()); + + let filter_protocols = rule + .filter_protocols + .as_ref() + .map(|protocols| protocols.iter().map(|v| v.0.into()).collect()); + + sled_agent_rules.push(sled_agent_client::types::VpcFirewallRule { + status: rule.status.0.into(), + direction: rule.direction.0.into(), + targets, + filter_hosts, + filter_ports, + filter_protocols, + action: rule.action.0.into(), + priority: rule.priority.0 .0, + }); + } + debug!( + log, + "resolved firewall rules for sled agents"; + "sled_agent_rules" => ?sled_agent_rules, + ); + + Ok(sled_agent_rules) +} + +pub async fn send_sled_agents_firewall_rules( + datastore: &DataStore, + opctx: &OpContext, + vpc: &db::model::Vpc, + rules: &[db::model::VpcFirewallRule], + sleds_filter: &[Uuid], + sled_lookup_opctx: &OpContext, + log: &Logger, +) -> Result<(), Error> { + let rules_for_sled = resolve_firewall_rules_for_sled_agent( + datastore, opctx, &vpc, rules, log, + ) + .await?; + debug!(log, "resolved {} rules for sleds", rules_for_sled.len()); + let sled_rules_request = + sled_agent_client::types::VpcFirewallRulesEnsureBody { + vni: vpc.vni.0, + rules: rules_for_sled, + }; + + let vpc_to_sleds = + datastore.vpc_resolve_to_sleds(vpc.id(), sleds_filter).await?; + debug!( + log, "resolved sleds for vpc {}", vpc.name(); + "vpc_to_sled" => ?vpc_to_sleds, + ); + + let mut sled_requests = Vec::with_capacity(vpc_to_sleds.len()); + for sled in &vpc_to_sleds { + let sled_id = sled.id(); + let vpc_id = vpc.id(); + let sled_rules_request = sled_rules_request.clone(); + sled_requests.push(async move { + crate::sled_client(datastore, sled_lookup_opctx, sled_id, log) + .await? + .vpc_firewall_rules_put(&vpc_id, &sled_rules_request) + .await + .map_err(|e| Error::internal_error(&e.to_string())) + }); + } + + debug!(log, "sending firewall rules to sled agents"); + let results = join_all(sled_requests).await; + // TODO-correctness: handle more than one failure in the sled-agent requests + // https://github.com/oxidecomputer/omicron/issues/1791 + for (sled, result) in vpc_to_sleds.iter().zip(results) { + if let Err(e) = result { + warn!(log, "failed to update firewall rules on sled agent"; + "sled_id" => %sled.id(), + "vpc_id" => %vpc.id(), + "error" => %e); + return Err(e); + } + } + info!(log, "updated firewall rules on {} sleds", vpc_to_sleds.len()); + + Ok(()) +} + +/// Ensure firewall rules for internal services get reflected on all the +/// relevant sleds. +pub async fn plumb_service_firewall_rules( + datastore: &DataStore, + opctx: &OpContext, + sleds_filter: &[Uuid], + sled_lookup_opctx: &OpContext, + log: &Logger, +) -> Result<(), Error> { + let svcs_vpc = LookupPath::new(opctx, datastore) + .vpc_id(*db::fixed_data::vpc::SERVICES_VPC_ID); + let svcs_fw_rules = + vpc_list_firewall_rules(datastore, opctx, &svcs_vpc).await?; + let (_, _, _, svcs_vpc) = svcs_vpc.fetch().await?; + send_sled_agents_firewall_rules( + datastore, + opctx, + &svcs_vpc, + &svcs_fw_rules, + sleds_filter, + sled_lookup_opctx, + log, + ) + .await?; + Ok(()) +} diff --git a/nexus/networking/src/lib.rs b/nexus/networking/src/lib.rs new file mode 100644 index 0000000000..98278a5c9c --- /dev/null +++ b/nexus/networking/src/lib.rs @@ -0,0 +1,12 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Networking functionality shared between Nexus proper and its background +//! tasks or sagas. + +mod firewall_rules; +mod sled_client; + +pub use firewall_rules::*; +pub use sled_client::*; diff --git a/nexus/networking/src/sled_client.rs b/nexus/networking/src/sled_client.rs new file mode 100644 index 0000000000..073f073356 --- /dev/null +++ b/nexus/networking/src/sled_client.rs @@ -0,0 +1,53 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Functionality for constructing sled-agent clients. + +use nexus_db_queries::context::OpContext; +use nexus_db_queries::db::lookup; +use nexus_db_queries::db::lookup::LookupPath; +use nexus_db_queries::db::DataStore; +use omicron_common::api::external::Error; +use omicron_common::api::external::LookupResult; +use sled_agent_client::Client as SledAgentClient; +use slog::o; +use slog::Logger; +use std::net::SocketAddrV6; +use uuid::Uuid; + +pub fn sled_lookup<'a>( + datastore: &'a DataStore, + opctx: &'a OpContext, + sled_id: Uuid, +) -> LookupResult> { + let sled = LookupPath::new(opctx, datastore).sled_id(sled_id); + Ok(sled) +} + +pub async fn sled_client( + datastore: &DataStore, + lookup_opctx: &OpContext, + sled_id: Uuid, + log: &Logger, +) -> Result { + let (.., sled) = + sled_lookup(datastore, lookup_opctx, sled_id)?.fetch().await?; + + Ok(sled_client_from_address(sled_id, sled.address(), log)) +} + +pub fn sled_client_from_address( + sled_id: Uuid, + address: SocketAddrV6, + log: &Logger, +) -> SledAgentClient { + let log = log.new(o!("SledAgent" => sled_id.to_string())); + let dur = std::time::Duration::from_secs(60); + let client = reqwest::ClientBuilder::new() + .connect_timeout(dur) + .timeout(dur) + .build() + .unwrap(); + SledAgentClient::new_with_client(&format!("http://{address}"), client, log) +} diff --git a/nexus/reconfigurator/execution/Cargo.toml b/nexus/reconfigurator/execution/Cargo.toml index 62155d9783..d48f4c6b5b 100644 --- a/nexus/reconfigurator/execution/Cargo.toml +++ b/nexus/reconfigurator/execution/Cargo.toml @@ -15,6 +15,7 @@ internal-dns.workspace = true nexus-config.workspace = true nexus-db-model.workspace = true nexus-db-queries.workspace = true +nexus-networking.workspace = true nexus-types.workspace = true omicron-common.workspace = true reqwest.workspace = true diff --git a/nexus/reconfigurator/execution/src/lib.rs b/nexus/reconfigurator/execution/src/lib.rs index 74b4764d7a..386e9dc1c8 100644 --- a/nexus/reconfigurator/execution/src/lib.rs +++ b/nexus/reconfigurator/execution/src/lib.rs @@ -68,7 +68,7 @@ where info!( opctx.log, "attempting to realize blueprint"; - "blueprint_id" => ?blueprint.id + "blueprint_id" => %blueprint.id ); resource_allocation::ensure_zone_resources_allocated( @@ -90,6 +90,25 @@ where omicron_zones::deploy_zones(&opctx, &sleds_by_id, &blueprint.omicron_zones) .await?; + // After deploying omicron zones, we may need to refresh OPTE service + // firewall rules. This is an idempotent operation, so we don't attempt + // to optimize out calling it in unnecessary cases, although it is only + // needed in cases where we've changed the set of services on one or more + // sleds, or the sleds have lost their firewall rules for some reason. + // Fixing the latter case is a side effect and should really be handled by a + // firewall-rule-specific RPW; once that RPW exists, we could trigger it + // here instead of pluming firewall rules ourselves. + nexus_networking::plumb_service_firewall_rules( + datastore, + &opctx, + &[], + &opctx, + &opctx.log, + ) + .await + .context("failed to plumb service firewall rules to sleds") + .map_err(|err| vec![err])?; + datasets::ensure_crucible_dataset_records_exist( &opctx, datastore, diff --git a/nexus/reconfigurator/execution/src/omicron_zones.rs b/nexus/reconfigurator/execution/src/omicron_zones.rs index 1d5c4444b1..336e40df16 100644 --- a/nexus/reconfigurator/execution/src/omicron_zones.rs +++ b/nexus/reconfigurator/execution/src/omicron_zones.rs @@ -11,7 +11,6 @@ use futures::stream; use futures::StreamExt; use nexus_db_queries::context::OpContext; use nexus_types::deployment::OmicronZonesConfig; -use sled_agent_client::Client as SledAgentClient; use slog::info; use slog::warn; use std::collections::BTreeMap; @@ -34,7 +33,11 @@ pub(crate) async fn deploy_zones( return Some(err); } }; - let client = sled_client(opctx, &db_sled); + let client = nexus_networking::sled_client_from_address( + *sled_id, + db_sled.sled_agent_address, + &opctx.log, + ); let result = client.omicron_zones_put(&config).await.with_context(|| { format!("Failed to put {config:#?} to sled {sled_id}") @@ -65,25 +68,6 @@ pub(crate) async fn deploy_zones( } } -// This is a modified copy of the functionality from `nexus/src/app/sled.rs`. -// There's no good way to access this functionality right now since it is a -// method on the `Nexus` type. We want to have a more constrained type we can -// pass into background tasks for this type of functionality, but for now we -// just copy the functionality. -fn sled_client(opctx: &OpContext, sled: &Sled) -> SledAgentClient { - let dur = std::time::Duration::from_secs(60); - let client = reqwest::ClientBuilder::new() - .connect_timeout(dur) - .timeout(dur) - .build() - .unwrap(); - SledAgentClient::new_with_client( - &format!("http://{}", sled.sled_agent_address), - client, - opctx.log.clone(), - ) -} - #[cfg(test)] mod test { use super::deploy_zones; diff --git a/nexus/src/app/background/blueprint_execution.rs b/nexus/src/app/background/blueprint_execution.rs index ceb3cd7a05..7024f8be75 100644 --- a/nexus/src/app/background/blueprint_execution.rs +++ b/nexus/src/app/background/blueprint_execution.rs @@ -109,6 +109,7 @@ mod test { use nexus_db_model::{ ByteCount, SledBaseboard, SledSystemHardware, SledUpdate, }; + use nexus_db_queries::authn; use nexus_db_queries::context::OpContext; use nexus_test_utils_macros::nexus_test; use nexus_types::deployment::OmicronZonesConfig; @@ -158,8 +159,10 @@ mod test { // Set up the test. let nexus = &cptestctx.server.apictx().nexus; let datastore = nexus.datastore(); - let opctx = OpContext::for_tests( + let opctx = OpContext::for_background( cptestctx.logctx.log.clone(), + nexus.authz.clone(), + authn::Context::internal_api(), datastore.clone(), ); diff --git a/nexus/src/app/sled.rs b/nexus/src/app/sled.rs index 88f70c7d0d..aad6e1122f 100644 --- a/nexus/src/app/sled.rs +++ b/nexus/src/app/sled.rs @@ -31,8 +31,7 @@ impl super::Nexus { opctx: &'a OpContext, sled_id: &Uuid, ) -> LookupResult> { - let sled = LookupPath::new(opctx, &self.db_datastore).sled_id(*sled_id); - Ok(sled) + nexus_networking::sled_lookup(&self.db_datastore, opctx, *sled_id) } // TODO-robustness we should have a limit on how many sled agents there can @@ -103,21 +102,14 @@ impl super::Nexus { // Frankly, returning an "Arc" here without a connection pool is a // little silly; it's not actually used if each client connection exists // as a one-shot. - let (.., sled) = - self.sled_lookup(&self.opctx_alloc, id)?.fetch().await?; - - let log = self.log.new(o!("SledAgent" => id.clone().to_string())); - let dur = std::time::Duration::from_secs(60); - let client = reqwest::ClientBuilder::new() - .connect_timeout(dur) - .timeout(dur) - .build() - .unwrap(); - Ok(Arc::new(SledAgentClient::new_with_client( - &format!("http://{}", sled.address()), - client, - log, - ))) + let client = nexus_networking::sled_client( + &self.db_datastore, + &self.opctx_alloc, + *id, + &self.log, + ) + .await?; + Ok(Arc::new(client)) } pub(crate) async fn reserve_on_random_sled( @@ -286,18 +278,13 @@ impl super::Nexus { opctx: &OpContext, sleds_filter: &[Uuid], ) -> Result<(), Error> { - let svcs_vpc = LookupPath::new(opctx, &self.db_datastore) - .vpc_id(*db::fixed_data::vpc::SERVICES_VPC_ID); - let svcs_fw_rules = - self.vpc_list_firewall_rules(opctx, &svcs_vpc).await?; - let (_, _, _, svcs_vpc) = svcs_vpc.fetch().await?; - self.send_sled_agents_firewall_rules( + nexus_networking::plumb_service_firewall_rules( + &self.db_datastore, opctx, - &svcs_vpc, - &svcs_fw_rules, sleds_filter, + &self.opctx_alloc, + &self.log, ) - .await?; - Ok(()) + .await } } diff --git a/nexus/src/app/vpc.rs b/nexus/src/app/vpc.rs index 44b676b853..0950e65b83 100644 --- a/nexus/src/app/vpc.rs +++ b/nexus/src/app/vpc.rs @@ -10,8 +10,6 @@ use nexus_db_queries::authn; use nexus_db_queries::authz; use nexus_db_queries::context::OpContext; use nexus_db_queries::db; -use nexus_db_queries::db::identity::Asset; -use nexus_db_queries::db::identity::Resource; use nexus_db_queries::db::lookup; use nexus_db_queries::db::lookup::LookupPath; use nexus_db_queries::db::model::Name; @@ -22,20 +20,12 @@ use omicron_common::api::external::CreateResult; use omicron_common::api::external::DeleteResult; use omicron_common::api::external::Error; use omicron_common::api::external::InternalContext; -use omicron_common::api::external::IpNet; use omicron_common::api::external::ListResultVec; use omicron_common::api::external::LookupResult; use omicron_common::api::external::LookupType; use omicron_common::api::external::NameOrId; use omicron_common::api::external::UpdateResult; use omicron_common::api::external::VpcFirewallRuleUpdateParams; -use omicron_common::api::internal::nexus::HostIdentifier; - -use futures::future::join_all; -use ipnetwork::IpNetwork; -use omicron_common::api::internal::shared::NetworkInterface; -use std::collections::{HashMap, HashSet}; -use std::net::IpAddr; use std::sync::Arc; use uuid::Uuid; @@ -170,13 +160,12 @@ impl super::Nexus { opctx: &OpContext, vpc_lookup: &lookup::Vpc<'_>, ) -> ListResultVec { - let (.., authz_vpc) = - vpc_lookup.lookup_for(authz::Action::Read).await?; - let rules = self - .db_datastore - .vpc_list_firewall_rules(&opctx, &authz_vpc) - .await?; - Ok(rules) + nexus_networking::vpc_list_firewall_rules( + &self.db_datastore, + opctx, + vpc_lookup, + ) + .await } pub(crate) async fn vpc_update_firewall_rules( @@ -252,56 +241,16 @@ impl super::Nexus { rules: &[db::model::VpcFirewallRule], sleds_filter: &[Uuid], ) -> Result<(), Error> { - let rules_for_sled = self - .resolve_firewall_rules_for_sled_agent(opctx, &vpc, rules) - .await?; - debug!(self.log, "resolved {} rules for sleds", rules_for_sled.len()); - let sled_rules_request = - sled_agent_client::types::VpcFirewallRulesEnsureBody { - vni: vpc.vni.0, - rules: rules_for_sled, - }; - - let vpc_to_sleds = self - .db_datastore - .vpc_resolve_to_sleds(vpc.id(), sleds_filter) - .await?; - debug!(self.log, "resolved sleds for vpc {}", vpc.name(); "vpc_to_sled" => ?vpc_to_sleds); - - let mut sled_requests = Vec::with_capacity(vpc_to_sleds.len()); - for sled in &vpc_to_sleds { - let sled_id = sled.id(); - let vpc_id = vpc.id(); - let sled_rules_request = sled_rules_request.clone(); - sled_requests.push(async move { - self.sled_client(&sled_id) - .await? - .vpc_firewall_rules_put(&vpc_id, &sled_rules_request) - .await - .map_err(|e| Error::internal_error(&e.to_string())) - }); - } - - debug!(self.log, "sending firewall rules to sled agents"); - let results = join_all(sled_requests).await; - // TODO-correctness: handle more than one failure in the sled-agent requests - // https://github.com/oxidecomputer/omicron/issues/1791 - for (sled, result) in vpc_to_sleds.iter().zip(results) { - if let Err(e) = result { - warn!(self.log, "failed to update firewall rules on sled agent"; - "sled_id" => %sled.id(), - "vpc_id" => %vpc.id(), - "error" => %e); - return Err(e); - } - } - info!( - self.log, - "updated firewall rules on {} sleds", - vpc_to_sleds.len() - ); - - Ok(()) + nexus_networking::send_sled_agents_firewall_rules( + &self.db_datastore, + opctx, + vpc, + rules, + sleds_filter, + &self.opctx_alloc, + &self.log, + ) + .await } pub(crate) async fn resolve_firewall_rules_for_sled_agent( @@ -310,326 +259,13 @@ impl super::Nexus { vpc: &db::model::Vpc, rules: &[db::model::VpcFirewallRule], ) -> Result, Error> { - // Collect the names of instances, subnets, and VPCs that are either - // targets or host filters. We have to find the sleds for all the - // targets, and we'll need information about the IP addresses or - // subnets for things that are specified as host filters as well. - let mut instances: HashSet = HashSet::new(); - let mut subnets: HashSet = HashSet::new(); - let mut vpcs: HashSet = HashSet::new(); - for rule in rules { - for target in &rule.targets { - match &target.0 { - external::VpcFirewallRuleTarget::Instance(name) => { - instances.insert(name.clone().into()); - } - external::VpcFirewallRuleTarget::Subnet(name) => { - subnets.insert(name.clone().into()); - } - external::VpcFirewallRuleTarget::Vpc(name) => { - if name != vpc.name() { - return Err(Error::invalid_request( - "cross-VPC firewall target unsupported", - )); - } - vpcs.insert(name.clone().into()); - } - external::VpcFirewallRuleTarget::Ip(_) - | external::VpcFirewallRuleTarget::IpNet(_) => { - vpcs.insert(vpc.name().clone().into()); - } - } - } - - for host in rule.filter_hosts.iter().flatten() { - match &host.0 { - external::VpcFirewallRuleHostFilter::Instance(name) => { - instances.insert(name.clone().into()); - } - external::VpcFirewallRuleHostFilter::Subnet(name) => { - subnets.insert(name.clone().into()); - } - external::VpcFirewallRuleHostFilter::Vpc(name) => { - if name != vpc.name() { - return Err(Error::invalid_request( - "cross-VPC firewall host filter unsupported", - )); - } - vpcs.insert(name.clone().into()); - } - // We don't need to resolve anything for Ip(Net)s. - external::VpcFirewallRuleHostFilter::Ip(_) => (), - external::VpcFirewallRuleHostFilter::IpNet(_) => (), - } - } - } - - // Resolve named instances, VPCs, and subnets. - // TODO-correctness: It's possible the resolving queries produce - // inconsistent results due to concurrent changes. They should be - // transactional. - type NetMap = HashMap>; - type NicMap = HashMap>; - let no_networks: Vec = Vec::new(); - let no_interfaces: Vec = Vec::new(); - - let mut instance_interfaces: NicMap = HashMap::new(); - for instance_name in &instances { - if let Ok((.., authz_instance)) = - LookupPath::new(opctx, &self.db_datastore) - .project_id(vpc.project_id) - .instance_name(instance_name) - .lookup_for(authz::Action::ListChildren) - .await - { - for iface in self - .db_datastore - .derive_guest_network_interface_info(opctx, &authz_instance) - .await? - { - instance_interfaces - .entry(instance_name.0.clone()) - .or_insert_with(Vec::new) - .push(iface); - } - } - } - - let mut vpc_interfaces: NicMap = HashMap::new(); - for vpc_name in &vpcs { - if let Ok((.., authz_vpc)) = - LookupPath::new(opctx, &self.db_datastore) - .project_id(vpc.project_id) - .vpc_name(vpc_name) - .lookup_for(authz::Action::ListChildren) - .await - { - for iface in self - .db_datastore - .derive_vpc_network_interface_info(opctx, &authz_vpc) - .await? - { - vpc_interfaces - .entry(vpc_name.0.clone()) - .or_insert_with(Vec::new) - .push(iface); - } - } - } - - let mut subnet_interfaces: NicMap = HashMap::new(); - for subnet_name in &subnets { - if let Ok((.., authz_subnet)) = - LookupPath::new(opctx, &self.db_datastore) - .project_id(vpc.project_id) - .vpc_name(&Name::from(vpc.name().clone())) - .vpc_subnet_name(subnet_name) - .lookup_for(authz::Action::ListChildren) - .await - { - for iface in self - .db_datastore - .derive_subnet_network_interface_info(opctx, &authz_subnet) - .await? - { - subnet_interfaces - .entry(subnet_name.0.clone()) - .or_insert_with(Vec::new) - .push(iface); - } - } - } - - let subnet_networks: NetMap = self - .db_datastore - .resolve_vpc_subnets_to_ip_networks(vpc, subnets) - .await? - .into_iter() - .map(|(name, v)| (name.0, v)) - .collect(); - - debug!( - self.log, - "resolved names for firewall rules"; - "instance_interfaces" => ?instance_interfaces, - "vpc_interfaces" => ?vpc_interfaces, - "subnet_interfaces" => ?subnet_interfaces, - "subnet_networks" => ?subnet_networks, - ); - - // Compile resolved rules for the sled agents. - let mut sled_agent_rules = Vec::with_capacity(rules.len()); - for rule in rules { - // TODO: what is the correct behavior when a name is not found? - // Options: - // (1) Fail update request (though note this can still arise - // from things like instance deletion) - // (2) Allow update request, ignore this rule (but store it - // in case it becomes valid later). This is consistent - // with the semantics of the rules. Rules with bad - // references should likely at least be flagged to users. - // We currently adopt option (2), as this allows users to add - // firewall rules (including default rules) before instances - // and their interfaces are instantiated. - - // Collect unique network interface targets. - // This would be easier if `NetworkInterface` were `Hash`, - // but that's not easy because it's a generated type. We - // use the pair (VNI, MAC) as a unique interface identifier. - let mut nics = HashSet::new(); - let mut targets = Vec::with_capacity(rule.targets.len()); - let mut push_target_nic = |nic: &NetworkInterface| { - if nics.insert((nic.vni, *nic.mac)) { - targets.push(nic.clone()); - } - }; - for target in &rule.targets { - match &target.0 { - external::VpcFirewallRuleTarget::Vpc(name) => { - vpc_interfaces - .get(&name) - .unwrap_or(&no_interfaces) - .iter() - .for_each(&mut push_target_nic); - } - external::VpcFirewallRuleTarget::Subnet(name) => { - subnet_interfaces - .get(&name) - .unwrap_or(&no_interfaces) - .iter() - .for_each(&mut push_target_nic); - } - external::VpcFirewallRuleTarget::Instance(name) => { - instance_interfaces - .get(&name) - .unwrap_or(&no_interfaces) - .iter() - .for_each(&mut push_target_nic); - } - external::VpcFirewallRuleTarget::Ip(addr) => { - vpc_interfaces - .get(vpc.name()) - .unwrap_or(&no_interfaces) - .iter() - .filter(|nic| nic.ip == *addr) - .for_each(&mut push_target_nic); - } - external::VpcFirewallRuleTarget::IpNet(net) => { - vpc_interfaces - .get(vpc.name()) - .unwrap_or(&no_interfaces) - .iter() - .filter(|nic| match (net, nic.ip) { - (IpNet::V4(net), IpAddr::V4(ip)) => { - net.contains(ip) - } - (IpNet::V6(net), IpAddr::V6(ip)) => { - net.contains(ip) - } - (_, _) => false, - }) - .for_each(&mut push_target_nic); - } - } - } - if !rule.targets.is_empty() && targets.is_empty() { - // Target not found; skip this rule. - continue; - } - - let filter_hosts = match &rule.filter_hosts { - None => None, - Some(hosts) => { - let mut host_addrs = Vec::with_capacity(hosts.len()); - for host in hosts { - match &host.0 { - external::VpcFirewallRuleHostFilter::Instance( - name, - ) => { - for interface in instance_interfaces - .get(&name) - .unwrap_or(&no_interfaces) - { - host_addrs.push( - HostIdentifier::Ip(IpNet::from( - interface.ip, - )) - .into(), - ) - } - } - external::VpcFirewallRuleHostFilter::Subnet( - name, - ) => { - for subnet in subnet_networks - .get(&name) - .unwrap_or(&no_networks) - { - host_addrs.push( - HostIdentifier::Ip(IpNet::from( - *subnet, - )) - .into(), - ); - } - } - external::VpcFirewallRuleHostFilter::Ip(addr) => { - host_addrs.push( - HostIdentifier::Ip(IpNet::from(*addr)) - .into(), - ) - } - external::VpcFirewallRuleHostFilter::IpNet(net) => { - host_addrs.push(HostIdentifier::Ip(*net).into()) - } - external::VpcFirewallRuleHostFilter::Vpc(name) => { - for interface in vpc_interfaces - .get(&name) - .unwrap_or(&no_interfaces) - { - host_addrs.push( - HostIdentifier::Vpc(interface.vni) - .into(), - ) - } - } - } - } - if !hosts.is_empty() && host_addrs.is_empty() { - // Filter host not found; skip this rule. - continue; - } - Some(host_addrs) - } - }; - - let filter_ports = rule - .filter_ports - .as_ref() - .map(|ports| ports.iter().map(|v| v.0.into()).collect()); - - let filter_protocols = - rule.filter_protocols.as_ref().map(|protocols| { - protocols.iter().map(|v| v.0.into()).collect() - }); - - sled_agent_rules.push(sled_agent_client::types::VpcFirewallRule { - status: rule.status.0.into(), - direction: rule.direction.0.into(), - targets, - filter_hosts, - filter_ports, - filter_protocols, - action: rule.action.0.into(), - priority: rule.priority.0 .0, - }); - } - debug!( - self.log, - "resolved firewall rules for sled agents"; - "sled_agent_rules" => ?sled_agent_rules, - ); - - Ok(sled_agent_rules) + nexus_networking::resolve_firewall_rules_for_sled_agent( + &self.db_datastore, + opctx, + vpc, + rules, + &self.log, + ) + .await } }