diff --git a/Cargo.lock b/Cargo.lock index 941b0c09f..124a1483c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2968,6 +2968,14 @@ dependencies = [ "libc", ] +[[package]] +name = "kumo-address" +version = "0.1.0" +dependencies = [ + "serde", + "thiserror 1.0.69", +] + [[package]] name = "kumo-api-types" version = "0.1.0" @@ -3059,6 +3067,7 @@ dependencies = [ "chrono", "data-encoding", "k9", + "kumo-address", "mailparsing", "rfc5321", "serde", diff --git a/Cargo.toml b/Cargo.toml index 6f1abbf93..973e7a66a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ members = [ "crates/kumo-dmarc", "crates/kumo-template", "crates/summarize-memory", + "crates/kumo-address", ] resolver = "2" diff --git a/crates/dns-resolver/src/lib.rs b/crates/dns-resolver/src/lib.rs index 4a3b2f226..180fa2944 100644 --- a/crates/dns-resolver/src/lib.rs +++ b/crates/dns-resolver/src/lib.rs @@ -186,7 +186,7 @@ pub async fn resolve_a_or_aaaa(domain_name: &str) -> anyhow::Result { return Ok(vec![ResolvedAddress { name: domain_name.to_string(), - addr: std::net::IpAddr::V6(addr), + addr: std::net::IpAddr::V6(addr).into(), }]); } Err(err) => { @@ -202,7 +202,7 @@ pub async fn resolve_a_or_aaaa(domain_name: &str) -> anyhow::Result { return Ok(vec![ResolvedAddress { name: domain_name.to_string(), - addr, + addr: addr.into(), }]); } Err(err) => { @@ -217,7 +217,7 @@ pub async fn resolve_a_or_aaaa(domain_name: &str) -> anyhow::Result() { by_pref.push(ResolvedAddress { name: mx_host.to_string(), - addr, + addr: addr.into(), }); continue; } @@ -391,7 +391,7 @@ impl MailExchanger { for addr in addresses.iter() { by_pref.push(ResolvedAddress { name: mx_host.to_string(), - addr: *addr, + addr: (*addr).into(), }); } } diff --git a/crates/kumo-address/Cargo.toml b/crates/kumo-address/Cargo.toml new file mode 100644 index 000000000..eee7952f5 --- /dev/null +++ b/crates/kumo-address/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "kumo-address" +version = "0.1.0" +edition = "2021" + +[dependencies] +serde.workspace = true +thiserror.workspace = true diff --git a/crates/kumo-address/src/host.rs b/crates/kumo-address/src/host.rs new file mode 100644 index 000000000..6321cdb17 --- /dev/null +++ b/crates/kumo-address/src/host.rs @@ -0,0 +1,218 @@ +use serde::{Deserialize, Serialize}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::os::unix::net::SocketAddr as UnixSocketAddr; +use std::path::Path; +use std::str::FromStr; +use thiserror::Error; + +#[derive(Error, Debug)] +#[error( + "Failed to parse '{candidate}' as an address. \ + Got '{net_err}' when considering it as an IP address and \ + '{unix_err}' when considering it as a unix domain socket path." +)] +pub struct AddressParseError { + pub(crate) candidate: String, + pub(crate) net_err: std::net::AddrParseError, + pub(crate) unix_err: std::io::Error, +} + +impl PartialEq for AddressParseError { + fn eq(&self, other: &Self) -> bool { + self.to_string().eq(&other.to_string()) + } +} + +#[derive(Clone, Deserialize, Serialize)] +#[serde(try_from = "String", into = "String")] +pub enum HostAddress { + UnixDomain(Box), + V4(std::net::Ipv4Addr), + V6(std::net::Ipv6Addr), +} + +impl HostAddress { + /// Returns the unix domain socket representation of the address + pub fn unix(&self) -> Option { + match self { + Self::V4(_) | Self::V6(_) => None, + Self::UnixDomain(unix) => Some((**unix).clone()), + } + } + + /// Returns the ip representation of the address + pub fn ip(&self) -> Option { + match self { + Self::V4(a) => Some(a.clone().into()), + Self::V6(a) => Some(a.clone().into()), + Self::UnixDomain(_) => None, + } + } +} + +impl PartialEq for HostAddress { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::UnixDomain(a), Self::UnixDomain(b)) => { + match (a.as_pathname(), b.as_pathname()) { + (Some(a), Some(b)) => a.eq(b), + (None, None) => true, + _ => false, + } + } + (Self::V4(a), Self::V4(b)) => a.eq(b), + (Self::V6(a), Self::V6(b)) => a.eq(b), + _ => false, + } + } +} + +impl Eq for HostAddress {} + +impl From for String { + fn from(a: HostAddress) -> String { + format!("{a}") + } +} + +impl std::fmt::Debug for HostAddress { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + ::fmt(self, fmt) + } +} + +impl std::fmt::Display for HostAddress { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Self::UnixDomain(unix) => match unix.as_pathname() { + Some(path) => path.display().fmt(fmt), + None => write!(fmt, ""), + }, + Self::V4(a) => a.fmt(fmt), + Self::V6(a) => a.fmt(fmt), + } + } +} + +impl FromStr for HostAddress { + type Err = AddressParseError; + fn from_str(s: &str) -> Result { + match IpAddr::from_str(s) { + Ok(a) => Ok(a.into()), + Err(net_err) => { + if s.starts_with('[') && s.ends_with(']') { + let alternative = &s[1..s.len() - 1]; + if let Ok(a) = IpAddr::from_str(alternative) { + return Ok(a.into()); + } + } + + let path: &Path = s.as_ref(); + if path.is_relative() { + Err(AddressParseError { + candidate: s.to_string(), + net_err, + unix_err: std::io::Error::new( + std::io::ErrorKind::Other, + "unix domain path must be absolute", + ), + }) + } else { + match UnixSocketAddr::from_pathname(path) { + Ok(unix) => Ok(HostAddress::UnixDomain(unix.into())), + Err(unix_err) => Err(AddressParseError { + candidate: s.to_string(), + net_err, + unix_err, + }), + } + } + } + } + } +} + +impl TryFrom for HostAddress { + type Error = AddressParseError; + fn try_from(s: String) -> Result { + HostAddress::from_str(&s) + } +} + +impl From for HostAddress { + fn from(unix: UnixSocketAddr) -> HostAddress { + HostAddress::UnixDomain(unix.into()) + } +} + +impl From for HostAddress { + fn from(ip: Ipv4Addr) -> HostAddress { + HostAddress::V4(ip) + } +} + +impl From for HostAddress { + fn from(ip: Ipv6Addr) -> HostAddress { + HostAddress::V6(ip) + } +} + +impl From for HostAddress { + fn from(ip: IpAddr) -> HostAddress { + match ip { + IpAddr::V4(a) => HostAddress::V4(a), + IpAddr::V6(a) => HostAddress::V6(a), + } + } +} + +impl From for HostAddress { + fn from(a: SocketAddr) -> HostAddress { + a.ip().into() + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn parse() { + assert_eq!( + "10.0.0.1".parse::(), + Ok(HostAddress::V4(Ipv4Addr::new(10, 0, 0, 1))) + ); + assert_eq!( + "[10.0.0.1]".parse::(), + Ok(HostAddress::V4(Ipv4Addr::new(10, 0, 0, 1))) + ); + assert_eq!( + "::1".parse::(), + Ok(HostAddress::V6(Ipv6Addr::LOCALHOST)) + ); + assert_eq!( + "[::1]".parse::(), + Ok(HostAddress::V6(Ipv6Addr::LOCALHOST)) + ); + assert_eq!( + "/some/path".parse::(), + Ok(HostAddress::UnixDomain( + UnixSocketAddr::from_pathname("/some/path").unwrap().into() + )) + ); + assert_eq!( + format!("{:#}", "[/some/path]".parse::().unwrap_err()), + "Failed to parse '[/some/path]' as an address. \ + Got 'invalid IP address syntax' when considering it as \ + an IP address and 'unix domain path must be absolute' \ + when considering it as a unix domain socket path." + ); + assert_eq!( + format!("{:#}", "hello there".parse::().unwrap_err()), + "Failed to parse 'hello there' as an address. \ + Got 'invalid IP address syntax' when considering it as \ + an IP address and 'unix domain path must be absolute' \ + when considering it as a unix domain socket path." + ); + } +} diff --git a/crates/kumo-address/src/lib.rs b/crates/kumo-address/src/lib.rs new file mode 100644 index 000000000..fc7678ba0 --- /dev/null +++ b/crates/kumo-address/src/lib.rs @@ -0,0 +1,2 @@ +pub mod host; +pub mod socket; diff --git a/crates/kumo-address/src/socket.rs b/crates/kumo-address/src/socket.rs new file mode 100644 index 000000000..aa9eb462b --- /dev/null +++ b/crates/kumo-address/src/socket.rs @@ -0,0 +1,222 @@ +use crate::host::{AddressParseError, HostAddress}; +use serde::{Deserialize, Serialize}; +use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6}; +use std::os::unix::net::SocketAddr as UnixSocketAddr; +use std::path::Path; +use std::str::FromStr; + +#[derive(Clone, Serialize, Deserialize)] +#[serde(try_from = "String", into = "String")] +pub enum SocketAddress { + UnixDomain(Box), + V4(std::net::SocketAddrV4), + V6(std::net::SocketAddrV6), +} + +impl std::fmt::Debug for SocketAddress { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + ::fmt(self, fmt) + } +} + +impl std::fmt::Display for SocketAddress { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Self::UnixDomain(unix) => match unix.as_pathname() { + Some(path) => path.display().fmt(fmt), + None => write!(fmt, ""), + }, + Self::V4(a) => a.fmt(fmt), + Self::V6(a) => a.fmt(fmt), + } + } +} + +impl From for String { + fn from(a: SocketAddress) -> String { + format!("{a}") + } +} + +impl TryFrom for SocketAddress { + type Error = AddressParseError; + fn try_from(s: String) -> Result { + SocketAddress::from_str(&s) + } +} + +impl SocketAddress { + /// Returns the "host" portion of the address + pub fn host(&self) -> HostAddress { + match self { + Self::UnixDomain(p) => HostAddress::UnixDomain(p.clone()), + Self::V4(a) => HostAddress::V4(a.ip().clone()), + Self::V6(a) => HostAddress::V6(a.ip().clone()), + } + } + + /// Returns the unix domain socket representation of the address + pub fn unix(&self) -> Option { + match self { + Self::V4(_) | Self::V6(_) => None, + Self::UnixDomain(unix) => Some((**unix).clone()), + } + } + + /// Returns the ip representation of the address + pub fn ip(&self) -> Option { + match self { + Self::V4(a) => Some(a.clone().into()), + Self::V6(a) => Some(a.clone().into()), + Self::UnixDomain(_) => None, + } + } +} + +impl FromStr for SocketAddress { + type Err = AddressParseError; + fn from_str(s: &str) -> Result { + // At the time of writing, Rust's IPv6 SockAddr parsing + // interally only accepts `[address]:port` while its IPv4 + // SockAddr parsing only accepts `address:port`. + // In the email world, `[]` is used to indicate a literal + // IP address so we desire the ability to uniformly use + // the `[]` syntax in both cases, so we check for that + // first and parse the internal address out. + + if s.starts_with('[') { + if let Some(host_end) = s.find(']') { + let (host, remainder) = s.split_at(host_end); + let host = &host[1..]; + + if let Some(port) = remainder.strip_prefix("]:") { + if let Ok(port) = port.parse::() { + match HostAddress::from_str(host) { + Ok(HostAddress::V4(a)) => { + return Ok(SocketAddress::V4(SocketAddrV4::new(a, port))) + } + Ok(HostAddress::V6(a)) => { + return Ok(SocketAddress::V6(SocketAddrV6::new(a, port, 0, 0))) + } + + _ => {} + } + } + } + } + } + + match SocketAddr::from_str(s) { + Ok(a) => Ok(a.into()), + Err(net_err) => { + let path: &Path = s.as_ref(); + if path.is_relative() { + Err(AddressParseError { + candidate: s.to_string(), + net_err, + unix_err: std::io::Error::new( + std::io::ErrorKind::Other, + "unix domain path must be absolute", + ), + }) + } else { + match UnixSocketAddr::from_pathname(path) { + Ok(unix) => Ok(SocketAddress::UnixDomain(unix.into())), + Err(unix_err) => Err(AddressParseError { + candidate: s.to_string(), + net_err, + unix_err, + }), + } + } + } + } + } +} + +impl PartialEq for SocketAddress { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::UnixDomain(a), Self::UnixDomain(b)) => { + match (a.as_pathname(), b.as_pathname()) { + (Some(a), Some(b)) => a.eq(b), + (None, None) => true, + _ => false, + } + } + (Self::V4(a), Self::V4(b)) => a.eq(b), + (Self::V6(a), Self::V6(b)) => a.eq(b), + _ => false, + } + } +} + +impl Eq for SocketAddress {} + +impl From for SocketAddress { + fn from(unix: UnixSocketAddr) -> SocketAddress { + SocketAddress::UnixDomain(unix.into()) + } +} + +impl From for SocketAddress { + fn from(ip: SocketAddr) -> SocketAddress { + match ip { + SocketAddr::V4(a) => SocketAddress::V4(a), + SocketAddr::V6(a) => SocketAddress::V6(a), + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use std::net::{Ipv4Addr, Ipv6Addr}; + + #[test] + fn parse() { + assert_eq!( + "10.0.0.1:25".parse::(), + Ok(SocketAddress::V4(SocketAddrV4::new( + Ipv4Addr::new(10, 0, 0, 1), + 25 + ))) + ); + assert_eq!( + "[10.0.0.1]:25".parse::(), + Ok(SocketAddress::V4(SocketAddrV4::new( + Ipv4Addr::new(10, 0, 0, 1), + 25 + ))) + ); + assert_eq!( + "[::1]:100".parse::(), + Ok(SocketAddress::V6(SocketAddrV6::new( + Ipv6Addr::LOCALHOST, + 100, + 0, + 0 + ))) + ); + assert_eq!( + "/some/path".parse::(), + Ok(SocketAddress::UnixDomain( + UnixSocketAddr::from_pathname("/some/path").unwrap().into() + )) + ); + assert_eq!( + format!("{:#}", "hello there".parse::().unwrap_err()), + "Failed to parse 'hello there' as an address. \ + Got 'invalid socket address syntax' when considering it as \ + an IP address and 'unix domain path must be absolute' \ + when considering it as a unix domain socket path." + ); + assert_eq!( + format!("{:#}", "[10.0.0.1]".parse::().unwrap_err()), + "Failed to parse '[10.0.0.1]' as an address. \ + Got 'invalid socket address syntax' when considering it as \ + an IP address and 'unix domain path must be absolute' \ + when considering it as a unix domain socket path." + ); + } +} diff --git a/crates/kumo-log-types/Cargo.toml b/crates/kumo-log-types/Cargo.toml index 03a91da7a..99bbf5d43 100644 --- a/crates/kumo-log-types/Cargo.toml +++ b/crates/kumo-log-types/Cargo.toml @@ -8,6 +8,7 @@ anyhow = {workspace=true} bounce-classify = {path="../bounce-classify"} chrono = {workspace=true, default-features=false, features=["serde", "std"]} data-encoding = {workspace=true} +kumo-address = {path="../kumo-address"} mailparsing = {path="../mailparsing"} rfc5321 = {path="../rfc5321", default-features=false} serde = {workspace=true} diff --git a/crates/kumo-log-types/src/lib.rs b/crates/kumo-log-types/src/lib.rs index e24e8fb8a..abfdd5591 100644 --- a/crates/kumo-log-types/src/lib.rs +++ b/crates/kumo-log-types/src/lib.rs @@ -1,12 +1,13 @@ use crate::rfc5965::ARFReport; use bounce_classify::BounceClass; use chrono::{DateTime, Utc}; +use kumo_address::host::HostAddress; use rfc5321::Response; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::borrow::Cow; use std::collections::HashMap; -use std::net::{IpAddr, SocketAddr}; +use std::net::SocketAddr; use uuid::Uuid; pub mod rfc3464; @@ -15,7 +16,7 @@ pub mod rfc5965; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ResolvedAddress { pub name: String, - pub addr: IpAddr, + pub addr: HostAddress, } impl std::fmt::Display for ResolvedAddress { diff --git a/crates/kumod/src/http_server/inject_v1.rs b/crates/kumod/src/http_server/inject_v1.rs index 16d4465c5..9791a40a7 100644 --- a/crates/kumod/src/http_server/inject_v1.rs +++ b/crates/kumod/src/http_server/inject_v1.rs @@ -569,7 +569,7 @@ async fn process_recipient<'a>( site: "", peer_address: Some(&ResolvedAddress { name: "".to_string(), - addr: peer_address, + addr: peer_address.into(), }), response: Response { code: 250, @@ -625,7 +625,7 @@ async fn queue_deferred( site: "", peer_address: Some(&ResolvedAddress { name: "".to_string(), - addr: peer_address, + addr: peer_address.into(), }), response: Response { code: 250, diff --git a/crates/kumod/src/logging/disposition.rs b/crates/kumod/src/logging/disposition.rs index eb1eb5bd5..77ec5ec83 100644 --- a/crates/kumod/src/logging/disposition.rs +++ b/crates/kumod/src/logging/disposition.rs @@ -221,7 +221,7 @@ pub async fn log_disposition(args: LogDisposition<'_>) { peer_address: Some(ResolvedAddress { name: report.per_message.reporting_mta.name.to_string(), addr: peer_address - .map(|a| a.addr) + .map(|a| a.addr.clone()) .unwrap_or_else(|| Ipv4Addr::UNSPECIFIED.into()), }), response: Response { diff --git a/crates/kumod/src/smtp_dispatcher.rs b/crates/kumod/src/smtp_dispatcher.rs index e8b378c24..676bbc6e5 100644 --- a/crates/kumod/src/smtp_dispatcher.rs +++ b/crates/kumod/src/smtp_dispatcher.rs @@ -169,27 +169,32 @@ impl SmtpDispatcher { } for addr in &addresses { - if path_config.prohibited_hosts.contains(addr.addr) { - dispatcher - .bulk_ready_queue_operation(Response { - code: 550, - enhanced_code: Some(EnhancedStatusCode { - class: 5, - subject: 4, - detail: 4, - }), - content: format!( - "{addr} is on the list of prohibited_hosts {:?}", - path_config.prohibited_hosts - ), - command: None, - }) - .await; - return Ok(None); + if let Some(ip) = addr.addr.ip() { + if path_config.prohibited_hosts.contains(ip) { + dispatcher + .bulk_ready_queue_operation(Response { + code: 550, + enhanced_code: Some(EnhancedStatusCode { + class: 5, + subject: 4, + detail: 4, + }), + content: format!( + "{addr} is on the list of prohibited_hosts {:?}", + path_config.prohibited_hosts + ), + command: None, + }) + .await; + return Ok(None); + } } } - addresses.retain(|addr| !path_config.skip_hosts.contains(addr.addr)); + addresses.retain(|addr| match addr.addr.ip() { + Some(ip) => !path_config.skip_hosts.contains(ip), + None => true, + }); if addresses.is_empty() { dispatcher @@ -304,7 +309,12 @@ impl SmtpDispatcher { tokio::spawn(async move { let (stream, source_address) = egress_source .connect_to( - SocketAddr::new(address.addr, port), + SocketAddr::new( + address.addr.ip().ok_or_else(|| { + anyhow::anyhow!("only ip addresses are currently supported") + })?, + port, + ), timeouts.connect_timeout, ) .await?; diff --git a/crates/kumod/src/smtp_server.rs b/crates/kumod/src/smtp_server.rs index eeeb77e30..7c85b3333 100644 --- a/crates/kumod/src/smtp_server.rs +++ b/crates/kumod/src/smtp_server.rs @@ -741,7 +741,7 @@ impl SmtpServer { meta: self.meta.clone_inner(), peer_address: ResolvedAddress { name: self.said_hello.as_deref().unwrap_or("").to_string(), - addr: self.peer_address.ip(), + addr: self.peer_address.ip().into(), }, response, sender, @@ -1773,7 +1773,7 @@ impl SmtpServer { site: "", peer_address: Some(&ResolvedAddress { name: self.said_hello.as_deref().unwrap_or("").to_string(), - addr: self.peer_address.ip(), + addr: self.peer_address.ip().into(), }), response: Response { code: 250,