diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 586ee03..7aa11e9 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -2,9 +2,9 @@ name: Build & Test on: push: - branches: [ "rust_dev" ] + branches: [ "master" ] pull_request: - branches: [ "rust_dev" ] + branches: [ "master" ] env: CARGO_TERM_COLOR: always diff --git a/Cargo.lock b/Cargo.lock index f5ef9a3..9cfb98b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -279,9 +279,9 @@ checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" [[package]] name = "libc" -version = "0.2.152" +version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" +checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" [[package]] name = "libudev" @@ -535,9 +535,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.30" +version = "0.38.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca" +checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" dependencies = [ "bitflags 2.4.2", "errno", diff --git a/lib/filevec/src/lib.rs b/lib/filevec/src/lib.rs index 04f008c..91ece30 100644 --- a/lib/filevec/src/lib.rs +++ b/lib/filevec/src/lib.rs @@ -27,8 +27,13 @@ //! ['MessagePack']: https://msgpack.org/index.html use serde::{de::DeserializeOwned, Serialize}; -use std::io::{Read, Seek, SeekFrom, Write}; +use std::{ + io::{Read, Seek, SeekFrom, Write}, + ops::{Deref, DerefMut}, + path::Path, +}; +#[derive(Debug)] pub struct FileVec { vec: Vec, file: std::fs::File, @@ -39,7 +44,7 @@ impl FileVec { /// /// **Note:** If the file exists and contains invalid data, it is interpreted as /// empty and overwritten. - pub fn open(path: String) -> Result { + pub fn open(path: impl AsRef) -> Result { let mut file = std::fs::OpenOptions::new().read(true).write(true).create(true).open(path)?; let metadata = file.metadata()?; @@ -88,6 +93,14 @@ impl FileVec { Ok(t) } + + /// Obtain a mutable reference to vector, which only writes to the underlying file + /// once this guard is dropped. + /// ### Note + /// Any io::Error that happens is dropped. Call `self.write_to_file` manually to handle them + pub fn as_mut(&mut self) -> FileVecGuard<'_, T> { + FileVecGuard(self) + } } impl AsRef> for FileVec { @@ -114,11 +127,34 @@ impl Extend for FileVec { } } +pub struct FileVecGuard<'a, T: Serialize + DeserializeOwned>(&'a mut FileVec); + +impl<'a, T: Serialize + DeserializeOwned> Deref for FileVecGuard<'a, T> { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.0.vec + } +} + +impl<'a, T: Serialize + DeserializeOwned> DerefMut for FileVecGuard<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0.vec + } +} + +impl<'a, T: Serialize + DeserializeOwned> Drop for FileVecGuard<'a, T> { + fn drop(&mut self) { + let _ = self.0.write_to_file(); + } +} + #[cfg(test)] mod test { use std::{ fs::File, io::{Read, Write}, + ops::DerefMut, }; use super::FileVec; @@ -209,4 +245,18 @@ mod test { let _ = std::fs::remove_file("__pop"); } + + #[test] + fn as_mut_writes_to_file() { + { + let mut f = FileVec::open("__as_mut").unwrap(); + let mut guard = f.as_mut(); + guard.push(123); + guard.push(456); + } + + assert_eq!(FileVec::::open("__as_mut").unwrap().vec, &[123, 456]); + + let _ = std::fs::remove_file("__as_mut"); + } } diff --git a/src/command/execute_program.rs b/src/command/execute_program.rs index 7b889bf..70dc846 100644 --- a/src/command/execute_program.rs +++ b/src/command/execute_program.rs @@ -8,7 +8,7 @@ use subprocess::Popen; use crate::{ command::{ - check_length, terminate_student_program, truncate_to_size, Event, ProgramStatus, ResultId, + check_length, terminate_student_program, truncate_to_size, Event, ProgramStatus, ResultId, RetryEvent, }, communication::{CEPPacket, CommunicationHandle}, }; @@ -51,8 +51,8 @@ pub fn execute_program( build_result_archive(rid).unwrap(); // create the tar file with result and log let mut context = wd_context.lock().unwrap(); - context.event_vec.push(Event::Status(sid)).unwrap(); - context.event_vec.push(Event::Result(rid)).unwrap(); + context.event_vec.push(RetryEvent::new(Event::Status(sid))).unwrap(); + context.event_vec.push(RetryEvent::new(Event::Result(rid))).unwrap(); context.running_flag = false; context.update_pin.set_high(); drop(context); diff --git a/src/command/execution_context.rs b/src/command/execution_context.rs index 6654d5c..634afd0 100644 --- a/src/command/execution_context.rs +++ b/src/command/execution_context.rs @@ -1,10 +1,11 @@ +use filevec::FileVec; use std::{ str::FromStr, sync::{Arc, Mutex}, thread, }; -use filevec::FileVec; +const EVENT_SEND_TRIES: u32 = 5; /// This type makes the ExecutionContext thread-safe pub type SyncExecutionContext = Arc>; @@ -20,7 +21,7 @@ pub struct ExecutionContext { /// This integer is the pin number of the EDU_Update pin pub update_pin: UpdatePin, /// Vector containing events that should be sent to the COBC - pub event_vec: FileVec, + pub event_vec: FileVec>, } impl ExecutionContext { @@ -35,13 +36,13 @@ impl ExecutionContext { event_vec: FileVec::open(event_file_path).unwrap(), }; - ec.check_update_pin(); + ec.configure_update_pin(); Ok(Arc::new(Mutex::new(ec))) } /// Checks and sets/resets the update pin accordingly - pub fn check_update_pin(&mut self) { + pub fn configure_update_pin(&mut self) { if self.has_data_ready() { self.update_pin.set_high(); } else { @@ -123,6 +124,18 @@ pub enum Event { DisableDosimeter, } +#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Eq, Debug)] +pub struct RetryEvent { + pub retries: u32, + pub event: T, +} + +impl RetryEvent { + pub fn new(event: T) -> Self { + Self { retries: EVENT_SEND_TRIES, event } + } +} + impl From for Vec { fn from(value: Event) -> Self { let mut v = Vec::new(); diff --git a/src/command/get_status.rs b/src/command/get_status.rs index 03c3039..b3df7e4 100644 --- a/src/command/get_status.rs +++ b/src/command/get_status.rs @@ -17,22 +17,18 @@ pub fn get_status( return Ok(()); } - if let Some(index) = - l_exec.event_vec.as_ref().iter().position(|x| matches!(x, Event::Status(_))) - { - let event = l_exec.event_vec[index]; - com.send_packet(&CEPPacket::Data(event.into()))?; - l_exec.event_vec.remove(index)?; - } else { - let event = *l_exec.event_vec.as_ref().first().unwrap(); // Safe, because we know it is not empty - com.send_packet(&CEPPacket::Data(event.into()))?; + let result = { + let mut events = l_exec.event_vec.as_mut(); + let index = events.iter().position(|x| matches!(x.event, Event::Status(_))).unwrap_or(0); - if !matches!(event, Event::Result(_)) { - // Results are removed when deleted - l_exec.event_vec.remove(0)?; + events[index].retries -= 1; + let result = com.send_packet(&CEPPacket::Data(events[index].event.into())); + if !matches!(events[index].event, Event::Result(_)) || events[index].retries == 0 { + events.remove(index); } - } + result + }; - l_exec.check_update_pin(); - Ok(()) + l_exec.configure_update_pin(); + Ok(result?) } diff --git a/src/command/return_result.rs b/src/command/return_result.rs index 86f9531..ecbd461 100644 --- a/src/command/return_result.rs +++ b/src/command/return_result.rs @@ -34,11 +34,16 @@ pub fn return_result( delete_result(result_id)?; let mut l_exec = exec.lock().unwrap(); - let event_index = - l_exec.event_vec.as_ref().iter().position(|x| x == &Event::Result(result_id)).unwrap(); - l_exec.event_vec.remove(event_index)?; - l_exec.check_update_pin(); + if let Some(event_index) = + l_exec.event_vec.as_ref().iter().position(|x| x.event == Event::Result(result_id)) + { + l_exec.event_vec.remove(event_index)?; + } + else { + log::error!("Could not find event entry for existing result file {program_id}:{timestamp}"); + } + l_exec.configure_update_pin(); Ok(()) } diff --git a/src/main.rs b/src/main.rs index 9c94aed..7ff8768 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ #![allow(non_snake_case)] -use command::ExecutionContext; +use command::{ExecutionContext, RetryEvent}; use communication::socket::UnixSocketParser; use core::time; use rppal::gpio::Gpio; @@ -96,8 +96,8 @@ fn event_socket_loop(context: Arc>, mut socket: UnixSock log::info!("Received on socket: {event:?}"); let mut context = context.lock().unwrap(); - context.event_vec.push(event).unwrap(); - context.check_update_pin(); + context.event_vec.push(RetryEvent::new(event)).unwrap(); + context.configure_update_pin(); } } diff --git a/tests/simulation/command_execution.rs b/tests/simulation/command_execution.rs index d306168..a441dfc 100644 --- a/tests/simulation/command_execution.rs +++ b/tests/simulation/command_execution.rs @@ -2,8 +2,7 @@ use crate::simulation::*; #[test] fn simulate_archive_is_stored_correctly() -> Result<(), std::io::Error> { - let (mut com, _socat) = SimulationComHandle::with_socat_proc("archive_is_stored_correctly"); - let _sched = start_scheduler("archive_is_stored_correctly")?; + let (_sched, mut com, _socat) = start_scheduler("archive_is_stored_correctly").unwrap(); simulate_test_store_archive(&mut com, 1).unwrap(); std::thread::sleep(std::time::Duration::from_millis(400)); @@ -24,3 +23,37 @@ fn simulate_archive_is_stored_correctly() -> Result<(), std::io::Error> { Ok(()) } + +#[test] +fn return_result_is_retried_n_times() { + let (_sched, mut com, _socat) = start_scheduler("return_result_retries").unwrap(); + + simulate_test_store_archive(&mut com, 8).unwrap(); + simulate_execute_program(&mut com, 8, 0, 5).unwrap(); + std::thread::sleep(std::time::Duration::from_millis(400)); + + assert_eq!(get_status_program_finished(8, 0, 0), simulate_get_status(&mut com).unwrap()); + for i in 0..5 { + assert_eq!(get_status_result_ready(8, 0), simulate_get_status(&mut com).unwrap()); + dbg!(i); + } + assert_eq!([0u8], *simulate_get_status(&mut com).unwrap()); +} + +#[test] +fn result_is_deleted_after_transfer() { + let (_sched, mut com, _socat) = start_scheduler("results_deleted").unwrap(); + + simulate_test_store_archive(&mut com, 8).unwrap(); + simulate_execute_program(&mut com, 8, 3, 5).unwrap(); + std::thread::sleep(std::time::Duration::from_millis(400)); + assert_eq!(simulate_get_status(&mut com).unwrap(), get_status_program_finished(8, 3, 0)); + assert_eq!(simulate_get_status(&mut com).unwrap(), get_status_result_ready(8, 3)); + + simulate_return_result(&mut com, 8, 3).unwrap(); + com.send_packet(&CEPPacket::Ack).unwrap(); + + assert_eq!(simulate_get_status(&mut com).unwrap(), vec![0]); + assert_eq!(std::fs::read_dir("tests/tmp/results_deleted/data").unwrap().count(), 0); + assert_eq!(std::fs::read_dir("tests/tmp/results_deleted/archives/8/results").unwrap().count(), 0); +} diff --git a/tests/simulation/full_run.rs b/tests/simulation/full_run.rs index e357112..f52b5c6 100644 --- a/tests/simulation/full_run.rs +++ b/tests/simulation/full_run.rs @@ -3,8 +3,7 @@ use std::{io::Cursor, time::Duration}; #[test] fn full_run() { - let (mut com, _socat) = SimulationComHandle::with_socat_proc("full_run"); - let _sched = start_scheduler("full_run").unwrap(); + let (_sched, mut com, _socat) = start_scheduler("full_run").unwrap(); // store and execute program simulate_test_store_archive(&mut com, 1).unwrap(); diff --git a/tests/simulation/logging.rs b/tests/simulation/logging.rs index 01e1b67..e3503d1 100644 --- a/tests/simulation/logging.rs +++ b/tests/simulation/logging.rs @@ -2,8 +2,8 @@ use crate::simulation::*; #[test] fn logfile_is_created() -> Result<(), std::io::Error> { - let (_, _proc) = SimulationComHandle::with_socat_proc("log_created"); - let _sched = start_scheduler("log_created")?; + let (_sched, _com, _socat) = start_scheduler("log_created").unwrap(); + std::thread::sleep(std::time::Duration::from_millis(400)); assert!(std::path::Path::new("./tests/tmp/log_created/log").exists()); @@ -12,8 +12,7 @@ fn logfile_is_created() -> Result<(), std::io::Error> { #[test] fn logfile_is_cleared_after_sent() -> std::io::Result<()> { - let (mut com, _socat) = SimulationComHandle::with_socat_proc("log_is_cleared_after_sent"); - let _sched = start_scheduler("log_is_cleared_after_sent")?; + let (_sched, mut com, _socat) = start_scheduler("log_is_cleared_after_sent").unwrap(); simulate_test_store_archive(&mut com, 1).unwrap(); com.send_packet(&CEPPacket::Data(execute_program(1, 0, 3))).unwrap(); diff --git a/tests/simulation/mod.rs b/tests/simulation/mod.rs index 1caed40..315faa0 100644 --- a/tests/simulation/mod.rs +++ b/tests/simulation/mod.rs @@ -17,17 +17,17 @@ pub struct SimulationComHandle { } impl SimulationComHandle { - fn with_socat_proc(unique: &str) -> (Self, PoisonedChild) { + fn with_socat_proc(socket_path: &str) -> (Self, PoisonedChild) { let mut proc = std::process::Command::new("socat") .arg("stdio") - .arg(format!("pty,raw,echo=0,link=/tmp/ttySTS1-{},b921600,wait-slave", unique)) + .arg(format!("pty,raw,echo=0,link={},b921600,wait-slave", socket_path)) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .spawn() .unwrap(); loop { - if std::path::Path::new(&format!("/tmp/ttySTS1-{unique}")).exists() { + if std::path::Path::new(socket_path).exists() { break; } std::thread::sleep(Duration::from_millis(50)); @@ -66,7 +66,7 @@ impl CommunicationHandle for SimulationComHandle { fn get_config_str(unique: &str) -> String { format!( " - uart = \"/tmp/ttySTS1-{unique}\" + uart = \"uart\" baudrate = 921600 heartbeat_pin = 34 update_pin = 35 @@ -85,17 +85,18 @@ impl Drop for PoisonedChild { } } -fn start_scheduler(unique: &str) -> Result { +fn start_scheduler(unique: &str) -> Result<(PoisonedChild, SimulationComHandle, PoisonedChild), std::io::Error> { let test_dir = format!("./tests/tmp/{}", unique); let scheduler_bin = std::fs::canonicalize("./target/release/STS1_EDU_Scheduler")?; let _ = std::fs::remove_dir_all(&test_dir); std::fs::create_dir_all(&test_dir)?; std::fs::write(format!("{}/config.toml", &test_dir), get_config_str(unique))?; + let (handle, socat) = SimulationComHandle::with_socat_proc(&format!("{}/uart", test_dir)); let scheduler = std::process::Command::new(scheduler_bin).current_dir(test_dir).spawn().unwrap(); - Ok(PoisonedChild(scheduler)) + Ok((PoisonedChild(scheduler), handle, socat)) } pub fn simulate_test_store_archive( @@ -174,3 +175,18 @@ pub fn return_result(program_id: u16, timestamp: u32) -> Vec { vec.extend(timestamp.to_le_bytes()); vec } + +pub fn get_status_program_finished(program_id: u16, timestamp: u32, exit_code: u8) -> Vec { + let mut vec = vec![1]; + vec.extend(program_id.to_le_bytes()); + vec.extend(timestamp.to_le_bytes()); + vec.push(exit_code); + vec +} + +pub fn get_status_result_ready(program_id: u16, timestamp: u32) -> Vec { + let mut vec = vec![2]; + vec.extend(program_id.to_le_bytes()); + vec.extend(timestamp.to_le_bytes()); + vec +} diff --git a/tests/simulation/socket.rs b/tests/simulation/socket.rs index df17615..71dd8d8 100644 --- a/tests/simulation/socket.rs +++ b/tests/simulation/socket.rs @@ -2,12 +2,12 @@ use std::io::Write; use std::os::unix::net::UnixStream; use std::time::Duration; -use super::{simulate_get_status, start_scheduler, SimulationComHandle}; +use super::{simulate_get_status, start_scheduler}; #[test] fn dosimeter_events_are_added() { - let (mut com, _socat) = SimulationComHandle::with_socat_proc("dosimeter"); - let _sched = start_scheduler("dosimeter").unwrap(); + let (_sched, mut com, _socat) = start_scheduler("dosimeter").unwrap(); + std::thread::sleep(Duration::from_millis(200)); { @@ -21,11 +21,11 @@ fn dosimeter_events_are_added() { #[test] fn multiple_dosimeter_events() { - let (mut com, _socat) = SimulationComHandle::with_socat_proc("dosimeter-multi"); - let _sched = start_scheduler("dosimeter-multi").unwrap(); + let (_sched, mut com, _socat) = start_scheduler("dosimeter_multi").unwrap(); + std::thread::sleep(Duration::from_millis(200)); - let mut socket = UnixStream::connect("/tmp/STS1_EDU_Scheduler_SIM_dosimeter-multi").unwrap(); + let mut socket = UnixStream::connect("/tmp/STS1_EDU_Scheduler_SIM_dosimeter_multi").unwrap(); for _ in 0..10 { writeln!(socket, "dosimeter/on").unwrap(); writeln!(socket, "dosimeter/off").unwrap(); diff --git a/tests/simulation/timeout.rs b/tests/simulation/timeout.rs index 632962f..98c1910 100644 --- a/tests/simulation/timeout.rs +++ b/tests/simulation/timeout.rs @@ -1,11 +1,10 @@ -use super::{get_status, start_scheduler, SimulationComHandle}; +use super::{get_status, start_scheduler}; use std::time::Duration; use STS1_EDU_Scheduler::communication::{CEPPacket, CommunicationHandle}; #[test] fn integrity_ack_timeout_is_honored() { - let (mut cobc, _socat) = SimulationComHandle::with_socat_proc("integrity_timeout"); - let _sched = start_scheduler("integrity_timeout").unwrap(); + let (_sched, mut cobc, _socat) = start_scheduler("integrity_timeout").unwrap(); // Check that delayed ACK is allowed cobc.send_packet(&CEPPacket::Data(get_status())).unwrap();