Skip to content

Commit

Permalink
Build result archive with simple-archive
Browse files Browse the repository at this point in the history
This now also compresses the log files.

Closes: #147 #95
  • Loading branch information
Florian Guggi committed Sep 15, 2024
1 parent 7b4186b commit 6c3c354
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 130 deletions.
76 changes: 2 additions & 74 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 9 additions & 11 deletions scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,26 @@ version = "0.1.0"
edition = "2021"

[dependencies]
log = "0.4.22"
simplelog = "0.12.2"
subprocess = "0.2.9"
anyhow = { version = "1.0.86", features = ["backtrace"] }
crc = "3.2.1"
toml = "0.8.19"
filevec = { path = "../filevec" }
log = "0.4.22"
rppal = "0.18.0"
serde = { version = "1.0.204", features = ["derive"] }
strum = { version = "0.26.3", features = ["derive"] }
serialport = "4.4.0"
test-case = "3.3.1"
tar = "0.4.41"
simple-archive = { path = "../simple-archive" }
simplelog = "0.12.2"
strum = { version = "0.26.3", features = ["derive"] }
subprocess = "0.2.9"
thiserror = "1.0.63"
anyhow = { version = "1.0.86", features = ["backtrace"] }

[dependencies.filevec]
path = "../filevec"
toml = "0.8.19"

[features]
mock = []
rpi = []

[dev-dependencies]
test-case = "3.3.1"
file-per-thread-logger = "0.2.0"
inquire = "0.7.5"

