Skip to content

Commit

Permalink
create individual files for commands and format source
Browse files Browse the repository at this point in the history
  • Loading branch information
Florian Guggi committed Oct 22, 2023
1 parent 1cdff79 commit 3652fb3
Show file tree
Hide file tree
Showing 19 changed files with 473 additions and 429 deletions.
53 changes: 53 additions & 0 deletions src/command/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use std::time::Duration;

use super::{CommandError, CommandResult, SyncExecutionContext};

pub const COM_TIMEOUT_DURATION: std::time::Duration = std::time::Duration::new(2, 0);

pub fn check_length(vec: &Vec<u8>, n: usize) -> Result<(), CommandError> {
let actual_len = vec.len();
if actual_len != n {
log::error!("Command came with {actual_len} bytes, should have {n}");
Err(CommandError::ProtocolViolation(
format!("Received command with {actual_len} bytes, expected {n}").into(),
))
} else {
Ok(())
}
}

/// Truncates the file at `path` to the given size. Returns wether it actually had to truncate.
pub fn truncate_to_size(path: &str, n_bytes: u64) -> Result<bool, std::io::Error> {
log::info!("Truncating {:?}", &path);
let file = std::fs::File::options().write(true).open(path)?;
let size = file.metadata()?.len();
if size > n_bytes {
file.set_len(n_bytes)?;
file.sync_all()?;
Ok(true)
} else {
Ok(false)
}
}

/// If no program is currently running, this function simply returns. Otherwise it signals the
/// supervisor thread to kill the student program and waits for a maximum of 2s before returning
/// and error
pub fn terminate_student_program(exec: &mut SyncExecutionContext) -> CommandResult {
let mut con = exec.lock().unwrap();
if !con.is_student_program_running() {
return Ok(());
}
con.running_flag = false; // Signal watchdog thread to terminate
drop(con); // Release mutex

for _ in 0..20 {
std::thread::sleep(Duration::from_millis(100)); // Sensible amount?
let con = exec.lock().unwrap();
if con.thread_handle.as_ref().unwrap().is_finished() {
return Ok(());
}
}

Err(CommandError::NonRecoverable("Supervisor thread did not finish in time".into()))
}
163 changes: 163 additions & 0 deletions src/command/execute_program.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
use std::{path::Path, process::Command, time::Duration};

use subprocess::Popen;

use crate::{
command::{
check_length, terminate_student_program, truncate_to_size, Event, ProgramStatus, ResultId,
},
communication::{CEPPacket, CommunicationHandle},
};

use super::{CommandError, CommandResult, SyncExecutionContext};

/// Executes a students program and starts a watchdog for it. The watchdog also creates entries in the
/// status and result queue found in `context`. The result, including logs, is packed into
/// `./data/{program_id}_{timestamp}`
pub fn execute_program(
data: Vec<u8>,
com: &mut impl CommunicationHandle,
exec: &mut SyncExecutionContext,
) -> CommandResult {
check_length(&data, 9)?;
com.send_packet(CEPPacket::ACK)?;

let program_id = u16::from_le_bytes([data[1], data[2]]);
let timestamp = u32::from_le_bytes([data[3], data[4], data[5], data[6]]);
let timeout = Duration::from_secs(u16::from_le_bytes([data[7], data[8]]).into());
log::info!("Executing Program {}:{} for {}s", program_id, timestamp, timeout.as_secs());

terminate_student_program(exec).expect("to terminate a running program");

let student_process = create_student_process(program_id, timestamp)?;

// WATCHDOG THREAD
let mut wd_context = exec.clone();
let wd_handle = std::thread::spawn(move || {
let exit_code = match supervise_process(student_process, timeout, &mut wd_context) {
Ok(code) => code,
Err(()) => 255,
};

log::info!("Program {}:{} finished with {}", program_id, timestamp, exit_code);
let sid = ProgramStatus { program_id, timestamp, exit_code };
let rid = ResultId { program_id, timestamp };
build_result_archive(rid).unwrap(); // create the zip file with result and log

let mut context = wd_context.lock().unwrap();
context.event_vec.push(Event::Status(sid)).unwrap();
context.event_vec.push(Event::Result(rid)).unwrap();
context.running_flag = false;
context.update_pin.set_high();
drop(context);
});

// After spawning the watchdog thread, store its handle and set flag
let mut l_context = exec.lock().unwrap();
l_context.thread_handle = Some(wd_handle);
l_context.running_flag = true;
drop(l_context);

com.send_packet(CEPPacket::ACK)?;
Ok(())
}

