diff --git a/Cargo.toml b/Cargo.toml index b3baf9c..1c7bf68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "std-embedded-nal" version = "0.1.3" authors = ["chrysn "] -edition = "2018" +edition = "2021" description = "Implementation of the `embedded-nal` traits for large devices that support the standard library" categories = ["embedded", "hardware-support"] @@ -12,15 +12,19 @@ repository = "https://gitlab.com/chrysn/std-embedded-nal" [dependencies] embedded-nal = "0.6" -embedded-nal-async = { version = "0.1", optional = true } +embedded-nal-async = { git = "https://github.com/rust-embedded-community/embedded-nal", optional = true } +embedded-io = { version = "0.4", optional = true, features = [ "std" ] } async-std = { version = "1.12", optional = true } # If enabled, these traits are implemented as well; they're experimental and # will hopefully wind up in alter embedded-nal versions, so enabling this has # no stability guarantees. embedded-nal-tcpextensions = { version = "0.1", optional = true } +async-io = "^1.9" +nix = { version = "0.25", features = [ "socket", "net", "uio" ] } + [features] -async = [ "embedded-nal-async", "async-std" ] +async = [ "embedded-nal-async", "async-std", "embedded-io" ] [dev-dependencies] mio = { version = "0.8", features = [ "os-ext" ] } diff --git a/examples/coapclient_async.rs b/examples/coapclient_async.rs index 61c8235..498cdd4 100644 --- a/examples/coapclient_async.rs +++ b/examples/coapclient_async.rs @@ -1,24 +1,24 @@ //! A brutally oversimplified CoAP client that GETs /.well-known/core from localhost:5683 -use embedded_nal_async::UdpClientStack; +use embedded_nal_async::{UdpStack, ConnectedUdp}; async fn run(stack: &mut S) -> Result<(), E> where E: core::fmt::Debug, // Might go away when MSRV goes up to 1.49, see https://github.com/rust-lang/rust/issues/80821 - S: UdpClientStack, + S: UdpStack, + S::Connected: ConnectedUdp, { let target = embedded_nal::SocketAddr::new( "::1".parse().unwrap(), 5683, ); - let mut sock = stack.socket().await?; - stack.connect(&mut sock, target).await?; + let (_local, mut sock) = stack.connect(target).await?; // Data, V1 NON no token, GET, message ID 0x0000, 2x Uri-Path - stack.send(&mut sock, b"\x50\x01\0\0\xbb.well-known\x04core").await?; + sock.send(b"\x50\x01\0\0\xbb.well-known\x04core").await?; let mut respbuf = [0; 1500]; - let (resplen, _) = stack.receive(&mut sock, &mut respbuf).await?; + let resplen = sock.receive_into(&mut respbuf).await?; let response = &respbuf[..resplen]; println!("Response: {}", String::from_utf8_lossy(response)); diff --git a/src/conversion.rs b/src/conversion.rs index a102d5d..a21780d 100644 --- a/src/conversion.rs +++ b/src/conversion.rs @@ -26,6 +26,31 @@ impl From for IpAddr { } } +// That this is missing zone info on IPv6 probably just makes std::net::IpAddr a bad intermediate +// type. +impl From for IpAddr { + fn from(input: nix::libc::in6_pktinfo) -> Self { + // FIXME why isn't this having zone infos?? + Self(input.ipi6_addr.s6_addr.into()) + } +} + +impl From for nix::libc::in6_pktinfo { + fn from(input: IpAddr) -> nix::libc::in6_pktinfo { + let input = match input.0 { + std::net::IpAddr::V6(a) => a, + _ => panic!("IPv6 only so far"), + }; + nix::libc::in6_pktinfo { + ipi6_addr: nix::libc::in6_addr { + s6_addr: input.octets(), + }, + // FIXME and here it really hurts + ipi6_ifindex: 0, + } + } +} + impl From for embedded_nal::IpAddr { fn from(s: IpAddr) -> embedded_nal::IpAddr { match s.0 { diff --git a/src/lib.rs b/src/lib.rs index 058d19e..b89a2ab 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,9 +9,7 @@ //! All implementations use `std::io::Error` as their error type. //! //! [embedded-nal]: https://crates.io/crates/embedded-nal -#![cfg_attr(feature = "async", feature(generic_associated_types))] // for implementing -async -#![cfg_attr(feature = "async", feature(type_alias_impl_trait))] // as async blocks - // don't provide types +#![cfg_attr(feature = "async", feature(async_fn_in_trait))] mod conversion; mod dns; diff --git a/src/udp_async.rs b/src/udp_async.rs index dfa5e4d..92c6131 100644 --- a/src/udp_async.rs +++ b/src/udp_async.rs @@ -6,138 +6,201 @@ //! the author is not sure whether that should be improved (would it make the future zero-sized), //! whether it can (as a rule of thumb, if it worked for nb it should support zero-sized futures) //! or whether it even makes a difference after an LTO pass. +//! +//! Known bugs: +//! * Excessive lengths are not reported correctly except for the recvmsg version. This would be +//! fixed by using recvmsg more widely. -use crate::conversion::SocketAddr; +use crate::conversion; use crate::SocketState; +use core::future::Future; use std::io::Error; -use std::net::{self, IpAddr, Ipv4Addr, Ipv6Addr}; -pub struct UdpSocket { - state: SocketState, -} +use std::os::unix::io::AsRawFd; +use std::os::unix::io::FromRawFd; -impl UdpSocket { - fn new() -> Self { - Self { - state: SocketState::new(), - } - } - - // Not providing as_raw_fd: The only reason this should be here is to enable async, which here - // is automatic. +pub struct ConnectedSocket(async_std::net::UdpSocket); +pub struct UniquelyBoundSocket { + socket: async_std::net::UdpSocket, + // By storing this, we avoid the whole recvmsg hell, which we can because there's really only + // one relevant address. (Alternatively, we could call `.local_addr()` over and over). + bound_address: embedded_nal::SocketAddr, +} +pub struct MultiplyBoundSocket { + socket: async_io::Async, + // Storing this so we can return a full SocketAddr, even though pktinfo doesn't provide that + // information + port: u16, } -impl embedded_nal_async::UdpClientStack for crate::Stack { - type UdpSocket = UdpSocket; +impl embedded_nal_async::UdpStack for crate::Stack { type Error = Error; - type SocketFuture<'m> = impl std::future::Future> where Self: 'm; - type ConnectFuture<'m> = impl std::future::Future> where Self: 'm; - type SendFuture<'m> = impl std::future::Future> + 'm - where - Self: 'm; - type ReceiveFuture<'m> = impl std::future::Future> + 'm - where - Self: 'm; - type CloseFuture<'m> = std::future::Ready> where Self: 'm; - - fn socket<'m>(&'m mut self) -> Self::SocketFuture<'m> { - std::future::ready(Ok(UdpSocket::new())) + type Connected = ConnectedSocket; + type UniquelyBound = UniquelyBoundSocket; + type MultiplyBound = MultiplyBoundSocket; + + async fn connect_from(&self, local: embedded_nal::SocketAddr, remote: embedded_nal::SocketAddr) -> Result<(embedded_nal::SocketAddr, Self::Connected), Self::Error> { + let sock = async_std::net::UdpSocket::bind( + async_std::net::SocketAddr::from(conversion::SocketAddr::from(local)) + ).await?; + + sock.connect( + async_std::net::SocketAddr::from(conversion::SocketAddr::from(remote)) + ) + .await?; + + let final_local = sock.local_addr()?; + + Ok(( + conversion::SocketAddr::from(final_local).into(), + ConnectedSocket(sock) + )) } - fn connect<'m>( - &'m mut self, - socket: &'m mut Self::UdpSocket, - remote: embedded_nal::SocketAddr, - ) -> Self::ConnectFuture<'m> { - async move { - let any = match remote { - embedded_nal::SocketAddr::V4(_) => { - net::SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0) - } - embedded_nal::SocketAddr::V6(_) => { - net::SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0) - } - }; - - let sock = async_std::net::UdpSocket::bind(any).await?; - - use std::net::ToSocketAddrs; - sock.connect(SocketAddr::from(remote).to_socket_addrs()? - .next() - .expect("Addresses converted from an embedded_nal address have exactly one socket address")) - .await?; - socket.state = SocketState::Connected(sock); - Ok(()) - } + async fn bind_single(&self, local: embedded_nal::SocketAddr) -> Result<(embedded_nal::SocketAddr, Self::UniquelyBound), Self::Error> { + let sock = async_std::net::UdpSocket::bind( + async_std::net::SocketAddr::from(conversion::SocketAddr::from(local)) + ).await?; + + let final_local = sock.local_addr()?; + let final_local = conversion::SocketAddr::from(final_local).into(); + + Ok(( + final_local, + UniquelyBoundSocket { socket: sock, bound_address: final_local } + )) } - fn send<'m>( - &'m mut self, - socket: &'m mut Self::UdpSocket, - buffer: &'m [u8], - ) -> Self::SendFuture<'m> { - async move { - let sock = socket.state.get_running()?; - sock.send(buffer).await.map(drop) + async fn bind_multiple(&self, local: embedded_nal::SocketAddr) -> Result { + let sock = async_std::net::UdpSocket::bind( + async_std::net::SocketAddr::from(conversion::SocketAddr::from(local)) + ).await?; + + // Due to https://github.com/async-rs/async-std/issues/1040 we have to leave the + // friendly async_std territory and are on our own now. + let sock: async_io::Async = unsafe { core::mem::transmute(sock) }; + let sock = sock.into_inner().unwrap(); + + nix::sys::socket::setsockopt( + sock.as_raw_fd(), + nix::sys::socket::sockopt::Ipv6RecvPacketInfo, + &true + )?; + + let mut local_port = local.port(); + if local_port == 0 { + local_port = sock.local_addr()?.port(); } + + let sock = async_io::Async::new(sock)?; + + Ok(MultiplyBoundSocket { socket: sock, port: local_port }) } +} - fn receive<'m>( - &'m mut self, - socket: &'m mut Self::UdpSocket, - buffer: &'m mut [u8], - ) -> Self::ReceiveFuture<'m> { - async move { - let sock = socket.state.get_any_mut()?; - sock.recv_from(buffer) - .await - .map(|(length, peer_addr)| (length, SocketAddr::from(peer_addr).into())) - } +impl embedded_nal_async::ConnectedUdp for ConnectedSocket { + type Error = Error; + + async fn send(&mut self, data: &[u8]) -> Result<(), Self::Error> { + let sent_len = self.0.send(data).await?; + assert!(sent_len == data.len(), "Datagram was not sent in a single operation"); + Ok(()) } - fn close<'m>(&'m mut self, _socket: Self::UdpSocket) -> Self::CloseFuture<'m> { - // No-op: Socket gets closed when it is freed - // - // Could wrap it in an Option, but really that'll only make things messier; users will - // probably drop the socket anyway after closing, and can't expect it to be usable with - // this API. - std::future::ready(Ok(())) + + async fn receive_into(& mut self, buffer: & mut [u8]) -> Result { + self.0.recv(buffer).await } } -impl embedded_nal_async::UdpFullStack for crate::Stack { - type BindFuture<'m> = impl std::future::Future> + 'm - where - Self: 'm; - type SendToFuture<'m> = impl std::future::Future> + 'm - where - Self: 'm; +impl embedded_nal_async::UnconnectedUdp for UniquelyBoundSocket { + type Error = Error; - fn bind<'m>(&'m mut self, socket: &'m mut UdpSocket, port: u16) -> Self::BindFuture<'m> { - async move { - let anyaddressthisport = async_std::net::SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port); + async fn send(&mut self, local: embedded_nal::SocketAddr, remote: embedded_nal::SocketAddr, data: &[u8]) -> Result<(), Self::Error> { + debug_assert!( + local == self.bound_address, + "A socket created from bind_single must always provide its original local address (or the one returned from a receive) in send" + ); + let remote: async_std::net::SocketAddr = conversion::SocketAddr::from(remote).into(); + let sent_len = self.socket.send_to(data, remote).await?; + assert!(sent_len == data.len(), "Datagram was not sent in a single operation"); + Ok(()) + } - let sock = async_std::net::UdpSocket::bind(anyaddressthisport).await?; + async fn receive_into(&mut self, buffer: &mut [u8]) -> Result<(usize, embedded_nal::SocketAddr, embedded_nal::SocketAddr), Self::Error> { + let (length, remote) = self.socket.recv_from(buffer).await?; + let remote = conversion::SocketAddr::from(remote).into(); + Ok((length, self.bound_address, remote)) + } +} - socket.state = SocketState::Bound(sock); - Ok(()) +impl embedded_nal_async::UnconnectedUdp for MultiplyBoundSocket { + type Error = Error; + + async fn send(&mut self, local: embedded_nal::SocketAddr, remote: embedded_nal::SocketAddr, data: &[u8]) -> Result<(), Self::Error> { + if local.port() != 0 { + debug_assert_eq!(local.port(), self.port, "Packets can only be sent from the locally bound to port"); } + let remote: async_std::net::SocketAddr = conversion::SocketAddr::from(remote).into(); + // Taking this step on foot due to https://github.com/nix-rust/nix/issues/1754 + // FIXME v6-only + let remote = match remote { + async_std::net::SocketAddr::V6(a) => a, + _ => panic!("Only IPv6 supported right now"), + }; + let remote = std::net::SocketAddrV6::from(remote); + let remote = nix::sys::socket::SockaddrIn6::from(remote); + let local_pktinfo = conversion::IpAddr::from(local.ip()).into(); + self.socket.write_with(|s| { + let sent_len = nix::sys::socket::sendmsg( + s.as_raw_fd(), + &[std::io::IoSlice::new(data)], + // FIXME this ignores the IP part of the local address + &[nix::sys::socket::ControlMessage::Ipv6PacketInfo(&local_pktinfo)], + nix::sys::socket::MsgFlags::empty(), + Some(&remote))?; + assert!(sent_len == data.len(), "Datagram was not sent in a single operation"); + Ok(()) + }).await } - fn send_to<'m>( - &'m mut self, - socket: &'m mut UdpSocket, - remote: embedded_nal::SocketAddr, - buffer: &'m [u8], - ) -> Self::SendToFuture<'m> { - async move { - let sock = socket.state.get_bound()?; - - use std::net::ToSocketAddrs; - sock.send_to(buffer, SocketAddr::from(remote).to_socket_addrs()? - .next() - .expect("Addresses converted from an embedded_nal address have exactly one socket address")) - .await - .map(drop) - } + async fn receive_into(&mut self, buffer: &mut [u8]) -> Result<(usize, embedded_nal::SocketAddr, embedded_nal::SocketAddr), Self::Error> { + let (length, remote, local) = self.socket.read_with(|s| { + let mut iov = [std::io::IoSliceMut::new(buffer)]; + let mut cmsg = nix::cmsg_space!(nix::libc::in6_pktinfo); + let received = nix::sys::socket::recvmsg( + s.as_raw_fd(), + &mut iov, + Some(&mut cmsg), + nix::sys::socket::MsgFlags::MSG_TRUNC, + ) + .map_err(Error::from)?; + let local; + if let Some(nix::sys::socket::ControlMessageOwned::Ipv6PacketInfo(pi)) = received.cmsgs().next() { + local = embedded_nal::SocketAddr::new( + conversion::IpAddr::from(pi).into(), + self.port, + ); + } else { + panic!("Operating system failed to send IPv6 packet info after acknowledging the socket option"); + } + Ok((received.bytes, received.address, local)) + }).await?; + + let remote: nix::sys::socket::SockaddrStorage = remote + .expect("recvmsg on UDP always returns a remote address"); + // Taking this step on foot due to https://github.com/nix-rust/nix/issues/1754 + let remote = remote.as_sockaddr_in6() + .expect("Right now this is IPv6 only"); + let remote = std::net::SocketAddr::V6( + std::net::SocketAddrV6::new( + remote.ip(), + remote.port(), + remote.flowinfo(), + remote.scope_id(), + )); + + // We could probably shorten things by going more directly from SockaddrLike + let remote = conversion::SocketAddr::from(remote).into(); + Ok((length, local, remote)) } } diff --git a/tests/udp_echo_async.rs b/tests/udp_echo_async.rs index 116ee06..a672134 100644 --- a/tests/udp_echo_async.rs +++ b/tests/udp_echo_async.rs @@ -1,22 +1,23 @@ #![cfg(feature = "async")] -async fn echo(stack: &mut impl embedded_nal_async::UdpFullStack, addr: &str) { - let addr: embedded_nal_async::SocketAddr = addr.parse().unwrap(); - let mut servsock = stack.socket().await.unwrap(); - let mut clisock = stack.socket().await.unwrap(); +use embedded_nal_async::{UdpStack, ConnectedUdp, UnconnectedUdp, SocketAddr}; - stack.bind(&mut servsock, addr.port()).await.unwrap(); - stack.connect(&mut clisock, addr).await.unwrap(); +async fn echo(stack: &mut impl UdpStack, addr: &str) { + let addr: SocketAddr = addr.parse().unwrap(); - stack.send(&mut clisock, b"ping").await.unwrap(); + let mut servsock = stack.bind_multiple(addr).await.unwrap(); + let (cli_local, mut clisock) = stack.connect(addr).await.unwrap(); + + clisock.send(b"ping").await.unwrap(); let mut buffer = [0u8; 10]; - let (received, cliaddr) = stack.receive(&mut servsock, &mut buffer).await.unwrap(); + let (received, servaddr, server_cliaddr) = servsock.receive_into(&mut buffer).await.unwrap(); assert_eq!(received, 4); assert_eq!(&buffer[..4], b"ping"); + assert_eq!(server_cliaddr, cli_local, "Client local and server remote address differ; NAT on loopback??"); - stack.send_to(&mut servsock, cliaddr, b"pong").await.unwrap(); + servsock.send(servaddr, server_cliaddr, b"pong").await.unwrap(); let mut buffer = [0u8; 10]; - let (received, _) = stack.receive(&mut clisock, &mut buffer).await.unwrap(); + let received = clisock.receive_into(&mut buffer).await.unwrap(); assert_eq!(received, 4); assert_eq!(&buffer[..4], b"pong"); }