Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

named-pipes: fix receiving IOCP events after deregister #1760

Merged
merged 3 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/sys/windows/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ impl Event {
pub(super) fn to_completion_status(&self) -> CompletionStatus {
CompletionStatus::new(self.flags, self.data as usize, std::ptr::null_mut())
}

#[cfg(feature = "os-ext")]
pub(super) fn to_completion_status_with_overlapped(
&self,
overlapped: *mut super::Overlapped,
) -> CompletionStatus {
CompletionStatus::new(self.flags, self.data as usize, overlapped)
}
}

pub(crate) const READABLE_FLAGS: u32 = afd::POLL_RECEIVE
Expand Down
83 changes: 71 additions & 12 deletions src/sys/windows/named_pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ struct Inner {
connect: Overlapped,
read: Overlapped,
write: Overlapped,
event: Overlapped,
// END NOTE.
handle: Handle,
connecting: AtomicBool,
Expand All @@ -110,10 +111,16 @@ impl Inner {

/// Same as [`ptr_from_conn_overlapped`] but for `Inner.write`.
unsafe fn ptr_from_write_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
// `read` is after `connect: Overlapped` and `read: Overlapped`.
// `write` is after `connect: Overlapped` and `read: Overlapped`.
(ptr as *mut Overlapped).wrapping_sub(2) as *const Inner
}

/// Same as [`ptr_from_conn_overlapped`] but for `Inner.event`.
unsafe fn ptr_from_event_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
// `event` is after `connect: Overlapped`, `read: Overlapped`, and `write: Overlapped`.
(ptr as *mut Overlapped).wrapping_sub(3) as *const Inner
}

/// Issue a connection request with the specified overlapped operation.
///
/// This function will issue a request to connect a client to this server,
Expand Down Expand Up @@ -478,6 +485,7 @@ impl FromRawHandle for NamedPipe {
connecting: AtomicBool::new(false),
read: Overlapped::new(read_done),
write: Overlapped::new(write_done),
event: Overlapped::new(event_done),
io: Mutex::new(Io {
cp: None,
token: None,
Expand Down Expand Up @@ -724,7 +732,7 @@ impl Inner {
// out the error.
Err(e) => {
io.read = State::Err(e);
io.notify_readable(events);
io.notify_readable(me, events);
true
}
}
Expand Down Expand Up @@ -787,7 +795,7 @@ impl Inner {
Ok(None) => (),
Err(e) => {
io.write = State::Err(e);
io.notify_writable(events);
io.notify_writable(me, events);
}
}
}
Expand All @@ -797,7 +805,7 @@ impl Inner {
#[allow(clippy::needless_option_as_deref)]
if Inner::schedule_read(me, &mut io, events.as_deref_mut()) {
if let State::None = io.write {
io.notify_writable(events);
io.notify_writable(me, events);
}
}
}
Expand Down Expand Up @@ -877,7 +885,7 @@ fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
}

// Flag our readiness that we've got data.
io.notify_readable(events);
io.notify_readable(&me, events);
}

fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
Expand All @@ -895,7 +903,7 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
// `Ok` here means, that the operation was completed immediately
// `bytes_transferred` is already reported to a client
State::Ok(..) => {
io.notify_writable(events);
io.notify_writable(&me, events);
return;
}
State::Pending(buf, pos) => (buf, pos),
Expand All @@ -909,20 +917,46 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
let new_pos = pos + (status.bytes_transferred() as usize);
if new_pos == buf.len() {
me.put_buffer(buf);
io.notify_writable(events);
io.notify_writable(&me, events);
} else {
Inner::schedule_write(&me, buf, new_pos, &mut io, events);
}
}
Err(e) => {
debug_assert_eq!(status.bytes_transferred(), 0);
io.write = State::Err(e);
io.notify_writable(events);
io.notify_writable(&me, events);
}
}
}
}

fn event_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
let status = CompletionStatus::from_entry(status);

// Acquire the `Arc<Inner>`. Note that we should be guaranteed that
// the refcount is available to us due to the `mem::forget` in
// `schedule_write` above.
let me = unsafe { Arc::from_raw(Inner::ptr_from_event_overlapped(status.overlapped())) };

let io = me.io.lock().unwrap();

// Make sure the I/O handle is still registered with the selector
if io.token.is_some() {
// This method is also called during `Selector::drop` to perform
// cleanup. In this case, `events` is `None` and we don't need to track
// the event.
if let Some(events) = events {
let mut ev = Event::from_completion_status(&status);
// Reverse the `.data` alteration done in `schedule_event`. This
// alteration was done so the selector recognized the event as one from
// a named pipe.
ev.data >>= 1;
events.push(ev);
}
}
}

impl Io {
fn check_association(&self, registry: &Registry, required: bool) -> io::Result<()> {
match self.cp {
Expand All @@ -938,28 +972,53 @@ impl Io {
}
}

fn notify_readable(&self, events: Option<&mut Vec<Event>>) {
fn notify_readable(&self, me: &Arc<Inner>, events: Option<&mut Vec<Event>>) {
if let Some(token) = self.token {
let mut ev = Event::new(token);
ev.set_readable();

if let Some(events) = events {
events.push(ev);
} else {
let _ = self.cp.as_ref().unwrap().post(ev.to_completion_status());
self.schedule_event(me, ev);
}
}
}

fn notify_writable(&self, events: Option<&mut Vec<Event>>) {
fn notify_writable(&self, me: &Arc<Inner>, events: Option<&mut Vec<Event>>) {
if let Some(token) = self.token {
let mut ev = Event::new(token);
ev.set_writable();

if let Some(events) = events {
events.push(ev);
} else {
let _ = self.cp.as_ref().unwrap().post(ev.to_completion_status());
self.schedule_event(me, ev);
}
}
}

fn schedule_event(&self, me: &Arc<Inner>, mut event: Event) {
// Alter the token so that the selector will identify the IOCP event as
// one for a named pipe. This will be reversed in `event_done`
//
// `data` for named pipes is an auto-incrementing counter. Because
// `data` is `u64` we do not risk losing the most-significant bit
// (unless a user creates 2^62 named pipes during the lifetime of the
// process).
event.data <<= 1;
Thomasdezeeuw marked this conversation as resolved.
Show resolved Hide resolved
event.data += 1;
Thomasdezeeuw marked this conversation as resolved.
Show resolved Hide resolved

let completion_status =
event.to_completion_status_with_overlapped(me.event.as_ptr() as *mut _);

match self.cp.as_ref().unwrap().post(completion_status) {
Ok(_) => {
// Increase the ref count of `Inner` for the completion event.
mem::forget(me.clone());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we could use Arc::increment_strong_count, but I don't know if we use it anywhere else, if not it's ok to keep the current code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the style in the rest of the file. It probably should be updated in one go.

}
Err(_) => {
// Nothing to do here
}
}
}
Expand Down