From 92d17f2bedf3b19d59e7582735a166f40bc573b6 Mon Sep 17 00:00:00 2001 From: Jimy Byerley Date: Fri, 7 Jul 2023 18:36:43 +0200 Subject: [PATCH] all async function returned futures should now be Send, despite https://github.com/rust-lang/rust/issues/104883 --- src/mapping.rs | 2 +- src/rawmaster.rs | 174 ++++++++++++++++++++++++----------------------- 2 files changed, 90 insertions(+), 86 deletions(-) diff --git a/src/mapping.rs b/src/mapping.rs index 71c76f4..6e96c28 100644 --- a/src/mapping.rs +++ b/src/mapping.rs @@ -69,7 +69,7 @@ use core::{ use std::{ cell::RefCell, collections::{HashMap, BTreeSet}, - sync::{Arc, Weak, Mutex, MutexGuard, RwLock, RwLockWriteGuard}, + sync::{Arc, Weak, Mutex, RwLock, RwLockWriteGuard}, }; use bilge::prelude::*; diff --git a/src/rawmaster.rs b/src/rawmaster.rs index 7c851a8..e683c0f 100644 --- a/src/rawmaster.rs +++ b/src/rawmaster.rs @@ -249,92 +249,93 @@ impl RawMaster { SlaveAddress::Logical => memory, }; - let token; - let (ready, _finisher) = { + let (token, ready, _finisher); + loop { // buffering the pdu sending - let mut state = self.pdu_state.lock().unwrap(); - - while state.free.is_empty() { - let notification = self.received.notified(); - drop(state); - notification.await; - state = self.pdu_state.lock().unwrap(); - } - - // sending the buffer if necessary - while self.socket.max_frame() < state.last_end + data.len() + PduHeader::packed_size() + PduFooter::packed_size() { - assert!(self.socket.max_frame() > - EthercatHeader::packed_size() - + data.len() - + PduHeader::packed_size() - + PduFooter::packed_size(), "data too big for an ethercat frame"); - state.ready = true; - self.sendable.notify_one(); - let notification = self.sent.notified(); - drop(state); - notification.await; - state = self.pdu_state.lock().unwrap(); - } - - // reserving a token number to ensure no other task will exchange a PDU with the same token and receive our data - token = state.free.pop().unwrap(); - state.receive[token] = Some(PduStorage { - // cast lifetime as static - // memory safety: this slice is pinned by the caller and its access is managed by field `ready` - data: unsafe {std::slice::from_raw_parts_mut( - data.as_mut_ptr(), - data.len(), - )}, - ready: false, - answers: 0, - }); - - // change last value's PduHeader.next - if state.last_start <= state.last_end { - let range = state.last_start .. state.last_end; - let place = &mut state.send[range]; - let mut header = PduHeader::unpack(place).unwrap(); - header.set_next(true); - header.pack(place).unwrap(); - } - else { - state.last_end = state.last_start; - } - // stacking the PDU in self.pdu_receive - let advance = { - let range = state.last_end ..; - let mut cursor = Cursor::new(&mut state.send[range]); - cursor.pack(&PduHeader::new( - u8::from(command), - token as u8, - address, - u11::new(data.len().try_into().unwrap()), - false, - false, - u16::new(0), - )).unwrap(); - cursor.write(data).unwrap(); - cursor.pack(&PduFooter::new(0)).unwrap(); - cursor.position() - }; - state.last_start = state.last_end; - state.last_end = state.last_start + advance; - - self.sendable.notify_one(); - - // memory safety: this item in the array cannot be moved since self is borrowed, and will only be removed later by the current function - // we will access it potentially concurrently, but since we only want to detect a change in the value, that's fine - let ready = unsafe {&*(&state.receive[token].as_ref().unwrap().ready as *const bool)}; - // clean up the receive table at function end, or in case the async runtime cancels this task - let finisher = Finisher::new(|| { + // this weird scope is here to prevent the rust thread checker to set this async future `!Send` just because there is remaining freed variables with `MutexGuard` type + // TODO: restore the previous code (more readable and flexible) once https://github.com/rust-lang/rust/issues/104883 is fixed + { let mut state = self.pdu_state.lock().unwrap(); - state.receive[token] = None; - state.free.push(token).unwrap(); - }); - (ready, finisher) - }; - + let space_available = || self.socket.max_frame() > state.last_end + data.len() + PduHeader::packed_size() + PduFooter::packed_size(); + let token_available = || ! state.free.is_empty(); + if ! token_available() { + // there is nothing to do except waiting + } + else if ! space_available() { + // sending the current buffer + assert!(self.socket.max_frame() > + EthercatHeader::packed_size() + + data.len() + + PduHeader::packed_size() + + PduFooter::packed_size(), "data too big for an ethercat frame"); + state.ready = true; + self.sendable.notify_one(); + } + else { + // reserving a token number to ensure no other task will exchange a PDU with the same token and receive our data + token = state.free.pop().unwrap(); + state.receive[token] = Some(PduStorage { + // cast lifetime as static + // memory safety: this slice is pinned by the caller and its access is managed by field `ready` + data: unsafe {std::slice::from_raw_parts_mut( + data.as_mut_ptr(), + data.len(), + )}, + ready: false, + answers: 0, + }); + + // change last value's PduHeader.next + if state.last_start <= state.last_end { + let range = state.last_start .. state.last_end; + let place = &mut state.send[range]; + let mut header = PduHeader::unpack(place).unwrap(); + header.set_next(true); + header.pack(place).unwrap(); + } + else { + state.last_end = state.last_start; + } + + // stacking the PDU in self.pdu_receive + let advance = { + let range = state.last_end ..; + let mut cursor = Cursor::new(&mut state.send[range]); + cursor.pack(&PduHeader::new( + u8::from(command), + token as u8, + address, + u11::new(data.len().try_into().unwrap()), + false, + false, + u16::new(0), + )).unwrap(); + cursor.write(data).unwrap(); + cursor.pack(&PduFooter::new(0)).unwrap(); + cursor.position() + }; + state.last_start = state.last_end; + state.last_end = state.last_start + advance; + + self.sendable.notify_one(); + + // memory safety: this item in the array cannot be moved since self is borrowed, and will only be removed later by the current function + // we will access it potentially concurrently, but since we only want to detect a change in the value, that's fine + ready = unsafe {&*(&state.receive[token].as_ref().unwrap().ready as *const bool)}; + // clean up the receive table at function end, or in case the async runtime cancels this task + _finisher = Finisher::new(|| { + let mut state = self.pdu_state.lock().unwrap(); + state.receive[token] = None; + state.free.push(token).unwrap(); + }); + + break + } + } + self.received.notified().await; + } + // waiting for the answer loop { let notification = self.received.notified(); @@ -342,8 +343,11 @@ impl RawMaster { notification.await; } - let state = self.pdu_state.lock().unwrap(); - state.receive[token].as_ref().unwrap().answers + { + // free the token + let state = self.pdu_state.lock().unwrap(); + state.receive[token].as_ref().unwrap().answers + } } /// trigger sending the buffered PDUs, they will be sent as soon as possible by [Self::send] instead of waiting for the frame to be full or for the timeout