Skip to content

Commit

Permalink
load file implemented in data_thread
Browse files Browse the repository at this point in the history
  • Loading branch information
hacknus committed Dec 22, 2024
1 parent d49a431 commit f32be2b
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 34 deletions.
15 changes: 11 additions & 4 deletions src/gui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ pub struct MyApp {
connected_lock: Arc<RwLock<bool>>,
data_lock: Arc<RwLock<DataContainer>>,
save_tx: Sender<FileOptions>,
load_tx: Sender<PathBuf>,
send_tx: Sender<String>,
clear_tx: Sender<bool>,
history: Vec<String>,
Expand Down Expand Up @@ -155,6 +156,7 @@ impl MyApp {
connected_lock: Arc<RwLock<bool>>,
gui_conf: GuiSettingsContainer,
save_tx: Sender<FileOptions>,
load_tx: Sender<PathBuf>,
send_tx: Sender<String>,
clear_tx: Sender<bool>,
) -> Self {
Expand Down Expand Up @@ -214,6 +216,7 @@ impl MyApp {
gui_conf,
data_lock,
save_tx,
load_tx,
send_tx,
clear_tx,
plotting_range: usize::MAX,
Expand Down Expand Up @@ -805,6 +808,8 @@ impl MyApp {
.clicked()
{
self.file_opened = false;
let _ = self.load_tx.send(PathBuf::new());
self.file_dialog_state = FileDialogState::None;
}
});
}
Expand Down Expand Up @@ -1157,9 +1162,9 @@ impl MyApp {
self.picked_path = path.to_path_buf();
self.file_opened = true;
self.file_dialog_state = FileDialogState::None;
// if let Err(e) = self.load_tx.send(self.picked_path.clone()) {
// log::error!("load_tx thread send failed: {:?}", e);
// }
if let Err(e) = self.load_tx.send(self.picked_path.clone()) {
log::error!("load_tx thread send failed: {:?}", e);
}
}
}
FileDialogState::SavePlot => {
Expand Down Expand Up @@ -1189,7 +1194,9 @@ impl MyApp {
}
}
}
FileDialogState::None => {}
FileDialogState::None => {
self.file_opened = false;
}
}
});
});
Expand Down
53 changes: 52 additions & 1 deletion src/io.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::error::Error;
use std::path::PathBuf;

use csv::WriterBuilder;
use csv::{ReaderBuilder, WriterBuilder};

use crate::DataContainer;

Expand All @@ -14,6 +14,57 @@ pub struct FileOptions {
pub names: Vec<String>,
}

pub fn open_from_csv(
data: &mut DataContainer,
csv_options: &mut FileOptions,
) -> Result<(), Box<dyn Error>> {
let mut rdr = ReaderBuilder::new()
.has_headers(true)
.from_path(&csv_options.file_path)?;

csv_options.names = rdr
.headers()
.unwrap()
.into_iter()
.skip(1)
.map(|s| s.to_string())
.collect::<Vec<String>>();

// Clear any existing data in the DataContainer
data.absolute_time.clear();
data.time.clear();
data.dataset = vec![vec![]; csv_options.names.len()];

// Read and parse each record in the CSV
for result in rdr.records() {
let record = result?;

// Ensure the record has the correct number of fields
if record.len() != csv_options.names.len() + 1 {
return Err("CSV record does not match the expected number of columns".into());
}

// Parse the time field (first column)
let time_value = record.get(0).unwrap();
if csv_options.save_absolute_time {
data.absolute_time.push(time_value.parse()?);
} else {
data.time.push(time_value.parse()?);
}

// Parse the remaining columns and populate the dataset
for (i, value) in record.iter().skip(1).enumerate() {
if let Some(dataset_column) = data.dataset.get_mut(i) {
dataset_column.push(value.parse()?);
} else {
return Err("Unexpected number of data columns in the CSV".into());
}
}
}

Ok(())
}

