diff --git a/src/command/common.rs b/src/command/common.rs new file mode 100644 index 0000000..e6e516f --- /dev/null +++ b/src/command/common.rs @@ -0,0 +1,53 @@ +use std::time::Duration; + +use super::{CommandError, CommandResult, SyncExecutionContext}; + +pub const COM_TIMEOUT_DURATION: std::time::Duration = std::time::Duration::new(2, 0); + +pub fn check_length(vec: &Vec, n: usize) -> Result<(), CommandError> { + let actual_len = vec.len(); + if actual_len != n { + log::error!("Command came with {actual_len} bytes, should have {n}"); + Err(CommandError::ProtocolViolation( + format!("Received command with {actual_len} bytes, expected {n}").into(), + )) + } else { + Ok(()) + } +} + +/// Truncates the file at `path` to the given size. Returns wether it actually had to truncate. +pub fn truncate_to_size(path: &str, n_bytes: u64) -> Result { + log::info!("Truncating {:?}", &path); + let file = std::fs::File::options().write(true).open(path)?; + let size = file.metadata()?.len(); + if size > n_bytes { + file.set_len(n_bytes)?; + file.sync_all()?; + Ok(true) + } else { + Ok(false) + } +} + +/// If no program is currently running, this function simply returns. Otherwise it signals the +/// supervisor thread to kill the student program and waits for a maximum of 2s before returning +/// and error +pub fn terminate_student_program(exec: &mut SyncExecutionContext) -> CommandResult { + let mut con = exec.lock().unwrap(); + if !con.is_student_program_running() { + return Ok(()); + } + con.running_flag = false; // Signal watchdog thread to terminate + drop(con); // Release mutex + + for _ in 0..20 { + std::thread::sleep(Duration::from_millis(100)); // Sensible amount? + let con = exec.lock().unwrap(); + if con.thread_handle.as_ref().unwrap().is_finished() { + return Ok(()); + } + } + + Err(CommandError::NonRecoverable("Supervisor thread did not finish in time".into())) +} diff --git a/src/command/execute_program.rs b/src/command/execute_program.rs new file mode 100644 index 0000000..46dcdcb --- /dev/null +++ b/src/command/execute_program.rs @@ -0,0 +1,163 @@ +use std::{path::Path, process::Command, time::Duration}; + +use subprocess::Popen; + +use crate::{ + command::{ + check_length, terminate_student_program, truncate_to_size, Event, ProgramStatus, ResultId, + }, + communication::{CEPPacket, CommunicationHandle}, +}; + +use super::{CommandError, CommandResult, SyncExecutionContext}; + +/// Executes a students program and starts a watchdog for it. The watchdog also creates entries in the +/// status and result queue found in `context`. The result, including logs, is packed into +/// `./data/{program_id}_{timestamp}` +pub fn execute_program( + data: Vec, + com: &mut impl CommunicationHandle, + exec: &mut SyncExecutionContext, +) -> CommandResult { + check_length(&data, 9)?; + com.send_packet(CEPPacket::ACK)?; + + let program_id = u16::from_le_bytes([data[1], data[2]]); + let timestamp = u32::from_le_bytes([data[3], data[4], data[5], data[6]]); + let timeout = Duration::from_secs(u16::from_le_bytes([data[7], data[8]]).into()); + log::info!("Executing Program {}:{} for {}s", program_id, timestamp, timeout.as_secs()); + + terminate_student_program(exec).expect("to terminate a running program"); + + let student_process = create_student_process(program_id, timestamp)?; + + // WATCHDOG THREAD + let mut wd_context = exec.clone(); + let wd_handle = std::thread::spawn(move || { + let exit_code = match supervise_process(student_process, timeout, &mut wd_context) { + Ok(code) => code, + Err(()) => 255, + }; + + log::info!("Program {}:{} finished with {}", program_id, timestamp, exit_code); + let sid = ProgramStatus { program_id, timestamp, exit_code }; + let rid = ResultId { program_id, timestamp }; + build_result_archive(rid).unwrap(); // create the zip file with result and log + + let mut context = wd_context.lock().unwrap(); + context.event_vec.push(Event::Status(sid)).unwrap(); + context.event_vec.push(Event::Result(rid)).unwrap(); + context.running_flag = false; + context.update_pin.set_high(); + drop(context); + }); + + // After spawning the watchdog thread, store its handle and set flag + let mut l_context = exec.lock().unwrap(); + l_context.thread_handle = Some(wd_handle); + l_context.running_flag = true; + drop(l_context); + + com.send_packet(CEPPacket::ACK)?; + Ok(()) +} + +/// This function creates and executes a student process. Its stdout/stderr is written into +/// `./data/[program_id]_[timestamp].log` +fn create_student_process(program_id: u16, timestamp: u32) -> Result { + let program_path = format!("./archives/{}/main.py", program_id); + if !Path::new(&program_path).exists() { + return Err(CommandError::ProtocolViolation("Could not find matching program".into())); + } + + // TODO run the program from a student user (setuid) + let output_file = std::fs::File::create(format!("./data/{}_{}.log", program_id, timestamp))?; // will contain the stdout and stderr of the execute program + let config = subprocess::PopenConfig { + cwd: Some(format!("./archives/{}", program_id).into()), + detached: false, // do not spawn as separate process + stdout: subprocess::Redirection::File(output_file), + stderr: subprocess::Redirection::Merge, + ..Default::default() + }; + + let process = Popen::create(&["python", "main.py", ×tamp.to_string()], config)?; + Ok(process) +} + +/// A function intended to be run in a separate process, which checks every seconds if the given +/// timeout has passed or the process terminated itself. If it didnt, the process is killed. +fn supervise_process( + mut process: Popen, + timeout: Duration, + exec: &mut SyncExecutionContext, +) -> Result { + match run_until_timeout(&mut process, timeout, exec) { + Ok(code) => Ok(code), + Err(()) => { + log::warn!("Student Process timed out or is stopped"); + process.kill().unwrap(); // send SIGKILL + process + .wait_timeout(Duration::from_millis(200)) // wait for it to do its magic + .unwrap() + .unwrap(); // Panic if not stopped + Err(()) + } + } +} + +/// This function allows the program to run for timeout (rounded to seconds) +/// If the program terminates, it exit code is returned +/// If it times out or the running flag is reset, an Err is returned instead +fn run_until_timeout( + process: &mut Popen, + timeout: Duration, + exec: &mut SyncExecutionContext, +) -> Result { + // Loop over timeout in 1s steps + for _ in 0..timeout.as_secs() { + if let Some(status) = process // if student program terminates with exit code + .wait_timeout(Duration::from_secs(1)) + .unwrap() + { + if let subprocess::ExitStatus::Exited(n) = status { + return Ok(n as u8); + } else { + return Ok(0); + } + } + + if !exec.lock().unwrap().running_flag { + // if student program should be stopped + break; + } + } + + Err(()) +} + +/// The function uses `zip` to create an uncompressed archive that includes the result file specified, as well as +/// the programs stdout/stderr and the schedulers log file. If any of the files is missing, the archive +/// is created without them. +fn build_result_archive(res: ResultId) -> Result<(), std::io::Error> { + let res_path = format!("./archives/{}/results/{}", res.program_id, res.timestamp); + let log_path = format!("./data/{}_{}.log", res.program_id, res.timestamp); + let out_path = format!("./data/{}_{}.zip", res.program_id, res.timestamp); + + const MAXIMUM_FILE_SIZE: u64 = 1_000_000; + for path in [&res_path, &log_path, &out_path, &"log".into()] { + if let Ok(true) = truncate_to_size(path, MAXIMUM_FILE_SIZE) { + log::warn!("Truncating {} from {} bytes", path, MAXIMUM_FILE_SIZE); + } + } + + let _ = Command::new("zip") + .arg("-0") + .arg(out_path) + .arg("--junk-paths") + .arg("log") + .arg(res_path) + .arg(log_path) + .status(); + + Ok(()) +} diff --git a/src/command/get_status.rs b/src/command/get_status.rs new file mode 100644 index 0000000..0175d6d --- /dev/null +++ b/src/command/get_status.rs @@ -0,0 +1,39 @@ +use crate::communication::{CEPPacket, CommunicationHandle}; + +use super::{check_length, CommandResult, Event, SyncExecutionContext}; + +/// The function handles the get status command, by checking if either a status or result is enqueued. +/// A status always has priority over a result. +pub fn get_status( + data: Vec, + com: &mut impl CommunicationHandle, + exec: &mut SyncExecutionContext, +) -> CommandResult { + check_length(&data, 1)?; + com.send_packet(CEPPacket::ACK)?; + + let mut l_exec = exec.lock().unwrap(); + if !l_exec.has_data_ready() { + com.send_packet(CEPPacket::DATA(vec![0]))?; + return Ok(()); + } + + if let Some(index) = + l_exec.event_vec.as_ref().iter().position(|x| matches!(x, Event::Status(_))) + { + let event = l_exec.event_vec[index]; + com.send_packet(CEPPacket::DATA(event.to_bytes()))?; + l_exec.event_vec.remove(index)?; + } else { + let event = *l_exec.event_vec.as_ref().last().unwrap(); // Safe, because we know it is not empty + com.send_packet(CEPPacket::DATA(event.to_bytes()))?; + + if !matches!(event, Event::Result(_)) { + // Results are removed when deleted + l_exec.event_vec.pop()?; + } + } + + l_exec.check_update_pin(); + Ok(()) +} diff --git a/src/command/handlers.rs b/src/command/handlers.rs deleted file mode 100644 index 290b866..0000000 --- a/src/command/handlers.rs +++ /dev/null @@ -1,405 +0,0 @@ -use subprocess::Popen; - -use crate::communication::CEPPacket; -use crate::communication::CommunicationHandle; -use std::fs::File; -use std::io::prelude::*; -use std::path::Path; -use std::process::Command; -use std::thread; -use std::time::Duration; - -use super::Event; -use super::ProgramStatus; -use super::ResultId; -use super::{CommandError, CommandResult, SyncExecutionContext}; - -const COM_TIMEOUT_DURATION: std::time::Duration = std::time::Duration::new(2, 0); - -/// This function implements the Store Archive command, including the reception of the archive itself -pub fn store_archive( - data: Vec, - com: &mut impl CommunicationHandle, - _exec: &mut SyncExecutionContext, -) -> CommandResult { - check_length(&data, 3)?; - com.send_packet(CEPPacket::ACK)?; - - let id = u16::from_le_bytes([data[1], data[2]]).to_string(); - log::info!("Storing Archive {}", id); - - let bytes = com.receive_multi_packet(&COM_TIMEOUT_DURATION, || false)?; // !! TODO !! - unpack_archive(id, bytes)?; - - com.send_packet(CEPPacket::ACK)?; - Ok(()) -} - -/// Stores a received program in the appropriate folder and unzips it -/// -/// * `folder` The folder to unzip into, subsequently the program id -/// * `bytes` A vector containing the raw bytes of the zip archive -/// -/// Returns Ok or passes along a file access/unzip process error -fn unpack_archive(folder: String, bytes: Vec) -> CommandResult { - // Store bytes into temporary file - let zip_path = format!("./data/{}.zip", folder); - let mut zip_file = File::create(&zip_path)?; - zip_file.write_all(&bytes)?; - zip_file.sync_all()?; - - let exit_status = Command::new("unzip") - .arg("-o") // overwrite silently - .arg(&zip_path) - .arg("-d") // target directory - .arg(format!("./archives/{}", folder)) - .status(); - - // Remove the temporary file, even if unzip failed - std::fs::remove_file(zip_path)?; - - match exit_status { - Ok(status) => { - if !status.success() { - return Err(CommandError::NonRecoverable("unzip failed".into())); - } - } - Err(err) => { - return Err(err.into()); - } - } - - Ok(()) -} - -/// Executes a students program and starts a watchdog for it. The watchdog also creates entries in the -/// status and result queue found in `context`. The result, including logs, is packed into -/// `./data/{program_id}_{timestamp}` -pub fn execute_program( - data: Vec, - com: &mut impl CommunicationHandle, - exec: &mut SyncExecutionContext, -) -> CommandResult { - check_length(&data, 9)?; - com.send_packet(CEPPacket::ACK)?; - - let program_id = u16::from_le_bytes([data[1], data[2]]); - let timestamp = u32::from_le_bytes([data[3], data[4], data[5], data[6]]); - let timeout = Duration::from_secs(u16::from_le_bytes([data[7], data[8]]).into()); - log::info!("Executing Program {}:{} for {}s", program_id, timestamp, timeout.as_secs()); - - terminate_student_program(exec).expect("to terminate a running program"); - - let student_process = create_student_process(program_id, timestamp)?; - - // WATCHDOG THREAD - let mut wd_context = exec.clone(); - let wd_handle = thread::spawn(move || { - let exit_code = match supervise_process(student_process, timeout, &mut wd_context) { - Ok(code) => code, - Err(()) => 255, - }; - - log::info!("Program {}:{} finished with {}", program_id, timestamp, exit_code); - let sid = ProgramStatus { program_id, timestamp, exit_code }; - let rid = ResultId { program_id, timestamp }; - build_result_archive(rid).unwrap(); // create the zip file with result and log - - let mut context = wd_context.lock().unwrap(); - context.event_vec.push(Event::Status(sid)).unwrap(); - context.event_vec.push(Event::Result(rid)).unwrap(); - context.running_flag = false; - context.update_pin.set_high(); - drop(context); - }); - - // After spawning the watchdog thread, store its handle and set flag - let mut l_context = exec.lock().unwrap(); - l_context.thread_handle = Some(wd_handle); - l_context.running_flag = true; - drop(l_context); - - com.send_packet(CEPPacket::ACK)?; - Ok(()) -} - -/// This function creates and executes a student process. Its stdout/stderr is written into -/// `./data/[program_id]_[timestamp].log` -fn create_student_process(program_id: u16, timestamp: u32) -> Result { - let program_path = format!("./archives/{}/main.py", program_id); - if !Path::new(&program_path).exists() { - return Err(CommandError::ProtocolViolation("Could not find matching program".into())); - } - - // TODO run the program from a student user (setuid) - let output_file = File::create(format!("./data/{}_{}.log", program_id, timestamp))?; // will contain the stdout and stderr of the execute program - let config = subprocess::PopenConfig { - cwd: Some(format!("./archives/{}", program_id).into()), - detached: false, // do not spawn as separate process - stdout: subprocess::Redirection::File(output_file), - stderr: subprocess::Redirection::Merge, - ..Default::default() - }; - - let process = Popen::create(&["python", "main.py", ×tamp.to_string()], config)?; - Ok(process) -} - -/// A function intended to be run in a separate process, which checks every seconds if the given -/// timeout has passed or the process terminated itself. If it didnt, the process is killed. -fn supervise_process( - mut process: Popen, - timeout: Duration, - exec: &mut SyncExecutionContext, -) -> Result { - match run_until_timeout(&mut process, timeout, exec) { - Ok(code) => Ok(code), - Err(()) => { - log::warn!("Student Process timed out or is stopped"); - process.kill().unwrap(); // send SIGKILL - process - .wait_timeout(Duration::from_millis(200)) // wait for it to do its magic - .unwrap() - .unwrap(); // Panic if not stopped - Err(()) - } - } -} - -/// This function allows the program to run for timeout (rounded to seconds) -/// If the program terminates, it exit code is returned -/// If it times out or the running flag is reset, an Err is returned instead -fn run_until_timeout( - process: &mut Popen, - timeout: Duration, - exec: &mut SyncExecutionContext, -) -> Result { - // Loop over timeout in 1s steps - for _ in 0..timeout.as_secs() { - if let Some(status) = process // if student program terminates with exit code - .wait_timeout(Duration::from_secs(1)) - .unwrap() - { - if let subprocess::ExitStatus::Exited(n) = status { - return Ok(n as u8); - } else { - return Ok(0); - } - } - - if !exec.lock().unwrap().running_flag { - // if student program should be stopped - break; - } - } - - Err(()) -} - -/// The function uses `zip` to create an uncompressed archive that includes the result file specified, as well as -/// the programs stdout/stderr and the schedulers log file. If any of the files is missing, the archive -/// is created without them. -fn build_result_archive(res: ResultId) -> Result<(), std::io::Error> { - let res_path = format!("./archives/{}/results/{}", res.program_id, res.timestamp); - let log_path = format!("./data/{}_{}.log", res.program_id, res.timestamp); - let out_path = format!("./data/{}_{}.zip", res.program_id, res.timestamp); - - const MAXIMUM_FILE_SIZE: u64 = 1_000_000; - for path in [&res_path, &log_path, &out_path, &"log".into()] { - if let Ok(true) = truncate_to_size(path, MAXIMUM_FILE_SIZE) { - log::warn!("Truncating {} from {} bytes", path, MAXIMUM_FILE_SIZE); - } - } - - let _ = Command::new("zip") - .arg("-0") - .arg(out_path) - .arg("--junk-paths") - .arg("log") - .arg(res_path) - .arg(log_path) - .status(); - - Ok(()) -} - -/// Truncates the file at `path` to the given size. Returns wether it actually had to truncate. -fn truncate_to_size(path: &str, n_bytes: u64) -> Result { - log::info!("Truncating {:?}", &path); - let file = std::fs::File::options().write(true).open(path)?; - let size = file.metadata()?.len(); - if size > n_bytes { - file.set_len(n_bytes)?; - file.sync_all()?; - Ok(true) - } - else { - Ok(false) - } -} - -/// Stops the currently running student program -pub fn stop_program( - data: Vec, - com: &mut impl CommunicationHandle, - exec: &mut SyncExecutionContext, -) -> CommandResult { - check_length(&data, 1)?; - com.send_packet(CEPPacket::ACK)?; - - terminate_student_program(exec).expect("to terminate student program"); - - com.send_packet(CEPPacket::ACK)?; - Ok(()) -} - -/// If no program is currently running, this function simply returns. Otherwise it signals the -/// supervisor thread to kill the student program and waits for a maximum of 2s before returning -/// and error -fn terminate_student_program(exec: &mut SyncExecutionContext) -> CommandResult { - let mut con = exec.lock().unwrap(); - if !con.is_student_program_running() { - return Ok(()); - } - con.running_flag = false; // Signal watchdog thread to terminate - drop(con); // Release mutex - - for _ in 0..20 { - std::thread::sleep(Duration::from_millis(100)); // Sensible amount? - let con = exec.lock().unwrap(); - if con.thread_handle.as_ref().unwrap().is_finished() { - return Ok(()); - } - } - - Err(CommandError::NonRecoverable("Supervisor thread did not finish in time".into())) -} - -/// The function handles the get status command, by checking if either a status or result is enqueued. -/// A status always has priority over a result. -pub fn get_status( - data: Vec, - com: &mut impl CommunicationHandle, - exec: &mut SyncExecutionContext, -) -> CommandResult { - check_length(&data, 1)?; - com.send_packet(CEPPacket::ACK)?; - - let mut l_exec = exec.lock().unwrap(); - if !l_exec.has_data_ready() { - com.send_packet(CEPPacket::DATA(vec![0]))?; - return Ok(()); - } - - if let Some(index) = - l_exec.event_vec.as_ref().iter().position(|x| matches!(x, Event::Status(_))) - { - let event = l_exec.event_vec[index]; - com.send_packet(CEPPacket::DATA(event.to_bytes()))?; - l_exec.event_vec.remove(index)?; - } else { - let event = *l_exec.event_vec.as_ref().last().unwrap(); // Safe, because we know it is not empty - com.send_packet(CEPPacket::DATA(event.to_bytes()))?; - - if !matches!(event, Event::Result(_)) { - // Results are removed when deleted - l_exec.event_vec.pop()?; - } - } - - l_exec.check_update_pin(); - Ok(()) -} - -/// Handles a complete return result command. The result zip file is only deleted if a final ACK is -/// received. -pub fn return_result( - data: Vec, - com: &mut impl CommunicationHandle, - exec: &mut SyncExecutionContext, -) -> CommandResult { - check_length(&data, 7)?; - com.send_packet(CEPPacket::ACK)?; - - let program_id = u16::from_le_bytes([data[1], data[2]]); - let timestamp = u32::from_le_bytes([data[3], data[4], data[5], data[6]]); - let result_path = format!("./data/{}_{}.zip", program_id, timestamp); - - if !std::path::Path::new(&result_path).exists() { - return Err(CommandError::ProtocolViolation( - format!("Result {}:{} does not exist", program_id, timestamp).into(), - )); - } - - let bytes = std::fs::read(result_path)?; - log::info!("Returning result for {}:{}", program_id, timestamp); - com.send_multi_packet(bytes, &COM_TIMEOUT_DURATION)?; - - let response = com.receive_packet(&COM_TIMEOUT_DURATION)?; - if response == CEPPacket::ACK { - let result_id = ResultId { program_id, timestamp }; - delete_result(result_id)?; - - let mut l_exec = exec.lock().unwrap(); - let event_index = - l_exec.event_vec.as_ref().iter().position(|x| x == &Event::Result(result_id)).unwrap(); - l_exec.event_vec.remove(event_index)?; - l_exec.check_update_pin(); - drop(l_exec); - } else { - log::error!("COBC did not acknowledge result"); - } - - Ok(()) -} - -/// Deletes the result archive corresponding to the next element in the result queue and removes -/// that element from the queue. The update pin is updated accordingly -fn delete_result(res: ResultId) -> CommandResult { - let res_path = format!("./archives/{}/results/{}", res.program_id, res.timestamp); - let log_path = format!("./data/{}_{}.log", res.program_id, res.timestamp); - let out_path = format!("./data/{}_{}.zip", res.program_id, res.timestamp); - let _ = std::fs::remove_file(res_path); - let _ = std::fs::remove_file(log_path); - let _ = std::fs::remove_file(out_path); - let _ = truncate_to_size("log", 0); - - Ok(()) -} - -/// Handles the update time command -pub fn update_time( - data: Vec, - com: &mut impl CommunicationHandle, - _exec: &mut SyncExecutionContext, -) -> CommandResult { - check_length(&data, 5)?; - com.send_packet(CEPPacket::ACK)?; - - let time = i32::from_le_bytes([data[1], data[2], data[3], data[4]]); - set_system_time(time)?; - - com.send_packet(CEPPacket::ACK)?; - Ok(()) -} - -fn set_system_time(s_since_epoch: i32) -> CommandResult { - let exit_status = Command::new("date").arg("-s").arg(format!("@{}", s_since_epoch)).status()?; - if !exit_status.success() { - return Err(CommandError::NonRecoverable("date utility failed".into())); - } - - Ok(()) -} - -fn check_length(vec: &Vec, n: usize) -> Result<(), CommandError> { - let actual_len = vec.len(); - if actual_len != n { - log::error!("Command came with {actual_len} bytes, should have {n}"); - Err(CommandError::ProtocolViolation( - format!("Received command with {actual_len} bytes, expected {n}").into(), - )) - } else { - Ok(()) - } -} diff --git a/src/command/mod.rs b/src/command/mod.rs index b98a326..9df9bc4 100644 --- a/src/command/mod.rs +++ b/src/command/mod.rs @@ -1,13 +1,27 @@ use crate::communication::{CEPPacket, CommunicationHandle}; use std::time::Duration; -mod handlers; -pub use handlers::*; +mod common; +pub use common::*; mod execution_context; pub use execution_context::*; mod error; pub use error::CommandError; +mod execute_program; +mod get_status; +mod return_result; +mod stop_program; +mod store_archive; +mod update_time; + +use execute_program::execute_program; +use get_status::get_status; +use return_result::return_result; +use stop_program::stop_program; +use store_archive::store_archive; +use update_time::update_time; + type CommandResult = Result<(), CommandError>; /// Main routine. Waits for a command to be received from the COBC, then parses and executes it. diff --git a/src/command/return_result.rs b/src/command/return_result.rs new file mode 100644 index 0000000..31b3c57 --- /dev/null +++ b/src/command/return_result.rs @@ -0,0 +1,62 @@ +use crate::{ + command::{check_length, CommandError, Event, ResultId, COM_TIMEOUT_DURATION}, + communication::{CEPPacket, CommunicationHandle}, +}; + +use super::{truncate_to_size, CommandResult, SyncExecutionContext}; + +/// Handles a complete return result command. The result zip file is only deleted if a final ACK is +/// received. +pub fn return_result( + data: Vec, + com: &mut impl CommunicationHandle, + exec: &mut SyncExecutionContext, +) -> CommandResult { + check_length(&data, 7)?; + com.send_packet(CEPPacket::ACK)?; + + let program_id = u16::from_le_bytes([data[1], data[2]]); + let timestamp = u32::from_le_bytes([data[3], data[4], data[5], data[6]]); + let result_path = format!("./data/{}_{}.zip", program_id, timestamp); + + if !std::path::Path::new(&result_path).exists() { + return Err(CommandError::ProtocolViolation( + format!("Result {}:{} does not exist", program_id, timestamp).into(), + )); + } + + let bytes = std::fs::read(result_path)?; + log::info!("Returning result for {}:{}", program_id, timestamp); + com.send_multi_packet(bytes, &COM_TIMEOUT_DURATION)?; + + let response = com.receive_packet(&COM_TIMEOUT_DURATION)?; + if response == CEPPacket::ACK { + let result_id = ResultId { program_id, timestamp }; + delete_result(result_id)?; + + let mut l_exec = exec.lock().unwrap(); + let event_index = + l_exec.event_vec.as_ref().iter().position(|x| x == &Event::Result(result_id)).unwrap(); + l_exec.event_vec.remove(event_index)?; + l_exec.check_update_pin(); + drop(l_exec); + } else { + log::error!("COBC did not acknowledge result"); + } + + Ok(()) +} + +/// Deletes the result archive corresponding to the next element in the result queue and removes +/// that element from the queue. The update pin is updated accordingly +fn delete_result(res: ResultId) -> CommandResult { + let res_path = format!("./archives/{}/results/{}", res.program_id, res.timestamp); + let log_path = format!("./data/{}_{}.log", res.program_id, res.timestamp); + let out_path = format!("./data/{}_{}.zip", res.program_id, res.timestamp); + let _ = std::fs::remove_file(res_path); + let _ = std::fs::remove_file(log_path); + let _ = std::fs::remove_file(out_path); + let _ = truncate_to_size("log", 0); + + Ok(()) +} diff --git a/src/command/stop_program.rs b/src/command/stop_program.rs new file mode 100644 index 0000000..3899fe6 --- /dev/null +++ b/src/command/stop_program.rs @@ -0,0 +1,18 @@ +use crate::communication::{CEPPacket, CommunicationHandle}; + +use super::{check_length, terminate_student_program, CommandResult, SyncExecutionContext}; + +/// Stops the currently running student program +pub fn stop_program( + data: Vec, + com: &mut impl CommunicationHandle, + exec: &mut SyncExecutionContext, +) -> CommandResult { + check_length(&data, 1)?; + com.send_packet(CEPPacket::ACK)?; + + terminate_student_program(exec).expect("to terminate student program"); + + com.send_packet(CEPPacket::ACK)?; + Ok(()) +} diff --git a/src/command/store_archive.rs b/src/command/store_archive.rs new file mode 100644 index 0000000..3e93d61 --- /dev/null +++ b/src/command/store_archive.rs @@ -0,0 +1,63 @@ +use std::{io::Write, process::Command}; + +use super::{CommandError, CommandResult, SyncExecutionContext}; +use crate::{ + command::{check_length, COM_TIMEOUT_DURATION}, + communication::{CEPPacket, CommunicationHandle}, +}; + +/// This function implements the Store Archive command, including the reception of the archive itself +pub fn store_archive( + data: Vec, + com: &mut impl CommunicationHandle, + _exec: &mut SyncExecutionContext, +) -> CommandResult { + check_length(&data, 3)?; + com.send_packet(CEPPacket::ACK)?; + + let id = u16::from_le_bytes([data[1], data[2]]).to_string(); + log::info!("Storing Archive {}", id); + + let bytes = com.receive_multi_packet(&COM_TIMEOUT_DURATION, || false)?; // !! TODO !! + unpack_archive(id, bytes)?; + + com.send_packet(CEPPacket::ACK)?; + Ok(()) +} + +/// Stores a received program in the appropriate folder and unzips it +/// +/// * `folder` The folder to unzip into, subsequently the program id +/// * `bytes` A vector containing the raw bytes of the zip archive +/// +/// Returns Ok or passes along a file access/unzip process error +fn unpack_archive(folder: String, bytes: Vec) -> CommandResult { + // Store bytes into temporary file + let zip_path = format!("./data/{}.zip", folder); + let mut zip_file = std::fs::File::create(&zip_path)?; + zip_file.write_all(&bytes)?; + zip_file.sync_all()?; + + let exit_status = Command::new("unzip") + .arg("-o") // overwrite silently + .arg(&zip_path) + .arg("-d") // target directory + .arg(format!("./archives/{}", folder)) + .status(); + + // Remove the temporary file, even if unzip failed + std::fs::remove_file(zip_path)?; + + match exit_status { + Ok(status) => { + if !status.success() { + return Err(CommandError::NonRecoverable("unzip failed".into())); + } + } + Err(err) => { + return Err(err.into()); + } + } + + Ok(()) +} diff --git a/src/command/update_time.rs b/src/command/update_time.rs new file mode 100644 index 0000000..8ac08b0 --- /dev/null +++ b/src/command/update_time.rs @@ -0,0 +1,30 @@ +use std::process::Command; + +use crate::communication::{CEPPacket, CommunicationHandle}; + +use super::{check_length, CommandError, CommandResult, SyncExecutionContext}; + +/// Handles the update time command +pub fn update_time( + data: Vec, + com: &mut impl CommunicationHandle, + _exec: &mut SyncExecutionContext, +) -> CommandResult { + check_length(&data, 5)?; + com.send_packet(CEPPacket::ACK)?; + + let time = i32::from_le_bytes([data[1], data[2], data[3], data[4]]); + set_system_time(time)?; + + com.send_packet(CEPPacket::ACK)?; + Ok(()) +} + +fn set_system_time(s_since_epoch: i32) -> CommandResult { + let exit_status = Command::new("date").arg("-s").arg(format!("@{}", s_since_epoch)).status()?; + if !exit_status.success() { + return Err(CommandError::NonRecoverable("date utility failed".into())); + } + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 3677948..7c2698c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -56,7 +56,7 @@ fn heartbeat_loop(heartbeat_pin: u8, freq: u64) -> ! { if cfg!(feature = "mock") { loop {} } - + let toogle_time = time::Duration::from_millis((1000 / freq / 2) as u64); let gpio = Gpio::new().unwrap(); @@ -73,8 +73,8 @@ fn heartbeat_loop(heartbeat_pin: u8, freq: u64) -> ! { /// Tries to create a directory, but only returns an error if the path does not already exists fn create_directory_if_not_exists(path: impl AsRef) -> std::io::Result<()> { match std::fs::create_dir(path) { - Ok(_) => Ok(()), + Ok(_) => Ok(()), Err(ref e) if e.kind() == std::io::ErrorKind::AlreadyExists => Ok(()), - Err(e) => Err(e) + Err(e) => Err(e), } -} \ No newline at end of file +} diff --git a/tests/simulation/logging.rs b/tests/simulation/logging.rs index 6f51b95..1fe4de5 100644 --- a/tests/simulation/logging.rs +++ b/tests/simulation/logging.rs @@ -22,7 +22,7 @@ fn logfile_is_cleared_after_sent() -> std::io::Result<()> { let _ = simulate_return_result(&mut cobc_in, &mut cobc_out, 1, 0)?; cobc_out.write_all(&CEPPacket::ACK.serialize())?; std::thread::sleep(std::time::Duration::from_millis(100)); - + scheduler.kill().unwrap(); let log_metadata = std::fs::metadata("./tests/tmp/log_is_cleared_after_sent/log")?; diff --git a/tests/simulation/mod.rs b/tests/simulation/mod.rs index 1ba782a..4a96d33 100644 --- a/tests/simulation/mod.rs +++ b/tests/simulation/mod.rs @@ -121,7 +121,10 @@ pub fn read_data_packet(input: &mut impl std::io::Read, data: &mut Vec) -> s } /// Reads a multi packet round without checking the CRC and returns the concatenated contents -pub fn read_multi_data_packets(input: &mut impl std::io::Read, output: &mut impl std::io::Write) -> std::io::Result> { +pub fn read_multi_data_packets( + input: &mut impl std::io::Read, + output: &mut impl std::io::Write, +) -> std::io::Result> { let mut eof_byte = [0; 1]; let mut data = Vec::new(); loop { @@ -133,7 +136,7 @@ pub fn read_multi_data_packets(input: &mut impl std::io::Read, output: &mut impl break; } } - + output.write_all(&CEPPacket::ACK.serialize())?; Ok(data) -} \ No newline at end of file +} diff --git a/tests/software_tests/common.rs b/tests/software_tests/common.rs index a24185e..1de366b 100644 --- a/tests/software_tests/common.rs +++ b/tests/software_tests/common.rs @@ -1,4 +1,8 @@ -use std::{sync::{Arc, Mutex}, process::{Stdio, Child}, io::{Write, Read}}; +use std::{ + io::{Read, Write}, + process::{Child, Stdio}, + sync::{Arc, Mutex}, +}; use STS1_EDU_Scheduler::{ command::{ExecutionContext, SyncExecutionContext}, diff --git a/tests/software_tests/execute_program.rs b/tests/software_tests/execute_program.rs index a186983..f0e29f7 100644 --- a/tests/software_tests/execute_program.rs +++ b/tests/software_tests/execute_program.rs @@ -1,10 +1,10 @@ use std::io::Read; -use STS1_EDU_Scheduler::command::{self}; -use STS1_EDU_Scheduler::communication::CEPPacket::*; use crate::software_tests::common; use crate::software_tests::common::ComEvent::*; use common::*; +use STS1_EDU_Scheduler::command::{self}; +use STS1_EDU_Scheduler::communication::CEPPacket::*; type TestResult = Result<(), Box>; diff --git a/tests/software_tests/get_status.rs b/tests/software_tests/get_status.rs index 99ba448..acf9ca4 100644 --- a/tests/software_tests/get_status.rs +++ b/tests/software_tests/get_status.rs @@ -1,8 +1,8 @@ -use STS1_EDU_Scheduler::command::{self}; -use STS1_EDU_Scheduler::communication::CEPPacket::*; use crate::software_tests::common; use crate::software_tests::common::ComEvent::*; use common::*; +use STS1_EDU_Scheduler::command::{self}; +use STS1_EDU_Scheduler::communication::CEPPacket::*; type TestResult = Result<(), Box>; diff --git a/tests/software_tests/mod.rs b/tests/software_tests/mod.rs index e525cc8..9893d83 100644 --- a/tests/software_tests/mod.rs +++ b/tests/software_tests/mod.rs @@ -1,8 +1,8 @@ -pub mod common; mod command_integration; +pub mod common; mod communication_tests; -mod store_archive; mod execute_program; -mod stop_program; mod get_status; mod return_result; +mod stop_program; +mod store_archive; diff --git a/tests/software_tests/return_result.rs b/tests/software_tests/return_result.rs index cf4add3..b108e36 100644 --- a/tests/software_tests/return_result.rs +++ b/tests/software_tests/return_result.rs @@ -1,10 +1,10 @@ use std::io::Write; -use STS1_EDU_Scheduler::command::{self}; -use STS1_EDU_Scheduler::communication::CEPPacket::*; use crate::software_tests::common; use crate::software_tests::common::ComEvent::*; use common::*; +use STS1_EDU_Scheduler::command::{self}; +use STS1_EDU_Scheduler::communication::CEPPacket::*; type TestResult = Result<(), Box>; diff --git a/tests/software_tests/stop_program.rs b/tests/software_tests/stop_program.rs index 0c9770f..f8cd0ef 100644 --- a/tests/software_tests/stop_program.rs +++ b/tests/software_tests/stop_program.rs @@ -1,8 +1,8 @@ -use STS1_EDU_Scheduler::command::{self}; -use STS1_EDU_Scheduler::communication::CEPPacket::*; use crate::software_tests::common; use crate::software_tests::common::ComEvent::*; use common::*; +use STS1_EDU_Scheduler::command::{self}; +use STS1_EDU_Scheduler::communication::CEPPacket::*; type TestResult = Result<(), Box>; diff --git a/tests/software_tests/store_archive.rs b/tests/software_tests/store_archive.rs index 80353e8..ac82d57 100644 --- a/tests/software_tests/store_archive.rs +++ b/tests/software_tests/store_archive.rs @@ -1,7 +1,7 @@ -use STS1_EDU_Scheduler::command::{self}; -use STS1_EDU_Scheduler::communication::CEPPacket::*; use crate::software_tests::common; use crate::software_tests::common::ComEvent::*; +use STS1_EDU_Scheduler::command::{self}; +use STS1_EDU_Scheduler::communication::CEPPacket::*; type TestResult = Result<(), Box>;