diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f794679..6c6faa6 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -11,6 +11,8 @@ repos: - id: check - id: clippy args: + - --all-targets + - --all-features - -- - -Dwarnings - repo: https://github.com/pre-commit/pre-commit-hooks diff --git a/doc/src/developers/style.md b/doc/src/developers/style.md index 996429b..b0bb84a 100644 --- a/doc/src/developers/style.md +++ b/doc/src/developers/style.md @@ -5,5 +5,3 @@ **Row's** [pre-commit](https://pre-commit.com/) configuration applies style fixes with `rustfmt` checks for common errors with `clippy`. - -TODO: Investigate clippy configuration and see if more stringent rules can be applied. diff --git a/src/builtin.rs b/src/builtin.rs index 39f21a7..2ce01bc 100644 --- a/src/builtin.rs +++ b/src/builtin.rs @@ -1,15 +1,13 @@ use std::collections::HashMap; -use crate::cluster::{ - Cluster, ClusterConfiguration, IdentificationMethod, Partition, SchedulerType, -}; -use crate::launcher::{Launcher, LauncherConfiguration}; +use crate::cluster::{self, Cluster, IdentificationMethod, Partition, SchedulerType}; +use crate::launcher::{self, Launcher}; pub(crate) trait BuiltIn { fn built_in() -> Self; } -impl BuiltIn for LauncherConfiguration { +impl BuiltIn for launcher::Configuration { /// Construct the built-in launchers /// fn built_in() -> Self { @@ -64,206 +62,208 @@ impl BuiltIn for LauncherConfiguration { } } -impl BuiltIn for ClusterConfiguration { - fn built_in() -> Self { - let cluster = vec![ - //////////////////////////////////////////////////////////////////////////////////////// - // Purdue Anvil - Cluster { - name: "anvil".into(), - identify: IdentificationMethod::ByEnvironment( - "RCAC_CLUSTER".into(), - "anvil".into(), - ), - scheduler: SchedulerType::Slurm, - partition: vec![ - // Auto-detected partitions: shared | wholenode | gpu - Partition { - name: "shared".into(), - maximum_cpus_per_job: Some(127), - maximum_gpus_per_job: Some(0), - ..Partition::default() - }, - Partition { - name: "wholenode".into(), - require_cpus_multiple_of: Some(128), - maximum_gpus_per_job: Some(0), - ..Partition::default() - }, - Partition { - name: "gpu".into(), - minimum_gpus_per_job: Some(1), - gpus_per_node: Some(4), - ..Partition::default() - }, - // The following partitions may only be selected manually. - Partition { - name: "wide".into(), - require_cpus_multiple_of: Some(128), - maximum_gpus_per_job: Some(0), - prevent_auto_select: true, - ..Partition::default() - }, - Partition { - name: "highmem".into(), - maximum_gpus_per_job: Some(0), - prevent_auto_select: true, - ..Partition::default() - }, - Partition { - name: "debug".into(), - maximum_gpus_per_job: Some(0), - prevent_auto_select: true, - ..Partition::default() - }, - Partition { - name: "gpu-debug".into(), - minimum_gpus_per_job: Some(1), - prevent_auto_select: true, - ..Partition::default() - }, - ], +fn anvil() -> Cluster { + //////////////////////////////////////////////////////////////////////////////////////// + // Purdue Anvil + Cluster { + name: "anvil".into(), + identify: IdentificationMethod::ByEnvironment("RCAC_CLUSTER".into(), "anvil".into()), + scheduler: SchedulerType::Slurm, + partition: vec![ + // Auto-detected partitions: shared | wholenode | gpu + Partition { + name: "shared".into(), + maximum_cpus_per_job: Some(127), + maximum_gpus_per_job: Some(0), + ..Partition::default() + }, + Partition { + name: "wholenode".into(), + require_cpus_multiple_of: Some(128), + maximum_gpus_per_job: Some(0), + ..Partition::default() + }, + Partition { + name: "gpu".into(), + minimum_gpus_per_job: Some(1), + gpus_per_node: Some(4), + ..Partition::default() + }, + // The following partitions may only be selected manually. + Partition { + name: "wide".into(), + require_cpus_multiple_of: Some(128), + maximum_gpus_per_job: Some(0), + prevent_auto_select: true, + ..Partition::default() + }, + Partition { + name: "highmem".into(), + maximum_gpus_per_job: Some(0), + prevent_auto_select: true, + ..Partition::default() + }, + Partition { + name: "debug".into(), + maximum_gpus_per_job: Some(0), + prevent_auto_select: true, + ..Partition::default() + }, + Partition { + name: "gpu-debug".into(), + minimum_gpus_per_job: Some(1), + prevent_auto_select: true, + ..Partition::default() + }, + ], + } +} + +fn delta() -> Cluster { + //////////////////////////////////////////////////////////////////////////////////////// + // NCSA delta + Cluster { + name: "delta".into(), + identify: IdentificationMethod::ByEnvironment("LMOD_SYSTEM_NAME".into(), "Delta".into()), + scheduler: SchedulerType::Slurm, + partition: vec![ + // Auto-detected partitions: cpu | gpuA100x4 + Partition { + name: "cpu".into(), + maximum_gpus_per_job: Some(0), + cpus_per_node: Some(128), + memory_per_cpu: Some("1970M".into()), + account_suffix: Some("-cpu".into()), + ..Partition::default() + }, + Partition { + name: "gpuA100x4".into(), + minimum_gpus_per_job: Some(1), + memory_per_gpu: Some("62200M".into()), + gpus_per_node: Some(4), + account_suffix: Some("-gpu".into()), + ..Partition::default() + }, + // The following partitions may only be selected manually. + Partition { + name: "gpuA100x8".into(), + minimum_gpus_per_job: Some(1), + memory_per_gpu: Some("256000M".into()), + gpus_per_node: Some(8), + account_suffix: Some("-gpu".into()), + prevent_auto_select: true, + ..Partition::default() + }, + Partition { + name: "gpuA40x4".into(), + minimum_gpus_per_job: Some(1), + memory_per_gpu: Some("62200M".into()), + gpus_per_node: Some(4), + account_suffix: Some("-gpu".into()), + prevent_auto_select: true, + ..Partition::default() + }, + Partition { + name: "gpuMI100x8".into(), + minimum_gpus_per_job: Some(1), + memory_per_gpu: Some("256000M".into()), + gpus_per_node: Some(8), + account_suffix: Some("-gpu".into()), + prevent_auto_select: true, + ..Partition::default() }, - //////////////////////////////////////////////////////////////////////////////////////// - // NCSA delta - Cluster { - name: "delta".into(), - identify: IdentificationMethod::ByEnvironment( - "LMOD_SYSTEM_NAME".into(), - "Delta".into(), - ), - scheduler: SchedulerType::Slurm, - partition: vec![ - // Auto-detected partitions: cpu | gpuA100x4 - Partition { - name: "cpu".into(), - maximum_gpus_per_job: Some(0), - cpus_per_node: Some(128), - memory_per_cpu: Some("1970M".into()), - account_suffix: Some("-cpu".into()), - ..Partition::default() - }, - Partition { - name: "gpuA100x4".into(), - minimum_gpus_per_job: Some(1), - memory_per_gpu: Some("62200M".into()), - gpus_per_node: Some(4), - account_suffix: Some("-gpu".into()), - ..Partition::default() - }, - // The following partitions may only be selected manually. - Partition { - name: "gpuA100x8".into(), - minimum_gpus_per_job: Some(1), - memory_per_gpu: Some("256000M".into()), - gpus_per_node: Some(8), - account_suffix: Some("-gpu".into()), - prevent_auto_select: true, - ..Partition::default() - }, - Partition { - name: "gpuA40x4".into(), - minimum_gpus_per_job: Some(1), - memory_per_gpu: Some("62200M".into()), - gpus_per_node: Some(4), - account_suffix: Some("-gpu".into()), - prevent_auto_select: true, - ..Partition::default() - }, - Partition { - name: "gpuMI100x8".into(), - minimum_gpus_per_job: Some(1), - memory_per_gpu: Some("256000M".into()), - gpus_per_node: Some(8), - account_suffix: Some("-gpu".into()), - prevent_auto_select: true, - ..Partition::default() - }, - ], + ], + } +} + +fn greatlakes() -> Cluster { + //////////////////////////////////////////////////////////////////////////////////////// + // Great Lakes + Cluster { + name: "greatlakes".into(), + identify: IdentificationMethod::ByEnvironment("CLUSTER_NAME".into(), "greatlakes".into()), + scheduler: SchedulerType::Slurm, + partition: vec![ + // Auto-detected partitions: standard | gpu_mig40,gpu | gpu. + Partition { + name: "standard".into(), + maximum_gpus_per_job: Some(0), + cpus_per_node: Some(36), + memory_per_cpu: Some("5G".into()), + ..Partition::default() + }, + Partition { + name: "gpu_mig40,gpu".into(), + minimum_gpus_per_job: Some(1), + maximum_gpus_per_job: Some(1), + memory_per_cpu: Some("60G".into()), + ..Partition::default() + }, + Partition { + name: "gpu".into(), + minimum_gpus_per_job: Some(1), + memory_per_cpu: Some("60G".into()), + // cannot set gpus_per_node, the partition is heterogeneous + ..Partition::default() + }, + // The following partitions may only be selected manually. + Partition { + name: "gpu_mig40".into(), + minimum_gpus_per_job: Some(1), + memory_per_cpu: Some("125G".into()), + prevent_auto_select: true, + ..Partition::default() + }, + Partition { + name: "spgpu".into(), + minimum_gpus_per_job: Some(1), + memory_per_cpu: Some("47000M".into()), + prevent_auto_select: true, + ..Partition::default() + }, + Partition { + name: "largemem".into(), + maximum_gpus_per_job: Some(0), + prevent_auto_select: true, + ..Partition::default() }, - //////////////////////////////////////////////////////////////////////////////////////// - // Great Lakes - Cluster { - name: "greatlakes".into(), - identify: IdentificationMethod::ByEnvironment( - "CLUSTER_NAME".into(), - "greatlakes".into(), - ), - scheduler: SchedulerType::Slurm, - partition: vec![ - // Auto-detected partitions: standard | gpu_mig40,gpu | gpu. - Partition { - name: "standard".into(), - maximum_gpus_per_job: Some(0), - cpus_per_node: Some(36), - memory_per_cpu: Some("5G".into()), - ..Partition::default() - }, - Partition { - name: "gpu_mig40,gpu".into(), - minimum_gpus_per_job: Some(1), - maximum_gpus_per_job: Some(1), - memory_per_cpu: Some("60G".into()), - ..Partition::default() - }, - Partition { - name: "gpu".into(), - minimum_gpus_per_job: Some(1), - memory_per_cpu: Some("60G".into()), - // cannot set gpus_per_node, the partition is heterogeneous - ..Partition::default() - }, - // The following partitions may only be selected manually. - Partition { - name: "gpu_mig40".into(), - minimum_gpus_per_job: Some(1), - memory_per_cpu: Some("125G".into()), - prevent_auto_select: true, - ..Partition::default() - }, - Partition { - name: "spgpu".into(), - minimum_gpus_per_job: Some(1), - memory_per_cpu: Some("47000M".into()), - prevent_auto_select: true, - ..Partition::default() - }, - Partition { - name: "largemem".into(), - maximum_gpus_per_job: Some(0), - prevent_auto_select: true, - ..Partition::default() - }, - Partition { - name: "standard-oc".into(), - maximum_gpus_per_job: Some(0), - cpus_per_node: Some(36), - memory_per_cpu: Some("5G".into()), - prevent_auto_select: true, - ..Partition::default() - }, - Partition { - name: "debug".into(), - maximum_gpus_per_job: Some(0), - cpus_per_node: Some(36), - memory_per_cpu: Some("5G".into()), - prevent_auto_select: true, - ..Partition::default() - }, - ], + Partition { + name: "standard-oc".into(), + maximum_gpus_per_job: Some(0), + cpus_per_node: Some(36), + memory_per_cpu: Some("5G".into()), + prevent_auto_select: true, + ..Partition::default() }, - // Fallback none cluster. - Cluster { - name: "none".into(), - identify: IdentificationMethod::Always(true), - scheduler: SchedulerType::Bash, - partition: vec![Partition { - name: "none".into(), - ..Partition::default() - }], + Partition { + name: "debug".into(), + maximum_gpus_per_job: Some(0), + cpus_per_node: Some(36), + memory_per_cpu: Some("5G".into()), + prevent_auto_select: true, + ..Partition::default() }, - ]; + ], + } +} + +fn none() -> Cluster { + // Fallback none cluster. + Cluster { + name: "none".into(), + identify: IdentificationMethod::Always(true), + scheduler: SchedulerType::Bash, + partition: vec![Partition { + name: "none".into(), + ..Partition::default() + }], + } +} + +impl BuiltIn for cluster::Configuration { + fn built_in() -> Self { + let cluster = vec![anvil(), delta(), greatlakes(), none()]; - ClusterConfiguration { cluster } + cluster::Configuration { cluster } } } diff --git a/src/cli.rs b/src/cli.rs index a1a84d3..517185d 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -11,13 +11,6 @@ use log::trace; use std::io; use std::path::PathBuf; -use cluster::ClusterArgs; -use directories::DirectoriesArgs; -use launchers::LaunchersArgs; -use scan::ScanArgs; -use status::StatusArgs; -use submit::SubmitArgs; - #[derive(Parser, Debug)] #[command(version, about, long_about = None, subcommand_required = true)] pub struct Options { @@ -25,7 +18,7 @@ pub struct Options { pub command: Option, #[command(flatten)] - pub global_options: GlobalOptions, + pub global: GlobalOptions, #[command(flatten)] pub verbose: Verbosity, @@ -67,31 +60,31 @@ pub enum ColorMode { } #[derive(Subcommand, Debug)] -pub enum ShowArgs { +pub enum ShowCommands { /// Show the current state of the workflow. - Status(StatusArgs), + Status(status::Arguments), /// List directories in the workspace. - Directories(DirectoriesArgs), + Directories(directories::Arguments), /// Show the cluster configuration. - Cluster(ClusterArgs), + Cluster(cluster::Arguments), /// Show launcher configurations. - Launchers(LaunchersArgs), + Launchers(launchers::Arguments), } #[derive(Subcommand, Debug)] pub enum Commands { /// Show properties of the workspace. #[command(subcommand)] - Show(ShowArgs), + Show(ShowCommands), /// Scan the workspace for completed actions. - Scan(ScanArgs), + Scan(scan::Arguments), /// Submit workflow actions to the scheduler. - Submit(SubmitArgs), + Submit(submit::Arguments), } /// Parse directories passed in on the command line. diff --git a/src/cli/cluster.rs b/src/cli/cluster.rs index 4c695a5..548cc82 100644 --- a/src/cli/cluster.rs +++ b/src/cli/cluster.rs @@ -4,10 +4,10 @@ use std::error::Error; use std::io::Write; use crate::cli::GlobalOptions; -use row::cluster::ClusterConfiguration; +use row::cluster; #[derive(Args, Debug)] -pub struct ClusterArgs { +pub struct Arguments { /// Show all clusters. #[arg(long, group = "select", display_order = 0)] all: bool, @@ -22,13 +22,13 @@ pub struct ClusterArgs { /// Print the cluster to stdout in toml format. /// pub fn cluster( - options: GlobalOptions, - args: ClusterArgs, + options: &GlobalOptions, + args: &Arguments, output: &mut W, ) -> Result<(), Box> { debug!("Showing clusters."); - let clusters = ClusterConfiguration::open()?; + let clusters = cluster::Configuration::open()?; if args.all { info!("All cluster configurations:"); diff --git a/src/cli/directories.rs b/src/cli/directories.rs index 7ba23d8..099e63c 100644 --- a/src/cli/directories.rs +++ b/src/cli/directories.rs @@ -12,7 +12,7 @@ use row::project::Project; use row::MultiProgressContainer; #[derive(Args, Debug)] -pub struct DirectoriesArgs { +pub struct Arguments { /// Select the action to scan (defaults to all). action: String, @@ -37,14 +37,14 @@ pub struct DirectoriesArgs { /// Print a human-readable list of directories, their status, job ID, and value(s). /// pub fn directories( - options: GlobalOptions, - args: DirectoriesArgs, + options: &GlobalOptions, + args: Arguments, multi_progress: &mut MultiProgressContainer, output: &mut W, ) -> Result<(), Box> { debug!("Showing directories."); - let mut project = Project::open(options.io_threads, options.cluster, multi_progress)?; + let mut project = Project::open(options.io_threads, &options.cluster, multi_progress)?; let query_directories = cli::parse_directories(args.directories, || Ok(project.state().list_directories()))?; @@ -111,9 +111,9 @@ pub fn directories( if let Some((cluster, job_id)) = submitted.get(&action.name).and_then(|d| d.get(directory)) { - row.push(Item::new(format!("{}/{}", cluster, job_id), Style::new())); + row.push(Item::new(format!("{cluster}/{job_id}"), Style::new())); } else { - row.push(Item::new("".into(), Style::new())); + row.push(Item::new(String::new(), Style::new())); } for pointer in &args.value { let value = project.state().values()[directory] @@ -131,11 +131,11 @@ pub fn directories( if !args.no_separate_groups && group_idx != groups.len() - 1 { let mut row = vec![ - Item::new("".to_string(), Style::new()), - Item::new("".to_string(), Style::new()), + Item::new(String::new(), Style::new()), + Item::new(String::new(), Style::new()), ]; for _ in &args.value { - row.push(Item::new("".to_string(), Style::new())) + row.push(Item::new(String::new(), Style::new())); } table.items.push(row); } diff --git a/src/cli/launchers.rs b/src/cli/launchers.rs index 4d9ca0a..2c044a1 100644 --- a/src/cli/launchers.rs +++ b/src/cli/launchers.rs @@ -4,11 +4,11 @@ use std::error::Error; use std::io::Write; use crate::cli::GlobalOptions; -use row::cluster::ClusterConfiguration; -use row::launcher::LauncherConfiguration; +use row::cluster; +use row::launcher; #[derive(Args, Debug)] -pub struct LaunchersArgs { +pub struct Arguments { /// Show all launchers. #[arg(long, display_order = 0)] all: bool, @@ -19,13 +19,13 @@ pub struct LaunchersArgs { /// Print the launchers to stdout in toml format. /// pub fn launchers( - options: GlobalOptions, - args: LaunchersArgs, + options: &GlobalOptions, + args: &Arguments, output: &mut W, ) -> Result<(), Box> { debug!("Showing launchers."); - let launchers = LauncherConfiguration::open()?; + let launchers = launcher::Configuration::open()?; if args.all { info!("All launcher configurations:"); @@ -35,7 +35,7 @@ pub fn launchers( &toml::to_string_pretty(launchers.full_config())? )?; } else { - let clusters = ClusterConfiguration::open()?; + let clusters = cluster::Configuration::open()?; let cluster = clusters.identify(options.cluster.as_deref())?; info!("Launcher configurations for cluster '{}':", cluster.name); diff --git a/src/cli/scan.rs b/src/cli/scan.rs index fb0f114..dcb5a04 100644 --- a/src/cli/scan.rs +++ b/src/cli/scan.rs @@ -13,7 +13,7 @@ use row::{ }; #[derive(Args, Debug)] -pub struct ScanArgs { +pub struct Arguments { /// Select the action to scan (defaults to all). #[arg(short, long, display_order = 0)] action: Option, @@ -27,8 +27,8 @@ pub struct ScanArgs { /// Write the resulting list of completed directories to a completion pack file. /// pub fn scan( - options: GlobalOptions, - args: ScanArgs, + options: &GlobalOptions, + args: Arguments, multi_progress: &mut MultiProgressContainer, ) -> Result<(), Box> { debug!("Scanning the workspace for completed actions."); diff --git a/src/cli/status.rs b/src/cli/status.rs index ac94a8b..9303b5b 100644 --- a/src/cli/status.rs +++ b/src/cli/status.rs @@ -14,7 +14,7 @@ use row::workflow::ResourceCost; use row::MultiProgressContainer; #[derive(Args, Debug)] -pub struct StatusArgs { +pub struct Arguments { /// Select the actions to summarize with a wildcard pattern. #[arg(short, long, value_name = "pattern", default_value_t=String::from("*"), display_order=0)] action: String, @@ -28,7 +28,7 @@ pub struct StatusArgs { } /// Format a status string for non-terminal outputs. -fn make_row(action_name: &str, status: &Status, cost: ResourceCost) -> Vec { +fn make_row(action_name: &str, status: &Status, cost: &ResourceCost) -> Vec { let mut result = Vec::with_capacity(6); result.push(Item::new(action_name.to_string(), Style::new().bold())); result.push( @@ -62,7 +62,7 @@ fn make_row(action_name: &str, status: &Status, cost: ResourceCost) -> Vec if !cost.is_zero() { result.push( - Item::new(format!("{}", cost), Style::new().italic().dim()) + Item::new(format!("{cost}"), Style::new().italic().dim()) .with_alignment(Alignment::Right), ); } @@ -75,15 +75,15 @@ fn make_row(action_name: &str, status: &Status, cost: ResourceCost) -> Vec /// Print a human-readable summary of the workflow. /// pub fn status( - options: GlobalOptions, - args: StatusArgs, + options: &GlobalOptions, + args: Arguments, multi_progress: &mut MultiProgressContainer, output: &mut W, ) -> Result<(), Box> { debug!("Showing the workflow's status."); let action_matcher = WildMatch::new(&args.action); - let mut project = Project::open(options.io_threads, options.cluster, multi_progress)?; + let mut project = Project::open(options.io_threads, &options.cluster, multi_progress)?; let query_directories = cli::parse_directories(args.directories, || Ok(project.state().list_directories()))?; @@ -131,7 +131,7 @@ pub fn status( cost = cost + action.resources.cost(group.len()); } - table.items.push(make_row(&action.name, &status, cost)); + table.items.push(make_row(&action.name, &status, &cost)); } if matching_action_count == 0 { diff --git a/src/cli/submit.rs b/src/cli/submit.rs index d5f542f..76c940f 100644 --- a/src/cli/submit.rs +++ b/src/cli/submit.rs @@ -1,6 +1,6 @@ use clap::Args; use console::style; -use indicatif::{HumanCount, HumanDuration}; +use indicatif::HumanCount; use log::{debug, info, trace, warn}; use signal_hook::consts::{SIGINT, SIGTERM}; use signal_hook::flag; @@ -14,12 +14,13 @@ use std::time::Instant; use wildmatch::WildMatch; use crate::cli::GlobalOptions; +use row::format::HumanDuration; use row::project::Project; use row::workflow::{Action, ResourceCost}; use row::MultiProgressContainer; #[derive(Args, Debug)] -pub struct SubmitArgs { +pub struct Arguments { /// Select the actions to summarize with a wildcard pattern. #[arg(short, long, value_name = "pattern", default_value_t=String::from("*"), display_order=0)] action: String, @@ -42,16 +43,17 @@ pub struct SubmitArgs { /// Submit workflow actions to the scheduler. /// +#[allow(clippy::too_many_lines)] pub fn submit( - options: GlobalOptions, - args: SubmitArgs, + options: &GlobalOptions, + args: Arguments, multi_progress: &mut MultiProgressContainer, output: &mut W, ) -> Result<(), Box> { debug!("Submitting workflow actions to the scheduler."); let action_matcher = WildMatch::new(&args.action); - let mut project = Project::open(options.io_threads, options.cluster, multi_progress)?; + let mut project = Project::open(options.io_threads, &options.cluster, multi_progress)?; let query_directories = if args.directories.is_empty() { project.state().list_directories() @@ -114,7 +116,7 @@ pub fn submit( if job_count == 1 { "job" } else { "jobs" }, cost, action.name - ) + ); } total_cost = total_cost + cost; @@ -140,7 +142,7 @@ pub fn submit( info!("script {}/{}:", index + 1, action_directories.len()); let script = scheduler.make_script(action, directories)?; - write!(output, "{}", script)?; + write!(output, "{script}")?; output.flush()?; } project.close(multi_progress)?; @@ -174,7 +176,7 @@ pub fn submit( if std::io::stdout().is_terminal() && !args.yes { let mut input = String::new(); - multi_progress.multi_progress.suspend(|| { + multi_progress.suspend(|| { print!("Proceed? [Y/n]: "); io::stdout().flush().expect("Can flush stdout"); io::stdin() @@ -198,8 +200,7 @@ pub fn submit( // stdin and stdout directly. project.close(multi_progress)?; - multi_progress.progress_bars.clear(); - multi_progress.multi_progress.clear().unwrap(); + multi_progress.clear().unwrap(); // Install the Ctrl-C signal handler to gracefully kill spawned processes // and save the pending scheduled job cache before exiting. Allow the user @@ -226,7 +227,7 @@ pub fn submit( .italic() .to_string(); } - message += &format!(" ({}).", style(HumanDuration(instant.elapsed())).dim()); + message += &format!(" ({:#}).", style(HumanDuration(instant.elapsed())).dim()); println!("{message}"); let result = scheduler.submit( diff --git a/src/cluster.rs b/src/cluster.rs index 9e36451..8cb50f7 100644 --- a/src/cluster.rs +++ b/src/cluster.rs @@ -1,6 +1,7 @@ use log::{debug, info, trace}; use serde::{Deserialize, Serialize}; use std::env; +use std::fmt::Write as _; use std::fs::File; use std::io::prelude::*; use std::io::{self, BufReader}; @@ -12,12 +13,12 @@ use crate::Error; /// Cluster configuration /// -/// `ClusterConfiguration` stores the cluster configuration for each defined +/// `Configuration` stores the cluster configuration for each defined /// cluster. /// #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)] #[serde(deny_unknown_fields)] -pub struct ClusterConfiguration { +pub struct Configuration { /// The cluster configurations. #[serde(default)] pub(crate) cluster: Vec, @@ -106,10 +107,16 @@ pub struct Partition { pub account_suffix: Option, } -impl ClusterConfiguration { +impl Configuration { /// Identify the cluster. /// - /// Identifying the current cluster consumes the `ClusterConfiguration`. + /// Identifying the current cluster consumes the `Configuration`. + /// + /// # Errors + /// * `row::Error::ClusterNameNotFound` when a cluster by the given name + /// is not present in the configuration (when `name = Some(_)`). + /// * `row::Error::ClusterNotFound` when the automatic identification + /// fails to find a cluster in the configuration. /// pub fn identify(self, name: Option<&str>) -> Result { let cluster = if let Some(name) = name { @@ -120,7 +127,7 @@ impl ClusterConfiguration { } else { self.cluster .into_iter() - .find(|c| c.identity_matches()) + .find(Cluster::identity_matches) .ok_or_else(Error::ClusterNotFound)? }; @@ -171,16 +178,16 @@ impl ClusterConfiguration { trace!("Parsing '{}'.", &clusters_toml_path.display()); let user_config = Self::parse_str(&clusters_toml_path, &clusters_string)?; - clusters.merge(user_config); + clusters.merge(&user_config); Ok(clusters) } - /// Parse a `ClusterConfiguration` from a TOML string + /// Parse a `Configuration` from a TOML string /// /// Does *NOT* merge with the built-in configuration. /// pub(crate) fn parse_str(path: &Path, toml: &str) -> Result { - let cluster: ClusterConfiguration = + let cluster: Configuration = toml::from_str(toml).map_err(|e| Error::TOMLParse(path.join("clusters.toml"), e))?; Ok(cluster) } @@ -190,10 +197,10 @@ impl ClusterConfiguration { /// Merging adds new keys from `b` into self. It also overrides any keys in /// both with the value in `b`. /// - fn merge(&mut self, b: Self) { + fn merge(&mut self, b: &Self) { let mut new_cluster = b.cluster.clone(); new_cluster.extend(self.cluster.clone()); - self.cluster = new_cluster + self.cluster = new_cluster; } } @@ -214,6 +221,10 @@ impl Cluster { } /// Find the partition to use for the given job. + /// + /// # Errors + /// Returns `Err` when the partition is not found. + /// pub fn find_partition( &self, partition_name: Option<&str>, @@ -252,6 +263,7 @@ impl Cluster { impl Partition { /// Check if a given job may use this partition. + #[allow(clippy::similar_names)] fn matches(&self, resources: &Resources, n_directories: usize, reason: &mut String) -> bool { let total_cpus = resources.total_cpus(n_directories); let total_gpus = resources.total_gpus(n_directories); @@ -259,12 +271,12 @@ impl Partition { trace!("Checking partition '{}'.", self.name); if self.prevent_auto_select { - reason.push_str(&format!("{}: Must be manually selected.\n", self.name)); + let _ = writeln!(reason, "{}: Must be manually selected.", self.name); return false; } if self.maximum_cpus_per_job.map_or(false, |x| total_cpus > x) { - reason.push_str(&format!("{}: Too many CPUs ({}).\n", self.name, total_cpus)); + let _ = writeln!(reason, "{}: Too many CPUs ({}).", self.name, total_cpus); return false; } @@ -272,23 +284,21 @@ impl Partition { .require_cpus_multiple_of .map_or(false, |x| total_cpus % x != 0) { - reason.push_str(&format!( - "{}: CPUs ({}) not a required multiple.\n", + let _ = writeln!( + reason, + "{}: CPUs ({}) not a required multiple.", self.name, total_cpus - )); + ); return false; } if self.minimum_gpus_per_job.map_or(false, |x| total_gpus < x) { - reason.push_str(&format!( - "{}: Not enough GPUs ({}).\n", - self.name, total_gpus - )); + let _ = writeln!(reason, "{}: Not enough GPUs ({}).", self.name, total_gpus); return false; } if self.maximum_gpus_per_job.map_or(false, |x| total_gpus > x) { - reason.push_str(&format!("{}: Too many GPUs ({}).\n", self.name, total_gpus)); + let _ = writeln!(reason, "{}: Too many GPUs ({}).", self.name, total_gpus); return false; } @@ -300,10 +310,11 @@ impl Partition { .require_gpus_multiple_of .map_or(false, |x| total_gpus == 0 || total_gpus % x != 0) { - reason.push_str(&format!( - "{}: GPUs ({}) not a required multiple.\n", + let _ = writeln!( + reason, + "{}: GPUs ({}) not a required multiple.", self.name, total_gpus - )); + ); return false; } @@ -382,7 +393,7 @@ mod tests { partition: Vec::new(), }, ]; - let cluster_configuration = ClusterConfiguration { cluster: clusters }; + let cluster_configuration = Configuration { cluster: clusters }; assert_eq!( cluster_configuration .clone() @@ -645,9 +656,8 @@ mod tests { fn open_no_file() { setup(); let temp = TempDir::new().unwrap().child("clusters.json"); - let clusters = - ClusterConfiguration::open_from_path(temp.path().into()).expect("valid clusters"); - assert_eq!(clusters, ClusterConfiguration::built_in()); + let clusters = Configuration::open_from_path(temp.path().into()).expect("valid clusters"); + assert_eq!(clusters, Configuration::built_in()); } #[test] @@ -656,9 +666,8 @@ mod tests { setup(); let temp = TempDir::new().unwrap().child("clusters.json"); temp.write_str("").unwrap(); - let clusters = - ClusterConfiguration::open_from_path(temp.path().into()).expect("valid clusters"); - assert_eq!(clusters, ClusterConfiguration::built_in()); + let clusters = Configuration::open_from_path(temp.path().into()).expect("valid clusters"); + assert_eq!(clusters, Configuration::built_in()); } #[test] @@ -678,8 +687,8 @@ name = "b" "#, ) .unwrap(); - let clusters = ClusterConfiguration::open_from_path(temp.path().into()).unwrap(); - let built_in_clusters = ClusterConfiguration::built_in(); + let clusters = Configuration::open_from_path(temp.path().into()).unwrap(); + let built_in_clusters = Configuration::built_in(); assert_eq!(clusters.cluster.len(), 1 + built_in_clusters.cluster.len()); let cluster = clusters.cluster.first().unwrap(); @@ -722,8 +731,8 @@ account_suffix = "-gpu" "#, ) .unwrap(); - let clusters = ClusterConfiguration::open_from_path(temp.path().into()).unwrap(); - let built_in_clusters = ClusterConfiguration::built_in(); + let clusters = Configuration::open_from_path(temp.path().into()).unwrap(); + let built_in_clusters = Configuration::built_in(); assert_eq!(clusters.cluster.len(), 1 + built_in_clusters.cluster.len()); let cluster = clusters.cluster.first().unwrap(); diff --git a/src/expr.rs b/src/expr.rs index 037897a..fb1f7c1 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -56,6 +56,7 @@ pub(crate) fn evaluate_json_comparison( a: &Value, b: &Value, ) -> Option { + #[allow(clippy::match_same_arms)] match (comparison, partial_cmp_json_values(a, b)) { (Comparison::EqualTo, Some(Ordering::Equal)) => Some(true), (Comparison::GreaterThan, Some(Ordering::Greater)) => Some(true), diff --git a/src/format.rs b/src/format.rs new file mode 100644 index 0000000..a9bdfc2 --- /dev/null +++ b/src/format.rs @@ -0,0 +1,23 @@ +use std::fmt; +use std::time::Duration; + +/// Extend `indicatif::HumanDuration` with milliseconds +#[derive(Debug)] +pub struct HumanDuration(pub Duration); + +impl fmt::Display for HumanDuration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.0.as_secs_f64() > 1.0 { + indicatif::HumanDuration(self.0).fmt(f) + } else { + #[allow(clippy::cast_sign_loss)] + let t = (self.0.as_secs_f64() / 1e-3).round() as usize; + + match (f.alternate(), t) { + (true, _) => write!(f, "{t}ms"), + (false, 1) => write!(f, "{t} millisecond"), + (false, _) => write!(f, "{t} milliseconds"), + } + } + } +} diff --git a/src/launcher.rs b/src/launcher.rs index df41b3c..03421bc 100644 --- a/src/launcher.rs +++ b/src/launcher.rs @@ -2,6 +2,7 @@ use log::trace; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::env; +use std::fmt::Write as _; use std::fs::File; use std::io::prelude::*; use std::io::{self, BufReader}; @@ -13,11 +14,11 @@ use crate::Error; /// Launcher configuration /// -/// `LauncherConfiguration` stores the launcher configuration for each defined +/// `Configuration` stores the launcher configuration for each defined /// launcher/cluster. /// #[derive(Clone, Debug, Default, PartialEq, Eq)] -pub struct LauncherConfiguration { +pub struct Configuration { /// The launcher configurations. pub(crate) launchers: HashMap>, } @@ -50,10 +51,11 @@ impl Launcher { if need_space { result.push(' '); } - result.push_str(&format!( + let _ = write!( + result, "{processes}{}", resources.total_processes(n_directories) - )); + ); need_space = true; } @@ -63,7 +65,7 @@ impl Launcher { if need_space { result.push(' '); } - result.push_str(&format!("{self_threads}{resources_threads}")); + let _ = write!(result, "{self_threads}{resources_threads}"); need_space = true; } @@ -73,7 +75,7 @@ impl Launcher { if need_space { result.push(' '); } - result.push_str(&format!("{self_gpus}{resources_gpus}")); + let _ = write!(result, "{self_gpus}{resources_gpus}"); need_space = true; } @@ -84,7 +86,7 @@ impl Launcher { } } -impl LauncherConfiguration { +impl Configuration { /// Open the launcher configuration /// /// Open `$HOME/.config/row/launchers.toml` if it exists and merge it with @@ -133,12 +135,12 @@ impl LauncherConfiguration { Ok(launchers) } - /// Parse a `LauncherConfiguration` from a TOML string + /// Parse a `Configuration` from a TOML string /// /// Does *NOT* merge with the built-in configuration. /// pub(crate) fn parse_str(path: &Path, toml: &str) -> Result { - Ok(LauncherConfiguration { + Ok(Configuration { launchers: toml::from_str(toml) .map_err(|e| Error::TOMLParse(path.join("launchers.toml"), e))?, }) @@ -173,6 +175,10 @@ impl LauncherConfiguration { } /// Get all launchers for a specific cluster. + /// + /// # Panics + /// When a given launcher has no default. + /// pub fn by_cluster(&self, cluster_name: &str) -> HashMap { let mut result = HashMap::with_capacity(self.launchers.len()); @@ -219,7 +225,7 @@ mod tests { #[parallel] fn unset_launcher() { setup(); - let launchers = LauncherConfiguration::built_in(); + let launchers = Configuration::built_in(); let launchers_by_cluster = launchers.by_cluster("any_cluster"); assert!(launchers_by_cluster.get("unset_launcher").is_none()); } @@ -228,7 +234,7 @@ mod tests { #[parallel] fn openmp_prefix() { setup(); - let launchers = LauncherConfiguration::built_in(); + let launchers = Configuration::built_in(); let launchers_by_cluster = launchers.by_cluster("any_cluster"); let openmp = launchers_by_cluster .get("openmp") @@ -250,7 +256,7 @@ mod tests { #[parallel] fn mpi_prefix_none() { setup(); - let launchers = LauncherConfiguration::built_in(); + let launchers = Configuration::built_in(); let launchers_by_cluster = launchers.by_cluster("none"); let mpi = launchers_by_cluster.get("mpi").expect("a valid Launcher"); @@ -279,7 +285,7 @@ mod tests { #[parallel] fn mpi_prefix_default() { setup(); - let launchers = LauncherConfiguration::built_in(); + let launchers = Configuration::built_in(); let launchers_by_cluster = launchers.by_cluster("any_cluster"); let mpi = launchers_by_cluster.get("mpi").expect("a valid Launcher"); @@ -315,9 +321,8 @@ mod tests { fn open_no_file() { setup(); let temp = TempDir::new().unwrap().child("launchers.json"); - let launchers = - LauncherConfiguration::open_from_path(temp.path().into()).expect("valid launchers"); - assert_eq!(launchers, LauncherConfiguration::built_in()); + let launchers = Configuration::open_from_path(temp.path().into()).expect("valid launchers"); + assert_eq!(launchers, Configuration::built_in()); } #[test] @@ -326,9 +331,8 @@ mod tests { setup(); let temp = TempDir::new().unwrap().child("launchers.json"); temp.write_str("").unwrap(); - let launchers = - LauncherConfiguration::open_from_path(temp.path().into()).expect("valid launchers"); - assert_eq!(launchers, LauncherConfiguration::built_in()); + let launchers = Configuration::open_from_path(temp.path().into()).expect("valid launchers"); + assert_eq!(launchers, Configuration::built_in()); } #[test] @@ -337,12 +341,12 @@ mod tests { setup(); let temp = TempDir::new().unwrap().child("launchers.json"); temp.write_str( - r#" + r" [new_launcher.not_default] -"#, +", ) .unwrap(); - let error = LauncherConfiguration::open_from_path(temp.path().into()); + let error = Configuration::open_from_path(temp.path().into()); assert!(matches!(error, Err(Error::LauncherMissingDefault(_)))); } @@ -364,10 +368,9 @@ executable = "e" "#, ) .unwrap(); - let launchers = - LauncherConfiguration::open_from_path(temp.path().into()).expect("valid launcher"); + let launchers = Configuration::open_from_path(temp.path().into()).expect("valid launcher"); - let built_in = LauncherConfiguration::built_in(); + let built_in = Configuration::built_in(); assert_eq!(launchers.launchers.len(), 3); assert_eq!(launchers.launchers["openmp"], built_in.launchers["openmp"]); assert_eq!(launchers.launchers["mpi"], built_in.launchers["mpi"]); diff --git a/src/lib.rs b/src/lib.rs index f35dae9..bd038f4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,14 @@ +#![warn(clippy::pedantic)] +#![allow(clippy::cast_precision_loss)] +#![allow(clippy::cast_possible_wrap)] +#![allow(clippy::cast_possible_truncation)] +#![allow(clippy::must_use_candidate)] +#![warn(clippy::format_push_string)] + pub(crate) mod builtin; pub mod cluster; mod expr; +pub mod format; pub mod launcher; pub mod progress_styles; pub mod project; @@ -9,7 +17,7 @@ pub mod state; pub mod workflow; pub mod workspace; -use indicatif::{MultiProgress, ProgressBar}; +use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget}; use serde_json::{self, Value}; use std::io; use std::path::PathBuf; @@ -23,14 +31,14 @@ const VALUE_CACHE_FILE_NAME: &str = "values.json"; const COMPLETED_CACHE_FILE_NAME: &str = "completed.postcard"; const SUBMITTED_CACHE_FILE_NAME: &str = "submitted.postcard"; -/// Hold a MultiProgress and all of its progress bars. +/// Hold a `MultiProgress` and all of its progress bars. /// -/// This is necessary because a dropped ProgressBar will be automatically -/// removed from MultiProgress (https://github.com/console-rs/indicatif/issues/614) +/// This is necessary because a dropped `ProgressBar` will be automatically +/// removed from [MultiProgress](https://github.com/console-rs/indicatif/issues/614) /// pub struct MultiProgressContainer { - pub progress_bars: Vec, - pub multi_progress: MultiProgress, + progress_bars: Vec, + multi_progress: MultiProgress, } /// Errors that may be encountered when using the row crate. @@ -179,3 +187,45 @@ pub enum Error { // #[error("Evalexpr error: {0}")] // Evalexpr(#[from] EvalexprError), } + +impl MultiProgressContainer { + /// Create a new multi-progress container. + pub fn new(multi_progress: MultiProgress) -> MultiProgressContainer { + MultiProgressContainer { + progress_bars: Vec::new(), + multi_progress, + } + } + + /// Add a progress bar to the container or hide it. + pub fn add_or_hide(&mut self, mut progress_bar: ProgressBar, hide: bool) -> ProgressBar { + if hide { + progress_bar.set_draw_target(ProgressDrawTarget::hidden()); + } else { + progress_bar = self.multi_progress.add(progress_bar); + self.progress_bars.push(progress_bar.clone()); + } + + progress_bar + } + + /// Add a progress bar to the container. + pub fn add(&mut self, progress_bar: ProgressBar) -> ProgressBar { + self.progress_bars.push(progress_bar.clone()); + self.multi_progress.add(progress_bar) + } + + /// Clear all progress bars + /// + /// # Errors + /// Forwards the error from `indicatif::MultiProgress::clear`. + pub fn clear(&mut self) -> Result<(), std::io::Error> { + self.progress_bars.clear(); + self.multi_progress.clear() + } + + /// Suspend the progress bar updates while executing f. + pub fn suspend R, R>(&self, f: F) -> R { + self.multi_progress.suspend(f) + } +} diff --git a/src/main.rs b/src/main.rs index a3ced34..512eacf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,7 @@ +#![warn(clippy::pedantic)] + use clap::Parser; -use indicatif::{HumanDuration, MultiProgress, ProgressDrawTarget}; +use indicatif::{MultiProgress, ProgressDrawTarget}; use indicatif_log_bridge::LogWrapper; use log::{error, info}; use std::error::Error; @@ -10,7 +12,8 @@ use std::time::Instant; mod cli; mod ui; -use cli::{ColorMode, Commands, Options, ShowArgs}; +use cli::{ColorMode, Commands, Options, ShowCommands}; +use row::format::HumanDuration; use row::MultiProgressContainer; use ui::MultiProgressWriter; @@ -19,7 +22,7 @@ fn main_detail() -> Result<(), Box> { let options = Options::parse(); let log_style; - match options.global_options.color { + match options.global.color { ColorMode::Never => { log_style = "never"; console::set_colors_enabled(false); @@ -43,7 +46,7 @@ fn main_detail() -> Result<(), Box> { clap_verbosity_flag::LevelFilter::Trace => "trace", }; - let multi_progress = if options.global_options.no_progress { + let multi_progress = if options.global.no_progress { MultiProgress::with_draw_target(ProgressDrawTarget::hidden()) } else { MultiProgress::new() @@ -61,39 +64,34 @@ fn main_detail() -> Result<(), Box> { LogWrapper::new(multi_progress.clone(), logger).try_init()?; - let mut multi_progress_container = MultiProgressContainer { - progress_bars: Vec::new(), - multi_progress: multi_progress.clone(), - }; + let mut multi_progress_container = MultiProgressContainer::new(multi_progress.clone()); match options.command { Some(Commands::Show(show)) => match show { - ShowArgs::Status(args) => cli::status::status( - options.global_options.clone(), + ShowCommands::Status(args) => cli::status::status( + &options.global, args, &mut multi_progress_container, &mut output, )?, - ShowArgs::Directories(args) => cli::directories::directories( - options.global_options.clone(), + ShowCommands::Directories(args) => cli::directories::directories( + &options.global, args, &mut multi_progress_container, &mut output, )?, - ShowArgs::Cluster(args) => { - cli::cluster::cluster(options.global_options.clone(), args, &mut output)? + ShowCommands::Cluster(args) => { + cli::cluster::cluster(&options.global, &args, &mut output)?; } - ShowArgs::Launchers(args) => { - cli::launchers::launchers(options.global_options.clone(), args, &mut output)? + ShowCommands::Launchers(args) => { + cli::launchers::launchers(&options.global, &args, &mut output)?; } }, - Some(Commands::Scan(args)) => cli::scan::scan( - options.global_options.clone(), - args, - &mut multi_progress_container, - )?, + Some(Commands::Scan(args)) => { + cli::scan::scan(&options.global, args, &mut multi_progress_container)?; + } Some(Commands::Submit(args)) => cli::submit::submit( - options.global_options.clone(), + &options.global, args, &mut multi_progress_container, &mut output, @@ -108,7 +106,7 @@ fn main_detail() -> Result<(), Box> { info!("Completed in {}.", HumanDuration(instant.elapsed())); - if options.global_options.clear_progress { + if options.global.clear_progress { multi_progress.clear().unwrap(); } diff --git a/src/progress_styles.rs b/src/progress_styles.rs index 8972c78..45ba33e 100644 --- a/src/progress_styles.rs +++ b/src/progress_styles.rs @@ -1,23 +1,49 @@ -use indicatif::ProgressStyle; +use indicatif::{ProgressState, ProgressStyle}; +use std::fmt::Write; + +use crate::format::HumanDuration; pub(crate) const STEADY_TICK: u64 = 110; +/// Format progress duration in milliseconds +fn elapsed(state: &ProgressState, w: &mut dyn Write) { + let _ = write!(w, "{:#}", HumanDuration(state.elapsed())); +} + +/// Create a named spinner. +/// +/// # Panics +/// When the progress style is invalid. +/// pub fn uncounted_spinner() -> ProgressStyle { ProgressStyle::with_template("{spinner:.green.bold} {msg:.bold}... ({elapsed:.dim})") .expect("Valid template") + .with_key("elapsed", elapsed) .tick_strings(&["◐", "◓", "◑", "◒", "⊙"]) } +/// Create a spinner that displays the current counted position. +/// +/// # Panics +/// When the progress style is invalid. +/// pub fn counted_spinner() -> ProgressStyle { ProgressStyle::with_template("{spinner:.green.bold} {msg:.bold}: {human_pos} ({elapsed:.dim})") .expect("Valid template") + .with_key("elapsed", elapsed) .tick_strings(&["◐", "◓", "◑", "◒", "⊙"]) } +/// Create a progress bar that displays the current counted position. +/// +/// # Panics +/// When the progress style is invalid. +/// pub fn counted_bar() -> ProgressStyle { ProgressStyle::with_template( "|{bar:32.green}| {msg:.bold}: {human_pos}/{human_len} ({elapsed:.dim})", ) .expect("Valid template") + .with_key("elapsed", elapsed) .progress_chars("█▉▊▋▌▍▎▏ ") } diff --git a/src/project.rs b/src/project.rs index 0fd54c7..4d7d91c 100644 --- a/src/project.rs +++ b/src/project.rs @@ -1,4 +1,4 @@ -use indicatif::{ProgressBar, ProgressDrawTarget}; +use indicatif::ProgressBar; use log::{debug, trace, warn}; use serde_json::Value; use std::cmp::Ordering; @@ -6,9 +6,9 @@ use std::collections::HashMap; use std::path::PathBuf; use std::time::Duration; -use crate::cluster::{ClusterConfiguration, SchedulerType}; +use crate::cluster::{self, SchedulerType}; use crate::expr; -use crate::launcher::LauncherConfiguration; +use crate::launcher; use crate::progress_styles; use crate::scheduler::bash::Bash; use crate::scheduler::slurm::Slurm; @@ -72,14 +72,14 @@ impl Project { /// pub fn open( io_threads: u16, - cluster_name: Option, + cluster_name: &Option, multi_progress: &mut MultiProgressContainer, ) -> Result { trace!("Opening project."); let workflow = Workflow::open()?; - let clusters = ClusterConfiguration::open()?; + let clusters = cluster::Configuration::open()?; let cluster = clusters.identify(cluster_name.as_deref())?; - let launchers = LauncherConfiguration::open()?.by_cluster(&cluster.name); + let launchers = launcher::Configuration::open()?.by_cluster(&cluster.name); let cluster_name = cluster.name.clone(); let scheduler: Box = match cluster.scheduler { @@ -93,15 +93,9 @@ impl Project { let jobs = state.jobs_submitted_on(&cluster_name); let mut progress = ProgressBar::new_spinner().with_message("Checking submitted job statuses"); - if !jobs.is_empty() { - progress = multi_progress.multi_progress.add(progress); - multi_progress.progress_bars.push(progress.clone()); - progress.enable_steady_tick(Duration::from_millis(progress_styles::STEADY_TICK)); - } else { - progress.set_draw_target(ProgressDrawTarget::hidden()); - // TODO: Refactor these types of code blocks into the MultiProgressContainer? - } + progress = multi_progress.add_or_hide(progress, jobs.is_empty()); + progress.enable_steady_tick(Duration::from_millis(progress_styles::STEADY_TICK)); progress.set_style(progress_styles::uncounted_spinner()); progress.tick(); @@ -166,6 +160,7 @@ impl Project { /// `Ok(Vec)` listing directories from `directories` that match /// the action's **include** directive. /// + /// # Errors /// `Err(row::Error)` when any action's include pointer cannot be resolved. /// /// # Warnings @@ -250,7 +245,7 @@ impl Project { let completed = self.state.completed(); if completed[&action.name].contains(&directory_name) { - status.completed.push(directory_name) + status.completed.push(directory_name); } else if self.state.is_submitted(&action.name, &directory_name) { status.submitted.push(directory_name); } else if action @@ -268,6 +263,14 @@ impl Project { } /// Separate directories into groups based on the given parameters + /// + /// # Errors + /// `Err(row::Error)` when a given directory is not present or a JSON + /// pointer used for sorting is not present. + /// + /// # Panics + /// When two JSON pointers are not valid for comparison. + /// pub fn separate_into_groups( &self, action: &Action, @@ -307,7 +310,12 @@ impl Project { // Sort by key when there are keys to sort by. let mut result = Vec::new(); - if !action.group.sort_by.is_empty() { + if action.group.sort_by.is_empty() { + if action.group.reverse_sort { + directories.reverse(); + } + result.push(directories); + } else { directories.sort_by(|a, b| { expr::partial_cmp_json_values(&sort_keys[a], &sort_keys[b]) .expect("Valid JSON comparison") @@ -318,6 +326,7 @@ impl Project { } // Split by the sort key when requested. + #[allow(clippy::redundant_closure_for_method_calls)] if action.group.split_by_sort_key { result.extend( directories @@ -331,20 +340,16 @@ impl Project { } else { result.push(directories); } - } else { - if action.group.reverse_sort { - directories.reverse(); - } - result.push(directories); } if let Some(maximum_size) = action.group.maximum_size { let mut new_result = Vec::new(); for array in result { + #[allow(clippy::redundant_closure_for_method_calls)] new_result.extend(array.chunks(maximum_size).map(|v| v.to_vec())); } - result = new_result + result = new_result; } Ok(result) @@ -429,7 +434,7 @@ previous_actions = ["two"] temp.child("workflow.toml").write_str(&workflow).unwrap(); - Project::open(2, None, &mut multi_progress).unwrap() + Project::open(2, &None, &mut multi_progress).unwrap() } #[test] diff --git a/src/scheduler.rs b/src/scheduler.rs index e3dc341..38997e4 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -18,6 +18,9 @@ pub trait Scheduler { /// # Returns /// A `String` containing the job script. /// + /// # Errors + /// Returns `Err` when the script cannot be created. + /// fn make_script(&self, action: &Action, directories: &[PathBuf]) -> Result; /// Submit a job to the scheduler. @@ -30,8 +33,6 @@ pub trait Scheduler { /// /// # Returns /// `Ok(job_id_option)` on success. - /// `Err(row::Error)` on error, which may be due to a non-zero exit status - /// from the submission. /// Schedulers that queue jobs should set `job_id_option = Some(job_id)`. /// Schedulers that execute jobs immediately should set `job_id_option = None`. /// @@ -39,6 +40,10 @@ pub trait Scheduler { /// Implementations should periodically check `should_terminate` and /// exit early (if possible) with `Err(Error::Interrupted)` when set. /// + /// # Errors + /// Returns `Err(row::Error)` on error, which may be due to a non-zero exit + /// status from the submission. + /// fn submit( &self, working_directory: &Path, @@ -52,16 +57,23 @@ pub trait Scheduler { /// # Arguments /// * `jobs`: Identifiers to query /// - /// `active_jobs` returns a ActiveJobs object, which provides the final + /// `active_jobs` returns a `ActiveJobs` object, which provides the final /// result via a method. This allows implementations to be asynchronous so /// that long-running subprocesses can complete in the background while the /// collar performs other work. /// + /// # Errors + /// Returns `Err` when the job queue query cannot be executed. + /// fn active_jobs(&self, jobs: &[u32]) -> Result, Error>; } /// Deferred result containing jobs that are still active on the cluster. pub trait ActiveJobs { /// Complete the operation and return the currently active jobs. + /// + /// # Errors + /// Returns `Err` when the job queue query cannot be executed. + /// fn get(self: Box) -> Result, Error>; } diff --git a/src/scheduler/bash.rs b/src/scheduler/bash.rs index 09e983b..7ea5145 100644 --- a/src/scheduler/bash.rs +++ b/src/scheduler/bash.rs @@ -3,6 +3,7 @@ use nix::sys::signal::{self, Signal}; use nix::unistd::Pid; use std::collections::{HashMap, HashSet}; use std::env; +use std::fmt::Write as _; use std::io::Write; use std::os::unix::process::ExitStatusExt; use std::path::{Path, PathBuf}; @@ -84,7 +85,8 @@ impl<'a> BashScriptBuilder<'a> { } result.push_str(")\n"); - result.push_str(&format!( + let _ = write!( + result, r#" export ACTION_CLUSTER="{}" export ACTION_NAME="{}" @@ -92,27 +94,27 @@ export ACTION_PROCESSES="{}" export ACTION_WALLTIME_IN_MINUTES="{}" "#, self.cluster_name, self.action.name, self.total_processes, self.walltime_in_minutes, - )); + ); if let Processes::PerDirectory(processes_per_directory) = self.action.resources.processes { - result.push_str(&format!( - "export ACTION_PROCESSES_PER_DIRECTORY=\"{}\"\n", - processes_per_directory - )); + let _ = writeln!( + result, + "export ACTION_PROCESSES_PER_DIRECTORY=\"{processes_per_directory}\"", + ); } if let Some(threads_per_process) = self.action.resources.threads_per_process { - result.push_str(&format!( - "export ACTION_THREADS_PER_PROCESS=\"{}\"\n", - threads_per_process - )); + let _ = writeln!( + result, + "export ACTION_THREADS_PER_PROCESS=\"{threads_per_process}\"", + ); } if let Some(gpus_per_process) = self.action.resources.gpus_per_process { - result.push_str(&format!( - "export ACTION_GPUS_PER_PROCESS=\"{}\"\n", - gpus_per_process - )); + let _ = writeln!( + result, + "export ACTION_GPUS_PER_PROCESS=\"{gpus_per_process}\"", + ); } Ok(result) @@ -139,10 +141,11 @@ export ACTION_WALLTIME_IN_MINUTES="{}" let action_name = &self.action.name; let row_executable = env::current_exe().map_err(Error::FindCurrentExecutable)?; let row_executable = row_executable.to_str().expect("UTF-8 path to executable."); - result.push_str(&format!( + let _ = write!( + result, r#" trap 'printf %s\\n "${{directories[@]}}" | {row_executable} scan --no-progress -a {action_name} - || exit 3' EXIT"# - )); + ); Ok(result) } @@ -150,7 +153,7 @@ trap 'printf %s\\n "${{directories[@]}}" | {row_executable} scan --no-progress - fn execution(&self) -> Result { let contains_directory = self.action.command.contains("{directory}"); let contains_directories = self.action.command.contains("{directories}"); - if contains_directory as u32 + contains_directories as u32 > 1 { + if contains_directory && contains_directories { return Err(Error::ActionContainsMultipleTemplates( self.action.name.clone(), )); @@ -247,7 +250,7 @@ impl Scheduler for Bash { .map_err(|e| Error::SpawnProcess("bash".into(), e))?; let mut stdin = child.stdin.take().expect("Piped stdin"); - write!(stdin, "{}", script)?; + write!(stdin, "{script}")?; drop(stdin); trace!("Waiting for bash to complete."); @@ -306,7 +309,7 @@ mod tests { use crate::builtin::BuiltIn; use crate::cluster::{IdentificationMethod, SchedulerType}; - use crate::launcher::LauncherConfiguration; + use crate::launcher; use crate::workflow::Walltime; use crate::workflow::{Resources, SubmitOptions}; @@ -329,7 +332,7 @@ mod tests { }; let directories = vec![PathBuf::from("a"), PathBuf::from("b"), PathBuf::from("c")]; - let launchers = LauncherConfiguration::built_in(); + let launchers = launcher::Configuration::built_in(); (action, directories, launchers.by_cluster("cluster")) } diff --git a/src/scheduler/slurm.rs b/src/scheduler/slurm.rs index a78489a..f309227 100644 --- a/src/scheduler/slurm.rs +++ b/src/scheduler/slurm.rs @@ -162,7 +162,7 @@ impl Scheduler for Slurm { let mut stdin = child.stdin.take().expect("Piped stdin"); let input_thread = thread::spawn(move || { - let _ = write!(stdin, "{}", script); + let _ = write!(stdin, "{script}"); }); trace!("Waiting for sbatch to complete."); @@ -172,7 +172,14 @@ impl Scheduler for Slurm { input_thread.join().expect("The thread should not panic"); - if !output.status.success() { + if output.status.success() { + let job_id_string = str::from_utf8(&output.stdout).expect("Valid UTF-8 output"); + let job_id = job_id_string + .trim_end_matches(char::is_whitespace) + .parse::() + .map_err(|_| Error::UnexpectedOutput("sbatch".into(), job_id_string.into()))?; + Ok(Some(job_id)) + } else { let message = match output.status.code() { None => match output.status.signal() { None => "sbatch was terminated by a unknown signal".to_string(), @@ -181,13 +188,6 @@ impl Scheduler for Slurm { Some(code) => format!("sbatch exited with code {code}"), }; Err(Error::SubmitAction(action.name.clone(), message)) - } else { - let job_id_string = str::from_utf8(&output.stdout).expect("Valid UTF-8 output"); - let job_id = job_id_string - .trim_end_matches(char::is_whitespace) - .parse::() - .map_err(|_| Error::UnexpectedOutput("sbatch".into(), job_id_string.into()))?; - Ok(Some(job_id)) } } @@ -213,7 +213,7 @@ impl Scheduler for Slurm { jobs_string.push_str("1,"); } for job in jobs { - let _ = write!(jobs_string, "{},", job); + let _ = write!(jobs_string, "{job},"); } let squeue = Command::new("squeue") @@ -278,7 +278,7 @@ mod tests { use crate::builtin::BuiltIn; use crate::cluster::{Cluster, IdentificationMethod, Partition, SchedulerType}; - use crate::launcher::LauncherConfiguration; + use crate::launcher; use crate::workflow::{Processes, SubmitOptions}; fn setup() -> (Action, Vec, Slurm) { @@ -290,7 +290,7 @@ mod tests { }; let directories = vec![PathBuf::from("a"), PathBuf::from("b"), PathBuf::from("c")]; - let launchers = LauncherConfiguration::built_in(); + let launchers = launcher::Configuration::built_in(); let cluster = Cluster { name: "cluster".into(), identify: IdentificationMethod::Always(false), @@ -413,7 +413,7 @@ mod tests { fn mem_per_cpu() { let (action, directories, _) = setup(); - let launchers = LauncherConfiguration::built_in(); + let launchers = launcher::Configuration::built_in(); let cluster = Cluster { name: "cluster".into(), identify: IdentificationMethod::Always(false), @@ -439,7 +439,7 @@ mod tests { fn mem_per_gpu() { let (mut action, directories, _) = setup(); - let launchers = LauncherConfiguration::built_in(); + let launchers = launcher::Configuration::built_in(); let cluster = Cluster { name: "cluster".into(), identify: IdentificationMethod::Always(false), @@ -467,7 +467,7 @@ mod tests { fn cpus_per_node() { let (mut action, directories, _) = setup(); - let launchers = LauncherConfiguration::built_in(); + let launchers = launcher::Configuration::built_in(); let cluster = Cluster { name: "cluster".into(), identify: IdentificationMethod::Always(false), @@ -495,7 +495,7 @@ mod tests { fn gpus_per_node() { let (mut action, directories, _) = setup(); - let launchers = LauncherConfiguration::built_in(); + let launchers = launcher::Configuration::built_in(); let cluster = Cluster { name: "cluster".into(), identify: IdentificationMethod::Always(false), diff --git a/src/state.rs b/src/state.rs index a1a7f25..09fb8c3 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,4 +1,4 @@ -use indicatif::{ProgressBar, ProgressDrawTarget}; +use indicatif::ProgressBar; use log::{debug, trace, warn}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -124,7 +124,7 @@ impl State { } } - Vec::from_iter(set.drain()) + set.drain().collect::>() } /// List all directories in the state. @@ -136,6 +136,10 @@ impl State { } /// Read the state cache from disk. + /// + /// # Errors + /// Returns `Err` when the cache files cannot be read or parsed. + /// pub fn from_cache(workflow: &Workflow) -> Result { let mut state = State { values: Self::read_value_cache(workflow)?, @@ -242,6 +246,10 @@ impl State { } /// Save the state cache to the filesystem. + /// + /// # Errors + /// Returns `Err` when a cache file cannot be saved. + /// pub fn save_cache( &mut self, workflow: &Workflow, @@ -314,12 +322,10 @@ impl State { // Then remove the staged files. let mut progress = ProgressBar::new(self.completed_file_names.len() as u64) .with_message("Removing staged completed actions"); - if self.completed_file_names.len() >= MIN_PROGRESS_BAR_SIZE { - progress = multi_progress.multi_progress.add(progress); - multi_progress.progress_bars.push(progress.clone()); - } else { - progress.set_draw_target(ProgressDrawTarget::hidden()); - } + progress = multi_progress.add_or_hide( + progress, + self.completed_file_names.len() < MIN_PROGRESS_BAR_SIZE, + ); progress.set_style(progress_styles::counted_bar()); progress.tick(); @@ -493,7 +499,7 @@ impl State { self.completed_modified = true; } - for (_, directories) in self.completed.iter_mut() { + for directories in self.completed.values_mut() { let directories_to_remove: Vec = directories .iter() .filter(|d| !self.values.contains_key(*d)) @@ -526,7 +532,7 @@ impl State { self.submitted_modified = true; } - for (_, directory_map) in self.submitted.iter_mut() { + for directory_map in self.submitted.values_mut() { let directories_to_remove: Vec = directory_map .keys() .filter(|d| !self.values.contains_key(*d)) @@ -598,12 +604,10 @@ impl State { let mut progress = ProgressBar::new(self.completed_file_names.len() as u64) .with_message("Reading staged completed actions"); - if self.completed_file_names.len() >= MIN_PROGRESS_BAR_SIZE { - progress = multi_progress.multi_progress.add(progress); - multi_progress.progress_bars.push(progress.clone()); - } else { - progress.set_draw_target(ProgressDrawTarget::hidden()); - } + progress = multi_progress.add_or_hide( + progress, + self.completed_file_names.len() < MIN_PROGRESS_BAR_SIZE, + ); progress.set_style(progress_styles::counted_bar()); progress.tick(); @@ -795,7 +799,9 @@ products = ["g"] assert!(state.completed.contains_key("e")); for i in 0..n { let directory = PathBuf::from(format!("dir{i}")); - assert_eq!(state.values[&directory].as_i64().unwrap() as usize, i); + #[allow(clippy::cast_sign_loss)] + let value = state.values[&directory].as_i64().unwrap() as usize; + assert_eq!(value, i); if i < n / 2 { assert!(state.completed["b"].contains(&directory)); @@ -877,7 +883,9 @@ products = ["g"] for i in 0..n { let directory = PathBuf::from(format!("dir{i}")); - assert_eq!(state.values[&directory].as_i64().unwrap() as usize, i); + #[allow(clippy::cast_sign_loss)] + let value = state.values[&directory].as_i64().unwrap() as usize; + assert_eq!(value, i); if i < n / 2 { assert!(state.completed["b"].contains(&directory)); @@ -928,7 +936,7 @@ products = ["g"] assert_eq!(state.jobs_submitted_on("cluster1"), vec![11]); let mut jobs_on_cluster2 = state.jobs_submitted_on("cluster2"); - jobs_on_cluster2.sort(); + jobs_on_cluster2.sort_unstable(); assert_eq!(jobs_on_cluster2, vec![12, 13]); state diff --git a/src/ui.rs b/src/ui.rs index 56bb18e..59cb6eb 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -7,10 +7,10 @@ use std::io::{self, Write}; /// The default writer buffer size. const DEFAULT_BUFFER_SIZE: usize = 1024; -/// Buffered writer that interoperates with a MultiProgress. +/// Buffered writer that interoperates with a `MultiProgress`. /// /// Use this writer to buffer writes to stdout/stderr. When flushed, the -/// writer will suspend the MultiProgress and write the output. +/// writer will suspend the `MultiProgress` and write the output. /// pub struct MultiProgressWriter { inner: T, @@ -50,9 +50,9 @@ impl Write for MultiProgressWriter { fn flush(&mut self) -> io::Result<()> { if let Some(last_newline) = memmem::rfind(&self.buffer, b"\n") { self.multi_progress.suspend(|| -> io::Result<()> { - self.inner.write_all(&self.buffer[0..last_newline + 1]) + self.inner.write_all(&self.buffer[0..=last_newline]) })?; - self.buffer.drain(0..last_newline + 1); + self.buffer.drain(0..=last_newline); self.inner.flush()?; } Ok(()) @@ -120,12 +120,7 @@ impl Table { self } - fn write_row( - &self, - writer: &mut W, - row: &[Item], - column_width: &[usize], - ) -> io::Result<()> { + fn write_row(writer: &mut W, row: &[Item], column_width: &[usize]) -> io::Result<()> { for (i, item) in row.iter().enumerate() { let text = match item.alignment { Alignment::Left => format!("{: Self { Self { cpu_hours, @@ -303,6 +304,9 @@ impl Resources { /// # Arguments /// `n_directories`: Number of directories in the submission. /// + /// # Panics + /// When the resulting walltime cannot be represented. + /// pub fn total_walltime(&self, n_directories: usize) -> Duration { match self.walltime { Walltime::PerDirectory(ref w) => Duration::new( @@ -412,11 +416,7 @@ impl Workflow { // Populate each action's submit_options with the global ones. for (name, global_options) in &self.submit_options { - if !action.submit_options.contains_key(name) { - action - .submit_options - .insert(name.clone(), global_options.clone()); - } else { + if action.submit_options.contains_key(name) { let action_options = action .submit_options .get_mut(name) @@ -433,6 +433,10 @@ impl Workflow { if action_options.custom.is_empty() { action_options.custom = global_options.custom.clone(); } + } else { + action + .submit_options + .insert(name.clone(), global_options.clone()); } } } @@ -532,8 +536,7 @@ mod tests { let result = find_and_open_workflow(); assert!( result.is_err(), - "Expected to find no workflow file, but got {:?}", - result + "Expected to find no workflow file, but got {result:?}" ); assert!(result @@ -560,7 +563,7 @@ mod tests { temp.path().canonicalize().unwrap() ); } else { - panic!("Expected to find a workflow file, but got {:?}", result); + panic!("Expected to find a workflow file, but got {result:?}"); } } @@ -728,8 +731,7 @@ command = "d" let result = Workflow::open_str(temp.path(), workflow); assert!( result.is_err(), - "Expected duplicate action error, but got {:?}", - result + "Expected duplicate action error, but got {result:?}" ); assert!(result @@ -801,8 +803,7 @@ previous_actions = ["a"] let result = Workflow::open_str(temp.path(), workflow); assert!( result.is_err(), - "Expected previous action error, but got {:?}", - result + "Expected previous action error, but got {result:?}" ); assert!(result @@ -886,15 +887,13 @@ processes.per_directory = 2 let result = Workflow::open_str(temp.path(), workflow); assert!( matches!(result, Err(Error::TOMLParse(..))), - "Expected duplicate processes error, but got {:?}", - result + "Expected duplicate processes error, but got {result:?}" ); let err = result.unwrap_err().to_string(); assert!( err.contains("wanted exactly 1 element"), - "Expected 'wanted exactly 1 element', got {:?}", - err + "Expected 'wanted exactly 1 element', got {err:?}" ); } @@ -913,15 +912,13 @@ walltime.per_directory = "01:00" let result = Workflow::open_str(temp.path(), workflow); assert!( matches!(result, Err(Error::TOMLParse(..))), - "Expected duplicate walltime error, but got {:?}", - result + "Expected duplicate walltime error, but got {result:?}" ); let err = result.unwrap_err().to_string(); assert!( err.contains("wanted exactly 1 element"), - "Expected 'wanted exactly 1 element', got {:?}", - err + "Expected 'wanted exactly 1 element', got {err:?}" ); } #[test] diff --git a/src/workspace.rs b/src/workspace.rs index dcc4ff2..39bc722 100644 --- a/src/workspace.rs +++ b/src/workspace.rs @@ -1,4 +1,4 @@ -use indicatif::{ProgressBar, ProgressDrawTarget}; +use indicatif::ProgressBar; use log::debug; use serde_json::Value; use std::collections::{HashMap, HashSet}; @@ -15,16 +15,16 @@ use crate::{progress_styles, Error, MultiProgressContainer, MIN_PROGRESS_BAR_SIZ /// List all directories in the workspace as found on the filesystem. /// +/// # Errors +/// Returns `Err` when the workspace directory cannot be accessed. +/// pub fn list_directories( workflow: &Workflow, multi_progress: &mut MultiProgressContainer, ) -> Result, Error> { let workspace_path = workflow.root.join(&workflow.workspace.path); - let progress = multi_progress - .multi_progress - .add(ProgressBar::new_spinner().with_message("Listing workspace")); - multi_progress.progress_bars.push(progress.clone()); + let progress = multi_progress.add(ProgressBar::new_spinner().with_message("Listing workspace")); progress.set_style(progress_styles::counted_spinner()); progress.enable_steady_tick(Duration::from_millis(progress_styles::STEADY_TICK)); @@ -82,6 +82,9 @@ pub struct CompletedDirectories { /// * `directories` - The directories to scan. Must be present in the workspace. /// * `io_threads` - Number of threads to use while scanning directories. /// +/// # Panics +/// When unable to spawn threads. +/// pub fn find_completed_directories( workflow: &Workflow, directories: Vec, @@ -90,13 +93,7 @@ pub fn find_completed_directories( ) -> CompletedDirectories { let mut progress = ProgressBar::new(directories.len() as u64).with_message("Scanning directories"); - if directories.len() >= MIN_PROGRESS_BAR_SIZE { - progress = multi_progress.multi_progress.add(progress); - multi_progress.progress_bars.push(progress.clone()); - } else { - progress.set_draw_target(ProgressDrawTarget::hidden()); - } - + progress = multi_progress.add_or_hide(progress, directories.len() < MIN_PROGRESS_BAR_SIZE); progress.set_style(progress_styles::counted_bar()); progress.tick(); @@ -186,6 +183,13 @@ pub fn find_completed_directories( impl CompletedDirectories { /// Get the directories that have been completed for each action. + /// + /// # Errors + /// Returns `Err` when the workspace directories cannot be accessed. + /// + /// # Panics + /// This method should not panic. + /// pub fn get(self) -> Result>, Error> { let mut result = HashMap::new(); for (directory, action) in &self.receiver { @@ -240,12 +244,7 @@ pub(crate) fn read_values( let (sender, receiver) = mpsc::channel(); let mut progress = ProgressBar::new(directories.len() as u64).with_message("Reading values"); - if directories.len() >= MIN_PROGRESS_BAR_SIZE { - progress = multi_progress.multi_progress.add(progress); - multi_progress.progress_bars.push(progress.clone()); - } else { - progress.set_draw_target(ProgressDrawTarget::hidden()); - } + progress = multi_progress.add_or_hide(progress, directories.len() < MIN_PROGRESS_BAR_SIZE); progress.set_style(progress_styles::counted_bar()); progress.tick();