Skip to content

Commit

Permalink
Merge pull request #146 from SpaceTeam/limited-retry-events
Browse files Browse the repository at this point in the history
Limited retry events
  • Loading branch information
florg-32 authored Jul 10, 2024
2 parents 0bdb79d + c6057a3 commit 2578d0a
Show file tree
Hide file tree
Showing 14 changed files with 170 additions and 60 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: Build & Test

on:
push:
branches: [ "rust_dev" ]
branches: [ "master" ]
pull_request:
branches: [ "rust_dev" ]
branches: [ "master" ]

env:
CARGO_TERM_COLOR: always
Expand Down
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 52 additions & 2 deletions lib/filevec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@
//! ['MessagePack']: https://msgpack.org/index.html
use serde::{de::DeserializeOwned, Serialize};
use std::io::{Read, Seek, SeekFrom, Write};
use std::{
io::{Read, Seek, SeekFrom, Write},
ops::{Deref, DerefMut},
path::Path,
};

#[derive(Debug)]
pub struct FileVec<T: Serialize + DeserializeOwned> {
vec: Vec<T>,
file: std::fs::File,
Expand All @@ -39,7 +44,7 @@ impl<T: Serialize + DeserializeOwned> FileVec<T> {
///
/// **Note:** If the file exists and contains invalid data, it is interpreted as
/// empty and overwritten.
pub fn open(path: String) -> Result<Self, std::io::Error> {
pub fn open(path: impl AsRef<Path>) -> Result<Self, std::io::Error> {
let mut file =
std::fs::OpenOptions::new().read(true).write(true).create(true).open(path)?;
let metadata = file.metadata()?;
Expand Down Expand Up @@ -88,6 +93,14 @@ impl<T: Serialize + DeserializeOwned> FileVec<T> {

Ok(t)
}

/// Obtain a mutable reference to vector, which only writes to the underlying file
/// once this guard is dropped.
/// ### Note
/// Any io::Error that happens is dropped. Call `self.write_to_file` manually to handle them
pub fn as_mut(&mut self) -> FileVecGuard<'_, T> {
FileVecGuard(self)
}
}

impl<T: Serialize + DeserializeOwned> AsRef<Vec<T>> for FileVec<T> {
Expand All @@ -114,11 +127,34 @@ impl<T: Serialize + DeserializeOwned> Extend<T> for FileVec<T> {
}
}

pub struct FileVecGuard<'a, T: Serialize + DeserializeOwned>(&'a mut FileVec<T>);

impl<'a, T: Serialize + DeserializeOwned> Deref for FileVecGuard<'a, T> {
type Target = Vec<T>;

fn deref(&self) -> &Self::Target {
&self.0.vec
}
}

impl<'a, T: Serialize + DeserializeOwned> DerefMut for FileVecGuard<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0.vec
}
}

impl<'a, T: Serialize + DeserializeOwned> Drop for FileVecGuard<'a, T> {
fn drop(&mut self) {
let _ = self.0.write_to_file();
}
}

#[cfg(test)]
mod test {
use std::{
fs::File,
io::{Read, Write},
ops::DerefMut,
};

use super::FileVec;
Expand Down Expand Up @@ -209,4 +245,18 @@ mod test {

let _ = std::fs::remove_file("__pop");
}

#[test]
fn as_mut_writes_to_file() {
{
let mut f = FileVec::open("__as_mut").unwrap();
let mut guard = f.as_mut();
guard.push(123);
guard.push(456);
}

assert_eq!(FileVec::<i32>::open("__as_mut").unwrap().vec, &[123, 456]);

let _ = std::fs::remove_file("__as_mut");
}
}
6 changes: 3 additions & 3 deletions src/command/execute_program.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use subprocess::Popen;