/// This function creates and executes a student process. Its stdout/stderr is written into
/// `./data/[program_id]_[timestamp].log`
fn create_student_process(program_id: u16, timestamp: u32) -> Result<Popen, CommandError> {
let program_path = format!("./archives/{}/main.py", program_id);
if !Path::new(&program_path).exists() {
return Err(CommandError::ProtocolViolation("Could not find matching program".into()));
}

// TODO run the program from a student user (setuid)
let output_file = std::fs::File::create(format!("./data/{}_{}.log", program_id, timestamp))?; // will contain the stdout and stderr of the execute program
let config = subprocess::PopenConfig {
cwd: Some(format!("./archives/{}", program_id).into()),
detached: false, // do not spawn as separate process
stdout: subprocess::Redirection::File(output_file),
stderr: subprocess::Redirection::Merge,
..Default::default()
};

let process = Popen::create(&["python", "main.py", &timestamp.to_string()], config)?;
Ok(process)
}

/// A function intended to be run in a separate process, which checks every seconds if the given
/// timeout has passed or the process terminated itself. If it didnt, the process is killed.
fn supervise_process(
mut process: Popen,
timeout: Duration,
exec: &mut SyncExecutionContext,
) -> Result<u8, ()> {
match run_until_timeout(&mut process, timeout, exec) {
Ok(code) => Ok(code),
Err(()) => {
log::warn!("Student Process timed out or is stopped");
process.kill().unwrap(); // send SIGKILL
process
.wait_timeout(Duration::from_millis(200)) // wait for it to do its magic
.unwrap()
.unwrap(); // Panic if not stopped
Err(())
}
}
}

/// This function allows the program to run for timeout (rounded to seconds)
/// If the program terminates, it exit code is returned
/// If it times out or the running flag is reset, an Err is returned instead
fn run_until_timeout(
process: &mut Popen,
timeout: Duration,
exec: &mut SyncExecutionContext,
) -> Result<u8, ()> {
// Loop over timeout in 1s steps
for _ in 0..timeout.as_secs() {
if let Some(status) = process // if student program terminates with exit code
.wait_timeout(Duration::from_secs(1))
.unwrap()
{
if let subprocess::ExitStatus::Exited(n) = status {
return Ok(n as u8);
} else {
return Ok(0);
}
}

if !exec.lock().unwrap().running_flag {
// if student program should be stopped
break;
}
}

Err(())
}

/// The function uses `zip` to create an uncompressed archive that includes the result file specified, as well as
/// the programs stdout/stderr and the schedulers log file. If any of the files is missing, the archive
/// is created without them.
fn build_result_archive(res: ResultId) -> Result<(), std::io::Error> {
let res_path = format!("./archives/{}/results/{}", res.program_id, res.timestamp);
let log_path = format!("./data/{}_{}.log", res.program_id, res.timestamp);
let out_path = format!("./data/{}_{}.zip", res.program_id, res.timestamp);

const MAXIMUM_FILE_SIZE: u64 = 1_000_000;
for path in [&res_path, &log_path, &out_path, &"log".into()] {
if let Ok(true) = truncate_to_size(path, MAXIMUM_FILE_SIZE) {
log::warn!("Truncating {} from {} bytes", path, MAXIMUM_FILE_SIZE);
}
}

let _ = Command::new("zip")
.arg("-0")
.arg(out_path)
.arg("--junk-paths")
.arg("log")
.arg(res_path)
.arg(log_path)
.status();

Ok(())
}
39 changes: 39 additions & 0 deletions src/command/get_status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use crate::communication::{CEPPacket, CommunicationHandle};

use super::{check_length, CommandResult, Event, SyncExecutionContext};

/// The function handles the get status command, by checking if either a status or result is enqueued.
/// A status always has priority over a result.
pub fn get_status(
data: Vec<u8>,
com: &mut impl CommunicationHandle,
exec: &mut SyncExecutionContext,
) -> CommandResult {
check_length(&data, 1)?;
com.send_packet(CEPPacket::ACK)?;

let mut l_exec = exec.lock().unwrap();
if !l_exec.has_data_ready() {
com.send_packet(CEPPacket::DATA(vec![0]))?;
return Ok(());
}

if let Some(index) =
l_exec.event_vec.as_ref().iter().position(|x| matches!(x, Event::Status(_)))
{
let event = l_exec.event_vec[index];
com.send_packet(CEPPacket::DATA(event.to_bytes()))?;
l_exec.event_vec.remove(index)?;
} else {
let event = *l_exec.event_vec.as_ref().last().unwrap(); // Safe, because we know it is not empty
com.send_packet(CEPPacket::DATA(event.to_bytes()))?;

if !matches!(event, Event::Result(_)) {
// Results are removed when deleted
l_exec.event_vec.pop()?;
}
}

l_exec.check_update_pin();
Ok(())
}
Loading

0 comments on commit 3652fb3

Please sign in to comment.