-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #122 from SpaceTeam/121-src-structure
create individual files for commands and format source
- Loading branch information
Showing
19 changed files
with
473 additions
and
429 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
use std::time::Duration; | ||
|
||
use super::{CommandError, CommandResult, SyncExecutionContext}; | ||
|
||
pub const COM_TIMEOUT_DURATION: std::time::Duration = std::time::Duration::new(2, 0); | ||
|
||
pub fn check_length(vec: &Vec<u8>, n: usize) -> Result<(), CommandError> { | ||
let actual_len = vec.len(); | ||
if actual_len != n { | ||
log::error!("Command came with {actual_len} bytes, should have {n}"); | ||
Err(CommandError::ProtocolViolation( | ||
format!("Received command with {actual_len} bytes, expected {n}").into(), | ||
)) | ||
} else { | ||
Ok(()) | ||
} | ||
} | ||
|
||
/// Truncates the file at `path` to the given size. Returns wether it actually had to truncate. | ||
pub fn truncate_to_size(path: &str, n_bytes: u64) -> Result<bool, std::io::Error> { | ||
log::info!("Truncating {:?}", &path); | ||
let file = std::fs::File::options().write(true).open(path)?; | ||
let size = file.metadata()?.len(); | ||
if size > n_bytes { | ||
file.set_len(n_bytes)?; | ||
file.sync_all()?; | ||
Ok(true) | ||
} else { | ||
Ok(false) | ||
} | ||
} | ||
|
||
/// If no program is currently running, this function simply returns. Otherwise it signals the | ||
/// supervisor thread to kill the student program and waits for a maximum of 2s before returning | ||
/// and error | ||
pub fn terminate_student_program(exec: &mut SyncExecutionContext) -> CommandResult { | ||
let mut con = exec.lock().unwrap(); | ||
if !con.is_student_program_running() { | ||
return Ok(()); | ||
} | ||
con.running_flag = false; // Signal watchdog thread to terminate | ||
drop(con); // Release mutex | ||
|
||
for _ in 0..20 { | ||
std::thread::sleep(Duration::from_millis(100)); // Sensible amount? | ||
let con = exec.lock().unwrap(); | ||
if con.thread_handle.as_ref().unwrap().is_finished() { | ||
return Ok(()); | ||
} | ||
} | ||
|
||
Err(CommandError::NonRecoverable("Supervisor thread did not finish in time".into())) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
use std::{path::Path, process::Command, time::Duration}; | ||
|
||
use subprocess::Popen; | ||
|
||
use crate::{ | ||
command::{ | ||
check_length, terminate_student_program, truncate_to_size, Event, ProgramStatus, ResultId, | ||
}, | ||
communication::{CEPPacket, CommunicationHandle}, | ||
}; | ||
|
||
use super::{CommandError, CommandResult, SyncExecutionContext}; | ||
|
||
/// Executes a students program and starts a watchdog for it. The watchdog also creates entries in the | ||
/// status and result queue found in `context`. The result, including logs, is packed into | ||
/// `./data/{program_id}_{timestamp}` | ||
pub fn execute_program( | ||
data: Vec<u8>, | ||
com: &mut impl CommunicationHandle, | ||
exec: &mut SyncExecutionContext, | ||
) -> CommandResult { | ||
check_length(&data, 9)?; | ||
com.send_packet(CEPPacket::ACK)?; | ||
|
||
let program_id = u16::from_le_bytes([data[1], data[2]]); | ||
let timestamp = u32::from_le_bytes([data[3], data[4], data[5], data[6]]); | ||
let timeout = Duration::from_secs(u16::from_le_bytes([data[7], data[8]]).into()); | ||
log::info!("Executing Program {}:{} for {}s", program_id, timestamp, timeout.as_secs()); | ||
|
||
terminate_student_program(exec).expect("to terminate a running program"); | ||
|
||
let student_process = create_student_process(program_id, timestamp)?; | ||
|
||
// WATCHDOG THREAD | ||
let mut wd_context = exec.clone(); | ||
let wd_handle = std::thread::spawn(move || { | ||
let exit_code = match supervise_process(student_process, timeout, &mut wd_context) { | ||
Ok(code) => code, | ||
Err(()) => 255, | ||
}; | ||
|
||
log::info!("Program {}:{} finished with {}", program_id, timestamp, exit_code); | ||
let sid = ProgramStatus { program_id, timestamp, exit_code }; | ||
let rid = ResultId { program_id, timestamp }; | ||
build_result_archive(rid).unwrap(); // create the zip file with result and log | ||
|
||
let mut context = wd_context.lock().unwrap(); | ||
context.event_vec.push(Event::Status(sid)).unwrap(); | ||
context.event_vec.push(Event::Result(rid)).unwrap(); | ||
context.running_flag = false; | ||
context.update_pin.set_high(); | ||
drop(context); | ||
}); | ||
|
||
// After spawning the watchdog thread, store its handle and set flag | ||
let mut l_context = exec.lock().unwrap(); | ||
l_context.thread_handle = Some(wd_handle); | ||
l_context.running_flag = true; | ||
drop(l_context); | ||
|
||
com.send_packet(CEPPacket::ACK)?; | ||
Ok(()) | ||
} | ||
|
||
/// This function creates and executes a student process. Its stdout/stderr is written into | ||
/// `./data/[program_id]_[timestamp].log` | ||
fn create_student_process(program_id: u16, timestamp: u32) -> Result<Popen, CommandError> { | ||
let program_path = format!("./archives/{}/main.py", program_id); | ||
if !Path::new(&program_path).exists() { | ||
return Err(CommandError::ProtocolViolation("Could not find matching program".into())); | ||
} | ||
|
||
// TODO run the program from a student user (setuid) | ||
let output_file = std::fs::File::create(format!("./data/{}_{}.log", program_id, timestamp))?; // will contain the stdout and stderr of the execute program | ||
let config = subprocess::PopenConfig { | ||
cwd: Some(format!("./archives/{}", program_id).into()), | ||
detached: false, // do not spawn as separate process | ||
stdout: subprocess::Redirection::File(output_file), | ||
stderr: subprocess::Redirection::Merge, | ||
..Default::default() | ||
}; | ||
|
||
let process = Popen::create(&["python", "main.py", ×tamp.to_string()], config)?; | ||
Ok(process) | ||
} | ||
|
||
/// A function intended to be run in a separate process, which checks every seconds if the given | ||
/// timeout has passed or the process terminated itself. If it didnt, the process is killed. | ||
fn supervise_process( | ||
mut process: Popen, | ||
timeout: Duration, | ||
exec: &mut SyncExecutionContext, | ||
) -> Result<u8, ()> { | ||
match run_until_timeout(&mut process, timeout, exec) { | ||
Ok(code) => Ok(code), | ||
Err(()) => { | ||
log::warn!("Student Process timed out or is stopped"); | ||
process.kill().unwrap(); // send SIGKILL | ||
process | ||
.wait_timeout(Duration::from_millis(200)) // wait for it to do its magic | ||
.unwrap() | ||
.unwrap(); // Panic if not stopped | ||
Err(()) | ||
} | ||
} | ||
} | ||
|
||
/// This function allows the program to run for timeout (rounded to seconds) | ||
/// If the program terminates, it exit code is returned | ||
/// If it times out or the running flag is reset, an Err is returned instead | ||
fn run_until_timeout( | ||
process: &mut Popen, | ||
timeout: Duration, | ||
exec: &mut SyncExecutionContext, | ||
) -> Result<u8, ()> { | ||
// Loop over timeout in 1s steps | ||
for _ in 0..timeout.as_secs() { | ||
if let Some(status) = process // if student program terminates with exit code | ||
.wait_timeout(Duration::from_secs(1)) | ||
.unwrap() | ||
{ | ||
if let subprocess::ExitStatus::Exited(n) = status { | ||
return Ok(n as u8); | ||
} else { | ||
return Ok(0); | ||
} | ||
} | ||
|
||
if !exec.lock().unwrap().running_flag { | ||
// if student program should be stopped | ||
break; | ||
} | ||
} | ||
|
||
Err(()) | ||
} | ||
|
||
/// The function uses `zip` to create an uncompressed archive that includes the result file specified, as well as | ||
/// the programs stdout/stderr and the schedulers log file. If any of the files is missing, the archive | ||
/// is created without them. | ||
fn build_result_archive(res: ResultId) -> Result<(), std::io::Error> { | ||
let res_path = format!("./archives/{}/results/{}", res.program_id, res.timestamp); | ||
let log_path = format!("./data/{}_{}.log", res.program_id, res.timestamp); | ||
let out_path = format!("./data/{}_{}.zip", res.program_id, res.timestamp); | ||
|
||
const MAXIMUM_FILE_SIZE: u64 = 1_000_000; | ||
for path in [&res_path, &log_path, &out_path, &"log".into()] { | ||
if let Ok(true) = truncate_to_size(path, MAXIMUM_FILE_SIZE) { | ||
log::warn!("Truncating {} from {} bytes", path, MAXIMUM_FILE_SIZE); | ||
} | ||
} | ||
|
||
let _ = Command::new("zip") | ||
.arg("-0") | ||
.arg(out_path) | ||
.arg("--junk-paths") | ||
.arg("log") | ||
.arg(res_path) | ||
.arg(log_path) | ||
.status(); | ||
|
||
Ok(()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
use crate::communication::{CEPPacket, CommunicationHandle}; | ||
|
||
use super::{check_length, CommandResult, Event, SyncExecutionContext}; | ||
|
||
/// The function handles the get status command, by checking if either a status or result is enqueued. | ||
/// A status always has priority over a result. | ||
pub fn get_status( | ||
data: Vec<u8>, | ||
com: &mut impl CommunicationHandle, | ||
exec: &mut SyncExecutionContext, | ||
) -> CommandResult { | ||
check_length(&data, 1)?; | ||
com.send_packet(CEPPacket::ACK)?; | ||
|
||
let mut l_exec = exec.lock().unwrap(); | ||
if !l_exec.has_data_ready() { | ||
com.send_packet(CEPPacket::DATA(vec![0]))?; | ||
return Ok(()); | ||
} | ||
|
||
if let Some(index) = | ||
l_exec.event_vec.as_ref().iter().position(|x| matches!(x, Event::Status(_))) | ||
{ | ||
let event = l_exec.event_vec[index]; | ||
com.send_packet(CEPPacket::DATA(event.to_bytes()))?; | ||
l_exec.event_vec.remove(index)?; | ||
} else { | ||
let event = *l_exec.event_vec.as_ref().last().unwrap(); // Safe, because we know it is not empty | ||
com.send_packet(CEPPacket::DATA(event.to_bytes()))?; | ||
|
||
if !matches!(event, Event::Result(_)) { | ||
// Results are removed when deleted | ||
l_exec.event_vec.pop()?; | ||
} | ||
} | ||
|
||
l_exec.check_update_pin(); | ||
Ok(()) | ||
} |
Oops, something went wrong.