Expand Down
2 changes: 1 addition & 1 deletion scheduler/examples/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ fn inquire_and_send_command(
let program_id = inquire::Text::new("Program id:").prompt()?.parse()?;
let timestamp = inquire::Text::new("Timestamp:").prompt()?.parse()?;
let result_path = inquire::Text::new("File path for returned result:")
.with_default("./result.tar")
.with_default("./result")
.prompt()?;
edu.send_packet(&CEPPacket::Data(return_result(program_id, timestamp)))?;
match edu.receive_multi_packet() {
Expand Down
46 changes: 28 additions & 18 deletions scheduler/src/command/execute_program.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ use crate::{
communication::{CEPPacket, CommunicationHandle},
};
use anyhow::anyhow;
use simple_archive::Compression;
use std::{
io::ErrorKind,
fs::File,
io::{ErrorKind, Read, Write},
path::{Path, PathBuf},
time::Duration,
};
use subprocess::Popen;

const MAXIMUM_FILE_SIZE: u64 = 1_000_000;
const MAXIMUM_FILE_SIZE: usize = 1_000_000;

/// Executes a students program and starts a watchdog for it. The watchdog also creates entries in the
/// status and result queue found in `context`. The result, including logs, is packed into
Expand Down Expand Up @@ -145,27 +147,35 @@ fn run_until_timeout(
/// the programs stdout/stderr and the schedulers log file. If any of the files is missing, the archive
/// is created without them.
fn build_result_archive(res: ResultId) -> Result<(), std::io::Error> {
let out_path = PathBuf::from(&format!("./data/{res}"));
let mut archive = simple_archive::Writer::new(std::fs::File::create(out_path)?);

let res_path =
PathBuf::from(format!("./archives/{}/results/{}", res.program_id, res.timestamp));
let student_log_path =
PathBuf::from(format!("./data/{}_{}.log", res.program_id, res.timestamp));
let log_path = PathBuf::from("log");
PathBuf::from(format!("./data/{res}.log"));
let log_path = PathBuf::from("./log");

let out_path = PathBuf::from(&format!("./data/{}_{}.tar", res.program_id, res.timestamp));
let mut archive = tar::Builder::new(std::fs::File::create(out_path)?);
add_to_archive_if_exists(&mut archive, &res.to_string(), res_path, Compression::None)?;
add_to_archive_if_exists(&mut archive, "student_log", student_log_path, Compression::Zopfli)?;
add_to_archive_if_exists(&mut archive, "log", log_path, Compression::Zopfli)?;

for path in &[res_path, student_log_path, log_path] {
let mut file = match std::fs::File::options().read(true).write(true).open(path) {
Ok(f) => f,
Err(e) if e.kind() == ErrorKind::NotFound => continue,
Err(e) => return Err(e),
};
Ok(())
}

truncate_to_size(&mut file, MAXIMUM_FILE_SIZE)?;
archive.append_file(path.file_name().unwrap(), &mut file)?;
fn add_to_archive_if_exists<T: Write>(
archive: &mut simple_archive::Writer<T>,
name: &str,
path: impl AsRef<Path>,
compression: simple_archive::Compression,
) -> std::io::Result<()> {
match std::fs::read(path) {
Ok(mut data) => {
data.truncate(MAXIMUM_FILE_SIZE);
archive.append_data(name, &data, compression)?;
Ok(())
}
Err(ref e) if e.kind() == ErrorKind::NotFound => Ok(()),
Err(e) => Err(e),
}

archive.finish()?;

Ok(())
}
10 changes: 7 additions & 3 deletions scheduler/src/command/execution_context.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use filevec::FileVec;
use std::{
str::FromStr,
sync::{Arc, Mutex},
thread,
fmt::Display, str::FromStr, sync::{Arc, Mutex}, thread
};

const EVENT_SEND_TRIES: u32 = 5;
Expand Down Expand Up @@ -119,6 +117,12 @@ pub struct ResultId {
pub timestamp: u32,
}

impl Display for ResultId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}_{}", self.program_id, self.timestamp)
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Eq, Debug)]
pub enum Event {
Status(ProgramStatus),
Expand Down
4 changes: 2 additions & 2 deletions scheduler/src/command/return_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub fn return_result(

let program_id = u16::from_le_bytes([data[1], data[2]]);
let timestamp = u32::from_le_bytes([data[3], data[4], data[5], data[6]]);
let result_path = format!("./data/{program_id}_{timestamp}.tar");
let result_path = format!("./data/{program_id}_{timestamp}");

if !std::path::Path::new(&result_path).exists() {
com.send_packet(&CEPPacket::Nack)?;
Expand Down Expand Up @@ -51,7 +51,7 @@ pub fn return_result(
fn delete_result(res: ResultId) -> CommandResult {
let res_path = format!("./archives/{}/results/{}", res.program_id, res.timestamp);
let log_path = format!("./data/{}_{}.log", res.program_id, res.timestamp);
let out_path = format!("./data/{}_{}.tar", res.program_id, res.timestamp);
let out_path = format!("./data/{}_{}", res.program_id, res.timestamp);
let _ = std::fs::remove_file(res_path);
let _ = std::fs::remove_file(log_path);
let _ = std::fs::remove_file(out_path);
Expand Down
13 changes: 4 additions & 9 deletions scheduler/tests/simulation/full_run.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::simulation::*;
use std::{io::Cursor, time::Duration};
use std::time::Duration;

#[test]
fn full_run() {
Expand All @@ -16,16 +16,11 @@ fn full_run() {

// Check result
let result = simulate_return_result(&mut com, 1, 3).unwrap();
let mut result_archive = tar::Archive::new(Cursor::new(result));
com.send_packet(&CEPPacket::Ack).unwrap();

let result_file = result_archive
.entries()
.unwrap()
.find(|x| x.as_ref().unwrap().header().path().unwrap().ends_with("3"))
.unwrap()
.unwrap();
assert_eq!(result_file.bytes().map(|b| b.unwrap()).collect::<Vec<_>>(), vec![0xde, 0xad]);
let decoded_result = simple_archive::Reader::new(&result[..]);
let result = decoded_result.map(Result::unwrap).find(|entry| entry.path == "1_3").unwrap();
assert_eq!(result.data, vec![0xde, 0xad]);

assert_eq!(simulate_get_status(&mut com).unwrap(), [0]);
}
22 changes: 10 additions & 12 deletions scheduler/tests/software_tests/return_result.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::software_tests::common;
use crate::software_tests::common::ComEvent::*;
use common::*;
use simple_archive::Entry;
use STS1_EDU_Scheduler::command::{self};
use STS1_EDU_Scheduler::communication::CEPPacket::*;

Expand All @@ -25,7 +26,7 @@ fn returns_result_correctly() -> TestResult {
Edu(Ack),
Action(Box::new(|packet| {
let bytes = packet.clone().serialize();
std::fs::write("tests/tmp/7.tar", &bytes[3..bytes.len() - 4]).unwrap();
std::fs::write("tests/tmp/7_3", &bytes[3..bytes.len() - 4]).unwrap();
})),
Cobc(Ack),
Edu(Eof),
Expand All @@ -42,15 +43,12 @@ fn returns_result_correctly() -> TestResult {
command::handle_command(&mut com, &mut exec);
assert!(com.is_complete());

let _ = std::fs::create_dir("./tests/tmp/7_unpack");
std::process::Command::new("tar")
.current_dir("./tests/tmp/7_unpack")
.arg("xf")
.arg("../7.tar")
.status()?;

assert_eq!(std::fs::read("./tests/tmp/7_unpack/3")?, vec![0xde, 0xad]);
assert!(std::fs::read("./tests/tmp/7_unpack/7_3.log").is_ok());
let results = simple_archive::Reader::new(std::fs::File::open("tests/tmp/7_3")?)
.map(Result::unwrap)
.collect::<Vec<_>>();
dbg!(&results);
assert!(results.contains(&Entry { path: "7_3".to_string(), data: vec![0xde, 0xad] }));
assert!(results.iter().any(|e| e.path == "student_log"));

common::cleanup("7");
Ok(())
Expand All @@ -77,7 +75,7 @@ fn truncate_result() -> TestResult {
command::handle_command(&mut com, &mut exec);
assert!(com.is_complete());

assert!(std::fs::File::open("./data/8_5.tar")?.metadata()?.len() < 1_005_000);
assert!(std::fs::File::open("./data/8_5")?.metadata()?.len() < 1_005_000);

common::cleanup("8");
Ok(())
Expand Down Expand Up @@ -116,7 +114,7 @@ fn result_is_not_deleted_after_corrupted_transfer() {
command::handle_command(&mut com, &mut exec);
assert!(com.is_complete());

assert!(std::fs::File::open("./data/50_0.tar").is_ok());
assert!(std::fs::File::open("./data/50_0").is_ok());

common::cleanup("50");
}

0 comments on commit 6c3c354

Please sign in to comment.