pub fn save_to_csv(data: &DataContainer, csv_options: &FileOptions) -> Result<(), Box<dyn Error>> {
let mut wtr = WriterBuilder::new()
.has_headers(false)
Expand Down
107 changes: 78 additions & 29 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ extern crate preferences;
extern crate serde;

use std::cmp::max;
use std::path::PathBuf;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{mpsc, Arc, RwLock};
use std::thread;
use std::time::Duration;

use crate::data::{DataContainer, Packet};
use crate::gui::{load_gui_settings, MyApp, RIGHT_PANEL_WIDTH};
use crate::io::{save_to_csv, FileOptions};
use crate::io::{open_from_csv, save_to_csv, FileOptions};
use crate::serial::{load_serial_settings, serial_thread, Device};
use eframe::egui::{vec2, ViewportBuilder, Visuals};
use eframe::{egui, icon_data};
Expand Down Expand Up @@ -51,51 +52,90 @@ fn main_thread(
data_lock: Arc<RwLock<DataContainer>>,
raw_data_rx: Receiver<Packet>,
save_rx: Receiver<FileOptions>,
load_rx: Receiver<PathBuf>,
clear_rx: Receiver<bool>,
) {
// reads data from mutex, samples and saves if needed
let mut data = DataContainer::default();
let mut failed_format_counter = 0;

let mut file_opened = false;

loop {
if let Ok(cl) = clear_rx.recv_timeout(Duration::from_millis(1)) {
if cl {
data = DataContainer::default();
failed_format_counter = 0;
}
}

if let Ok(packet) = raw_data_rx.recv_timeout(Duration::from_millis(1)) {
if !packet.payload.is_empty() {
sync_tx.send(true).expect("unable to send sync tx");
data.raw_traffic.push(packet.clone());
let split_data = split(&packet.payload);
if data.dataset.is_empty() || failed_format_counter > 10 {
// resetting dataset
data.dataset = vec![vec![]; max(split_data.len(), 1)];
failed_format_counter = 0;
// println!("resetting dataset. split length = {}, length data.dataset = {}", split_data.len(), data.dataset.len());
} else if split_data.len() == data.dataset.len() {
// appending data
for (i, set) in data.dataset.iter_mut().enumerate() {
set.push(split_data[i]);
failed_format_counter = 0;
}
data.time.push(packet.relative_time);
data.absolute_time.push(packet.absolute_time);
if data.time.len() != data.dataset[0].len() {
if !file_opened {
if let Ok(packet) = raw_data_rx.recv_timeout(Duration::from_millis(1)) {
if !packet.payload.is_empty() {
sync_tx.send(true).expect("unable to send sync tx");
data.raw_traffic.push(packet.clone());
let split_data = split(&packet.payload);
if data.dataset.is_empty() || failed_format_counter > 10 {
// resetting dataset
data.time = vec![];
data.dataset = vec![vec![]; max(split_data.len(), 1)];
failed_format_counter = 0;
// println!("resetting dataset. split length = {}, length data.dataset = {}", split_data.len(), data.dataset.len());
} else if split_data.len() == data.dataset.len() {
// appending data
for (i, set) in data.dataset.iter_mut().enumerate() {
set.push(split_data[i]);
failed_format_counter = 0;
}
data.time.push(packet.relative_time);
data.absolute_time.push(packet.absolute_time);
if data.time.len() != data.dataset[0].len() {
// resetting dataset
data.time = vec![];
data.dataset = vec![vec![]; max(split_data.len(), 1)];
}
} else {
// not same length
failed_format_counter += 1;
// println!("not same length in main! length split_data = {}, length data.dataset = {}", split_data.len(), data.dataset.len())
}
} else {
// not same length
failed_format_counter += 1;
// println!("not same length in main! length split_data = {}, length data.dataset = {}", split_data.len(), data.dataset.len())
}
if let Ok(mut write_guard) = data_lock.write() {
*write_guard = data.clone();
}
}
if let Ok(fp) = load_rx.recv_timeout(Duration::from_millis(10)) {
if let Some(file_ending) = fp.extension() {
match file_ending.to_str().unwrap() {
"csv" => {
file_opened = true;
let mut file_options = FileOptions {
file_path: fp.clone(),
save_absolute_time: false,
save_raw_traffic: false,
names: vec![],
};
match open_from_csv(&mut data, &mut file_options) {
Ok(_) => {
log::info!("opened {:?}", fp);
}
Err(err) => {
file_opened = false;
log::error!("failed opening {:?}: {:?}", fp, err);
}
};
}
_ => {
file_opened = false;
log::error!("file not supported: {:?} \n Close the file to connect to a spectrometer or open another file.", fp);
continue;
}
}
} else {
file_opened = false;
}
} else {
file_opened = false;
}

if let Ok(mut write_guard) = data_lock.write() {
*write_guard = data.clone();
}

if let Ok(csv_options) = save_rx.recv_timeout(Duration::from_millis(1)) {
Expand Down Expand Up @@ -129,6 +169,7 @@ fn main() {
let connected_lock = Arc::new(RwLock::new(false));

let (save_tx, save_rx): (Sender<FileOptions>, Receiver<FileOptions>) = mpsc::channel();
let (load_tx, load_rx): (Sender<PathBuf>, Receiver<PathBuf>) = mpsc::channel();
let (send_tx, send_rx): (Sender<String>, Receiver<String>) = mpsc::channel();
let (clear_tx, clear_rx): (Sender<bool>, Receiver<bool>) = mpsc::channel();
let (raw_data_tx, raw_data_rx): (Sender<Packet>, Receiver<Packet>) = mpsc::channel();
Expand All @@ -151,7 +192,14 @@ fn main() {
let main_data_lock = data_lock.clone();

let _main_thread_handler = thread::spawn(|| {
main_thread(sync_tx, main_data_lock, raw_data_rx, save_rx, clear_rx);
main_thread(
sync_tx,
main_data_lock,
raw_data_rx,
save_rx,
load_rx,
clear_rx,
);
});

let options = eframe::NativeOptions {
Expand Down Expand Up @@ -196,6 +244,7 @@ fn main() {
gui_connected_lock,
gui_settings,
save_tx,
load_tx,
send_tx,
clear_tx,
)))
Expand Down

0 comments on commit f32be2b

Please sign in to comment.