Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce allocations in poll(2) implementation #1705

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 61 additions & 53 deletions src/sys/unix/selector/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@
// Permission to use this code has been granted by original author:
// https://github.com/tokio-rs/mio/pull/1602#issuecomment-1218441031

use crate::sys::unix::selector::LOWEST_FD;
use crate::sys::unix::waker::WakerInternal;
use crate::{Interest, Token};
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::mem::swap;
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
use std::{cmp, fmt, io};

use crate::sys::unix::selector::LOWEST_FD;
use crate::sys::unix::waker::WakerInternal;
use crate::{Interest, Token};

/// Unique id for use as `SelectorId`.
#[cfg(debug_assertions)]
static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
Expand All @@ -25,34 +26,36 @@ pub struct Selector {
/// Whether this selector currently has an associated waker.
#[cfg(debug_assertions)]
has_waker: AtomicBool,
/// Buffer used in [`SelectorState::select`].
fd_bufs0: Vec<RawFd>,
fd_bufs1: Vec<RawFd>,
}

impl Selector {
pub fn new() -> io::Result<Selector> {
let state = SelectorState::new()?;

Ok(Selector {
state: Arc::new(state),
state: Arc::new(SelectorState::new()?),
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(false),
fd_bufs0: Vec::new(),
fd_bufs1: Vec::new(),
})
}

pub fn try_clone(&self) -> io::Result<Selector> {
// Just to keep the compiler happy :)
let _ = LOWEST_FD;

let state = self.state.clone();

let _ = LOWEST_FD; // Just to keep the compiler happy :)
Ok(Selector {
state,
state: self.state.clone(),
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
fd_bufs0: Vec::new(),
fd_bufs1: Vec::new(),
})
}

pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
self.state.select(events, timeout)
pub fn select(&mut self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
self.state
.select(events, timeout, &mut self.fd_bufs0, &mut self.fd_bufs1)
}

pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
Expand Down Expand Up @@ -85,6 +88,7 @@ impl Selector {
pub fn wake(&self, token: Token) -> io::Result<()> {
self.state.wake(token)
}

cfg_io_source! {
#[cfg(debug_assertions)]
pub fn id(&self) -> usize {
Expand All @@ -98,33 +102,30 @@ impl Selector {
struct SelectorState {
/// File descriptors to poll.
fds: Mutex<Fds>,

/// File descriptors which will be removed before the next poll call.
///
/// When a file descriptor is deregistered while a poll is running, we need to filter
/// out all removed descriptors after that poll is finished running.
/// When a file descriptor is deregistered while a poll is running, we need
/// to filter out all removed descriptors after that poll is finished
/// running.
pending_removal: Mutex<Vec<RawFd>>,

/// Token associated with Waker that have recently asked to wake. This will
/// cause a synthetic behaviour where on any wakeup we add all pending tokens
/// to the list of emitted events.
/// cause a synthetic behaviour where on any wakeup we add all pending
/// tokens to the list of emitted events.
pending_wake_token: Mutex<Option<Token>>,

/// Data is written to this to wake up the current instance of `wait`, which can occur when the
/// user notifies it (in which case `notified` would have been set) or when an operation needs
/// to occur (in which case `waiting_operations` would have been incremented).
/// Data is written to this to wake up the current instance of `wait`, which
/// can occur when the user notifies it (in which case `notified` would have
/// been set) or when an operation needs to occur (in which case
/// `waiting_operations` would have been incremented).
notify_waker: WakerInternal,

/// The number of operations (`add`, `modify` or `delete`) that are currently waiting on the
/// mutex to become free. When this is nonzero, `wait` must be suspended until it reaches zero
/// again.
/// The number of operations (`add`, `modify` or `delete`) that are
/// currently waiting on the mutex to become free. When this is nonzero,
/// `wait` must be suspended until it reaches zero again.
waiting_operations: AtomicUsize,
/// The condition variable that gets notified when `waiting_operations` reaches zero or
/// `notified` becomes true.
/// The condition variable that gets notified when `waiting_operations`
/// reaches zero or `notified` becomes true.
///
/// This is used with the `fds` mutex.
operations_complete: Condvar,

/// This selectors id.
#[cfg(debug_assertions)]
#[allow(dead_code)]
Expand All @@ -136,15 +137,16 @@ struct SelectorState {
struct Fds {
/// The list of `pollfds` taken by poll.
///
/// The first file descriptor is always present and is used to notify the poller.
/// The first file descriptor is always present and is used to notify the
/// poller.
poll_fds: Vec<PollFd>,
/// The map of each file descriptor to data associated with it. This does not include the file
/// descriptors created by the internal notify waker.
/// The map of each file descriptor to data associated with it. This does
/// not include the file descriptors created by the internal notify waker.
fd_data: HashMap<RawFd, FdData>,
}

/// Transparent wrapper around `libc::pollfd`, used to support `Debug` derives without adding the
/// `extra_traits` feature of `libc`.
/// Transparent wrapper around `libc::pollfd`, used to support `Debug` derives
/// without adding the `extra_traits` feature of `libc`.
#[repr(transparent)]
#[derive(Clone)]
struct PollFd(libc::pollfd);
Expand All @@ -166,16 +168,15 @@ struct FdData {
poll_fds_index: usize,
/// The key of the `Event` associated with this file descriptor.
token: Token,
/// Used to communicate with IoSourceState when we need to internally deregister
/// based on a closed fd.
/// Used to communicate with IoSourceState when we need to internally
/// deregister based on a closed fd.
shared_record: Arc<RegistrationRecord>,
}

impl SelectorState {
pub fn new() -> io::Result<SelectorState> {
let notify_waker = WakerInternal::new()?;

Ok(Self {
Ok(SelectorState {
fds: Mutex::new(Fds {
poll_fds: vec![PollFd(libc::pollfd {
fd: notify_waker.as_raw_fd(),
Expand All @@ -194,17 +195,19 @@ impl SelectorState {
})
}

pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
pub fn select(
&self,
events: &mut Events,
timeout: Option<Duration>,
closed_raw_fds: &mut Vec<RawFd>,
pending_removal: &mut Vec<RawFd>,
) -> io::Result<()> {
events.clear();
closed_raw_fds.clear();
pending_removal.clear();

let mut fds = self.fds.lock().unwrap();

// Keep track of fds that receive POLLHUP or POLLERR (i.e. won't receive further
// events) and internally deregister them before they are externally deregister'd. See
// IoSourceState below to track how the external deregister call will be handled
// when this state occurs.
let mut closed_raw_fds = Vec::new();

loop {
// Complete all current operations.
loop {
Expand Down Expand Up @@ -240,7 +243,7 @@ impl SelectorState {
// We now check whether this poll was performed with descriptors which were pending
// for removal and filter out any matching.
let mut pending_removal_guard = self.pending_removal.lock().unwrap();
let mut pending_removal = std::mem::replace(pending_removal_guard.as_mut(), Vec::new());
swap(pending_removal_guard.as_mut(), pending_removal);
drop(pending_removal_guard);

// Store the events if there were any.
Expand All @@ -258,7 +261,7 @@ impl SelectorState {
}

for fd_data in fds.fd_data.values_mut() {
let PollFd(poll_fd) = &mut fds.poll_fds[fd_data.poll_fds_index];
let poll_fd = &mut fds.poll_fds[fd_data.poll_fds_index].0;

if pending_removal.contains(&poll_fd.fd) {
// Fd was removed while poll was running
Expand All @@ -272,6 +275,12 @@ impl SelectorState {
events: poll_fd.revents,
});

// Keep track of fds that receive POLLHUP or POLLERR
// (i.e. won't receive further events) and internally
// deregister them before they are externally
// deregistered. See IoSourceState below to track how
// the external deregister call will be handled when
// this state occurs.
if poll_fd.revents & (libc::POLLHUP | libc::POLLERR) != 0 {
pending_removal.push(poll_fd.fd);
closed_raw_fds.push(poll_fd.fd);
Expand Down Expand Up @@ -299,7 +308,6 @@ impl SelectorState {

drop(fds);
let _ = self.deregister_all(&closed_raw_fds);

Ok(())
}

Expand Down Expand Up @@ -665,7 +673,7 @@ cfg_io_source! {

pub fn do_io<T, F, R>(&self, f: F, io: &T) -> io::Result<R>
where
F: FnOnce(&T) -> io::Result<R>,
F: FnOnce(&T) -> io::Result<R>,
{
let result = f(io);

Expand Down