Skip to content

Commit

Permalink
implementation of drop trait for async stream (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
nazanjaffery authored Apr 9, 2024
1 parent 811d30d commit d95c29b
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 30 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Check [Keep a Changelog](http://keepachangelog.com/) for recommendations on how
## [0.15.1]

### Changed
- Nothing yet
- - Implemented Drop for AsyncStream (TSP-584)

## [0.15.0]

Expand Down
106 changes: 83 additions & 23 deletions src/interface/async_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,60 @@ use crate::interface::{Interface, NonBlock};

//create an Async version of the interface
pub struct AsyncStream {
join: JoinHandle<Result<Arc<dyn Interface + Send + Sync>>>,
write_to: Sender<Vec<u8>>,
join: Option<JoinHandle<Result<Arc<dyn Interface + Send + Sync>>>>,
write_to: Option<Sender<AsyncMessage>>,
read_from: Rc<Receiver<Vec<u8>>>,
buffer: Vec<u8>,
nonblocking: bool,
instrument_info: Option<InstrumentInfo>,
}

enum AsyncMessage {
Message(Vec<u8>),
End,
}

impl AsyncStream {
fn join_thread(&mut self) -> Result<Arc<dyn Interface + Send + Sync>> {
self.drop_write_channel()?;
let socket = match self.join.take() {
Some(join) => match join.join() {
Ok(Ok(socket)) => socket,
_ => {
return Err(InstrumentError::ConnectionError {
details: "unable to retrieve synchronous stream".to_string(),
});
}
},
None => {
return Err(InstrumentError::ConnectionError {
details: "unable to close the asynchronous connection, could not retrieve synchronous stream".to_string(),
});
}
};

Ok(socket)
}

fn drop_write_channel(&mut self) -> Result<()> {
match self.write_to.take() {
Some(send) => {
match send.send(AsyncMessage::End) {
Ok(()) => {}
Err(_) => {
return Err(InstrumentError::IoError { source: (std::io::Error::new(
ErrorKind::NotConnected,
"attempted to write asynchronously to socket, but it was not connected".to_string(),
))});
}
}
}
None => {}
}
Ok(())
}
}

impl TryFrom<Arc<dyn Interface + Send + Sync>> for AsyncStream {
type Error = InstrumentError;

Expand All @@ -44,15 +90,15 @@ impl TryFrom<Arc<dyn Interface + Send + Sync>> for AsyncStream {
// get INstrumentInfo by call get_info of interface
let join = builder.spawn(move || -> Result<Arc<dyn Interface + Send + Sync>> {
Arc::get_mut(&mut socket).unwrap().set_nonblocking(true)?;
let read_into: Receiver<Vec<u8>> = read_into;
let read_into: Receiver<AsyncMessage> = read_into;
let write_out: Sender<Vec<u8>> = write_out;

'rw_loop: loop {
// see if the application has anything to send to the instrument.
std::thread::sleep(Duration::from_millis(1));
match read_into.try_recv() {
// It does, so send it
Ok(msg) => {
Ok(AsyncMessage::Message(msg)) => {
let chunk_size = 1024;
let mut start = 0;
while start < msg.len() {
Expand Down Expand Up @@ -93,11 +139,11 @@ impl TryFrom<Arc<dyn Interface + Send + Sync>> for AsyncStream {
}
}

Err(e) => match e {
// The sender has disconnected, therefore we need to clean up
mpsc::TryRecvError::Disconnected => break 'rw_loop,
mpsc::TryRecvError::Empty => {}
},
Ok(AsyncMessage::End) | Err(mpsc::TryRecvError::Disconnected) => {
break 'rw_loop;
}

Err(mpsc::TryRecvError::Empty) => {}
}
let buf = &mut [0u8; 512];
if let Ok(size) = Arc::get_mut(&mut socket).unwrap().read(buf) {
Expand All @@ -117,8 +163,8 @@ impl TryFrom<Arc<dyn Interface + Send + Sync>> for AsyncStream {
})?;

Ok(Self {
join,
write_to,
join: Some(join),
write_to: Some(write_to),
read_from: Rc::new(read_from),
buffer: Vec::new(),
nonblocking: true,
Expand Down Expand Up @@ -169,11 +215,23 @@ impl Read for AsyncStream {

impl Write for AsyncStream {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let Ok(()) = self.write_to.send(Vec::from(buf)) else {
return Err(std::io::Error::new(
ErrorKind::NotConnected,
"attempted to write asynchronously to socket, but it was not connected".to_string(),
));
match self.write_to {
Some(ref mut send) => match send.send(AsyncMessage::Message(Vec::from(buf))) {
Ok(()) => {}
Err(_) => {
return Err(std::io::Error::new(
ErrorKind::NotConnected,
"attempted to write asynchronously to socket, but it was not connected"
.to_string(),
));
}
},
None => {
return Err(std::io::Error::new(
ErrorKind::NotConnected,
"asynchronous connection was not found".to_string(),
));
}
};

Ok(buf.len())
Expand Down Expand Up @@ -201,17 +259,19 @@ impl Info for AsyncStream {
}
}

impl Drop for AsyncStream {
fn drop(&mut self) {
let _ = self.join_thread();
}
}

impl TryFrom<AsyncStream> for Arc<dyn Interface + Send + Sync> {
type Error = InstrumentError;

fn try_from(async_stream: AsyncStream) -> std::result::Result<Self, Self::Error> {
drop(async_stream.write_to);
match async_stream.join.join() {
Ok(Ok(stream)) => Ok(stream),
_ => Err(InstrumentError::ConnectionError {
details: "unable to retrieve synchronous stream".to_string(),
}),
}
let mut async_stream = async_stream;
let socket = async_stream.join_thread()?;
Ok(socket)
}
}

Expand Down
24 changes: 18 additions & 6 deletions src/model/tti.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{
io::{BufRead, Read, Write},
thread,
time::Duration,
};

Expand Down Expand Up @@ -69,10 +68,24 @@ impl Info for Instrument {
impl Language for Instrument {
fn get_language(&mut self) -> Result<CmdLanguage, InstrumentError> {
self.write_all(b"*LANG?\n")?;
let mut lang: Vec<u8> = vec![0; 16];
thread::sleep(Duration::from_millis(500));
let _read = self.read(&mut lang)?;
String::from_utf8_lossy(&lang).to_string().as_str().parse()
for _i in 0..5 {
std::thread::sleep(Duration::from_millis(100));
let mut lang: Vec<u8> = vec![0; 256];
let _read = self.read(&mut lang)?;
let lang = std::str::from_utf8(lang.as_slice())
.unwrap_or("")
.trim_matches(char::from(0))
.trim();

if lang.contains("TSP") {
return Ok(CmdLanguage::Tsp);
} else if lang.contains("SCPI") {
return Ok(CmdLanguage::Scpi);
}
}
Err(InstrumentError::InformationRetrievalError {
details: ("could not read language of the instrument").to_string(),
})
}

fn change_language(&mut self, lang: CmdLanguage) -> Result<(), InstrumentError> {
Expand Down Expand Up @@ -172,7 +185,6 @@ impl NonBlock for Instrument {
impl Drop for Instrument {
fn drop(&mut self) {
let _ = self.write_all(b"logout\n");
thread::sleep(Duration::from_millis(500));
}
}

Expand Down

0 comments on commit d95c29b

Please sign in to comment.