Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implementation of drop trait for async stream #6

Merged
merged 2 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ Check [Keep a Changelog](http://keepachangelog.com/) for recommendations on how
Security -- in case of vulnerabilities.
-->

## [Unreleased]

### Changed
- Implemented Drop for AsyncStream (TSP-584)

esarver marked this conversation as resolved.
Show resolved Hide resolved
## [0.15.1]

### Changed
Expand Down
106 changes: 83 additions & 23 deletions src/interface/async_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,60 @@ use crate::interface::{Interface, NonBlock};

//create an Async version of the interface
pub struct AsyncStream {
join: JoinHandle<Result<Arc<dyn Interface + Send + Sync>>>,
write_to: Sender<Vec<u8>>,
join: Option<JoinHandle<Result<Arc<dyn Interface + Send + Sync>>>>,
write_to: Option<Sender<AsyncMessage>>,
read_from: Rc<Receiver<Vec<u8>>>,
buffer: Vec<u8>,
nonblocking: bool,
instrument_info: Option<InstrumentInfo>,
}

enum AsyncMessage {
Message(Vec<u8>),
End,
}

impl AsyncStream {
fn join_thread(&mut self) -> Result<Arc<dyn Interface + Send + Sync>> {
self.drop_write_channel()?;
let socket = match self.join.take() {
Some(join) => match join.join() {
Ok(Ok(socket)) => socket,
_ => {
return Err(InstrumentError::ConnectionError {
details: "unable to retrieve synchronous stream".to_string(),
});
}
},
None => {
return Err(InstrumentError::ConnectionError {
details: "unable to close the asynchronous connection, could not retrieve synchronous stream".to_string(),
});
}
};

Ok(socket)
}

fn drop_write_channel(&mut self) -> Result<()> {
match self.write_to.take() {
Some(send) => {
match send.send(AsyncMessage::End) {
Ok(()) => {}
Err(_) => {
return Err(InstrumentError::IoError { source: (std::io::Error::new(
ErrorKind::NotConnected,
"attempted to write asynchronously to socket, but it was not connected".to_string(),
))});
}
}
}
None => {}
}
Ok(())
}
}

impl TryFrom<Arc<dyn Interface + Send + Sync>> for AsyncStream {
type Error = InstrumentError;

Expand All @@ -44,15 +90,15 @@ impl TryFrom<Arc<dyn Interface + Send + Sync>> for AsyncStream {
// get INstrumentInfo by call get_info of interface
let join = builder.spawn(move || -> Result<Arc<dyn Interface + Send + Sync>> {
Arc::get_mut(&mut socket).unwrap().set_nonblocking(true)?;
let read_into: Receiver<Vec<u8>> = read_into;
let read_into: Receiver<AsyncMessage> = read_into;
let write_out: Sender<Vec<u8>> = write_out;

'rw_loop: loop {
// see if the application has anything to send to the instrument.
std::thread::sleep(Duration::from_millis(1));
match read_into.try_recv() {
// It does, so send it
Ok(msg) => {
Ok(AsyncMessage::Message(msg)) => {
let chunk_size = 1024;
let mut start = 0;
while start < msg.len() {
Expand Down Expand Up @@ -93,11 +139,11 @@ impl TryFrom<Arc<dyn Interface + Send + Sync>> for AsyncStream {
}
}

Err(e) => match e {
// The sender has disconnected, therefore we need to clean up
mpsc::TryRecvError::Disconnected => break 'rw_loop,
mpsc::TryRecvError::Empty => {}
},
Ok(AsyncMessage::End) | Err(mpsc::TryRecvError::Disconnected) => {
break 'rw_loop;
}

Err(mpsc::TryRecvError::Empty) => {}
}
let buf = &mut [0u8; 512];
if let Ok(size) = Arc::get_mut(&mut socket).unwrap().read(buf) {
Expand All @@ -117,8 +163,8 @@ impl TryFrom<Arc<dyn Interface + Send + Sync>> for AsyncStream {
})?;

Ok(Self {
join,
write_to,
join: Some(join),
write_to: Some(write_to),
read_from: Rc::new(read_from),
buffer: Vec::new(),
nonblocking: true,
Expand Down Expand Up @@ -169,11 +215,23 @@ impl Read for AsyncStream {

impl Write for AsyncStream {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let Ok(()) = self.write_to.send(Vec::from(buf)) else {
return Err(std::io::Error::new(
ErrorKind::NotConnected,
"attempted to write asynchronously to socket, but it was not connected".to_string(),
));
match self.write_to {
Some(ref mut send) => match send.send(AsyncMessage::Message(Vec::from(buf))) {
Ok(()) => {}
Err(_) => {
return Err(std::io::Error::new(
ErrorKind::NotConnected,
"attempted to write asynchronously to socket, but it was not connected"
.to_string(),
));
}
},
None => {
return Err(std::io::Error::new(
ErrorKind::NotConnected,
"asynchronous connection was not found".to_string(),
));
}
};

Ok(buf.len())
Expand Down Expand Up @@ -201,17 +259,19 @@ impl Info for AsyncStream {
}
}

impl Drop for AsyncStream {
fn drop(&mut self) {
let _ = self.join_thread();
}
}

impl TryFrom<AsyncStream> for Arc<dyn Interface + Send + Sync> {
type Error = InstrumentError;

fn try_from(async_stream: AsyncStream) -> std::result::Result<Self, Self::Error> {
drop(async_stream.write_to);
match async_stream.join.join() {
Ok(Ok(stream)) => Ok(stream),
_ => Err(InstrumentError::ConnectionError {
details: "unable to retrieve synchronous stream".to_string(),
}),
}
let mut async_stream = async_stream;
let socket = async_stream.join_thread()?;
Ok(socket)
}
}

Expand Down
24 changes: 18 additions & 6 deletions src/model/tti.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{
io::{BufRead, Read, Write},
thread,
time::Duration,
};

Expand Down Expand Up @@ -69,10 +68,24 @@ impl Info for Instrument {
impl Language for Instrument {
fn get_language(&mut self) -> Result<CmdLanguage, InstrumentError> {
self.write_all(b"*LANG?\n")?;
let mut lang: Vec<u8> = vec![0; 16];
thread::sleep(Duration::from_millis(500));
let _read = self.read(&mut lang)?;
String::from_utf8_lossy(&lang).to_string().as_str().parse()
for _i in 0..5 {
std::thread::sleep(Duration::from_millis(100));
let mut lang: Vec<u8> = vec![0; 256];
let _read = self.read(&mut lang)?;
let lang = std::str::from_utf8(lang.as_slice())
.unwrap_or("")
.trim_matches(char::from(0))
.trim();

if lang.contains("TSP") {
return Ok(CmdLanguage::Tsp);
} else if lang.contains("SCPI") {
return Ok(CmdLanguage::Scpi);
}
}
Err(InstrumentError::InformationRetrievalError {
details: ("could not read language of the instrument").to_string(),
})
}

fn change_language(&mut self, lang: CmdLanguage) -> Result<(), InstrumentError> {
Expand Down Expand Up @@ -172,7 +185,6 @@ impl NonBlock for Instrument {
impl Drop for Instrument {
fn drop(&mut self) {
let _ = self.write_all(b"logout\n");
thread::sleep(Duration::from_millis(500));
}
}

Expand Down
Loading