-
Notifications
You must be signed in to change notification settings - Fork 231
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for sendmmsg/recvmmsg #494
base: master
Are you sure you want to change the base?
Changes from all commits
3c6a36a
75eaa14
338edb2
1947781
d297957
15dd43d
3570d60
8edaeb7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -727,3 +727,211 @@ impl<'name, 'bufs, 'control> fmt::Debug for MsgHdrMut<'name, 'bufs, 'control> { | |
"MsgHdrMut".fmt(fmt) | ||
} | ||
} | ||
|
||
/// Configuration of a `sendmmsg(2)` system call. | ||
/// | ||
/// This wraps `mmsghdr` on Unix. Also see [`MmsgHdrMut`] for the variant used | ||
/// by `recvmmsg(2)`. | ||
#[cfg(all( | ||
unix, | ||
not(any( | ||
target_os = "redox", | ||
target_os = "solaris", | ||
target_os = "hurd", | ||
target_os = "vita", | ||
target_os = "illumos", | ||
target_vendor = "apple" | ||
)) | ||
))] | ||
pub struct MmsgHdr<'addr, 'bufs, 'control> { | ||
inner: Vec<sys::mmsghdr>, | ||
#[allow(clippy::type_complexity)] | ||
_lifetimes: PhantomData<(&'addr SockAddr, &'bufs IoSlice<'bufs>, &'control [u8])>, | ||
} | ||
|
||
#[cfg(all( | ||
unix, | ||
not(any( | ||
target_os = "redox", | ||
target_os = "solaris", | ||
target_os = "hurd", | ||
target_os = "vita", | ||
target_os = "illumos", | ||
target_vendor = "apple" | ||
)) | ||
))] | ||
impl<'addr, 'bufs, 'control> MmsgHdr<'addr, 'bufs, 'control> { | ||
/// Create a new `MmsgHdr` with all empty/zero fields. | ||
#[allow(clippy::new_without_default)] | ||
pub fn new(len: usize) -> MmsgHdr<'addr, 'bufs, 'control> { | ||
// SAFETY: all zero is valid for `mmsghdr` | ||
MmsgHdr { | ||
inner: vec![unsafe { mem::zeroed() }; len], | ||
_lifetimes: PhantomData, | ||
} | ||
} | ||
|
||
/// Set the addresses (name) of the message. | ||
/// | ||
/// Corresponds to setting `msg_name` and `msg_namelen`. | ||
pub fn with_addrs(mut self, addrs: &'addr [SockAddr]) -> Self { | ||
for (msg, addr) in self.inner.iter_mut().zip(addrs) { | ||
sys::set_msghdr_name(&mut msg.msg_hdr, addr); | ||
} | ||
self | ||
} | ||
|
||
/// Set the buffer(s) of the message. | ||
/// | ||
/// Corresponds to setting `msg_iov` and `msg_iovlen` on Unix. | ||
pub fn with_buffers(mut self, bufs: &'bufs [IoSlice<'_>]) -> Self { | ||
for (msg, buf) in self.inner.iter_mut().zip(bufs) { | ||
sys::set_msghdr_iov(&mut msg.msg_hdr, buf as *const _ as *mut libc::iovec, 1); | ||
} | ||
self | ||
} | ||
|
||
/// Set the control buffer of the messages. | ||
/// | ||
/// Corresponds to setting `msg_control` and `msg_controllen` on Unix. | ||
pub fn with_control(mut self, bufs: &'control [&'control [u8]]) -> Self { | ||
for (msg, buf) in self.inner.iter_mut().zip(bufs) { | ||
sys::set_msghdr_control(&mut msg.msg_hdr, buf.as_ptr() as *mut _, buf.len()) | ||
} | ||
self | ||
} | ||
|
||
/// Set the flags on the messages | ||
/// | ||
/// Corresponds to setting `msg_flags` on Unix. | ||
pub fn with_flags(mut self, flags: &[sys::c_int]) -> Self { | ||
for (msg, flags) in self.inner.iter_mut().zip(flags) { | ||
sys::set_msghdr_flags(&mut msg.msg_hdr, *flags); | ||
} | ||
self | ||
} | ||
} | ||
|
||
#[cfg(all( | ||
unix, | ||
not(any( | ||
target_os = "redox", | ||
target_os = "solaris", | ||
target_os = "hurd", | ||
target_os = "vita", | ||
target_os = "illumos", | ||
target_vendor = "apple" | ||
)) | ||
))] | ||
impl<'name, 'bufs, 'control> fmt::Debug for MmsgHdr<'name, 'bufs, 'control> { | ||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
"MmsgHdr".fmt(fmt) | ||
} | ||
} | ||
|
||
/// Configuration of a `recvmmsg(2)` system call | ||
/// | ||
/// This wraps `mmsghdr` on Unix. Also see [`MmsgHdr`] for the variant used by | ||
/// `sendmmsg(2)`. | ||
#[cfg(all( | ||
unix, | ||
not(any( | ||
target_os = "redox", | ||
target_os = "solaris", | ||
target_os = "hurd", | ||
target_os = "vita", | ||
target_os = "illumos", | ||
target_vendor = "apple" | ||
)) | ||
))] | ||
pub struct MmsgHdrMut<'addr, 'bufs, 'control> { | ||
inner: Vec<sys::mmsghdr>, | ||
#[allow(clippy::type_complexity)] | ||
_lifetimes: PhantomData<( | ||
&'addr mut SockAddr, | ||
&'bufs mut MaybeUninitSlice<'bufs>, | ||
&'control mut [u8], | ||
)>, | ||
} | ||
|
||
#[cfg(all( | ||
unix, | ||
not(any( | ||
target_os = "redox", | ||
target_os = "solaris", | ||
target_os = "hurd", | ||
target_os = "vita", | ||
target_os = "illumos", | ||
target_vendor = "apple" | ||
)) | ||
))] | ||
impl<'addr, 'bufs, 'control> MmsgHdrMut<'addr, 'bufs, 'control> { | ||
/// Create a new `MmsgHdrMut` with all empty/zero fields. | ||
#[allow(clippy::new_without_default)] | ||
pub fn new(len: usize) -> MmsgHdrMut<'addr, 'bufs, 'control> { | ||
// SAFETY: all zero is valid for `msghdr` and `WSAMSG`. | ||
MmsgHdrMut { | ||
inner: vec![unsafe { mem::zeroed() }; len], | ||
_lifetimes: PhantomData, | ||
} | ||
} | ||
|
||
/// Set the mutable address (name) of the message. | ||
/// | ||
/// Corresponds to setting `msg_name` and `msg_namelen` on Unix and `name` | ||
/// and `namelen` on Windows. | ||
pub fn with_addrs(mut self, addrs: &'addr mut [SockAddr]) -> Self { | ||
for (msg, addr) in self.inner.iter_mut().zip(addrs) { | ||
sys::set_msghdr_name(&mut msg.msg_hdr, addr); | ||
} | ||
self | ||
} | ||
|
||
/// Set the mutable buffer(s) of the message. | ||
/// | ||
/// Corresponds to setting `msg_iov` and `msg_iovlen` on Unix and `lpBuffers` | ||
/// and `dwBufferCount` on Windows. | ||
pub fn with_buffers(mut self, bufs: &'bufs mut [MaybeUninitSlice<'_>]) -> Self { | ||
for (msg, buf) in self.inner.iter_mut().zip(bufs) { | ||
sys::set_msghdr_iov(&mut msg.msg_hdr, buf as *mut _ as *mut libc::iovec, 1); | ||
} | ||
self | ||
} | ||
|
||
/// Set the mutable control buffer of the message. | ||
/// | ||
/// Corresponds to setting `msg_control` and `msg_controllen` on Unix and | ||
/// `Control` on Windows. | ||
pub fn with_control(mut self, buf: &'control mut [&'control mut [MaybeUninit<u8>]]) -> Self { | ||
for (msg, buf) in self.inner.iter_mut().zip(buf) { | ||
sys::set_msghdr_control(&mut msg.msg_hdr, buf.as_mut_ptr().cast(), buf.len()); | ||
} | ||
self | ||
} | ||
|
||
/// Returns the flags of the message. | ||
pub fn flags(&self, n: usize) -> Vec<RecvFlags> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think accessors are also needed for Rather than the 3 vec allocations it might also be nice to have a method to return all 3 at once so we'd at least have only 1, or to avoid it completely with an iterator based API. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. diff --git a/src/lib.rs b/src/lib.rs
index 6c49cd1..51ebcc2 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -923,7 +923,43 @@ impl<'addr, 'bufs, 'control> MmsgHdrMut<'addr, 'bufs, 'control> {
self
}
- /// Returns the flags of the message.
+ /// Returns the lengths, flags and control lengths of the messages.
+ pub fn results(&self, n: usize) -> Vec<(usize, RecvFlags, usize)> {
+ self.inner
+ .iter()
+ .take(n)
+ .map(|msg| {
+ (
+ msg.msg_len as _,
+ sys::msghdr_flags(&msg.msg_hdr),
+ sys::msghdr_control_len(&msg.msg_hdr),
+ )
+ })
+ .collect()
+ }
+
+ /// Extends the vec with the length, flags and control lengths of the messages.
+ /// This avoids the need to allocate a new vec on each use which affects `results`.
+ pub fn extend_with_results(&self, v: &mut Vec<(usize, RecvFlags, usize)>, n: usize) {
+ v.extend(self.inner.iter().take(n).map(|msg| {
+ (
+ msg.msg_len as _,
+ sys::msghdr_flags(&msg.msg_hdr),
+ sys::msghdr_control_len(&msg.msg_hdr),
+ )
+ }));
+ }
+
+ /// Returns the lengths of the messages.
+ pub fn lens(&self, n: usize) -> Vec<usize> {
+ self.inner
+ .iter()
+ .take(n)
+ .map(|msg| msg.msg_len as _)
+ .collect()
+ }
+
+ /// Returns the flags of the messages.
pub fn flags(&self, n: usize) -> Vec<RecvFlags> {
self.inner
.iter()
@@ -931,6 +967,15 @@ impl<'addr, 'bufs, 'control> MmsgHdrMut<'addr, 'bufs, 'control> {
.map(|msg| sys::msghdr_flags(&msg.msg_hdr))
.collect()
}
+
+ /// Returns the control lengths of the messages.
+ pub fn control_lens(&self, n: usize) -> Vec<usize> {
+ self.inner
+ .iter()
+ .take(n)
+ .map(|msg| sys::msghdr_control_len(&msg.msg_hdr))
+ .collect()
+ }
}
#[cfg(all( Is what I ended up with. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I quite like the idea of an iterator-based API. This lets the user choose if they want a Vec. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not too bad for You basically setup the largest number of buffers you want to receive at once, which is fine if you use them all, but often you'll receive some smaller number. The overhead of repopulating from scratch on each iteration is noticeable enough that it leads to populating a smaller number of potential receives than you otherwise would want to. Ideally you'd want to repopulate the buffers which did get used but reuse the ones which didn't. I've absolutely failed to make that work with the lifetimes involved, since the AFAICT the same lifetime issue exists whether the collection in Is there some technique where the lifetime of the (used) head can be split from the (unused) tail? Footnotes
|
||
self.inner | ||
.iter() | ||
.take(n) | ||
.map(|msg| sys::msghdr_flags(&msg.msg_hdr)) | ||
.collect() | ||
} | ||
} | ||
|
||
#[cfg(all( | ||
unix, | ||
not(any( | ||
target_os = "redox", | ||
target_os = "solaris", | ||
target_os = "hurd", | ||
target_os = "vita", | ||
target_os = "illumos", | ||
target_vendor = "apple" | ||
)) | ||
))] | ||
impl<'name, 'bufs, 'control> fmt::Debug for MmsgHdrMut<'name, 'bufs, 'control> { | ||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
"MmsgHdrMut".fmt(fmt) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about matching
mmsghdr
here? For exampleThis way the caller can decide if they want to use a
Vec
, a slice an array or something else.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Being able to avoid the allocation which the vec implies in the hot I/O loop when the batch size is
const
would be nice for sure.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see two options:
Socket::sendmmsg
function, but not forSocket::send_multiple_to
as it needs some memory layout adaptation