From d95c29bed07b65ded530a0077e8034293ac9a1d9 Mon Sep 17 00:00:00 2001 From: nazanjaffery <104909429+nazanjaffery@users.noreply.github.com> Date: Tue, 9 Apr 2024 18:02:31 +0530 Subject: [PATCH] implementation of drop trait for async stream (#6) --- CHANGELOG.md | 2 +- src/interface/async_stream.rs | 106 ++++++++++++++++++++++++++-------- src/model/tti.rs | 24 ++++++-- 3 files changed, 102 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c4d61b5..b50a815 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,7 @@ Check [Keep a Changelog](http://keepachangelog.com/) for recommendations on how ## [0.15.1] ### Changed -- Nothing yet +- - Implemented Drop for AsyncStream (TSP-584) ## [0.15.0] diff --git a/src/interface/async_stream.rs b/src/interface/async_stream.rs index 3a88b57..27cdbfb 100644 --- a/src/interface/async_stream.rs +++ b/src/interface/async_stream.rs @@ -21,14 +21,60 @@ use crate::interface::{Interface, NonBlock}; //create an Async version of the interface pub struct AsyncStream { - join: JoinHandle>>, - write_to: Sender>, + join: Option>>>, + write_to: Option>, read_from: Rc>>, buffer: Vec, nonblocking: bool, instrument_info: Option, } +enum AsyncMessage { + Message(Vec), + End, +} + +impl AsyncStream { + fn join_thread(&mut self) -> Result> { + self.drop_write_channel()?; + let socket = match self.join.take() { + Some(join) => match join.join() { + Ok(Ok(socket)) => socket, + _ => { + return Err(InstrumentError::ConnectionError { + details: "unable to retrieve synchronous stream".to_string(), + }); + } + }, + None => { + return Err(InstrumentError::ConnectionError { + details: "unable to close the asynchronous connection, could not retrieve synchronous stream".to_string(), + }); + } + }; + + Ok(socket) + } + + fn drop_write_channel(&mut self) -> Result<()> { + match self.write_to.take() { + Some(send) => { + match send.send(AsyncMessage::End) { + Ok(()) => {} + Err(_) => { + return Err(InstrumentError::IoError { source: (std::io::Error::new( + ErrorKind::NotConnected, + "attempted to write asynchronously to socket, but it was not connected".to_string(), + ))}); + } + } + } + None => {} + } + Ok(()) + } +} + impl TryFrom> for AsyncStream { type Error = InstrumentError; @@ -44,7 +90,7 @@ impl TryFrom> for AsyncStream { // get INstrumentInfo by call get_info of interface let join = builder.spawn(move || -> Result> { Arc::get_mut(&mut socket).unwrap().set_nonblocking(true)?; - let read_into: Receiver> = read_into; + let read_into: Receiver = read_into; let write_out: Sender> = write_out; 'rw_loop: loop { @@ -52,7 +98,7 @@ impl TryFrom> for AsyncStream { std::thread::sleep(Duration::from_millis(1)); match read_into.try_recv() { // It does, so send it - Ok(msg) => { + Ok(AsyncMessage::Message(msg)) => { let chunk_size = 1024; let mut start = 0; while start < msg.len() { @@ -93,11 +139,11 @@ impl TryFrom> for AsyncStream { } } - Err(e) => match e { - // The sender has disconnected, therefore we need to clean up - mpsc::TryRecvError::Disconnected => break 'rw_loop, - mpsc::TryRecvError::Empty => {} - }, + Ok(AsyncMessage::End) | Err(mpsc::TryRecvError::Disconnected) => { + break 'rw_loop; + } + + Err(mpsc::TryRecvError::Empty) => {} } let buf = &mut [0u8; 512]; if let Ok(size) = Arc::get_mut(&mut socket).unwrap().read(buf) { @@ -117,8 +163,8 @@ impl TryFrom> for AsyncStream { })?; Ok(Self { - join, - write_to, + join: Some(join), + write_to: Some(write_to), read_from: Rc::new(read_from), buffer: Vec::new(), nonblocking: true, @@ -169,11 +215,23 @@ impl Read for AsyncStream { impl Write for AsyncStream { fn write(&mut self, buf: &[u8]) -> std::io::Result { - let Ok(()) = self.write_to.send(Vec::from(buf)) else { - return Err(std::io::Error::new( - ErrorKind::NotConnected, - "attempted to write asynchronously to socket, but it was not connected".to_string(), - )); + match self.write_to { + Some(ref mut send) => match send.send(AsyncMessage::Message(Vec::from(buf))) { + Ok(()) => {} + Err(_) => { + return Err(std::io::Error::new( + ErrorKind::NotConnected, + "attempted to write asynchronously to socket, but it was not connected" + .to_string(), + )); + } + }, + None => { + return Err(std::io::Error::new( + ErrorKind::NotConnected, + "asynchronous connection was not found".to_string(), + )); + } }; Ok(buf.len()) @@ -201,17 +259,19 @@ impl Info for AsyncStream { } } +impl Drop for AsyncStream { + fn drop(&mut self) { + let _ = self.join_thread(); + } +} + impl TryFrom for Arc { type Error = InstrumentError; fn try_from(async_stream: AsyncStream) -> std::result::Result { - drop(async_stream.write_to); - match async_stream.join.join() { - Ok(Ok(stream)) => Ok(stream), - _ => Err(InstrumentError::ConnectionError { - details: "unable to retrieve synchronous stream".to_string(), - }), - } + let mut async_stream = async_stream; + let socket = async_stream.join_thread()?; + Ok(socket) } } diff --git a/src/model/tti.rs b/src/model/tti.rs index 1fc5a51..a27a1e0 100644 --- a/src/model/tti.rs +++ b/src/model/tti.rs @@ -1,6 +1,5 @@ use std::{ io::{BufRead, Read, Write}, - thread, time::Duration, }; @@ -69,10 +68,24 @@ impl Info for Instrument { impl Language for Instrument { fn get_language(&mut self) -> Result { self.write_all(b"*LANG?\n")?; - let mut lang: Vec = vec![0; 16]; - thread::sleep(Duration::from_millis(500)); - let _read = self.read(&mut lang)?; - String::from_utf8_lossy(&lang).to_string().as_str().parse() + for _i in 0..5 { + std::thread::sleep(Duration::from_millis(100)); + let mut lang: Vec = vec![0; 256]; + let _read = self.read(&mut lang)?; + let lang = std::str::from_utf8(lang.as_slice()) + .unwrap_or("") + .trim_matches(char::from(0)) + .trim(); + + if lang.contains("TSP") { + return Ok(CmdLanguage::Tsp); + } else if lang.contains("SCPI") { + return Ok(CmdLanguage::Scpi); + } + } + Err(InstrumentError::InformationRetrievalError { + details: ("could not read language of the instrument").to_string(), + }) } fn change_language(&mut self, lang: CmdLanguage) -> Result<(), InstrumentError> { @@ -172,7 +185,6 @@ impl NonBlock for Instrument { impl Drop for Instrument { fn drop(&mut self) { let _ = self.write_all(b"logout\n"); - thread::sleep(Duration::from_millis(500)); } }