Skip to content

Commit

Permalink
all async function returned futures should now be Send, despite rust-…
Browse files Browse the repository at this point in the history
  • Loading branch information
jimy-byerley committed Jul 7, 2023
1 parent eaaf458 commit 92d17f2
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 86 deletions.
2 changes: 1 addition & 1 deletion src/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ use core::{
use std::{
cell::RefCell,
collections::{HashMap, BTreeSet},
sync::{Arc, Weak, Mutex, MutexGuard, RwLock, RwLockWriteGuard},
sync::{Arc, Weak, Mutex, RwLock, RwLockWriteGuard},
};
use bilge::prelude::*;

Expand Down
174 changes: 89 additions & 85 deletions src/rawmaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,101 +249,105 @@ impl RawMaster {
SlaveAddress::Logical => memory,
};

let token;
let (ready, _finisher) = {
let (token, ready, _finisher);
loop {
// buffering the pdu sending
let mut state = self.pdu_state.lock().unwrap();

while state.free.is_empty() {
let notification = self.received.notified();
drop(state);
notification.await;
state = self.pdu_state.lock().unwrap();
}

// sending the buffer if necessary
while self.socket.max_frame() < state.last_end + data.len() + PduHeader::packed_size() + PduFooter::packed_size() {
assert!(self.socket.max_frame() >
EthercatHeader::packed_size()
+ data.len()
+ PduHeader::packed_size()
+ PduFooter::packed_size(), "data too big for an ethercat frame");
state.ready = true;
self.sendable.notify_one();
let notification = self.sent.notified();
drop(state);
notification.await;
state = self.pdu_state.lock().unwrap();
}

// reserving a token number to ensure no other task will exchange a PDU with the same token and receive our data
token = state.free.pop().unwrap();
state.receive[token] = Some(PduStorage {
// cast lifetime as static
// memory safety: this slice is pinned by the caller and its access is managed by field `ready`
data: unsafe {std::slice::from_raw_parts_mut(
data.as_mut_ptr(),
data.len(),
)},
ready: false,
answers: 0,
});

// change last value's PduHeader.next
if state.last_start <= state.last_end {
let range = state.last_start .. state.last_end;
let place = &mut state.send[range];
let mut header = PduHeader::unpack(place).unwrap();
header.set_next(true);
header.pack(place).unwrap();
}
else {
state.last_end = state.last_start;
}

// stacking the PDU in self.pdu_receive
let advance = {
let range = state.last_end ..;
let mut cursor = Cursor::new(&mut state.send[range]);
cursor.pack(&PduHeader::new(
u8::from(command),
token as u8,
address,
u11::new(data.len().try_into().unwrap()),
false,
false,
u16::new(0),
)).unwrap();
cursor.write(data).unwrap();
cursor.pack(&PduFooter::new(0)).unwrap();
cursor.position()
};
state.last_start = state.last_end;
state.last_end = state.last_start + advance;

self.sendable.notify_one();

// memory safety: this item in the array cannot be moved since self is borrowed, and will only be removed later by the current function
// we will access it potentially concurrently, but since we only want to detect a change in the value, that's fine
let ready = unsafe {&*(&state.receive[token].as_ref().unwrap().ready as *const bool)};
// clean up the receive table at function end, or in case the async runtime cancels this task
let finisher = Finisher::new(|| {
// this weird scope is here to prevent the rust thread checker to set this async future `!Send` just because there is remaining freed variables with `MutexGuard` type
// TODO: restore the previous code (more readable and flexible) once https://github.com/rust-lang/rust/issues/104883 is fixed
{
let mut state = self.pdu_state.lock().unwrap();
state.receive[token] = None;
state.free.push(token).unwrap();
});
(ready, finisher)
};

let space_available = || self.socket.max_frame() > state.last_end + data.len() + PduHeader::packed_size() + PduFooter::packed_size();
let token_available = || ! state.free.is_empty();
if ! token_available() {
// there is nothing to do except waiting
}
else if ! space_available() {
// sending the current buffer
assert!(self.socket.max_frame() >
EthercatHeader::packed_size()
+ data.len()
+ PduHeader::packed_size()
+ PduFooter::packed_size(), "data too big for an ethercat frame");
state.ready = true;
self.sendable.notify_one();
}
else {
// reserving a token number to ensure no other task will exchange a PDU with the same token and receive our data
token = state.free.pop().unwrap();
state.receive[token] = Some(PduStorage {
// cast lifetime as static
// memory safety: this slice is pinned by the caller and its access is managed by field `ready`
data: unsafe {std::slice::from_raw_parts_mut(
data.as_mut_ptr(),
data.len(),
)},
ready: false,
answers: 0,
});

// change last value's PduHeader.next
if state.last_start <= state.last_end {
let range = state.last_start .. state.last_end;
let place = &mut state.send[range];
let mut header = PduHeader::unpack(place).unwrap();
header.set_next(true);
header.pack(place).unwrap();
}
else {
state.last_end = state.last_start;
}

// stacking the PDU in self.pdu_receive
let advance = {
let range = state.last_end ..;
let mut cursor = Cursor::new(&mut state.send[range]);
cursor.pack(&PduHeader::new(
u8::from(command),
token as u8,
address,
u11::new(data.len().try_into().unwrap()),
false,
false,
u16::new(0),
)).unwrap();
cursor.write(data).unwrap();
cursor.pack(&PduFooter::new(0)).unwrap();
cursor.position()
};
state.last_start = state.last_end;
state.last_end = state.last_start + advance;

self.sendable.notify_one();

// memory safety: this item in the array cannot be moved since self is borrowed, and will only be removed later by the current function
// we will access it potentially concurrently, but since we only want to detect a change in the value, that's fine
ready = unsafe {&*(&state.receive[token].as_ref().unwrap().ready as *const bool)};
// clean up the receive table at function end, or in case the async runtime cancels this task
_finisher = Finisher::new(|| {
let mut state = self.pdu_state.lock().unwrap();
state.receive[token] = None;
state.free.push(token).unwrap();
});

break
}
}
self.received.notified().await;
}

// waiting for the answer
loop {
let notification = self.received.notified();
if *ready {break}
notification.await;
}

let state = self.pdu_state.lock().unwrap();
state.receive[token].as_ref().unwrap().answers
{
// free the token
let state = self.pdu_state.lock().unwrap();
state.receive[token].as_ref().unwrap().answers
}
}

/// trigger sending the buffered PDUs, they will be sent as soon as possible by [Self::send] instead of waiting for the frame to be full or for the timeout
Expand Down

0 comments on commit 92d17f2

Please sign in to comment.