diff --git a/Cargo.lock b/Cargo.lock index 44002406fa..3b8099add4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4424,9 +4424,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.36.0" +version = "1.38.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" +checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a" dependencies = [ "backtrace", "bytes", @@ -4441,9 +4441,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index ce8ae1643d..b623f1c425 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -164,7 +164,7 @@ stop-token = "0.7.0" syn = "2.0" tide = "0.16.0" token-cell = { version = "1.4.2", default-features = false } -tokio = { version = "1.35.1", default-features = false } # Default features are disabled due to some crates' requirements +tokio = { version = "1.38.0", default-features = false } # Default features are disabled due to some crates' requirements tokio-util = "0.7.10" tokio-tungstenite = "0.23.1" tokio-rustls = { version = "0.26.0", default-features = false } diff --git a/io/zenoh-link-commons/src/lib.rs b/io/zenoh-link-commons/src/lib.rs index 56d99806a2..7a56f2448b 100644 --- a/io/zenoh-link-commons/src/lib.rs +++ b/io/zenoh-link-commons/src/lib.rs @@ -39,7 +39,10 @@ use zenoh_result::ZResult; /* GENERAL */ /*************************************/ +// Common endpoint configuration options pub const BIND_INTERFACE: &str = "iface"; +pub const SO_SNDBUF: &str = "so_sndbuf"; +pub const SO_RCVBUF: &str = "so_rcvbuf"; #[derive(Clone, Debug, Serialize, Hash, PartialEq, Eq)] pub struct Link { diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 7532055f8e..49580d78aa 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -9,7 +9,7 @@ // SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 // // Contributors: -// ZettaScale Zenoh Team, +// ZettaScale Zenoh Team, 4 // use std::{cell::UnsafeCell, convert::TryInto, fmt, net::SocketAddr, sync::Arc, time::Duration}; @@ -21,7 +21,7 @@ use tokio::{ use tokio_util::sync::CancellationToken; use zenoh_link_commons::{ get_ip_interface_names, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, - ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, + ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, SO_RCVBUF, SO_SNDBUF, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -51,28 +51,6 @@ unsafe impl Sync for LinkUnicastTcp {} impl LinkUnicastTcp { fn new(socket: TcpStream, src_addr: SocketAddr, dst_addr: SocketAddr) -> LinkUnicastTcp { - // Set the TCP nodelay option - if let Err(err) = socket.set_nodelay(true) { - tracing::warn!( - "Unable to set NODEALY option on TCP link {} => {}: {}", - src_addr, - dst_addr, - err - ); - } - - // Set the TCP linger option - if let Err(err) = socket.set_linger(Some(Duration::from_secs( - (*TCP_LINGER_TIMEOUT).try_into().unwrap(), - ))) { - tracing::warn!( - "Unable to set LINGER option on TCP link {} => {}: {}", - src_addr, - dst_addr, - err - ); - } - // Compute the MTU // See IETF RFC6691: https://datatracker.ietf.org/doc/rfc6691/ let header = match src_addr.ip() { @@ -241,21 +219,68 @@ impl LinkManagerUnicastTcp { } } +#[derive(Clone, Debug, Default)] +struct LinkUnicastTcpConfig<'a> { + iface: Option<&'a str>, + so_sndbuf: Option<&'a str>, + so_rcvbuf: Option<&'a str>, +} + impl LinkManagerUnicastTcp { - async fn new_link_inner( - &self, - dst_addr: &SocketAddr, - iface: Option<&str>, - ) -> ZResult<(TcpStream, SocketAddr, SocketAddr)> { - let socket = match dst_addr { + fn new_tcp_socket(addr: &SocketAddr, config: &LinkUnicastTcpConfig) -> ZResult { + let socket = match addr { SocketAddr::V4(_) => TcpSocket::new_v4(), SocketAddr::V6(_) => TcpSocket::new_v6(), }?; - if let Some(iface) = iface { + if let Some(iface) = config.iface { zenoh_util::net::set_bind_to_device_tcp_socket(&socket, iface)?; } + // Set the TCP SNDBUF option + if let Some(sndbuf) = config.so_sndbuf.as_ref() { + let sndbuf: u32 = sndbuf.parse().map_err(|e| { + zerror!("Invalid SO_SNDBUF config option on TCP socket {addr:?}: {e}") + })?; + if let Err(err) = socket.set_send_buffer_size(sndbuf) { + tracing::warn!("Unable to set SO_SNDBUF option on TCP socket {addr:?}: {err}"); + } + } + + // Set the TCP RCVBUF option + if let Some(rcvbuf) = config.so_rcvbuf.as_ref() { + let rcvbuf: u32 = rcvbuf.parse().map_err(|e| { + zerror!("Invalid SO_RCVBUF config option on TCP socket {addr:?}: {e}") + })?; + if let Err(err) = socket.set_recv_buffer_size(rcvbuf) { + tracing::warn!("Unable to set SO_RCVBUF option on TCP socket {addr:?}: {err}"); + } + } + + // Set the TCP nodelay option + if let Err(err) = socket.set_nodelay(true) { + tracing::warn!("Unable to set TCP_NODELAY option on TCP socket {addr:?}: {err}"); + } + + // Set the TCP linger option + if let Err(err) = socket.set_linger(Some(Duration::from_secs( + (*TCP_LINGER_TIMEOUT).try_into().map_err(|e| { + zerror!("Invalid SO_LINGER config option on TCP socket {addr:?}: {e}") + })?, + ))) { + tracing::warn!("Unable to set SO_LINGER option on TCP socket {addr:?}: {err}",); + } + + Ok(socket) + } + + async fn new_link_inner( + &self, + dst_addr: &SocketAddr, + config: &LinkUnicastTcpConfig<'_>, + ) -> ZResult<(TcpStream, SocketAddr, SocketAddr)> { + let socket = Self::new_tcp_socket(dst_addr, config)?; + // Build a TcpStream from TcpSocket // https://docs.rs/tokio/latest/tokio/net/struct.TcpSocket.html let stream = socket @@ -276,30 +301,25 @@ impl LinkManagerUnicastTcp { async fn new_listener_inner( &self, - addr: &SocketAddr, - iface: Option<&str>, + src_addr: &SocketAddr, + config: &LinkUnicastTcpConfig<'_>, ) -> ZResult<(TcpListener, SocketAddr)> { - let socket = match addr { - SocketAddr::V4(_) => TcpSocket::new_v4(), - SocketAddr::V6(_) => TcpSocket::new_v6(), - }?; - - if let Some(iface) = iface { - zenoh_util::net::set_bind_to_device_tcp_socket(&socket, iface)?; - } + let socket = Self::new_tcp_socket(src_addr, config)?; // Build a TcpListener from TcpSocket // https://docs.rs/tokio/latest/tokio/net/struct.TcpSocket.html socket.set_reuseaddr(true)?; - socket.bind(*addr).map_err(|e| zerror!("{}: {}", addr, e))?; + socket + .bind(*src_addr) + .map_err(|e| zerror!("{}: {}", src_addr, e))?; // backlog (the maximum number of pending connections are queued): 1024 let listener = socket .listen(1024) - .map_err(|e| zerror!("{}: {}", addr, e))?; + .map_err(|e| zerror!("{}: {}", src_addr, e))?; let local_addr = listener .local_addr() - .map_err(|e| zerror!("{}: {}", addr, e))?; + .map_err(|e| zerror!("{}: {}", src_addr, e))?; Ok((listener, local_addr)) } @@ -309,12 +329,16 @@ impl LinkManagerUnicastTcp { impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { async fn new_link(&self, endpoint: EndPoint) -> ZResult { let dst_addrs = get_tcp_addrs(endpoint.address()).await?; - let config = endpoint.config(); - let iface = config.get(BIND_INTERFACE); + let c = endpoint.config(); + let config = LinkUnicastTcpConfig { + iface: c.get(BIND_INTERFACE), + so_rcvbuf: c.get(SO_RCVBUF), + so_sndbuf: c.get(SO_SNDBUF), + }; let mut errs: Vec = vec![]; for da in dst_addrs { - match self.new_link_inner(&da, iface).await { + match self.new_link_inner(&da, &config).await { Ok((stream, src_addr, dst_addr)) => { let link = Arc::new(LinkUnicastTcp::new(stream, src_addr, dst_addr)); return Ok(LinkUnicast(link)); @@ -338,12 +362,16 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { async fn new_listener(&self, mut endpoint: EndPoint) -> ZResult { let addrs = get_tcp_addrs(endpoint.address()).await?; - let config = endpoint.config(); - let iface = config.get(BIND_INTERFACE); + let c = endpoint.config(); + let config = LinkUnicastTcpConfig { + iface: c.get(BIND_INTERFACE), + so_rcvbuf: c.get(SO_RCVBUF), + so_sndbuf: c.get(SO_SNDBUF), + }; let mut errs: Vec = vec![]; for da in addrs { - match self.new_listener_inner(&da, iface).await { + match self.new_listener_inner(&da, &config).await { Ok((socket, local_addr)) => { // Update the endpoint locator address endpoint = EndPoint::new(