From 10ff21b3645eff7d6e20c446e30c84f001eb4221 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 29 Feb 2024 13:36:33 -0800 Subject: [PATCH 1/3] named-pipes: fix receiving IOCP events after deregister --- src/sys/windows/event.rs | 7 ++++ src/sys/windows/named_pipe.rs | 78 +++++++++++++++++++++++++++++------ 2 files changed, 73 insertions(+), 12 deletions(-) diff --git a/src/sys/windows/event.rs b/src/sys/windows/event.rs index 731bd6067..8ce096c1b 100644 --- a/src/sys/windows/event.rs +++ b/src/sys/windows/event.rs @@ -41,6 +41,13 @@ impl Event { pub(super) fn to_completion_status(&self) -> CompletionStatus { CompletionStatus::new(self.flags, self.data as usize, std::ptr::null_mut()) } + + pub(super) fn to_completion_status_with_overlapped( + &self, + overlapped: *mut super::Overlapped, + ) -> CompletionStatus { + CompletionStatus::new(self.flags, self.data as usize, overlapped) + } } pub(crate) const READABLE_FLAGS: u32 = afd::POLL_RECEIVE diff --git a/src/sys/windows/named_pipe.rs b/src/sys/windows/named_pipe.rs index 23f85d1eb..8ec075d5a 100644 --- a/src/sys/windows/named_pipe.rs +++ b/src/sys/windows/named_pipe.rs @@ -84,6 +84,7 @@ struct Inner { connect: Overlapped, read: Overlapped, write: Overlapped, + event: Overlapped, // END NOTE. handle: Handle, connecting: AtomicBool, @@ -110,10 +111,16 @@ impl Inner { /// Same as [`ptr_from_conn_overlapped`] but for `Inner.write`. unsafe fn ptr_from_write_overlapped(ptr: *mut OVERLAPPED) -> *const Inner { - // `read` is after `connect: Overlapped` and `read: Overlapped`. + // `write` is after `connect: Overlapped` and `read: Overlapped`. (ptr as *mut Overlapped).wrapping_sub(2) as *const Inner } + /// Same as [`ptr_from_conn_overlapped`] but for `Inner.event`. + unsafe fn ptr_from_event_overlapped(ptr: *mut OVERLAPPED) -> *const Inner { + // `event` is after `connect: Overlapped`, `read: Overlapped`, and `write: Overlapped`. + (ptr as *mut Overlapped).wrapping_sub(3) as *const Inner + } + /// Issue a connection request with the specified overlapped operation. /// /// This function will issue a request to connect a client to this server, @@ -478,6 +485,7 @@ impl FromRawHandle for NamedPipe { connecting: AtomicBool::new(false), read: Overlapped::new(read_done), write: Overlapped::new(write_done), + event: Overlapped::new(event_done), io: Mutex::new(Io { cp: None, token: None, @@ -724,7 +732,7 @@ impl Inner { // out the error. Err(e) => { io.read = State::Err(e); - io.notify_readable(events); + io.notify_readable(me, events); true } } @@ -787,7 +795,7 @@ impl Inner { Ok(None) => (), Err(e) => { io.write = State::Err(e); - io.notify_writable(events); + io.notify_writable(me, events); } } } @@ -797,7 +805,7 @@ impl Inner { #[allow(clippy::needless_option_as_deref)] if Inner::schedule_read(me, &mut io, events.as_deref_mut()) { if let State::None = io.write { - io.notify_writable(events); + io.notify_writable(me, events); } } } @@ -877,7 +885,7 @@ fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec>) { } // Flag our readiness that we've got data. - io.notify_readable(events); + io.notify_readable(&me, events); } fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec>) { @@ -895,7 +903,7 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec>) { // `Ok` here means, that the operation was completed immediately // `bytes_transferred` is already reported to a client State::Ok(..) => { - io.notify_writable(events); + io.notify_writable(&me, events); return; } State::Pending(buf, pos) => (buf, pos), @@ -909,7 +917,7 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec>) { let new_pos = pos + (status.bytes_transferred() as usize); if new_pos == buf.len() { me.put_buffer(buf); - io.notify_writable(events); + io.notify_writable(&me, events); } else { Inner::schedule_write(&me, buf, new_pos, &mut io, events); } @@ -917,12 +925,38 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec>) { Err(e) => { debug_assert_eq!(status.bytes_transferred(), 0); io.write = State::Err(e); - io.notify_writable(events); + io.notify_writable(&me, events); } } } } +fn event_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec>) { + let status = CompletionStatus::from_entry(status); + + // Acquire the `Arc`. Note that we should be guaranteed that + // the refcount is available to us due to the `mem::forget` in + // `schedule_write` above. + let me = unsafe { Arc::from_raw(Inner::ptr_from_event_overlapped(status.overlapped())) }; + + let io = me.io.lock().unwrap(); + + // Make sure the I/O handle is still registered with the selector + if io.token.is_some() { + // This method is also called during `Selector::drop` to perform + // cleanup. In this case, `events` is `None` and we don't need to track + // the event. + if let Some(events) = events { + let mut ev = Event::from_completion_status(&status); + // Reverse the `.data` alteration done in `schedule_event`. This + // alteration was done so the selector recognized the event as one from + // a named pipe. + ev.data >>= 1; + events.push(ev); + } + } +} + impl Io { fn check_association(&self, registry: &Registry, required: bool) -> io::Result<()> { match self.cp { @@ -938,7 +972,7 @@ impl Io { } } - fn notify_readable(&self, events: Option<&mut Vec>) { + fn notify_readable(&self, me: &Arc, events: Option<&mut Vec>) { if let Some(token) = self.token { let mut ev = Event::new(token); ev.set_readable(); @@ -946,12 +980,12 @@ impl Io { if let Some(events) = events { events.push(ev); } else { - let _ = self.cp.as_ref().unwrap().post(ev.to_completion_status()); + self.schedule_event(me, ev); } } } - fn notify_writable(&self, events: Option<&mut Vec>) { + fn notify_writable(&self, me: &Arc, events: Option<&mut Vec>) { if let Some(token) = self.token { let mut ev = Event::new(token); ev.set_writable(); @@ -959,7 +993,27 @@ impl Io { if let Some(events) = events { events.push(ev); } else { - let _ = self.cp.as_ref().unwrap().post(ev.to_completion_status()); + self.schedule_event(me, ev); + } + } + } + + fn schedule_event(&self, me: &Arc, mut event: Event) { + // Alter the token so that the selector will identify the IOCP event as + // one for a named pipe. This will be reversed in `event_done` + event.data <<= 1; + event.data += 1; + + let completion_status = + event.to_completion_status_with_overlapped(me.event.as_ptr() as *mut _); + + match self.cp.as_ref().unwrap().post(completion_status) { + Ok(_) => { + // Increase the ref count of `Inner` for the completion event. + mem::forget(me.clone()); + } + Err(_) => { + // Nothing to do here } } } From 4d8c29849056a9f43b5f85d900a54f11dcf86ac5 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 1 Mar 2024 09:58:47 -0800 Subject: [PATCH 2/3] add an extra comment about the event.data shift --- src/sys/windows/named_pipe.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/sys/windows/named_pipe.rs b/src/sys/windows/named_pipe.rs index 8ec075d5a..eb35d3797 100644 --- a/src/sys/windows/named_pipe.rs +++ b/src/sys/windows/named_pipe.rs @@ -1001,6 +1001,11 @@ impl Io { fn schedule_event(&self, me: &Arc, mut event: Event) { // Alter the token so that the selector will identify the IOCP event as // one for a named pipe. This will be reversed in `event_done` + // + // `data` for named pipes is an auto-incrementing counter. Because + // `data` is `u64` we do not risk losing the most-significant bit + // (unless a user creates 2^62 named pipes during the lifetime of the + // process). event.data <<= 1; event.data += 1; From 931b581a7a81a85fe683de25baf5cb46e03b266f Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 1 Mar 2024 10:03:47 -0800 Subject: [PATCH 3/3] try fixing CI --- src/sys/windows/event.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sys/windows/event.rs b/src/sys/windows/event.rs index 8ce096c1b..66656d0e5 100644 --- a/src/sys/windows/event.rs +++ b/src/sys/windows/event.rs @@ -42,6 +42,7 @@ impl Event { CompletionStatus::new(self.flags, self.data as usize, std::ptr::null_mut()) } + #[cfg(feature = "os-ext")] pub(super) fn to_completion_status_with_overlapped( &self, overlapped: *mut super::Overlapped,