From 4b12c4ca1c1abdc3522be21266bdcea5d911bf30 Mon Sep 17 00:00:00 2001 From: Wez Furlong Date: Tue, 17 Dec 2024 09:53:35 -0700 Subject: [PATCH] add kumo-address crate for HostAddress and SocketAddress types One frustration I have with the Rust standard library types is that while it has a SocketAddr type, that type doesn't include unix domain addressing like the underlying unix OS does in the underlying system type. I understand why it is that way, it's still a bit of a PITA. Since we want to enable the use of unix domain sockets for outbound LMTP support, our internal addressing type needs to be able to represent a unix domain address. This commit introduces our own HostAddress (equivalent to IpAddr, but not limited to IP addresses) and SocketAddress (equivalent to SockAddr, but not limited to IP sockets) types. The ResolvedAddress type has been cut over from IpAddr to HostAddress and the fan out addressed. This commit does not add support for establishing unix domain connections, it is merely the ability to recognize/report on them. refs: https://github.com/KumoCorp/kumomta/issues/267 --- Cargo.lock | 9 + Cargo.toml | 1 + crates/dns-resolver/src/lib.rs | 10 +- crates/kumo-address/Cargo.toml | 8 + crates/kumo-address/src/host.rs | 218 +++++++++++++++++++++ crates/kumo-address/src/lib.rs | 2 + crates/kumo-address/src/socket.rs | 222 ++++++++++++++++++++++ crates/kumo-log-types/Cargo.toml | 1 + crates/kumo-log-types/src/lib.rs | 5 +- crates/kumod/src/http_server/inject_v1.rs | 4 +- crates/kumod/src/logging/disposition.rs | 2 +- crates/kumod/src/smtp_dispatcher.rs | 48 +++-- crates/kumod/src/smtp_server.rs | 4 +- 13 files changed, 503 insertions(+), 31 deletions(-) create mode 100644 crates/kumo-address/Cargo.toml create mode 100644 crates/kumo-address/src/host.rs create mode 100644 crates/kumo-address/src/lib.rs create mode 100644 crates/kumo-address/src/socket.rs diff --git a/Cargo.lock b/Cargo.lock index 941b0c09f..124a1483c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2968,6 +2968,14 @@ dependencies = [ "libc", ] +[[package]] +name = "kumo-address" +version = "0.1.0" +dependencies = [ + "serde", + "thiserror 1.0.69", +] + [[package]] name = "kumo-api-types" version = "0.1.0" @@ -3059,6 +3067,7 @@ dependencies = [ "chrono", "data-encoding", "k9", + "kumo-address", "mailparsing", "rfc5321", "serde", diff --git a/Cargo.toml b/Cargo.toml index 6f1abbf93..973e7a66a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ members = [ "crates/kumo-dmarc", "crates/kumo-template", "crates/summarize-memory", + "crates/kumo-address", ] resolver = "2" diff --git a/crates/dns-resolver/src/lib.rs b/crates/dns-resolver/src/lib.rs index 4a3b2f226..180fa2944 100644 --- a/crates/dns-resolver/src/lib.rs +++ b/crates/dns-resolver/src/lib.rs @@ -186,7 +186,7 @@ pub async fn resolve_a_or_aaaa(domain_name: &str) -> anyhow::Result { return Ok(vec![ResolvedAddress { name: domain_name.to_string(), - addr: std::net::IpAddr::V6(addr), + addr: std::net::IpAddr::V6(addr).into(), }]); } Err(err) => { @@ -202,7 +202,7 @@ pub async fn resolve_a_or_aaaa(domain_name: &str) -> anyhow::Result { return Ok(vec![ResolvedAddress { name: domain_name.to_string(), - addr, + addr: addr.into(), }]); } Err(err) => { @@ -217,7 +217,7 @@ pub async fn resolve_a_or_aaaa(domain_name: &str) -> anyhow::Result() { by_pref.push(ResolvedAddress { name: mx_host.to_string(), - addr, + addr: addr.into(), }); continue; } @@ -391,7 +391,7 @@ impl MailExchanger { for addr in addresses.iter() { by_pref.push(ResolvedAddress { name: mx_host.to_string(), - addr: *addr, + addr: (*addr).into(), }); } } diff --git a/crates/kumo-address/Cargo.toml b/crates/kumo-address/Cargo.toml new file mode 100644 index 000000000..eee7952f5 --- /dev/null +++ b/crates/kumo-address/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "kumo-address" +version = "0.1.0" +edition = "2021" + +[dependencies] +serde.workspace = true +thiserror.workspace = true diff --git a/crates/kumo-address/src/host.rs b/crates/kumo-address/src/host.rs new file mode 100644 index 000000000..6321cdb17 --- /dev/null +++ b/crates/kumo-address/src/host.rs @@ -0,0 +1,218 @@ +use serde::{Deserialize, Serialize}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::os::unix::net::SocketAddr as UnixSocketAddr; +use std::path::Path; +use std::str::FromStr; +use thiserror::Error; + +#[derive(Error, Debug)] +#[error( + "Failed to parse '{candidate}' as an address. \ + Got '{net_err}' when considering it as an IP address and \ + '{unix_err}' when considering it as a unix domain socket path." +)] +pub struct AddressParseError { + pub(crate) candidate: String, + pub(crate) net_err: std::net::AddrParseError, + pub(crate) unix_err: std::io::Error, +} + +impl PartialEq for AddressParseError { + fn eq(&self, other: &Self) -> bool { + self.to_string().eq(&other.to_string()) + } +} + +#[derive(Clone, Deserialize, Serialize)] +#[serde(try_from = "String", into = "String")] +pub enum HostAddress { + UnixDomain(Box), + V4(std::net::Ipv4Addr), + V6(std::net::Ipv6Addr), +} + +impl HostAddress { + /// Returns the unix domain socket representation of the address + pub fn unix(&self) -> Option { + match self { + Self::V4(_) | Self::V6(_) => None, + Self::UnixDomain(unix) => Some((**unix).clone()), + } + } + + /// Returns the ip representation of the address + pub fn ip(&self) -> Option { + match self { + Self::V4(a) => Some(a.clone().into()), + Self::V6(a) => Some(a.clone().into()), + Self::UnixDomain(_) => None, + } + } +} + +impl PartialEq for HostAddress { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::UnixDomain(a), Self::UnixDomain(b)) => { + match (a.as_pathname(), b.as_pathname()) { + (Some(a), Some(b)) => a.eq(b), + (None, None) => true, + _ => false, + } + } + (Self::V4(a), Self::V4(b)) => a.eq(b), + (Self::V6(a), Self::V6(b)) => a.eq(b), + _ => false, + } + } +} + +impl Eq for HostAddress {} + +impl From for String { + fn from(a: HostAddress) -> String { + format!("{a}") + } +} + +impl std::fmt::Debug for HostAddress { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + ::fmt(self, fmt) + } +} + +impl std::fmt::Display for HostAddress { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Self::UnixDomain(unix) => match unix.as_pathname() { + Some(path) => path.display().fmt(fmt), + None => write!(fmt, ""), + }, + Self::V4(a) => a.fmt(fmt), + Self::V6(a) => a.fmt(fmt), + } + } +} + +impl FromStr for HostAddress { + type Err = AddressParseError; + fn from_str(s: &str) -> Result { + match IpAddr::from_str(s) { + Ok(a) => Ok(a.into()), + Err(net_err) => { + if s.starts_with('[') && s.ends_with(']') { + let alternative = &s[1..s.len() - 1]; + if let Ok(a) = IpAddr::from_str(alternative) { + return Ok(a.into()); + } + } + + let path: &Path = s.as_ref(); + if path.is_relative() { + Err(AddressParseError { + candidate: s.to_string(), + net_err, + unix_err: std::io::Error::new( + std::io::ErrorKind::Other, + "unix domain path must be absolute", + ), + }) + } else { + match UnixSocketAddr::from_pathname(path) { + Ok(unix) => Ok(HostAddress::UnixDomain(unix.into())), + Err(unix_err) => Err(AddressParseError { + candidate: s.to_string(), + net_err, + unix_err, + }), + } + } + } + } + } +} + +impl TryFrom for HostAddress { + type Error = AddressParseError; + fn try_from(s: String) -> Result { + HostAddress::from_str(&s) + } +} + +impl From for HostAddress { + fn from(unix: UnixSocketAddr) -> HostAddress { + HostAddress::UnixDomain(unix.into()) + } +} + +impl From for HostAddress { + fn from(ip: Ipv4Addr) -> HostAddress { + HostAddress::V4(ip) + } +} + +impl From for HostAddress { + fn from(ip: Ipv6Addr) -> HostAddress { + HostAddress::V6(ip) + } +} + +impl From for HostAddress { + fn from(ip: IpAddr) -> HostAddress { + match ip { + IpAddr::V4(a) => HostAddress::V4(a), + IpAddr::V6(a) => HostAddress::V6(a), + } + } +} + +impl From for HostAddress { + fn from(a: SocketAddr) -> HostAddress { + a.ip().into() + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn parse() { + assert_eq!( + "10.0.0.1".parse::(), + Ok(HostAddress::V4(Ipv4Addr::new(10, 0, 0, 1))) + ); + assert_eq!( + "[10.0.0.1]".parse::(), + Ok(HostAddress::V4(Ipv4Addr::new(10, 0, 0, 1))) + ); + assert_eq!( + "::1".parse::(), + Ok(HostAddress::V6(Ipv6Addr::LOCALHOST)) + ); + assert_eq!( + "[::1]".parse::(), + Ok(HostAddress::V6(Ipv6Addr::LOCALHOST)) + ); + assert_eq!( + "/some/path".parse::(), + Ok(HostAddress::UnixDomain( + UnixSocketAddr::from_pathname("/some/path").unwrap().into() + )) + ); + assert_eq!( + format!("{:#}", "[/some/path]".parse::().unwrap_err()), + "Failed to parse '[/some/path]' as an address. \ + Got 'invalid IP address syntax' when considering it as \ + an IP address and 'unix domain path must be absolute' \ + when considering it as a unix domain socket path." + ); + assert_eq!( + format!("{:#}", "hello there".parse::().unwrap_err()), + "Failed to parse 'hello there' as an address. \ + Got 'invalid IP address syntax' when considering it as \ + an IP address and 'unix domain path must be absolute' \ + when considering it as a unix domain socket path." + ); + } +} diff --git a/crates/kumo-address/src/lib.rs b/crates/kumo-address/src/lib.rs new file mode 100644 index 000000000..fc7678ba0 --- /dev/null +++ b/crates/kumo-address/src/lib.rs @@ -0,0 +1,2 @@ +pub mod host; +pub mod socket; diff --git a/crates/kumo-address/src/socket.rs b/crates/kumo-address/src/socket.rs new file mode 100644 index 000000000..aa9eb462b --- /dev/null +++ b/crates/kumo-address/src/socket.rs @@ -0,0 +1,222 @@ +use crate::host::{AddressParseError, HostAddress}; +use serde::{Deserialize, Serialize}; +use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6}; +use std::os::unix::net::SocketAddr as UnixSocketAddr; +use std::path::Path; +use std::str::FromStr; + +#[derive(Clone, Serialize, Deserialize)] +#[serde(try_from = "String", into = "String")] +pub enum SocketAddress { + UnixDomain(Box), + V4(std::net::SocketAddrV4), + V6(std::net::SocketAddrV6), +} + +impl std::fmt::Debug for SocketAddress { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + ::fmt(self, fmt) + } +} + +impl std::fmt::Display for SocketAddress { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Self::UnixDomain(unix) => match unix.as_pathname() { + Some(path) => path.display().fmt(fmt), + None => write!(fmt, ""), + }, + Self::V4(a) => a.fmt(fmt), + Self::V6(a) => a.fmt(fmt), + } + } +} + +impl From for String { + fn from(a: SocketAddress) -> String { + format!("{a}") + } +} + +impl TryFrom for SocketAddress { + type Error = AddressParseError; + fn try_from(s: String) -> Result { + SocketAddress::from_str(&s) + } +} + +impl SocketAddress { + /// Returns the "host" portion of the address + pub fn host(&self) -> HostAddress { + match self { + Self::UnixDomain(p) => HostAddress::UnixDomain(p.clone()), + Self::V4(a) => HostAddress::V4(a.ip().clone()), + Self::V6(a) => HostAddress::V6(a.ip().clone()), + } + } + + /// Returns the unix domain socket representation of the address + pub fn unix(&self) -> Option { + match self { + Self::V4(_) | Self::V6(_) => None, + Self::UnixDomain(unix) => Some((**unix).clone()), + } + } + + /// Returns the ip representation of the address + pub fn ip(&self) -> Option { + match self { + Self::V4(a) => Some(a.clone().into()), + Self::V6(a) => Some(a.clone().into()), + Self::UnixDomain(_) => None, + } + } +} + +impl FromStr for SocketAddress { + type Err = AddressParseError; + fn from_str(s: &str) -> Result { + // At the time of writing, Rust's IPv6 SockAddr parsing + // interally only accepts `[address]:port` while its IPv4 + // SockAddr parsing only accepts `address:port`. + // In the email world, `[]` is used to indicate a literal + // IP address so we desire the ability to uniformly use + // the `[]` syntax in both cases, so we check for that + // first and parse the internal address out. + + if s.starts_with('[') { + if let Some(host_end) = s.find(']') { + let (host, remainder) = s.split_at(host_end); + let host = &host[1..]; + + if let Some(port) = remainder.strip_prefix("]:") { + if let Ok(port) = port.parse::() { + match HostAddress::from_str(host) { + Ok(HostAddress::V4(a)) => { + return Ok(SocketAddress::V4(SocketAddrV4::new(a, port))) + } + Ok(HostAddress::V6(a)) => { + return Ok(SocketAddress::V6(SocketAddrV6::new(a, port, 0, 0))) + } + + _ => {} + } + } + } + } + } + + match SocketAddr::from_str(s) { + Ok(a) => Ok(a.into()), + Err(net_err) => { + let path: &Path = s.as_ref(); + if path.is_relative() { + Err(AddressParseError { + candidate: s.to_string(), + net_err, + unix_err: std::io::Error::new( + std::io::ErrorKind::Other, + "unix domain path must be absolute", + ), + }) + } else { + match UnixSocketAddr::from_pathname(path) { + Ok(unix) => Ok(SocketAddress::UnixDomain(unix.into())), + Err(unix_err) => Err(AddressParseError { + candidate: s.to_string(), + net_err, + unix_err, + }), + } + } + } + } + } +} + +impl PartialEq for SocketAddress { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::UnixDomain(a), Self::UnixDomain(b)) => { + match (a.as_pathname(), b.as_pathname()) { + (Some(a), Some(b)) => a.eq(b), + (None, None) => true, + _ => false, + } + } + (Self::V4(a), Self::V4(b)) => a.eq(b), + (Self::V6(a), Self::V6(b)) => a.eq(b), + _ => false, + } + } +} + +impl Eq for SocketAddress {} + +impl From for SocketAddress { + fn from(unix: UnixSocketAddr) -> SocketAddress { + SocketAddress::UnixDomain(unix.into()) + } +} + +impl From for SocketAddress { + fn from(ip: SocketAddr) -> SocketAddress { + match ip { + SocketAddr::V4(a) => SocketAddress::V4(a), + SocketAddr::V6(a) => SocketAddress::V6(a), + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use std::net::{Ipv4Addr, Ipv6Addr}; + + #[test] + fn parse() { + assert_eq!( + "10.0.0.1:25".parse::(), + Ok(SocketAddress::V4(SocketAddrV4::new( + Ipv4Addr::new(10, 0, 0, 1), + 25 + ))) + ); + assert_eq!( + "[10.0.0.1]:25".parse::(), + Ok(SocketAddress::V4(SocketAddrV4::new( + Ipv4Addr::new(10, 0, 0, 1), + 25 + ))) + ); + assert_eq!( + "[::1]:100".parse::(), + Ok(SocketAddress::V6(SocketAddrV6::new( + Ipv6Addr::LOCALHOST, + 100, + 0, + 0 + ))) + ); + assert_eq!( + "/some/path".parse::(), + Ok(SocketAddress::UnixDomain( + UnixSocketAddr::from_pathname("/some/path").unwrap().into() + )) + ); + assert_eq!( + format!("{:#}", "hello there".parse::().unwrap_err()), + "Failed to parse 'hello there' as an address. \ + Got 'invalid socket address syntax' when considering it as \ + an IP address and 'unix domain path must be absolute' \ + when considering it as a unix domain socket path." + ); + assert_eq!( + format!("{:#}", "[10.0.0.1]".parse::().unwrap_err()), + "Failed to parse '[10.0.0.1]' as an address. \ + Got 'invalid socket address syntax' when considering it as \ + an IP address and 'unix domain path must be absolute' \ + when considering it as a unix domain socket path." + ); + } +} diff --git a/crates/kumo-log-types/Cargo.toml b/crates/kumo-log-types/Cargo.toml index 03a91da7a..99bbf5d43 100644 --- a/crates/kumo-log-types/Cargo.toml +++ b/crates/kumo-log-types/Cargo.toml @@ -8,6 +8,7 @@ anyhow = {workspace=true} bounce-classify = {path="../bounce-classify"} chrono = {workspace=true, default-features=false, features=["serde", "std"]} data-encoding = {workspace=true} +kumo-address = {path="../kumo-address"} mailparsing = {path="../mailparsing"} rfc5321 = {path="../rfc5321", default-features=false} serde = {workspace=true} diff --git a/crates/kumo-log-types/src/lib.rs b/crates/kumo-log-types/src/lib.rs index e24e8fb8a..abfdd5591 100644 --- a/crates/kumo-log-types/src/lib.rs +++ b/crates/kumo-log-types/src/lib.rs @@ -1,12 +1,13 @@ use crate::rfc5965::ARFReport; use bounce_classify::BounceClass; use chrono::{DateTime, Utc}; +use kumo_address::host::HostAddress; use rfc5321::Response; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::borrow::Cow; use std::collections::HashMap; -use std::net::{IpAddr, SocketAddr}; +use std::net::SocketAddr; use uuid::Uuid; pub mod rfc3464; @@ -15,7 +16,7 @@ pub mod rfc5965; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ResolvedAddress { pub name: String, - pub addr: IpAddr, + pub addr: HostAddress, } impl std::fmt::Display for ResolvedAddress { diff --git a/crates/kumod/src/http_server/inject_v1.rs b/crates/kumod/src/http_server/inject_v1.rs index 16d4465c5..9791a40a7 100644 --- a/crates/kumod/src/http_server/inject_v1.rs +++ b/crates/kumod/src/http_server/inject_v1.rs @@ -569,7 +569,7 @@ async fn process_recipient<'a>( site: "", peer_address: Some(&ResolvedAddress { name: "".to_string(), - addr: peer_address, + addr: peer_address.into(), }), response: Response { code: 250, @@ -625,7 +625,7 @@ async fn queue_deferred( site: "", peer_address: Some(&ResolvedAddress { name: "".to_string(), - addr: peer_address, + addr: peer_address.into(), }), response: Response { code: 250, diff --git a/crates/kumod/src/logging/disposition.rs b/crates/kumod/src/logging/disposition.rs index eb1eb5bd5..77ec5ec83 100644 --- a/crates/kumod/src/logging/disposition.rs +++ b/crates/kumod/src/logging/disposition.rs @@ -221,7 +221,7 @@ pub async fn log_disposition(args: LogDisposition<'_>) { peer_address: Some(ResolvedAddress { name: report.per_message.reporting_mta.name.to_string(), addr: peer_address - .map(|a| a.addr) + .map(|a| a.addr.clone()) .unwrap_or_else(|| Ipv4Addr::UNSPECIFIED.into()), }), response: Response { diff --git a/crates/kumod/src/smtp_dispatcher.rs b/crates/kumod/src/smtp_dispatcher.rs index e8b378c24..676bbc6e5 100644 --- a/crates/kumod/src/smtp_dispatcher.rs +++ b/crates/kumod/src/smtp_dispatcher.rs @@ -169,27 +169,32 @@ impl SmtpDispatcher { } for addr in &addresses { - if path_config.prohibited_hosts.contains(addr.addr) { - dispatcher - .bulk_ready_queue_operation(Response { - code: 550, - enhanced_code: Some(EnhancedStatusCode { - class: 5, - subject: 4, - detail: 4, - }), - content: format!( - "{addr} is on the list of prohibited_hosts {:?}", - path_config.prohibited_hosts - ), - command: None, - }) - .await; - return Ok(None); + if let Some(ip) = addr.addr.ip() { + if path_config.prohibited_hosts.contains(ip) { + dispatcher + .bulk_ready_queue_operation(Response { + code: 550, + enhanced_code: Some(EnhancedStatusCode { + class: 5, + subject: 4, + detail: 4, + }), + content: format!( + "{addr} is on the list of prohibited_hosts {:?}", + path_config.prohibited_hosts + ), + command: None, + }) + .await; + return Ok(None); + } } } - addresses.retain(|addr| !path_config.skip_hosts.contains(addr.addr)); + addresses.retain(|addr| match addr.addr.ip() { + Some(ip) => !path_config.skip_hosts.contains(ip), + None => true, + }); if addresses.is_empty() { dispatcher @@ -304,7 +309,12 @@ impl SmtpDispatcher { tokio::spawn(async move { let (stream, source_address) = egress_source .connect_to( - SocketAddr::new(address.addr, port), + SocketAddr::new( + address.addr.ip().ok_or_else(|| { + anyhow::anyhow!("only ip addresses are currently supported") + })?, + port, + ), timeouts.connect_timeout, ) .await?; diff --git a/crates/kumod/src/smtp_server.rs b/crates/kumod/src/smtp_server.rs index eeeb77e30..7c85b3333 100644 --- a/crates/kumod/src/smtp_server.rs +++ b/crates/kumod/src/smtp_server.rs @@ -741,7 +741,7 @@ impl SmtpServer { meta: self.meta.clone_inner(), peer_address: ResolvedAddress { name: self.said_hello.as_deref().unwrap_or("").to_string(), - addr: self.peer_address.ip(), + addr: self.peer_address.ip().into(), }, response, sender, @@ -1773,7 +1773,7 @@ impl SmtpServer { site: "", peer_address: Some(&ResolvedAddress { name: self.said_hello.as_deref().unwrap_or("").to_string(), - addr: self.peer_address.ip(), + addr: self.peer_address.ip().into(), }), response: Response { code: 250,