Skip to content

Commit

Permalink
TCP SNDBUF and RCVBUF
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Jul 17, 2024
1 parent 3255739 commit ad821d6
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 55 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ stop-token = "0.7.0"
syn = "2.0"
tide = "0.16.0"
token-cell = { version = "1.4.2", default-features = false }
tokio = { version = "1.35.1", default-features = false } # Default features are disabled due to some crates' requirements
tokio = { version = "1.38.0", default-features = false } # Default features are disabled due to some crates' requirements
tokio-util = "0.7.10"
tokio-tungstenite = "0.23.1"
tokio-rustls = { version = "0.26.0", default-features = false }
Expand Down
3 changes: 3 additions & 0 deletions io/zenoh-link-commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ use zenoh_result::ZResult;
/* GENERAL */
/*************************************/

// Common endpoint configuration options
pub const BIND_INTERFACE: &str = "iface";
pub const SO_SNDBUF: &str = "so_sndbuf";
pub const SO_RCVBUF: &str = "so_rcvbuf";

#[derive(Clone, Debug, Serialize, Hash, PartialEq, Eq)]
pub struct Link {
Expand Down
128 changes: 78 additions & 50 deletions io/zenoh-links/zenoh-link-tcp/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
// ZettaScale Zenoh Team, <[email protected]>4
//
use std::{cell::UnsafeCell, convert::TryInto, fmt, net::SocketAddr, sync::Arc, time::Duration};

Expand All @@ -21,7 +21,7 @@ use tokio::{
use tokio_util::sync::CancellationToken;
use zenoh_link_commons::{
get_ip_interface_names, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait,
ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE,
ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, SO_RCVBUF, SO_SNDBUF,
};
use zenoh_protocol::{
core::{EndPoint, Locator},
Expand Down Expand Up @@ -51,28 +51,6 @@ unsafe impl Sync for LinkUnicastTcp {}

impl LinkUnicastTcp {
fn new(socket: TcpStream, src_addr: SocketAddr, dst_addr: SocketAddr) -> LinkUnicastTcp {
// Set the TCP nodelay option
if let Err(err) = socket.set_nodelay(true) {
tracing::warn!(
"Unable to set NODEALY option on TCP link {} => {}: {}",
src_addr,
dst_addr,
err
);
}

// Set the TCP linger option
if let Err(err) = socket.set_linger(Some(Duration::from_secs(
(*TCP_LINGER_TIMEOUT).try_into().unwrap(),
))) {
tracing::warn!(
"Unable to set LINGER option on TCP link {} => {}: {}",
src_addr,
dst_addr,
err
);
}

// Compute the MTU
// See IETF RFC6691: https://datatracker.ietf.org/doc/rfc6691/
let header = match src_addr.ip() {
Expand Down Expand Up @@ -241,21 +219,68 @@ impl LinkManagerUnicastTcp {
}
}

#[derive(Clone, Debug, Default)]
struct LinkUnicastTcpConfig<'a> {
iface: Option<&'a str>,
so_sndbuf: Option<&'a str>,
so_rcvbuf: Option<&'a str>,
}

impl LinkManagerUnicastTcp {
async fn new_link_inner(
&self,
dst_addr: &SocketAddr,
iface: Option<&str>,
) -> ZResult<(TcpStream, SocketAddr, SocketAddr)> {
let socket = match dst_addr {
fn new_tcp_socket(addr: &SocketAddr, config: &LinkUnicastTcpConfig) -> ZResult<TcpSocket> {
let socket = match addr {
SocketAddr::V4(_) => TcpSocket::new_v4(),
SocketAddr::V6(_) => TcpSocket::new_v6(),
}?;

if let Some(iface) = iface {
if let Some(iface) = config.iface {
zenoh_util::net::set_bind_to_device_tcp_socket(&socket, iface)?;
}

// Set the TCP SNDBUF option
if let Some(sndbuf) = config.so_sndbuf.as_ref() {
let sndbuf: u32 = sndbuf.parse().map_err(|e| {
zerror!("Invalid SO_SNDBUF config option on TCP socket {addr:?}: {e}")
})?;
if let Err(err) = socket.set_send_buffer_size(sndbuf) {
tracing::warn!("Unable to set SO_SNDBUF option on TCP socket {addr:?}: {err}");
}
}

// Set the TCP RCVBUF option
if let Some(rcvbuf) = config.so_rcvbuf.as_ref() {
let rcvbuf: u32 = rcvbuf.parse().map_err(|e| {
zerror!("Invalid SO_RCVBUF config option on TCP socket {addr:?}: {e}")
})?;
if let Err(err) = socket.set_recv_buffer_size(rcvbuf) {
tracing::warn!("Unable to set SO_RCVBUF option on TCP socket {addr:?}: {err}");
}
}

// Set the TCP nodelay option
if let Err(err) = socket.set_nodelay(true) {
tracing::warn!("Unable to set TCP_NODELAY option on TCP socket {addr:?}: {err}");
}

// Set the TCP linger option
if let Err(err) = socket.set_linger(Some(Duration::from_secs(
(*TCP_LINGER_TIMEOUT).try_into().map_err(|e| {
zerror!("Invalid SO_LINGER config option on TCP socket {addr:?}: {e}")
})?,
))) {
tracing::warn!("Unable to set SO_LINGER option on TCP socket {addr:?}: {err}",);
}

Ok(socket)
}

async fn new_link_inner(
&self,
dst_addr: &SocketAddr,
config: &LinkUnicastTcpConfig<'_>,
) -> ZResult<(TcpStream, SocketAddr, SocketAddr)> {
let socket = Self::new_tcp_socket(dst_addr, config)?;

// Build a TcpStream from TcpSocket
// https://docs.rs/tokio/latest/tokio/net/struct.TcpSocket.html
let stream = socket
Expand All @@ -276,30 +301,25 @@ impl LinkManagerUnicastTcp {

async fn new_listener_inner(
&self,
addr: &SocketAddr,
iface: Option<&str>,
src_addr: &SocketAddr,
config: &LinkUnicastTcpConfig<'_>,
) -> ZResult<(TcpListener, SocketAddr)> {
let socket = match addr {
SocketAddr::V4(_) => TcpSocket::new_v4(),
SocketAddr::V6(_) => TcpSocket::new_v6(),
}?;

if let Some(iface) = iface {
zenoh_util::net::set_bind_to_device_tcp_socket(&socket, iface)?;
}
let socket = Self::new_tcp_socket(src_addr, config)?;

// Build a TcpListener from TcpSocket
// https://docs.rs/tokio/latest/tokio/net/struct.TcpSocket.html
socket.set_reuseaddr(true)?;
socket.bind(*addr).map_err(|e| zerror!("{}: {}", addr, e))?;
socket
.bind(*src_addr)
.map_err(|e| zerror!("{}: {}", src_addr, e))?;
// backlog (the maximum number of pending connections are queued): 1024
let listener = socket
.listen(1024)
.map_err(|e| zerror!("{}: {}", addr, e))?;
.map_err(|e| zerror!("{}: {}", src_addr, e))?;

let local_addr = listener
.local_addr()
.map_err(|e| zerror!("{}: {}", addr, e))?;
.map_err(|e| zerror!("{}: {}", src_addr, e))?;

Ok((listener, local_addr))
}
Expand All @@ -309,12 +329,16 @@ impl LinkManagerUnicastTcp {
impl LinkManagerUnicastTrait for LinkManagerUnicastTcp {
async fn new_link(&self, endpoint: EndPoint) -> ZResult<LinkUnicast> {
let dst_addrs = get_tcp_addrs(endpoint.address()).await?;
let config = endpoint.config();
let iface = config.get(BIND_INTERFACE);
let c = endpoint.config();
let config = LinkUnicastTcpConfig {
iface: c.get(BIND_INTERFACE),
so_rcvbuf: c.get(SO_RCVBUF),
so_sndbuf: c.get(SO_SNDBUF),
};

let mut errs: Vec<ZError> = vec![];
for da in dst_addrs {
match self.new_link_inner(&da, iface).await {
match self.new_link_inner(&da, &config).await {
Ok((stream, src_addr, dst_addr)) => {
let link = Arc::new(LinkUnicastTcp::new(stream, src_addr, dst_addr));
return Ok(LinkUnicast(link));
Expand All @@ -338,12 +362,16 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp {

async fn new_listener(&self, mut endpoint: EndPoint) -> ZResult<Locator> {
let addrs = get_tcp_addrs(endpoint.address()).await?;
let config = endpoint.config();
let iface = config.get(BIND_INTERFACE);
let c = endpoint.config();
let config = LinkUnicastTcpConfig {
iface: c.get(BIND_INTERFACE),
so_rcvbuf: c.get(SO_RCVBUF),
so_sndbuf: c.get(SO_SNDBUF),
};

let mut errs: Vec<ZError> = vec![];
for da in addrs {
match self.new_listener_inner(&da, iface).await {
match self.new_listener_inner(&da, &config).await {
Ok((socket, local_addr)) => {
// Update the endpoint locator address
endpoint = EndPoint::new(
Expand Down

0 comments on commit ad821d6

Please sign in to comment.