use crate::{
command::{
check_length, terminate_student_program, truncate_to_size, Event, ProgramStatus, ResultId,
check_length, terminate_student_program, truncate_to_size, Event, ProgramStatus, ResultId, RetryEvent,
},
communication::{CEPPacket, CommunicationHandle},
};
Expand Down Expand Up @@ -51,8 +51,8 @@ pub fn execute_program(
build_result_archive(rid).unwrap(); // create the tar file with result and log

let mut context = wd_context.lock().unwrap();
context.event_vec.push(Event::Status(sid)).unwrap();
context.event_vec.push(Event::Result(rid)).unwrap();
context.event_vec.push(RetryEvent::new(Event::Status(sid))).unwrap();
context.event_vec.push(RetryEvent::new(Event::Result(rid))).unwrap();
context.running_flag = false;
context.update_pin.set_high();
drop(context);
Expand Down
21 changes: 17 additions & 4 deletions src/command/execution_context.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use filevec::FileVec;
use std::{
str::FromStr,
sync::{Arc, Mutex},
thread,
};

use filevec::FileVec;
const EVENT_SEND_TRIES: u32 = 5;

/// This type makes the ExecutionContext thread-safe
pub type SyncExecutionContext = Arc<Mutex<ExecutionContext>>;
Expand All @@ -20,7 +21,7 @@ pub struct ExecutionContext {
/// This integer is the pin number of the EDU_Update pin
pub update_pin: UpdatePin,
/// Vector containing events that should be sent to the COBC
pub event_vec: FileVec<Event>,
pub event_vec: FileVec<RetryEvent<Event>>,
}

impl ExecutionContext {
Expand All @@ -35,13 +36,13 @@ impl ExecutionContext {
event_vec: FileVec::open(event_file_path).unwrap(),
};

ec.check_update_pin();
ec.configure_update_pin();

Ok(Arc::new(Mutex::new(ec)))
}

/// Checks and sets/resets the update pin accordingly
pub fn check_update_pin(&mut self) {
pub fn configure_update_pin(&mut self) {
if self.has_data_ready() {
self.update_pin.set_high();
} else {
Expand Down Expand Up @@ -123,6 +124,18 @@ pub enum Event {
DisableDosimeter,
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Eq, Debug)]
pub struct RetryEvent<T> {
pub retries: u32,
pub event: T,
}

impl<T> RetryEvent<T> {
pub fn new(event: T) -> Self {
Self { retries: EVENT_SEND_TRIES, event }
}
}

impl From<Event> for Vec<u8> {
fn from(value: Event) -> Self {
let mut v = Vec::new();
Expand Down
26 changes: 11 additions & 15 deletions src/command/get_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,18 @@ pub fn get_status(
return Ok(());
}

if let Some(index) =
l_exec.event_vec.as_ref().iter().position(|x| matches!(x, Event::Status(_)))
{
let event = l_exec.event_vec[index];
com.send_packet(&CEPPacket::Data(event.into()))?;
l_exec.event_vec.remove(index)?;
} else {
let event = *l_exec.event_vec.as_ref().first().unwrap(); // Safe, because we know it is not empty
com.send_packet(&CEPPacket::Data(event.into()))?;
let result = {
let mut events = l_exec.event_vec.as_mut();
let index = events.iter().position(|x| matches!(x.event, Event::Status(_))).unwrap_or(0);

if !matches!(event, Event::Result(_)) {
// Results are removed when deleted
l_exec.event_vec.remove(0)?;
events[index].retries -= 1;
let result = com.send_packet(&CEPPacket::Data(events[index].event.into()));
if !matches!(events[index].event, Event::Result(_)) || events[index].retries == 0 {
events.remove(index);
}
}
result
};

l_exec.check_update_pin();
Ok(())
l_exec.configure_update_pin();
Ok(result?)
}
13 changes: 9 additions & 4 deletions src/command/return_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@ pub fn return_result(
delete_result(result_id)?;

let mut l_exec = exec.lock().unwrap();
let event_index =
l_exec.event_vec.as_ref().iter().position(|x| x == &Event::Result(result_id)).unwrap();
l_exec.event_vec.remove(event_index)?;
l_exec.check_update_pin();
if let Some(event_index) =
l_exec.event_vec.as_ref().iter().position(|x| x.event == Event::Result(result_id))
{
l_exec.event_vec.remove(event_index)?;
}
else {
log::error!("Could not find event entry for existing result file {program_id}:{timestamp}");
}

l_exec.configure_update_pin();
Ok(())
}

Expand Down
6 changes: 3 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![allow(non_snake_case)]
use command::ExecutionContext;
use command::{ExecutionContext, RetryEvent};
use communication::socket::UnixSocketParser;
use core::time;
use rppal::gpio::Gpio;
Expand Down Expand Up @@ -96,8 +96,8 @@ fn event_socket_loop(context: Arc<Mutex<ExecutionContext>>, mut socket: UnixSock

log::info!("Received on socket: {event:?}");
let mut context = context.lock().unwrap();
context.event_vec.push(event).unwrap();
context.check_update_pin();
context.event_vec.push(RetryEvent::new(event)).unwrap();
context.configure_update_pin();
}
}

Expand Down
37 changes: 35 additions & 2 deletions tests/simulation/command_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use crate::simulation::*;

#[test]
fn simulate_archive_is_stored_correctly() -> Result<(), std::io::Error> {
let (mut com, _socat) = SimulationComHandle::with_socat_proc("archive_is_stored_correctly");
let _sched = start_scheduler("archive_is_stored_correctly")?;
let (_sched, mut com, _socat) = start_scheduler("archive_is_stored_correctly").unwrap();

simulate_test_store_archive(&mut com, 1).unwrap();
std::thread::sleep(std::time::Duration::from_millis(400));
Expand All @@ -24,3 +23,37 @@ fn simulate_archive_is_stored_correctly() -> Result<(), std::io::Error> {

Ok(())
}

#[test]
fn return_result_is_retried_n_times() {
let (_sched, mut com, _socat) = start_scheduler("return_result_retries").unwrap();

simulate_test_store_archive(&mut com, 8).unwrap();
simulate_execute_program(&mut com, 8, 0, 5).unwrap();
std::thread::sleep(std::time::Duration::from_millis(400));

assert_eq!(get_status_program_finished(8, 0, 0), simulate_get_status(&mut com).unwrap());
for i in 0..5 {
assert_eq!(get_status_result_ready(8, 0), simulate_get_status(&mut com).unwrap());
dbg!(i);
}
assert_eq!([0u8], *simulate_get_status(&mut com).unwrap());
}

#[test]
fn result_is_deleted_after_transfer() {
let (_sched, mut com, _socat) = start_scheduler("results_deleted").unwrap();

simulate_test_store_archive(&mut com, 8).unwrap();
simulate_execute_program(&mut com, 8, 3, 5).unwrap();
std::thread::sleep(std::time::Duration::from_millis(400));
assert_eq!(simulate_get_status(&mut com).unwrap(), get_status_program_finished(8, 3, 0));
assert_eq!(simulate_get_status(&mut com).unwrap(), get_status_result_ready(8, 3));

simulate_return_result(&mut com, 8, 3).unwrap();
com.send_packet(&CEPPacket::Ack).unwrap();

assert_eq!(simulate_get_status(&mut com).unwrap(), vec![0]);
assert_eq!(std::fs::read_dir("tests/tmp/results_deleted/data").unwrap().count(), 0);
assert_eq!(std::fs::read_dir("tests/tmp/results_deleted/archives/8/results").unwrap().count(), 0);
}
3 changes: 1 addition & 2 deletions tests/simulation/full_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use std::{io::Cursor, time::Duration};

#[test]
fn full_run() {
let (mut com, _socat) = SimulationComHandle::with_socat_proc("full_run");
let _sched = start_scheduler("full_run").unwrap();
let (_sched, mut com, _socat) = start_scheduler("full_run").unwrap();

// store and execute program
simulate_test_store_archive(&mut com, 1).unwrap();
Expand Down
7 changes: 3 additions & 4 deletions tests/simulation/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::simulation::*;

#[test]
fn logfile_is_created() -> Result<(), std::io::Error> {
let (_, _proc) = SimulationComHandle::with_socat_proc("log_created");
let _sched = start_scheduler("log_created")?;
let (_sched, _com, _socat) = start_scheduler("log_created").unwrap();

std::thread::sleep(std::time::Duration::from_millis(400));

assert!(std::path::Path::new("./tests/tmp/log_created/log").exists());
Expand All @@ -12,8 +12,7 @@ fn logfile_is_created() -> Result<(), std::io::Error> {

#[test]
fn logfile_is_cleared_after_sent() -> std::io::Result<()> {
let (mut com, _socat) = SimulationComHandle::with_socat_proc("log_is_cleared_after_sent");
let _sched = start_scheduler("log_is_cleared_after_sent")?;
let (_sched, mut com, _socat) = start_scheduler("log_is_cleared_after_sent").unwrap();

simulate_test_store_archive(&mut com, 1).unwrap();
com.send_packet(&CEPPacket::Data(execute_program(1, 0, 3))).unwrap();
Expand Down
Loading

0 comments on commit 2578d0a

Please sign in to comment.