From c21b4598ed4e72baf60783a4633d313d61071c0b Mon Sep 17 00:00:00 2001 From: "Joshua A. Anderson" Date: Tue, 14 May 2024 10:23:09 -0400 Subject: [PATCH] Refactor default actions * Support actions with duplicate names. * Add: [default.action] table. Set the default for any action key (or sub-key). * Add: [action.from] string. Copy any omitted keys from the given action. * Add: Include operators ">=" and "<=" * Breaking: Rename "greater_than" to ">" * Breaking: Rename "less_than" to ">" * Breaking: Rename "equal_to" to "==" * Breaking: [submit_options] is now [default.action.submit_options] --- .github/CODEOWNERS | 2 +- DESIGN.md | 1 + doc/src/SUMMARY.md | 10 +- doc/src/clusters/built-in.md | 31 +- doc/src/clusters/cluster.md | 30 +- doc/src/clusters/index.md | 10 +- doc/src/developers/contributing.md | 37 +- doc/src/developers/style.md | 5 +- doc/src/developers/testing.md | 9 +- doc/src/env.md | 18 +- doc/src/guide/concepts/best-practices.md | 47 - doc/src/guide/concepts/cache.md | 12 +- doc/src/guide/concepts/process-parallelism.md | 7 +- doc/src/guide/concepts/status.md | 5 +- doc/src/guide/concepts/thread-parallelism.md | 2 + doc/src/guide/howto/account.md | 25 + doc/src/guide/howto/best-practices.md | 52 + doc/src/guide/howto/index.md | 4 + doc/src/guide/howto/same.md | 30 + doc/src/guide/howto/summarize.md | 31 + doc/src/guide/python/actions.md | 51 +- doc/src/guide/python/signac-workflow.toml | 6 +- doc/src/guide/python/signac.md | 2 +- doc/src/guide/tutorial/group-workflow2.toml | 4 +- doc/src/guide/tutorial/group-workflow3.toml | 4 +- doc/src/guide/tutorial/group-workflow4.toml | 4 +- doc/src/guide/tutorial/group-workflow5.toml | 4 +- doc/src/guide/tutorial/group.md | 8 +- doc/src/guide/tutorial/hello.md | 4 +- doc/src/guide/tutorial/multiple.md | 2 +- doc/src/guide/tutorial/resources.md | 2 +- doc/src/guide/tutorial/scheduler.md | 4 +- doc/src/guide/tutorial/submit.md | 10 +- doc/src/launchers/built-in.md | 2 + doc/src/launchers/index.md | 8 +- doc/src/launchers/launcher.md | 6 +- doc/src/row/index.md | 6 +- doc/src/row/scan.md | 2 +- doc/src/row/show/launchers.md | 2 +- doc/src/row/submit.md | 6 +- doc/src/signac-flow.md | 24 +- doc/src/workflow/action/group.md | 24 +- doc/src/workflow/action/index.md | 49 +- doc/src/workflow/action/resources.md | 25 +- doc/src/workflow/action/submit-options.md | 54 +- doc/src/workflow/default.md | 31 + doc/src/workflow/index.md | 2 +- doc/src/workflow/submit-options.md | 57 -- src/cli/directories.rs | 186 ++-- src/cli/scan.rs | 6 +- src/cli/status.rs | 6 +- src/cli/submit.rs | 28 +- src/cluster.rs | 8 +- src/expr.rs | 52 +- src/launcher.rs | 8 +- src/lib.rs | 27 +- src/project.rs | 40 +- src/scheduler/bash.rs | 79 +- src/scheduler/slurm.rs | 20 +- src/state.rs | 8 +- src/workflow.rs | 953 +++++++++++++++--- src/workspace.rs | 4 +- 62 files changed, 1604 insertions(+), 592 deletions(-) delete mode 100644 doc/src/guide/concepts/best-practices.md create mode 100644 doc/src/guide/howto/account.md create mode 100644 doc/src/guide/howto/best-practices.md create mode 100644 doc/src/guide/howto/index.md create mode 100644 doc/src/guide/howto/same.md create mode 100644 doc/src/guide/howto/summarize.md create mode 100644 doc/src/workflow/default.md delete mode 100644 doc/src/workflow/submit-options.md diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 3c6c1aa..4f3ea66 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1 @@ -* joaander +* @joaander diff --git a/DESIGN.md b/DESIGN.md index 79d67de..4d3e55b 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -299,3 +299,4 @@ status may take a long time, so it should display a progress bar. # TODO: Dependabot configuration # TODO: readthedocs builds # TODO: logo +# TODO: release CI diff --git a/doc/src/SUMMARY.md b/doc/src/SUMMARY.md index bce7bd0..3180188 100644 --- a/doc/src/SUMMARY.md +++ b/doc/src/SUMMARY.md @@ -16,12 +16,17 @@ - [Working with signac projects](guide/python/signac.md) - [Writing action commands in Python](guide/python/actions.md) - [Concepts](guide/concepts/index.md) - - [Best practices](guide/concepts/best-practices.md) - [Process parallelism](guide/concepts/process-parallelism.md) - [Thread parallelism](guide/concepts/thread-parallelism.md) - [Directory status](guide/concepts/status.md) - [JSON pointers](guide/concepts/json-pointers.md) - [Cache files](guide/concepts/cache.md) +- [How-to](guide/howto/index.md) + - [Best practices](guide/howto/best-practices.md) + - [Set the cluster account](guide/howto/account.md) + - [Submit the same action to different groups/resources](guide/howto/same.md) + - [Summarize directory groups with an action](guide/howto/summarize.md) + # Reference - [row](row/index.md) @@ -34,14 +39,13 @@ - [show launchers](row/show/launchers.md) - [scan](row/scan.md) - [clean](row/clean.md) - - [`workflow.toml`](workflow/index.md) - [workspace](workflow/workspace.md) - - [submit_options](workflow/submit-options.md) - [action](workflow/action/index.md) - [group](workflow/action/group.md) - [resources](workflow/action/resources.md) - [submit_options](workflow/action/submit-options.md) + - [default](workflow/default.md) - [`clusters.toml`](clusters/index.md) - [cluster](clusters/cluster.md) - [Built-in clusters](clusters/built-in.md) diff --git a/doc/src/clusters/built-in.md b/doc/src/clusters/built-in.md index 68dc76e..06a3477 100644 --- a/doc/src/clusters/built-in.md +++ b/doc/src/clusters/built-in.md @@ -4,47 +4,50 @@ ## Anvil (Purdue) -[Anvil documentation](https://www.rcac.purdue.edu/knowledge/anvil). - -**Row** automatically selects from the following partitions: +**Row** automatically selects from the following partitions on [Anvil]: * `shared` * `wholenode` * `gpu` Other partitions may be selected manually. -There is no need to set `--mem-per-*` options on Anvil as the cluster automatically +There is no need to set `--mem-per-*` options on [Anvil] as the cluster automatically chooses the largest amount of memory available per core by default. -## Delta (NCSA) +> Note: The whole node partitions **require** that each job submitted request an +> integer multiple of 128 CPU cores. -[Delta documentation](https://docs.ncsa.illinois.edu/systems/delta). +[Anvil]: https://www.rcac.purdue.edu/knowledge/anvil -**Row** automatically selects from the following partitions: +## Delta (NCSA) + +**Row** automatically selects from the following partitions on [Delta]: * `cpu` * `gpuA100x4` Other partitions may be selected manually. -Delta jobs default to a small amount of memory per core. **Row** inserts `--mem-per-cpu` -or `--mem-per-gpu` to select the maximum amount of memory possible that allows full-node -jobs and does not incur extra charges. +[Delta] jobs default to a small amount of memory per core. **Row** inserts +`--mem-per-cpu` or `--mem-per-gpu` to select the maximum amount of memory possible that +allows full-node jobs and does not incur extra charges. -## Great Lakes (University of Michigan) +[Delta]: https://docs.ncsa.illinois.edu/systems/delta -[Great Lakes documentation](https://arc.umich.edu/greatlakes/). +## Great Lakes (University of Michigan) -**Row** automatically selects from the following partitions: +**Row** automatically selects from the following partitions on [Great Lakes]: * `standard` * `gpu_mig40,gpu` * `gpu` Other partitions may be selected manually. -Great Lakes jobs default to a small amount of memory per core. **Row** inserts +[Great Lakes] jobs default to a small amount of memory per core. **Row** inserts `--mem-per-cpu` or `--mem-per-gpu` to select the maximum amount of memory possible that allows full-node jobs and does not incur extra charges. > Note: The `gpu_mig40,gpu` partition is selected only when there is one GPU per job. > This is a combination of 2 partitions which decreases queue wait time due to the > larger number of nodes that can run your job. + +[Great Lakes]: https://arc.umich.edu/greatlakes/ diff --git a/doc/src/clusters/cluster.md b/doc/src/clusters/cluster.md index a06885f..92712e3 100644 --- a/doc/src/clusters/cluster.md +++ b/doc/src/clusters/cluster.md @@ -38,10 +38,10 @@ on this cluster. The table **must** have one of the following keys: * `by_environment`: **array** of two strings - Identify the cluster when the environment variable `by_environment[0]` is set and equal to `by_environment[1]`. * `always`: **bool** - Set to `true` to always identify this cluster. When `false`, - this cluster can only be chosen by an explicit `--cluster` option. + this cluster may only be chosen by an explicit `--cluster` option. > Note: The *first* cluster in the list that sets `identify.always = true` will prevent -> any later cluster from being identified. +> any later cluster from being identified (except by explicit `--cluster=name`). ## scheduler @@ -87,16 +87,18 @@ will pass this option to the scheduler. For example SLURM schedulers will set ### cpus_per_node -`cluster.partition.cpus_per_node`: **string** - Number of CPUs per node. When -`cpus_per_node` is not set, **row** will ask the scheduler to schedule only a given -number of tasks. In this case, some schedulers are free to spread tasks among any -number of nodes (for example, shared partitions on Slurm schedulers). +`cluster.partition.cpus_per_node`: **string** - Number of CPUs per node. -When `cpus_per_node` is set, **row** will request the minimal number of nodes needed -to satisfy `n_nodes * cpus_per_node >= total_cpus`. This may result in longer queue -times, but will lead to more stable performance for users. +When `cpus_per_node` is not set, **row** will request `n_processes` tasks. In this case, +some schedulers are free to spread tasks among any number of nodes (for example, shared +partitions on Slurm schedulers). -Set `cpus_per_node` only when all nodes in the partition have the same number of CPUs. +When `cpus_per_node` is set, **row** will **also** request the minimal number of nodes +needed to satisfy `n_nodes * cpus_per_node >= total_cpus`. This may result in longer +queue times, but will lead to more stable performance for users. + +> Note: Set `cpus_per_node` only when all nodes in the partition have the same number +> of CPUs. ### minimum_gpus_per_job @@ -131,7 +133,7 @@ will pass this option to the scheduler. For example SLURM schedulers will set ### gpus_per_node `cluster.partition.gpus_per_node`: **string** - Number of GPUs per node. Like -`cpus_per_node` but used on jobs that request GPUs. +`cpus_per_node` but used when jobs request GPUs. ### prevent_auto_select @@ -140,6 +142,6 @@ automatically selecting this partition. ### account_suffix -`cluster.partition.account_suffix`: **string** - Set to provide an account suffix -when submitting jobs to this partition. Useful when clusters define separate -`aacount-cpu` and `account-gpu` accounts. +`cluster.partition.account_suffix`: **string** - An account suffix when submitting jobs +to this partition. Useful when clusters define separate `account-cpu` and `account-gpu` +accounts. diff --git a/doc/src/clusters/index.md b/doc/src/clusters/index.md index cccc058..9f521ec 100644 --- a/doc/src/clusters/index.md +++ b/doc/src/clusters/index.md @@ -18,13 +18,15 @@ name = "cluster2" ``` User-provided clusters in `$HOME/.config/row/clusters.toml` are placed first in the -array. +array. Execute [`row show cluster --all`](../row/show/cluster.md) to see the complete +cluster configuration. ## Cluster identification On startup, **row** iterates over the array of clusters in order. If `--cluster` is not set, **row** checks the `identify` condition in the configuration. If `--cluster` is -set, **row** checks to see if the name matches. +set, **row** checks to see if the name matches. **Row** selects the *first* cluster +that matches. -> Note: **Row** uses the *first* such match. To override a built-in, your configuration -> should include a cluster by the same name and `identify` condition. +> To override a built-in, your configuration should include a cluster by the same name +> and `identify` condition. diff --git a/doc/src/developers/contributing.md b/doc/src/developers/contributing.md index fe8cf03..7fb793d 100644 --- a/doc/src/developers/contributing.md +++ b/doc/src/developers/contributing.md @@ -2,7 +2,7 @@ Contributions are welcomed via [pull requests on GitHub][github]. Contact the **row** developers before starting work to ensure it meshes well with the planned development -direction and standards set for the project. +direction and follows standards set for the project. [github]: https://github.com/glotzerlab/gsd/row @@ -17,27 +17,31 @@ assist you in designing flexible interfaces. Expensive code paths should only execute when requested. +### Maintain compatibility + +New features should be opt-in and *preserve the behavior* of all existing user scripts. + ## Version control ### Base your work off the correct branch -- Base all new work on `trunk`. +Base all bug fixes and new features on `trunk`. ### Propose a minimal set of related changes -All changes in a pull request should be closely related. Multiple change sets that are -loosely coupled should be proposed in separate pull requests. +All changes in a pull request should be *closely related*. Multiple change sets that are +loosely coupled should be proposed in *separate pull requests*. ### Agree to the Contributor Agreement -All contributors must agree to the Contributor Agreement before their pull request can -be merged. +All contributors must agree to the **Contributor Agreement** before their pull request +can be merged. ### Set your git identity Git identifies every commit you make with your name and e-mail. [Set your identity][id] -to correctly identify your work and set it identically on all systems and accounts where -you make commits. +to correctly identify your work and set it *identically on all systems* and accounts +where you make commits. [id]: http://www.git-scm.com/book/en/v2/Getting-Started-First-Time-Git-Setup @@ -45,12 +49,12 @@ you make commits. ### Use a consistent style -The **Code style** section of the documentation sets the style guidelines for **row** -code. +Follow all guidelines outlined in the [Code style](style.md) section of the +documentation. ### Document code with comments -Use **Rust** documentation comments for classes, functions, etc. Also comment complex +Write Rust documentation comments for traits, functions, etc. Also comment complex sections of code so that other developers can understand them. ### Compile without warnings @@ -61,12 +65,12 @@ Your changes should compile without warnings. ### Write unit tests -Add unit tests for all new functionality. +Add unit tests for all new functionality and bug fixes. -### Validity tests +### Test validity -The developer should run research-scale simulations using the new functionality and -ensure that it behaves as intended. When appropriate, add a new test to `validate.py`. +Run research-scale simulations using new functionality and ensure that it behaves as +intended. ## User documentation @@ -77,8 +81,7 @@ and any important user-facing change in the mdBook documentation. ### Tutorial -When applicable, update or write a new tutorial. - +When applicable, update or write a new tutorial or how-to guide. ### Add developer to the credits diff --git a/doc/src/developers/style.md b/doc/src/developers/style.md index 6718f61..ca96b58 100644 --- a/doc/src/developers/style.md +++ b/doc/src/developers/style.md @@ -3,7 +3,8 @@ ## Rust **Row's** rust code follows the [Rust style guide][1]. **Row's** [pre-commit][2] -configuration applies style fixes with `rustfmt` checks for common errors with `clippy`. +configuration applies style fixes with `rustfmt` and checks for common errors with +`clippy`. [1]: https://doc.rust-lang.org/style-guide/index.html [2]: https://pre-commit.com/ @@ -16,7 +17,7 @@ configuration applies style fixes with `rustfmt` checks for common errors with ` Wrap **Markdown** files at 88 characters wide, except when not possible (e.g. when formatting a table). Follow layout and design patterns established in existing markdown -files. +files. Use reference-style links for long URLs. ## Spelling/grammar diff --git a/doc/src/developers/testing.md b/doc/src/developers/testing.md index 4f67fa6..22280db 100644 --- a/doc/src/developers/testing.md +++ b/doc/src/developers/testing.md @@ -8,9 +8,12 @@ cargo test ``` in the source directory to execute the unit and integration tests. -All tests must be marked either `#[serial]` or `#[parallel]` explicitly. Some serial -tests set environment variables and/or the current working directory, which may conflict -with any test that is automatically run concurrently. Check for this with: +## Writing unit tests + +Write tests using standard Rust conventions. All tests must be marked either `#[serial]` +or `#[parallel]` explicitly. Some serial tests set environment variables and/or the +current working directory, which may conflict with any test that is automatically run +concurrently. Check for this with: ```bash rg --multiline "#\[test\]\n *fn" ``` diff --git a/doc/src/env.md b/doc/src/env.md index 0222864..ef10a50 100644 --- a/doc/src/env.md +++ b/doc/src/env.md @@ -1,7 +1,6 @@ # Environment variables -> Note: Environment variables that influence the execution of **row** are documented in -> [the command line options](row/index.md). +## In job scripts **Row** sets the following environment variables in generated job scripts: @@ -14,3 +13,18 @@ | `ACTION_PROCESSES_PER_DIRECTORY` | Set to the value of `action.resources.processes_per_directory`. Unset when `processes_per_submission`.| | `ACTION_THREADS_PER_PROCESS` | Set to the value of `action.resources.threads_per_process`. Unset when `threads_per_process` is omitted. | | `ACTION_GPUS_PER_PROCESS` | Set to the value of `action.resources.gpus_per_process`. Unset when `gpus_per_process` is omitted. | + +# Set row options + +Set any of these environment variables to provide default values for +[command line options]. + +| Environment variable | Option | +|----------------------|-------------| +| `ROW_CLEAR_PROGRESS`| --clear-progress | +| `ROW_CLUSTER` | --cluster | +| `ROW_COLOR` | --color | +| `ROW_IO_THREADS` | --io-threads | +| `ROW_NO_PROGRESS` | --no-progress | + +[command line options]: row/index.md diff --git a/doc/src/guide/concepts/best-practices.md b/doc/src/guide/concepts/best-practices.md deleted file mode 100644 index 2731a94..0000000 --- a/doc/src/guide/concepts/best-practices.md +++ /dev/null @@ -1,47 +0,0 @@ -# Best practices - -Follow these guidelines to use **row** effectively. - -## Exit actions early when products already exist. - -There are some cases where **row** may fail to identify when your action completes: - -* Software exits with an unrecoverable error. -* Your job exceeds its walltime and is killed. -* And many others... - -To ensure that your action executes as intended, you should **check for the existence -of product files** when your action starts and **exit immediately** when they already -exist. This way, resubmitting an already completed job will not needlessly recompute -results or overwrite files you intended to keep. - -## Write to temporary files and move them to the final product location. - -For example, say `products = ["output.dat"]`. Write to `output.dat.in_progress` -while your calculation executes. Once the action is fully complete, *move* -`output.dat.in_progress` to `output.dat`. - -If you wrote directly to `output.dat`, **row** might identify your computation as -**complete** right after it starts. This pattern also allows you to *continue* running -one calculation over several job submissions. Move the output file to its final location -only after the final submission completes the calculation. - -## Group directories whenever possible, but not to an extreme degree. - -The **scheduler** does an excellent job handling the queue. However, there is some -overhead and the scheduler can only process so many jobs at a time. Your cluster may -even limit how many jobs you are allowed to queue. So please don't submit thousands of -jobs at a time to your cluster. You can improve your workflow's throughput by grouping -directories together into a smaller number of jobs. - -Group jobs that execute quickly in serial with `processes.per_submission` and -`walltime.per_directory`. After a given job has waited in the queue, it can process many -directories before exiting. Limit group sizes so that the total wall time of the job -remains reasonable. - -Group jobs that take a longer time in parallel using MPI partitions, -`processes.per_directory` and `walltime.per_submission`. Limit the group sizes to a -relatively small fraction of the cluster (*except on Leadership class machines*). -Huge parallel jobs may wait a long time in queue before starting. Experiment with the -`group.maximum_size` value and find a good job size (in number of nodes) that balances -queue time vs. scheduler overhead. diff --git a/doc/src/guide/concepts/cache.md b/doc/src/guide/concepts/cache.md index 1fdbc04..9bcea36 100644 --- a/doc/src/guide/concepts/cache.md +++ b/doc/src/guide/concepts/cache.md @@ -3,7 +3,7 @@ **Row** stores cache files in `/.row` to improve performance. In most usage environments **row** will automatically update the cache and keep it synchronized with the state of the workflow and workspace. The rest of this document describes -some scenarios where they cache may not be updated and how you fix the problem. +some scenarios where they cache may not be updated and how you can fix the problem. ## Directory values @@ -19,13 +19,13 @@ invalid when: ## Submitted jobs -**Row** caches the job ID, directory, and cluster name for every job it submits +**Row** caches the *job ID*, *directory*, and *cluster name* for every job it submits to a cluster via `row submit`. **Row** will be unaware of any jobs that you manually submit with `sbatch`. > You should submit all jobs via: > ```bash -> `row submit` +> row submit > ``` Copying a project directory (including `.row/`) from one cluster to another (or from @@ -34,11 +34,11 @@ access the job queue of the first, so all jobs will remain in the cache. *Submit jobs on the 2nd cluster will inevitably lead to changes in the submitted cache on both clusters that cannot be merged. -> Before you copy your project directory, wait for all jobs to finish, then execute +> Wait for all jobs to finish, then execute > ```bash > row show status > ``` -> to update the cache. +> to update the cache. Now the submitted cache is empty and safe to copy. ## Completed directories @@ -50,7 +50,7 @@ if: * *You change products* in `workflow.toml`. * *You change the name of an action* in `workflow.toml`. -> To discover new completed directories, execute +> To discover all completed directories, execute > ```bash > row scan > ``` diff --git a/doc/src/guide/concepts/process-parallelism.md b/doc/src/guide/concepts/process-parallelism.md index e72cb1b..7bbfe01 100644 --- a/doc/src/guide/concepts/process-parallelism.md +++ b/doc/src/guide/concepts/process-parallelism.md @@ -13,8 +13,7 @@ processes: e.g. `launcher = ["mpi"]`. > **processes**. At this time **MPI** is the only **process** launcher that **row** supports. You can -configure additional launchers in [`launchers.toml`](../../launchers/index.md) if your -cluster and application use a different launcher. +configure additional launchers in [`launchers.toml`](../../launchers/index.md). Use **MPI** parallelism to launch: * MPI-enabled applications on one directory (`processes.per_submission = N`, @@ -26,3 +25,7 @@ Use **MPI** parallelism to launch: * MPI-enable applications on many directories in parallel (`processes.per_directory = N`). Instruct your application to *partition* the MPI communicator. + +TODO: Provide a concrete example using HOOMD + +TODO: Provide a concrete example using mpi4py diff --git a/doc/src/guide/concepts/status.md b/doc/src/guide/concepts/status.md index 2928b6c..9a387c7 100644 --- a/doc/src/guide/concepts/status.md +++ b/doc/src/guide/concepts/status.md @@ -1,7 +1,8 @@ # Directory status -For each action, each directory in the workspace that matches the action's -[include condition](../../workflow/action/group.md#include) has a single status: +Each directory in the workspace that matches the action's +[include condition](../../workflow/action/group.md#include) has a *single* status for +that action: | Status | Description | |--------|-------------| diff --git a/doc/src/guide/concepts/thread-parallelism.md b/doc/src/guide/concepts/thread-parallelism.md index 5e86e03..a427b47 100644 --- a/doc/src/guide/concepts/thread-parallelism.md +++ b/doc/src/guide/concepts/thread-parallelism.md @@ -23,3 +23,5 @@ For all other cases, refer to the documentation of your application or library. provide some way to set the number of threads/processes. Use the environment variable `ACTION_THREADS_PER_PROCESS` to ensure that the number of executed threads matches that requested. + +TODO: Provide a concrete example using the Python multiprocessing library diff --git a/doc/src/guide/howto/account.md b/doc/src/guide/howto/account.md new file mode 100644 index 0000000..5e514d2 --- /dev/null +++ b/doc/src/guide/howto/account.md @@ -0,0 +1,25 @@ +# Set the cluster account + +Use the default action to conveniently set the account (or accounts) once in your +`workflow.toml`. It will apply to all actions that do not override the account + +```toml +[default.action.submit_options.cluster1] +account "cluster1-account" +[default.action.submit_options.cluster2] +account "cluster2-account" + +[[action]] +# Will use the defaults above. + +[[action]] +# Will use the defaults above. + +[[action]] +submit_options.cluster1.account = "alternate-account" +# Will use the "alternate-account" on cluster1 and "cluster2-account" on cluster2. +``` + +> Note: NCSA Delta assigns `-cpu` and `-gpu` accounts. Set +> `submit_options.delta.account = ""`. **Row** will automatically append the +> `-cpu` or `-gpu` when submitting to the CPU or GPU partitions respectively. diff --git a/doc/src/guide/howto/best-practices.md b/doc/src/guide/howto/best-practices.md new file mode 100644 index 0000000..496eeb0 --- /dev/null +++ b/doc/src/guide/howto/best-practices.md @@ -0,0 +1,52 @@ +# Best practices + +Follow these guidelines to use **row** effectively. + +## Exit actions early when products already exist. + +There are some cases where **row** may fail to identify when your action completes: + +* Software exits with an unrecoverable error. +* Your job exceeds its walltime and is killed. +* And many others... + +To ensure that your action executes as intended, you should **check for the existence +of product files** when your action starts and **exit immediately** when they already +exist. This way, resubmitting an already completed job will not needlessly recompute +results or overwrite files you intended to keep. + +## Write to temporary files and move them to the final product location. + +For example, say `products = ["output.dat"]`. Write to `output.dat.in_progress` +while your calculation executes. Once the action is fully complete, *move* +`output.dat.in_progress` to `output.dat`. + +This pattern also allows you to *continue* running one calculation over several job +submissions. Move the output file to its final location only after the final submission +completes the calculation. + +> Note: If you wrote directly to `output.dat`, **row** might identify your computation +> as **complete** right after it starts. + +## Group directories whenever possible, but not to an extreme degree. + +The **scheduler** can effectively schedule **many** jobs. However, there is some +overhead. Each job takes a certain amount of time to launch at the start and clean up +at the end. Additionally, the scheduler can only process so many jobs efficiently. Your +cluster may even limit how many jobs you are allowed to queue. So please don't submit +thousands of jobs at a time to your cluster. You can improve your workflow's throughput +by grouping directories together into a smaller number of jobs. + +For actions that execute quickly in serial: Group directories and use with +`processes.per_submission` and `walltime.per_directory`. After a given job has waited in +the queue, it can process many directories before exiting. Limit group sizes so that the +total wall time of the job remains reasonable. + +For actions that take longer: Group directories and execute the action in parallel using +[MPI partitions], `processes.per_directory` and `walltime.per_submission`. You should +typically limit the group sizes to a relatively small fraction of the cluster. Unless +you are using a Leadership class machine, huge parallel jobs may wait a long time in +queue before starting. Experiment with the `group.maximum_size` value and find a good +job size (in number of nodes) that balances queue time vs. scheduler overhead. + +[MPI partitions]: ../concepts/process-parallelism.md diff --git a/doc/src/guide/howto/index.md b/doc/src/guide/howto/index.md new file mode 100644 index 0000000..96c2dce --- /dev/null +++ b/doc/src/guide/howto/index.md @@ -0,0 +1,4 @@ +# How-to + +This section shows how to accomplish various tasks in Row that are not covered in the +tutorial. diff --git a/doc/src/guide/howto/same.md b/doc/src/guide/howto/same.md new file mode 100644 index 0000000..16f4fb2 --- /dev/null +++ b/doc/src/guide/howto/same.md @@ -0,0 +1,30 @@ +# Submit the same action to different groups/resources + +You can submit the same action to different groups and resources. To do so, +create multiple elements in the action array *with the same name*. Each must use +[`group.include`](../../workflow/action/group.md#include) to select *non-overlapping +subsets*. You can use [`action.from`](../../workflow/action/index.md#from) to copy all +fields from one action and selectively override others. + +For example, this `workflow.toml` uses 4 processors on directories with small *N* and 8 +those with a large *N*. + +```toml +[default.action] +command = "python actions.py --action $ACTION_NAME {directories}" + +[[action]] +name = "compute" +products = ["results.out"] +[action.resources] +walltime.per_submission = "12:00:00" +processes.per_directory = 4 +[action.group] +include = [["/N", "<=", "4096"]] +maximum_size = 32 + +[[action]] +from = "compute" +resources.processes.per_directory = 8 +group.include = [["/N", ">", "4096"]] +``` diff --git a/doc/src/guide/howto/summarize.md b/doc/src/guide/howto/summarize.md new file mode 100644 index 0000000..7583579 --- /dev/null +++ b/doc/src/guide/howto/summarize.md @@ -0,0 +1,31 @@ +# Summarize directory groups with an action + +Set [`submit_whole=true`] to ensure that an action is always submitted on the +*whole* group of included directories. For example, you could use this in an analysis +action that averages over replicates. Say your directories have values like +```json +{ + "temperature": 1.0, + "pressure": 0.3, + "replicate": 2 +} +``` +with many directories at the same *temperature* and *pressure* and different +values of *replicate*. You could average over all replicates at the same *temperature* +and *pressure* with an action like this: +```toml +[[action]] +name = "average" +[action.group] +sort_by = ["/temperature", "/pressure"] +split_by_sort_key = true +submit_whole = true +``` + +Actions that summarize output have no clear location to place output files (such as +plots). Many users will write summary output to the project root. +You may omit `products` in this case so that you do not need to create empty files in +each directory. This also makes it easy to rerun the analysis whenever needed as **row** +will never consider it **complete**. + +[`submit_whole=true`]: ../../workflow/action/group.md#submit_whole diff --git a/doc/src/guide/python/actions.md b/doc/src/guide/python/actions.md index 692ba2a..95a843b 100644 --- a/doc/src/guide/python/actions.md +++ b/doc/src/guide/python/actions.md @@ -5,7 +5,7 @@ In **row**, actions execute arbitrary **shell commands**. When your action is that takes directories as arguments. There are many ways you can achieve this goal. This guide will show you how to structure all of your actions in a single file: -`actions.py`. This layout is inspired by **row's** predecessor: **signac-flow** +`actions.py`. This layout is inspired by **row's** predecessor **signac-flow** and its `project.py`. > Note: If you are familiar with **signac-fow**, see [migrating from signac-flow][1] @@ -37,12 +37,12 @@ Execute: ``` to initialize the signac workspace and populate it with directories. -> Note: If you aren't familiar with **signac**, then go read the [*basic* tutorial][2]. -> Come back to the **row** documentation when you get to the section on *workflows*. Or, -> for extra credit, reimplement the **signac** tutorial workflow in **row** after you +> Note: If you are not familiar with **signac**, then go read the [*basic* tutorial]. +> Come back to the **row** documentation when you get to the section on *workflows*. +> For extra credit, reimplement the **signac** tutorial workflow in **row** after you > finish reading this guide. -[2]: https://docs.signac.io/en/latest/tutorial.html#basics +[*basic* tutorial]: https://docs.signac.io/en/latest/tutorial.html#basics ## Write actions.py @@ -68,22 +68,21 @@ Next, replace the contents of `workflow.toml` with the corresponding workflow: {{#include signac-workflow.toml}} ``` -You should be familiar with all of these options from previous tutorials. The main point -of interest here is that *both* actions have the same **command**: +*Both* actions have the same **command**, set once by the +[**default action**](../../workflow/default.md): ```toml -{{#include signac-workflow.toml:6}} +{{#include signac-workflow.toml:5}} ``` `python actions.py` executes the `actions.py` file above. It is given the argument `--action $ACTION_NAME` which selects the Python function to call. Here `$ACTION_NAME` -is an [environment variable](../../env.md) that **row** sets in job scripts. You could -type the action name explicitly here, but then you should take extra care when copying -and pasting commands to avoid executing the wrong action! The last arguments are given -by `{directories}`. Unlike `{directory}` shown in previous tutorials, `{directories}` -expands to *ALL* directories in the submitted **group**. In this way, `action.py` is -executed only once and is free to process the list of directories in any way it chooses -(e.g. in serial, with multiprocessing parallelism, using MPI parallelism, multiple -threads, ...). +is an [environment variable](../../env.md) that **row** sets in job scripts. The +last arguments are given by `{directories}`. Unlike `{directory}` shown in previous +tutorials, `{directories}` expands to *ALL* directories in the submitted **group**. In +this way, `action.py` is executed once and is free to process the list of directories in +any way it chooses (e.g. in serial, with +[multiprocessing parallelism, multiple threads](../concepts/thread-parallelism.md), +using [MPI parallelism](../concepts/process-parallelism.md), ...). ## Execute the workflow @@ -112,19 +111,21 @@ Proceed? [Y/n]: y It worked! `sum` printed the result `285`. +> Note: If you are on a cluster, use `--cluster=none` or wait for jobs to complete +> after submitting. + ## Applying this structure to your workflows With this structure in place, you can add new **actions** to your workflow following these steps: -First, write a function `def action(*jobs)` in `actions.py`. -Then add: -```toml -[[action]] -name = "action" -command = "python actions.py --action $ACTION_NAME {directories}" -# And other relevant options -``` -to your `workflow.toml` file. +1) Write a function `def action(*jobs)` in `actions.py`. +2) Add: + ```toml + [[action]] + name = "action" + # And other relevant options + ``` + to your `workflow.toml` file. > Note: You may write functions that take only one job `def action(job)` without > modifying the given implementation of `__main__`. However, you will need to set diff --git a/doc/src/guide/python/signac-workflow.toml b/doc/src/guide/python/signac-workflow.toml index 950f4f5..67e589e 100644 --- a/doc/src/guide/python/signac-workflow.toml +++ b/doc/src/guide/python/signac-workflow.toml @@ -1,14 +1,16 @@ [workspace] value_file = "signac_statepoint.json" +[default.action] +command = "python actions.py --action $ACTION_NAME {directories}" + [[action]] name = "square" -command = "python actions.py --action $ACTION_NAME {directories}" products = ["square.out"] resources.walltime.per_directory = "00:00:01" [[action]] name = "compute_sum" -command = "python actions.py --action $ACTION_NAME {directories}" previous_actions = ["square"] resources.walltime.per_directory = "00:00:01" +group.submit_whole = true diff --git a/doc/src/guide/python/signac.md b/doc/src/guide/python/signac.md index 07d40ba..afa5382 100644 --- a/doc/src/guide/python/signac.md +++ b/doc/src/guide/python/signac.md @@ -12,7 +12,7 @@ project and add the lines: value_file = "signac_statepoint.json" ``` -That is all. Now you can use any values in your state points to form **groups**. +Now you can use any values in your state points to form **groups**. > Note: **signac** has a rich command line interface as well. You should consider using > **signac** even if you are not a Python user. diff --git a/doc/src/guide/tutorial/group-workflow2.toml b/doc/src/guide/tutorial/group-workflow2.toml index 218f925..96ac15e 100644 --- a/doc/src/guide/tutorial/group-workflow2.toml +++ b/doc/src/guide/tutorial/group-workflow2.toml @@ -5,10 +5,10 @@ value_file = "value.json" name = "process_point" command = "echo {directory}" [action.group] -include = [["/type", "equal_to", "point"]] +include = [["/type", "==", "point"]] [[action]] name = "process_letter" command = "echo {directory}" [action.group] -include = [["/type", "equal_to", "letter"]] +include = [["/type", "==", "letter"]] diff --git a/doc/src/guide/tutorial/group-workflow3.toml b/doc/src/guide/tutorial/group-workflow3.toml index 06bc686..85b162c 100644 --- a/doc/src/guide/tutorial/group-workflow3.toml +++ b/doc/src/guide/tutorial/group-workflow3.toml @@ -5,11 +5,11 @@ value_file = "value.json" name = "process_point" command = "echo {directory}" [action.group] -include = [["/type", "equal_to", "point"]] +include = [["/type", "==", "point"]] sort_by = ["/x"] [[action]] name = "process_letter" command = "echo {directory}" [action.group] -include = [["/type", "equal_to", "letter"]] +include = [["/type", "==", "letter"]] diff --git a/doc/src/guide/tutorial/group-workflow4.toml b/doc/src/guide/tutorial/group-workflow4.toml index b7ab6a0..593261f 100644 --- a/doc/src/guide/tutorial/group-workflow4.toml +++ b/doc/src/guide/tutorial/group-workflow4.toml @@ -5,7 +5,7 @@ value_file = "value.json" name = "process_point" command = "echo {directory}" [action.group] -include = [["/type", "equal_to", "point"]] +include = [["/type", "==", "point"]] sort_by = ["/x"] split_by_sort_key = true @@ -13,4 +13,4 @@ split_by_sort_key = true name = "process_letter" command = "echo {directory}" [action.group] -include = [["/type", "equal_to", "letter"]] +include = [["/type", "==", "letter"]] diff --git a/doc/src/guide/tutorial/group-workflow5.toml b/doc/src/guide/tutorial/group-workflow5.toml index a573ac9..ae3166c 100644 --- a/doc/src/guide/tutorial/group-workflow5.toml +++ b/doc/src/guide/tutorial/group-workflow5.toml @@ -5,7 +5,7 @@ value_file = "value.json" name = "process_point" command = "echo {directory}" [action.group] -include = [["/type", "equal_to", "point"]] +include = [["/type", "==", "point"]] sort_by = ["/x"] maximum_size = 4 @@ -13,4 +13,4 @@ maximum_size = 4 name = "process_letter" command = "echo {directory}" [action.group] -include = [["/type", "equal_to", "letter"]] +include = [["/type", "==", "letter"]] diff --git a/doc/src/guide/tutorial/group.md b/doc/src/guide/tutorial/group.md index 6cde54f..b0f9822 100644 --- a/doc/src/guide/tutorial/group.md +++ b/doc/src/guide/tutorial/group.md @@ -11,7 +11,7 @@ on a **group** of directories. So far, this tutorial has demonstrated small toy examples. In practice, any workflow that you need to execute on a cluster likely has hundreds or thousands of directories - each with different parameters. You could try to encode these parameters into the -directory names, but *please don't*. This quickly becomes unmanageable. Instead, you +directory names, but *please don't* - it quickly becomes unmanageable. Instead, you should include a [JSON](https://www.json.org) file in each directory that identifies its **value**. @@ -56,9 +56,9 @@ This workflow will apply the `process_point` action to the directories where `include` is an array. Each element is a length 3 array with the contents: `[JSON pointer, operator, operand]`. Think of each element as an expression. The [*JSON pointer*](../concepts/json-pointers.md) is a string that reads a particular value -from the directory's **value**. The *operator* is a comparison operator: `"equal_to", -"greater_than", or "less_than"`. The *operand* is the value to compare to. Together, -these 3 elements make a *condition*. +from the directory's **value**. The *operator* is a comparison operator: `"<"`, `"<="`, +`"=="`, `">="`, or `">"`. The *operand* is the value to compare to. Together, these 3 +elements make a *condition*. **Row** applies these *conditions* to all directories in the workspace. When all *conditions* are true, the directory is included in the action's **groups**. diff --git a/doc/src/guide/tutorial/hello.md b/doc/src/guide/tutorial/hello.md index bd2e75e..ea92640 100644 --- a/doc/src/guide/tutorial/hello.md +++ b/doc/src/guide/tutorial/hello.md @@ -54,8 +54,8 @@ Submitting 1 job that may cost up to 3 CPU-hours. Proceed? [Y/n]: ``` -The cost is 3 CPU-hours because **action** defaults to 1 CPU-hour per directory. -Later sections in this tutorial will cover resource costs in more detail. +The cost is 3 CPU-hours because **action** defaults to 1 CPU-hour per directory +(later sections in this tutorial will cover resource costs in more detail). `echo "Hello, {directory}!"` is certainly not going to take that long, so confirm with `y` and then press enter. You should then see the action execute: ```plaintext diff --git a/doc/src/guide/tutorial/multiple.md b/doc/src/guide/tutorial/multiple.md index 438e2fa..d2a8512 100644 --- a/doc/src/guide/tutorial/multiple.md +++ b/doc/src/guide/tutorial/multiple.md @@ -63,7 +63,7 @@ Execute: {{#include hello.sh:submit2}} ``` -Go ahead, run `row show status` and see if the output is what you expect. +Run `row show status` and see if the output is what you expect. ## Getting more detailed information diff --git a/doc/src/guide/tutorial/resources.md b/doc/src/guide/tutorial/resources.md index bc3a126..544503d 100644 --- a/doc/src/guide/tutorial/resources.md +++ b/doc/src/guide/tutorial/resources.md @@ -65,7 +65,7 @@ walltime.per_directory = "00:10:00" # Execute MPI parallel calculations -To launch MPI enabled applications, request more than one *process* and request the +To launch MPI enabled applications, request more than one *process* and the `"mpi"` launcher. `launchers = ["mpi"]` will add the appropriate MPI launcher prefix before your command (e.g. `srun --ntasks 16 parallel_application $directory`). diff --git a/doc/src/guide/tutorial/scheduler.md b/doc/src/guide/tutorial/scheduler.md index 63dd87d..8240242 100644 --- a/doc/src/guide/tutorial/scheduler.md +++ b/doc/src/guide/tutorial/scheduler.md @@ -112,7 +112,7 @@ pid 830675's current affinity list: 99 > your **cluster's** documentation to see specific details on how jobs are allocated > to nodes and charged for resource usage. Remember, it is **YOUR RESPONSIBILITY** (not > **row's**) to understand whether `--ntasks=1` costs 1 CPU-hour per hour or more (e.g. -> 128) CPU-hours per hour. If your cluster lacks a *shared* partition, then you need to +> 128 CPU-hours per hour). If your cluster lacks a *shared* partition, then you need to > structure your **actions** and **groups** in such a way to use all the cores you are > given or else the resources are wasted. @@ -142,7 +142,7 @@ Submit this script and see if the output is what you expect. You can also request GPUs, memory, licenses, and others. In the next section, you will learn how to use **row** to automatically generate job scripts that request **CPUs**, **GPUs**, and **time**. You can set -[`custom` submit options](../../workflow/submit-options.md) to request others. +[`custom` submit options](../../workflow/action/submit-options.md) to request others. Most **clusters** also have separate **partitions** (requested with `--partition=` for certain resources (e.g. GPU). See your **cluster's** diff --git a/doc/src/guide/tutorial/submit.md b/doc/src/guide/tutorial/submit.md index 84c45c3..515fe55 100644 --- a/doc/src/guide/tutorial/submit.md +++ b/doc/src/guide/tutorial/submit.md @@ -14,8 +14,8 @@ This section explains how to **submit** jobs to the **scheduler** with **row**. You can skip to the [next heading](#checking-your-job-script) if you are using one of these clusters. -If not, then you need to create one or two configuration files that describe your -cluster and its launchers. +If not, then you need to create a configuration files that describe your +cluster. You may also need to define launchers specific to your cluster. * [`$HOME/.config/row/clusters.toml`](../../clusters/index.md) gives your cluster a name, instructions on how to identify it, and lists the partitions your cluster @@ -97,15 +97,15 @@ row submit > If your cluster does not default to the correct account, you can set it in > `workflow.toml`: > ```toml -> [submit_options] -> .account = "" +> [default.action.submit_options.] +> account = "" > ``` ### The submitted status **Row** tracks the **Job IDs** that it submits. Every time you execute `row show status` (or just about any `row` command), it will execute `squeue` in the background to see -which jobs are still **submitted** (in any state). +which jobs are still **submitted**. Use the `row show` family of commands to query details about submitted jobs. For the `hello` workflow: diff --git a/doc/src/launchers/built-in.md b/doc/src/launchers/built-in.md index 6a90dac..0aae5ea 100644 --- a/doc/src/launchers/built-in.md +++ b/doc/src/launchers/built-in.md @@ -6,6 +6,8 @@ You may need to add new configurations for your specific cluster or adjust the ` launcher to match your system. Execute [`row show launchers`](../row/show/launchers.md) to see the current launcher configuration. +## Hybrid OpenMP/MPI + When using OpenMP/MPI hybrid applications, place `"openmp"` first in the list of launchers (`launchers = ["openmp", "mpi"]`) to generate the appropriate command: ```bash diff --git a/doc/src/launchers/index.md b/doc/src/launchers/index.md index 14715ca..fcd93a9 100644 --- a/doc/src/launchers/index.md +++ b/doc/src/launchers/index.md @@ -2,9 +2,11 @@ **Row** includes [built-in launchers](built-in.md) to enable OpenMP and MPI on the [built-in clusters](../clusters/built-in.md). You can override these configurations -and add new launchers in the file `$HOME/.config/row/launchers.toml`. It defines how -each **launcher** expands into a **command prefix**, with the possibility for specific -settings on each [**cluster**](../clusters/index.md). For example, an +and add new launchers in the file `$HOME/.config/row/launchers.toml`. + +The launcher configuration defines how each **launcher** expands into a **command +prefix**, with the possibility for specific settings on each +[**cluster**](../clusters/index.md). For example, an [**action**](../workflow/action/index.md) with the configuration: ```toml [[action]] diff --git a/doc/src/launchers/launcher.md b/doc/src/launchers/launcher.md index 3e61fff..b811106 100644 --- a/doc/src/launchers/launcher.md +++ b/doc/src/launchers/launcher.md @@ -6,7 +6,7 @@ prefix constructed from this configuration will be: {launcher.executable} [option1] [option2] ... ``` -See [Built-in launchers](built-in.md) for examples. +Execute [`row show launchers`](../row/show/launchers.md) to see examples. ## executable @@ -34,8 +34,8 @@ When `launcher.processes` is set, add the following option to the launcher prefi where `total_processes` is `n_directories * resources.processes.per_directory` or `resources.processes.per_submission` depending on the resource configuration. -It is an error when `total_processes > 1` and the action requests *no* launchers that -set `processes`. +> Note: **Row** exits with an error when `total_processes > 1` and the action requests +> *no* launchers that set `processes`. ## threads_per_process diff --git a/doc/src/row/index.md b/doc/src/row/index.md index b72e251..2a1d364 100644 --- a/doc/src/row/index.md +++ b/doc/src/row/index.md @@ -7,14 +7,14 @@ row [OPTIONS] `` must be one of: * [`init`](init.md) -* [`show`](show/index.md) * [`submit`](submit.md) +* [`show`](show/index.md) * [`scan`](scan.md) * [`clean`](clean.md)
-You should execute only one instance of row at a time for a given project. -Row maintains a cache and concurrent invocations may corrupt it. The +You should execute at most one instance of row at a time for a given +project. Row maintains a cache and concurrent invocations may corrupt it. The scan command is excepted from this rule.
diff --git a/doc/src/row/scan.md b/doc/src/row/scan.md index 7539778..13e4408 100644 --- a/doc/src/row/scan.md +++ b/doc/src/row/scan.md @@ -9,7 +9,7 @@ row scan [OPTIONS] [DIRECTORIES] [products](../workflow/action/index.md#products) and updates the cache of completed directories accordingly. -Under normal usage, you should not need to execute `row scan` manually. +Under normal usage, you should not need to execute `row scan`. [`row submit`](submit.md) automatically scans the submitted directories after it executes the action's command. diff --git a/doc/src/row/show/launchers.md b/doc/src/row/show/launchers.md index fedd069..0ade229 100644 --- a/doc/src/row/show/launchers.md +++ b/doc/src/row/show/launchers.md @@ -9,7 +9,7 @@ Print the [launchers](../../launchers/index.md) defined for the current cluster cluster given in `--cluster`). The output is TOML formatted. This includes the user-provided launchers in [`launchers.toml`](../../launchers/index.md) -and the built-in launchers (or the user-provided overrides). +and the built-in launchers. ## `[OPTIONS]` diff --git a/doc/src/row/submit.md b/doc/src/row/submit.md index 66a58df..8b15404 100644 --- a/doc/src/row/submit.md +++ b/doc/src/row/submit.md @@ -8,7 +8,11 @@ row submit [OPTIONS] [DIRECTORIES] `row submit` submits jobs to the scheduler. First it determines the [status](../guide/concepts/status.md) of all the given directories for the selected actions. Then it forms [groups](../workflow/action/group.md) and submits one job for -each group. Pass `--dry-run` to see the script(s) that will be submitted. +each group. Pass `--dry-run` to see the script(s) that will be submitted. Execute +``` +row show directories action --eligible +``` +to see the specific directory groups that will be submitted. ## `[DIRECTORIES]` diff --git a/doc/src/signac-flow.md b/doc/src/signac-flow.md index ce9915f..19cb1bc 100644 --- a/doc/src/signac-flow.md +++ b/doc/src/signac-flow.md @@ -8,13 +8,14 @@ Concepts: | flow | row | |------|-----| | *job* | *directory* | +| *cluster job* | *job* | | *statepoint* | *value* | | *operation* | [`action`](workflow/action/index.md) in `workflow.toml`| | *group* | A command may perform multiple steps. | | *label* | Not implemented. | | *hooks* | Not implemented. | | *environments* | [`clusters.toml`](clusters/index.md) | -| `project.py` | [`workflow.toml`](workflow/index.md) and [`actions.py`](guide/python/actions.md) | +| `project.py` | [`workflow.toml`](workflow/index.md) combined with [`actions.py`](guide/python/actions.md) | Commands: | flow | row | @@ -22,20 +23,19 @@ Commands: | `project.py status` | [`row show status`](row/show/status.md) | | `project.py status --detailed` | [`row show directories `](row/show/directories.md) | | `project.py run` | [`row submit --cluster=none`](row/submit.md) | -| `project.py run --parallel` | A command may execute groups in parallel. | +| `project.py run --parallel` | A command *may* execute [group members][group] in [parallel]. | | `project.py exec ...` | Execute your action's command in the shell. | | `project.py submit` | [`row submit`](row/submit.md) | -| `project.py submit --partition ` | `row submit` automatically selects appropriate partitions. | +| `project.py submit --partition ` | `row submit` *automatically* selects appropriate partitions. | | `project.py submit -n ` | [`row submit -n `](row/submit.md) | | `project.py submit --pretend` | [`row submit --dry-run`](row/submit.md) | -| `project.py submit --bundle ` | [`group`](workflow/action/group.md) in `workflow.toml` | -| `project.py submit --bundle --parallel` | A command may execute groups in parallel. | +| `project.py submit --bundle ` | [`group`][group] in `workflow.toml` | +| `project.py submit --bundle --parallel` | A command *may* execute [group members][group] in [parallel]. | | `project.py submit -o ` | [`row submit --action `](row/submit.md) | | `project.py -j [JOB_ID1] [JOB_ID2] ...` | `row [JOB_ID1] [JOB_ID2] ...` | | `project.py -j a1234` | `cd workspace; row a1234` | | `project.py -f ` | `row $(signac find )` | - Conditions: | flow | row | |------|-----| @@ -44,10 +44,10 @@ Conditions: | precondition: `after` | [`previous_actions`](workflow/action/index.md#previous_actions) | | precondition: state point comparison | [`include`](workflow/action/group.md#include) | | precondition: others | Not implemented. | -| aggregation | [`group`](workflow/action/group.md) in `workflow.toml` | +| aggregation | [`group`][group] in `workflow.toml` | | aggregation: `select` | [`include`](workflow/action/group.md#include) | -| aggregation: `sort_by` | [`sort_by`](workflow/action/group.md#sort_by) | -| aggregation: `groupby` | `sort_by` and [`split_by_sort_key=true`](workflow/action/group.md#split_by_sort_key) | +| aggregation: `sort_by` | [`sort_by`] | +| aggregation: `groupby` | [`sort_by`] and [`split_by_sort_key=true`](workflow/action/group.md#split_by_sort_key) | | aggregation: `groupsof` | [`maximum_size`](workflow/action/group.md#maximum_size) | Execution: @@ -58,4 +58,8 @@ Execution: | directives: `np`, `ngpu`, `omp_num_threads`, `walltime` | [resources](workflow/action/resources.md) in `workflow.toml` | | directives: Launch with MPI | [`launchers`](workflow/action/index.md#launchers) `= ["mpi"]` | | directives: Launch with OpenMP | [`launchers`](workflow/action/index.md#launchers) `= ["openmp"]` | -| template job script: `script.sh` | [`submit_options`](workflow/submit-options.md) in `workflow.toml` | +| template job script: `script.sh` | [`submit_options`](workflow/action/submit-options.md) in `workflow.toml` | + +[group]: workflow/action/group.md +[parallel]: guide/concepts/thread-parallelism.md +[`sort_by`]: workflow/action/group.md#sort_by diff --git a/doc/src/workflow/action/group.md b/doc/src/workflow/action/group.md index e4109d0..e9a7a13 100644 --- a/doc/src/workflow/action/group.md +++ b/doc/src/workflow/action/group.md @@ -6,7 +6,7 @@ that it submits. Example: ```toml [action.group] -include = [["/subproject", "equal_to", "project_one"]] +include = [["/subproject", "==", "project_one"]] sort_by = ["/value"] split_by_sort_key = true maximum_size = 16 @@ -25,24 +25,23 @@ groups of directories included in a given action. all be true for a directory to be included in this group. Each condition is an **array** of three elements: The *JSON pointer*, *the operator*, and the *operand*. The [JSON pointer](../../guide/concepts/json-pointers.md) points to a specific element -from the directory's value. The operator may be `"less_than"`, `"greater_than"`, or -`"equal_to"`. +from the directory's value. The operator may be `"<"`, `"<="`, `"=="`, `">="`, or `">"`. For example, select all directories where a value is in the given range: ```toml -include = [["/value, "less_than", 0.9], ["/value", "greater_than", 0.2]] +include = [["/value", ">", 0.2], ["/value", "<", 0.9]] ``` Choose directories where an array element is equal to a specific value: ```toml -include = [["/array/1", "equal_to", 12]] +include = [["/array/1", "==", 12]] ``` Match against strings: ```toml -include = [["/map/name", "equal_to", "string"]] +include = [["/map/name", "==", "string"]] ``` Compare by array: ```toml -include = [["/array", "eqal_to", [1, "string", 14.0]]] +include = [["/array", "==", [1, "string", 14.0]]] ``` Both operands **must** have the same data type. The JSON pointer must be present in the @@ -61,8 +60,8 @@ pointers to specific keys in objects. `action.group.sort_by`: **array** of **strings** - An array of [JSON pointers](../../guide/concepts/json-pointers.md) to elements of each directory's -value. **Row** will sort directories matched by `include` by these quantities -*lexicographically*. For example, +value. **Row** will sort directories by these quantities *lexicographically*. For +example, ```toml action.group.sort_by = ["/a", "/b"] ``` @@ -100,10 +99,9 @@ by `include` are placed in a single group. `action.group.maximum_size`: **integer** - Maximum size of a group. -**Row** further splits the groups into smaller groups up to the given `maximum_size`. -When the number of directories is not evenly divisible by `maximum_size`, **row** -creates the first **n** groups with `maximum_size` elements and places one remainder -group at the end. +Split included directories into groups up to the given `maximum_size`. When the number +of directories is not evenly divisible by `maximum_size`, **row** creates the first +**n** groups with `maximum_size` elements and places one remainder group at the end. For example, with `maximum_size = 2` the directories: `[dir1, dir2, dir3, dir4, dir5]` diff --git a/doc/src/workflow/action/index.md b/doc/src/workflow/action/index.md index d48b8d8..03eb218 100644 --- a/doc/src/workflow/action/index.md +++ b/doc/src/workflow/action/index.md @@ -1,6 +1,6 @@ # action -`action` is an **array** where each element defines one action in your workflow. +Each element in the `action` **array** defines one action in your workflow. Examples: ```toml @@ -15,7 +15,8 @@ command = "python action_two.py {directories}" previous_actions = ["action_one"] products = ["two.data", "log.txt"] launchers = ["openmp", "mpi"] -action.group.maximum_size = 8 +[action.group] +maximum_size = 8 [action.resources] processes.per_directory = 16 threads_per_process = 4 @@ -25,7 +26,13 @@ walltime.per_submission = "04:00:00" ## name `action.name`: **string** - The action's name. You must set a name for each -action. +action. The name may be set by [from](#from). + +> Note: Two or more conceptually identical elements in the actions array *may* have +> the same name. All elements with the same name **must** have identical +> [`products`](#products) and [`previous_actions`](#previous_actions). All elements +> with the same name **must also** select non-intersecting subsets of directories with +> [`group.include`](group.md#include). ## command @@ -62,7 +69,7 @@ command = "echo Message && python action.py {directory}" `action.launchers`: **array** of **strings** - The launchers to apply when executing a command. A launcher is a prefix placed before the command in the submission script. The -cluster configuration [`clusters.toml`](../../clusters/index.md) defines what launchers +launcher configuration [`lauchers.toml`](../../launchers/index.md) defines what launchers are available on each cluster and how they are invoked. The example for `action_two` above (`launchers = ["openmp", "mpi"]`) would expand into something like: ```bash @@ -82,3 +89,37 @@ must *all* be completed before this action may be executed. When omitted, action produces in the directory. When *all* products are present, that directory has *completed* the action. When omitted, `products` defaults to an empty array. + +## `[group]` + +See [group](group.md). + +## `[resources]` + +See [resources](resources.md). + +## `[submit_options]` + +See [submit_options](submit-options.md). + +## from + +`action.from`: **string** - Name of the **action** to copy settings from. + +Every key in an `[[action]]` table (including sub-keys in `[action.group]`, +`[action.resources]`, and `[action.submit_options]`) may be set in one of 3 locations: + +1. This action: `action.key[.sub_key]`. +2. The action named by `from`: `action_from.key[.sub_key]` (when `action.from` is set). +3. The default action: `default.action.key[.sub_key]`. + +The action will take on the value set in the **first** location that does not omit +the key. When all 3 locations omit the key, the "when omitted" behavior takes effect +(documented separately for each key). + +`from` is a convenient way to [submit the same action to different groups/resources]. + +> Note: `name` and `command` may be provided by `from` or `action.default` but may not +> be omitted entirely. + +[submit the same action to different groups/resources]: ../../guide/howto/same.md diff --git a/doc/src/workflow/action/resources.md b/doc/src/workflow/action/resources.md index aa86889..7080eac 100644 --- a/doc/src/workflow/action/resources.md +++ b/doc/src/workflow/action/resources.md @@ -16,8 +16,7 @@ walltime.per_submission = "04:00:00" `action.resources.processes`: **table** - Set the number of processes this action will execute on (launched by `mpi` or similarly capable launcher). The table **must** -have one of two keys: `per_submission` or `per_directory` which both have **integer** -values. +have one of two keys: `per_submission` or `per_directory`. Examples: ```toml @@ -27,7 +26,7 @@ processes.per_submission = 16 processes.per_directory = 8 ``` -When set to `per_submission`, **row** always asks the scheduler to allocate the given +When set to `per_submission`, **row** asks the scheduler to allocate the given number of processes for each job. When set to `per_directory`, **row** requests the given value multiplied by the number of directories in the submission group. Use `per_submission` when your action loops over directories and reuses the same processes @@ -53,10 +52,9 @@ from the scheduler. Most schedulers default to 0 GPUs per process in this case. ## walltime `action.resources.walltime`: **table** - Set the walltime that this action takes to -execute. The table **must** have one of two keys: `per_submission` or `per_directory` -which both have **string** values. Valid walltime strings include `"HH:MM:SS"`, `"D -days, HH:MM:SS"`, and all other valid `Duration` formats parsed by -[speedate](https://docs.rs/speedate/latest/speedate/). +execute. The table **must** have one of two keys: `per_submission` or `per_directory`. +Valid walltime strings include `"HH:MM:SS"`, `"D days, HH:MM:SS"`, and all other valid +`Duration` formats parsed by [speedate](https://docs.rs/speedate/latest/speedate/). Examples: ```toml @@ -66,12 +64,11 @@ walltime.per_submission = "4 days, 12:00:00" walltime.per_directory = "00:10:00" ``` -When set to `per_submission`, **row** always asks the scheduler to allocate the given -walltime for each job. When set to `per_directory`, **row** requests the given value -multiplied by the number of directories in the submission group. Use `per_submission` -when your action parallelizes over directories and therefore takes the same amount of -time independent of the submission group size. Use `per_directory` when your action -loops over the directories and therefore the walltime scales with the number of -directories. +When set to `per_submission`, **row** asks the scheduler to allocate the given walltime +for each job. When set to `per_directory`, **row** requests the given value multiplied +by the number of directories in the submission group. Use `per_submission` when your +action parallelizes over directories and therefore takes the same amount of time +independent of the submission group size. Use `per_directory` when your action loops +over the directories and therefore the walltime scales with the number of directories. When omitted, `walltime` defaults to `per_directories = 01:00:00`. diff --git a/doc/src/workflow/action/submit-options.md b/doc/src/workflow/action/submit-options.md index c4ccd5c..c1d7a40 100644 --- a/doc/src/workflow/action/submit-options.md +++ b/doc/src/workflow/action/submit-options.md @@ -1,12 +1,56 @@ # submit_options -`action.submit_options`: **table** - Override the global cluster-specific -submission options with values specific to this action. Any key that can be set -in the global [`submit_options.`](../submit-options.md) can be overridden in -`action.submit_options.`. +`action.submit_options`: **table** - sets the cluster-specific submission options. Keys +in `submit_options` must be one of the named clusters defined in + [`clusters.toml`](../../clusters/index.md). Example: ```toml [action.submit_options.cluster1] -setup = "echo Executing action on cluster1..." +account = "my_account" +setup = """ +module reset +module load cuda +""" +custom = ["--mail-user=user@example.com"] +partition = "shared" + +[action.submit_options.cluster2] +account = "other_account" +setup = "module load openmpi" ``` + +> Note: You may omit `[submit_options]` entirely. + +## `.account` + +`action.submit_options..account`: **string** - Submit jobs to this account on +cluster ``. When you omit `account`, **row** does not add the `--account=` line +to the submission script. + +## `.setup` + +`action.submit_options..setup`: **string** - Lines to include in the submission +script on cluster ``. The setup is executed *before* the action's command. You may +omit `setup` to leave this portion of the script blank. + +## `.custom` + +`action.submit_options..custom`: **array** of **strings** - List of additional +command line options to pass to the batch submission script on cluster ``. For +example. `custom = ["--mail-user=user@example.com"]` will add the line +``` +#SBATCH --mail-user=user@example.com +``` +to the top of a SLURM submission script. `custom` defaults to an empty array when +omitted. + +## `.partition` + +`action.submit_options..partition`: **string** - Force the use of a particular +partition when submitting jobs to the queue on cluster ``. When omitted, **row** +will automatically determine the correct partition based on the configuration in +[`clusters.toml`](../../clusters/index.md). + +> Note: You should almost always omit `partition`. Set it *only* when your action +> **requires** a *specialty* partition that is not automatically selected. diff --git a/doc/src/workflow/default.md b/doc/src/workflow/default.md new file mode 100644 index 0000000..20de170 --- /dev/null +++ b/doc/src/workflow/default.md @@ -0,0 +1,31 @@ +# default + +The `default` table sets default options. + +Example: + +```toml +[default.action.submit_options.cluster1] +account = "my_account" +``` + +## action + +`default.action`: **table** - accepts *any* key that is valid in +an [action array element](action/index.md). When an action array element omits a key, +the default key is used. When both the action **and** the default action omit a key, +the individually documented "when omitted" behavior takes effect. + +> Note: This rule applies to all sub-keys as well. For example: +> ```toml +> [default.action.resources] +> processes.per_submission = 8 +> walltime.per_directory = "02:00:00" +> +> [[action]] +> name = "action" +> command = "command {directory}" +> resources.processes.per_submission = 16 +> ``` +> Will result in an action that sets `processes.per_submission == 16` and +> `walltime.per_directory == "02:00:00"`. diff --git a/doc/src/workflow/index.md b/doc/src/workflow/index.md index 9b2dbd9..c36395a 100644 --- a/doc/src/workflow/index.md +++ b/doc/src/workflow/index.md @@ -1,7 +1,7 @@ # workflow.toml The file `workflow.toml` defines the [workspace](workspace.md), -[actions](action/index.md), and [submission options](submit-options.md). Place +[actions](action/index.md), and [default](default.md) values. Place `workflow.toml` in a directory to identify it as a **row** *project*. The [`row` command line tool](../row/index.md) will identify the current project by finding `workflow.toml` in the current working directory or any parent directory, diff --git a/doc/src/workflow/submit-options.md b/doc/src/workflow/submit-options.md deleted file mode 100644 index 0464344..0000000 --- a/doc/src/workflow/submit-options.md +++ /dev/null @@ -1,57 +0,0 @@ -# submit_options - -The `submit_options` table sets the default cluster-specific submission options for all -*actions*. You can set action-specific submission options in -[`action.submit_options`](action/submit-options.md). Keys in `submit_options` must be -one of the named clusters defined in [`clusters.toml`](../clusters/index.md). - -Example: -```toml -[submit_options.cluster1] -account = "my_account" -setup = """ -module reset -module load cuda -""" -custom = ["--mail-user=user@example.com"] -partition = "shared" - -[submit_options.cluster2] -account = "other_account" -setup = "module load openmpi" -``` - -> Note: You may omit `[submit_options]` entirely. - -## account - -`submit_options..account`: **string** - Submit jobs to this account on cluster -``. When you omit `account`, **row** does not add the `--account=` line to the -submission script. - -## setup - -`submit_options..setup`: **string** - Lines to include in the submission script on -cluster ``. The setup is executed *before* the action's command. You may omit -`setup` to leave this portion of the script blank. - -## custom - -`submit_options..custom`: **array** of **strings** - List of additional command -line options to pass to the batch submission script on cluster ``. For example. -`custom = ["--mail-user=user@example.com"]` will add the line -``` -#SBATCH --mail-user=user@example.com -``` -to the top of a SLURM submission script. `custom` defaults to an empty array when -omitted. - -## partition - -`submit_options..partition`: **string** - Force the use of a particular partition -when submitting jobs to the queue on cluster `. When omitted, **row** -will automatically determine the correct partition based on the configuration in -[`clusters.toml`](../clusters/index.md). - -> Note: You should almost always omit `partition`. Set it *only* when you need a -> specialty partition that is not automatically selected. diff --git a/src/cli/directories.rs b/src/cli/directories.rs index 69aecfc..631fa07 100644 --- a/src/cli/directories.rs +++ b/src/cli/directories.rs @@ -69,25 +69,6 @@ pub fn directories( ) -> Result<(), Box> { debug!("Showing directories."); - let mut project = Project::open(options.io_threads, &options.cluster, multi_progress)?; - - let query_directories = - cli::parse_directories(args.directories, || Ok(project.state().list_directories()))?; - - let action = project - .workflow() - .action_by_name(&args.action) - .ok_or_else(|| row::Error::ActionNotFound(args.action))?; - - let matching_directories = - project.find_matching_directories(action, query_directories.clone())?; - - let status = project.separate_by_status(action, matching_directories.clone())?; - let completed = HashSet::::from_iter(status.completed.clone()); - let submitted = HashSet::::from_iter(status.submitted.clone()); - let eligible = HashSet::::from_iter(status.eligible.clone()); - let waiting = HashSet::::from_iter(status.waiting.clone()); - // Show directories with selected statuses. let mut show_completed = args.completed; let mut show_submitted = args.submitted; @@ -100,21 +81,15 @@ pub fn directories( show_waiting = true; } - let mut selected_directories = Vec::with_capacity(matching_directories.len()); - if show_completed { - selected_directories.extend(status.completed); - } - if show_submitted { - selected_directories.extend(status.submitted); - } - if show_eligible { - selected_directories.extend(status.eligible); - } - if show_waiting { - selected_directories.extend(status.waiting); - } + let mut project = Project::open(options.io_threads, &options.cluster, multi_progress)?; - let groups = project.separate_into_groups(action, selected_directories)?; + let query_directories = + cli::parse_directories(args.directories, || Ok(project.state().list_directories()))?; + + project + .workflow() + .action_by_name(&args.action) + .ok_or_else(|| row::Error::ActionNotFound(args.action.clone()))?; let mut table = Table::new().with_hide_header(args.no_header); table.header = vec![ @@ -132,73 +107,106 @@ pub fn directories( .push(Item::new(pointer.clone(), Style::new().underlined())); } - for (group_idx, group) in groups.iter().enumerate() { - if let Some(n) = args.n_groups { - if group_idx >= n { - break; - } + for action in &project.workflow().action { + if action.name() != args.action { + continue; } - for directory in group { - // Format the directory status. - let status = if completed.contains(directory) { - Item::new("completed".to_string(), Style::new().green().italic()) - } else if submitted.contains(directory) { - Item::new("submitted".to_string(), Style::new().yellow().italic()) - } else if eligible.contains(directory) { - Item::new("eligible".to_string(), Style::new().blue().italic()) - } else if waiting.contains(directory) { - Item::new("waiting".to_string(), Style::new().cyan().dim().italic()) - } else { - panic!("Directory not found in status.") - }; - - let mut row = Vec::new(); - - // The directory name - row.push(Item::new( - directory.display().to_string(), - Style::new().bold(), - )); - - // Status - row.push(status); - - // Job ID - if show_submitted || show_completed { - let submitted = project.state().submitted(); - - // Values - if let Some((cluster, job_id)) = - submitted.get(&action.name).and_then(|d| d.get(directory)) - { - row.push(Item::new(format!("{cluster}/{job_id}"), Style::new())); - } else { - row.push(Item::new(String::new(), Style::new())); + let matching_directories = + project.find_matching_directories(action, query_directories.clone())?; + + let status = project.separate_by_status(action, matching_directories.clone())?; + let completed = HashSet::::from_iter(status.completed.clone()); + let submitted = HashSet::::from_iter(status.submitted.clone()); + let eligible = HashSet::::from_iter(status.eligible.clone()); + let waiting = HashSet::::from_iter(status.waiting.clone()); + + let mut selected_directories = Vec::with_capacity(matching_directories.len()); + if show_completed { + selected_directories.extend(status.completed); + } + if show_submitted { + selected_directories.extend(status.submitted); + } + if show_eligible { + selected_directories.extend(status.eligible); + } + if show_waiting { + selected_directories.extend(status.waiting); + } + + let groups = project.separate_into_groups(action, selected_directories)?; + + for (group_idx, group) in groups.iter().enumerate() { + if let Some(n) = args.n_groups { + if group_idx >= n { + break; } } - for pointer in &args.value { - if !pointer.is_empty() && !pointer.starts_with('/') { - warn!("The JSON pointer '{pointer}' does not appear valid. Did you mean '/{pointer}'?"); + for directory in group { + // Format the directory status. + let status = if completed.contains(directory) { + Item::new("completed".to_string(), Style::new().green().italic()) + } else if submitted.contains(directory) { + Item::new("submitted".to_string(), Style::new().yellow().italic()) + } else if eligible.contains(directory) { + Item::new("eligible".to_string(), Style::new().blue().italic()) + } else if waiting.contains(directory) { + Item::new("waiting".to_string(), Style::new().cyan().dim().italic()) + } else { + panic!("Directory not found in status.") + }; + + let mut row = Vec::new(); + + // The directory name + row.push(Item::new( + directory.display().to_string(), + Style::new().bold(), + )); + + // Status + row.push(status); + + // Job ID + if show_submitted || show_completed { + let submitted = project.state().submitted(); + + // Values + if let Some((cluster, job_id)) = + submitted.get(action.name()).and_then(|d| d.get(directory)) + { + row.push(Item::new(format!("{cluster}/{job_id}"), Style::new())); + } else { + row.push(Item::new(String::new(), Style::new())); + } + } + + for pointer in &args.value { + if !pointer.is_empty() && !pointer.starts_with('/') { + warn!("The JSON pointer '{pointer}' does not appear valid. Did you mean '/{pointer}'?"); + } + + let value = project.state().values()[directory] + .pointer(pointer) + .ok_or_else(|| { + row::Error::JSONPointerNotFound(directory.clone(), pointer.clone()) + })?; + row.push( + Item::new(value.to_string(), Style::new()).with_alignment(Alignment::Right), + ); } - let value = project.state().values()[directory] - .pointer(pointer) - .ok_or_else(|| { - row::Error::JSONPointerNotFound(directory.clone(), pointer.clone()) - })?; - row.push( - Item::new(value.to_string(), Style::new()).with_alignment(Alignment::Right), - ); + table.rows.push(Row::Items(row)); } - table.rows.push(Row::Items(row)); + if !args.no_separate_groups && group_idx != groups.len() - 1 { + table.rows.push(Row::Separator); + } } - if !args.no_separate_groups && group_idx != groups.len() - 1 { - table.rows.push(Row::Separator); - } + table.rows.push(Row::Separator); } table.write(output)?; diff --git a/src/cli/scan.rs b/src/cli/scan.rs index 544fd15..9e1a150 100644 --- a/src/cli/scan.rs +++ b/src/cli/scan.rs @@ -53,14 +53,14 @@ pub fn scan( let mut matching_action_count = 0; for action in workflow.action { if let Some(selection) = args.action.as_ref() { - if selection != &action.name { - complete.remove(&action.name); + if selection != action.name() { + complete.remove(action.name()); continue; } } trace!( "Including complete directories for action '{}'.", - action.name + action.name() ); matching_action_count += 1; diff --git a/src/cli/status.rs b/src/cli/status.rs index b7a1e99..07085b1 100644 --- a/src/cli/status.rs +++ b/src/cli/status.rs @@ -105,10 +105,10 @@ pub fn status( let mut matching_action_count = 0; for action in &project.workflow().action { - if !action_matcher.matches(&action.name) { + if !action_matcher.matches(action.name()) { trace!( "Skipping action '{}'. It does not match the pattern '{}'.", - action.name, + action.name(), args.action ); continue; @@ -136,7 +136,7 @@ pub fn status( table .rows - .push(Row::Items(make_row(&action.name, &status, &cost))); + .push(Row::Items(make_row(action.name(), &status, &cost))); } if matching_action_count == 0 { diff --git a/src/cli/submit.rs b/src/cli/submit.rs index bb07528..4774cbc 100644 --- a/src/cli/submit.rs +++ b/src/cli/submit.rs @@ -7,6 +7,7 @@ use indicatif::HumanCount; use log::{debug, info, trace, warn}; use signal_hook::consts::{SIGINT, SIGTERM}; use signal_hook::flag; +use std::collections::HashSet; use std::error::Error; use std::io::prelude::*; use std::io::{self, IsTerminal}; @@ -65,14 +66,15 @@ pub fn submit( }; let mut matching_action_count = 0; + let mut action_directory_set = HashSet::new(); let mut action_groups: Vec<(&Action, Vec>)> = Vec::with_capacity(project.workflow().action.len()); for action in &project.workflow().action { - if !action_matcher.matches(&action.name) { + if !action_matcher.matches(action.name()) { trace!( "Skipping action '{}'. It does not match the pattern '{}'.", - action.name, + action.name(), args.action ); continue; @@ -86,17 +88,29 @@ pub fn submit( let status = project.separate_by_status(action, matching_directories)?; let groups = project.separate_into_groups(action, status.eligible)?; - if action.group.submit_whole { + if action.group.submit_whole() { let whole_groups = project.separate_into_groups(action, project.state().list_directories())?; for group in &groups { if !whole_groups.contains(group) { return Err(Box::new(row::Error::PartialGroupSubmission( - action.name.clone(), + action.name().into(), ))); } } } + + for group in &groups { + for directory in group { + if !action_directory_set.insert((action.name.clone(), directory.clone())) { + return Err(Box::new(row::Error::WouldSubmitMultipleTimes( + directory.clone(), + action.name().into(), + ))); + } + } + } + action_groups.push((&action, groups)); } @@ -129,7 +143,7 @@ pub fn submit( job_count, if job_count == 1 { "job" } else { "jobs" }, cost, - action.name + action.name() ); } total_cost = total_cost + cost; @@ -231,7 +245,7 @@ pub fn submit( "[{}/{}] Submitting action '{}' on directory {}", HumanCount((index + 1) as u64), HumanCount(action_directories.len() as u64), - style(action.name.clone()).blue(), + style(action.name().to_string()).blue(), style(directories[0].display().to_string()).bold() ); if directories.len() > 1 { @@ -257,7 +271,7 @@ pub fn submit( } Ok(Some(job_id)) => { println!("Row submitted job {job_id}."); - project.add_submitted(&action.name, directories, job_id); + project.add_submitted(action.name(), directories, job_id); continue; } Ok(None) => continue, diff --git a/src/cluster.rs b/src/cluster.rs index e47a071..396f481 100644 --- a/src/cluster.rs +++ b/src/cluster.rs @@ -446,7 +446,7 @@ mod tests { let partition = Partition::default(); let resources = Resources { - processes: Processes::PerDirectory(1), + processes: Some(Processes::PerDirectory(1)), threads_per_process: Some(2), gpus_per_process: Some(3), ..Resources::default() @@ -461,7 +461,7 @@ mod tests { setup(); let resources = Resources { - processes: Processes::PerDirectory(1), + processes: Some(Processes::PerDirectory(1)), threads_per_process: Some(2), gpus_per_process: Some(3), ..Resources::default() @@ -564,12 +564,12 @@ mod tests { }; let cpu_resources = Resources { - processes: Processes::PerDirectory(1), + processes: Some(Processes::PerDirectory(1)), ..Resources::default() }; let gpu_resources = Resources { - processes: Processes::PerDirectory(1), + processes: Some(Processes::PerDirectory(1)), gpus_per_process: Some(1), ..Resources::default() }; diff --git a/src/expr.rs b/src/expr.rs index f12c3b8..a6fbb70 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -61,9 +61,11 @@ pub(crate) fn evaluate_json_comparison( ) -> Option { #[allow(clippy::match_same_arms)] match (comparison, partial_cmp_json_values(a, b)) { + (Comparison::LessThan, Some(Ordering::Less)) => Some(true), + (Comparison::LessThanOrEqualTo, Some(Ordering::Less | Ordering::Equal)) => Some(true), (Comparison::EqualTo, Some(Ordering::Equal)) => Some(true), + (Comparison::GreaterThanOrEqualTo, Some(Ordering::Greater | Ordering::Equal)) => Some(true), (Comparison::GreaterThan, Some(Ordering::Greater)) => Some(true), - (Comparison::LessThan, Some(Ordering::Less)) => Some(true), (_, None) => None, (_, _) => Some(false), } @@ -163,6 +165,22 @@ mod tests { evaluate_json_comparison(&Comparison::EqualTo, &Value::from(5), &Value::from(5)), Some(true) ); + assert_eq!( + evaluate_json_comparison( + &Comparison::GreaterThanOrEqualTo, + &Value::from(5), + &Value::from(5) + ), + Some(true) + ); + assert_eq!( + evaluate_json_comparison( + &Comparison::LessThanOrEqualTo, + &Value::from(5), + &Value::from(5) + ), + Some(true) + ); assert_eq!( evaluate_json_comparison(&Comparison::EqualTo, &Value::from(5), &Value::from(10)), Some(false) @@ -171,9 +189,41 @@ mod tests { evaluate_json_comparison(&Comparison::GreaterThan, &Value::from(5), &Value::from(10)), Some(false) ); + assert_eq!( + evaluate_json_comparison( + &Comparison::GreaterThanOrEqualTo, + &Value::from(5), + &Value::from(10) + ), + Some(false) + ); + assert_eq!( + evaluate_json_comparison( + &Comparison::GreaterThanOrEqualTo, + &Value::from(6), + &Value::from(5) + ), + Some(true) + ); assert_eq!( evaluate_json_comparison(&Comparison::LessThan, &Value::from(5), &Value::from(10)), Some(true) ); + assert_eq!( + evaluate_json_comparison( + &Comparison::LessThanOrEqualTo, + &Value::from(5), + &Value::from(10) + ), + Some(true) + ); + assert_eq!( + evaluate_json_comparison( + &Comparison::LessThanOrEqualTo, + &Value::from(5), + &Value::from(4) + ), + Some(false) + ); } } diff --git a/src/launcher.rs b/src/launcher.rs index 4cd87b1..41bf747 100644 --- a/src/launcher.rs +++ b/src/launcher.rs @@ -268,14 +268,14 @@ mod tests { assert_eq!(mpi.prefix(&one_proc, 1), "mpirun -n 1 "); let procs_per_directory = Resources { - processes: Processes::PerDirectory(2), + processes: Some(Processes::PerDirectory(2)), ..Resources::default() }; assert_eq!(mpi.prefix(&procs_per_directory, 11), "mpirun -n 22 "); assert_eq!(mpi.prefix(&procs_per_directory, 1), "mpirun -n 2 "); let all = Resources { - processes: Processes::PerDirectory(6), + processes: Some(Processes::PerDirectory(6)), threads_per_process: Some(3), gpus_per_process: Some(8), ..Resources::default() @@ -297,14 +297,14 @@ mod tests { assert_eq!(mpi.prefix(&one_proc, 1), "srun --ntasks=1 "); let procs_per_directory = Resources { - processes: Processes::PerDirectory(2), + processes: Some(Processes::PerDirectory(2)), ..Resources::default() }; assert_eq!(mpi.prefix(&procs_per_directory, 11), "srun --ntasks=22 "); assert_eq!(mpi.prefix(&procs_per_directory, 1), "srun --ntasks=2 "); let all = Resources { - processes: Processes::PerDirectory(6), + processes: Some(Processes::PerDirectory(6)), threads_per_process: Some(3), gpus_per_process: Some(8), ..Resources::default() diff --git a/src/lib.rs b/src/lib.rs index 12ff797..bf8b6d3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -105,9 +105,6 @@ pub enum Error { PostcardSerialize(PathBuf, #[source] postcard::Error), // workflow errors - #[error("Found duplicate action definition '{0}'.")] - DuplicateAction(String), - #[error("Previous action '{0}' not found in action '{1}'.")] PreviousActionNotFound(String, String), @@ -129,6 +126,27 @@ pub enum Error { #[error("Cannot compare {0} and {1} while checking directory '{2}'.")] CannotCompareInclude(Value, Value, PathBuf), + #[error("Action at index {0} is missing `name`.")] + ActionMissingName(usize), + + #[error("Action '{0}' is missing `command`.")] + ActionMissingCommand(String), + + #[error("Default action must not set `from`.")] + DefaultActionSetsFrom(), + + #[error("Action '{0}' set in `from` not found.")] + FromActionNotFound(String), + + #[error("Cannot resolve recursive `from={0}`.")] + RecursiveFrom(String), + + #[error("Duplicate actions '{0}' must have the same `products`.")] + DuplicateActionsDifferentProducts(String), + + #[error("Duplicate actions '{0}' must have the same `previous_actions`.")] + DuplicateActionsDifferentPreviousActions(String), + // submission errors #[error("Error encountered while executing action '{0}': {1}.")] ExecuteAction(String, String), @@ -145,6 +163,9 @@ pub enum Error { #[error("Interrupted")] Interrupted, + #[error("'{0}' would be submitted multiple times in action '{1}'.\nCheck that duplicate actions include non-overlapping groups.")] + WouldSubmitMultipleTimes(PathBuf, String), + // launcher errors #[error("Launcher '{0}' does not contain a default configuration")] LauncherMissingDefault(String), diff --git a/src/project.rs b/src/project.rs index 38a0798..96d1d24 100644 --- a/src/project.rs +++ b/src/project.rs @@ -177,14 +177,14 @@ impl Project { ) -> Result, Error> { trace!( "Finding directories that action '{}' includes.", - &action.name + action.name() ); let mut matching_directories = Vec::with_capacity(directories.len()); 'outer: for name in directories { if let Some(value) = self.state.values().get(&name) { - for (include, comparison, expected) in &action.group.include { + for (include, comparison, expected) in action.group.include() { let actual = value .pointer(include) .ok_or_else(|| Error::JSONPointerNotFound(name.clone(), include.clone()))?; @@ -230,7 +230,7 @@ impl Project { trace!( "Separating {} directories by status for '{}'.", directories.len(), - action.name + action.name() ); let capacity = directories.capacity(); let mut status = Status { @@ -247,12 +247,12 @@ impl Project { let completed = self.state.completed(); - if completed[&action.name].contains(&directory_name) { + if completed[action.name()].contains(&directory_name) { status.completed.push(directory_name); - } else if self.state.is_submitted(&action.name, &directory_name) { + } else if self.state.is_submitted(action.name(), &directory_name) { status.submitted.push(directory_name); } else if action - .previous_actions + .previous_actions() .iter() .all(|a| completed[a].contains(&directory_name)) { @@ -282,7 +282,7 @@ impl Project { trace!( "Separating {} directories into groups for '{}'.", directories.len(), - action.name + action.name() ); if directories.is_empty() { @@ -302,7 +302,7 @@ impl Project { .ok_or_else(|| Error::DirectoryNotFound(directory_name.clone()))?; let mut sort_key = Vec::new(); - for pointer in &action.group.sort_by { + for pointer in action.group.sort_by() { let element = value.pointer(pointer).ok_or_else(|| { Error::JSONPointerNotFound(directory_name.clone(), pointer.clone()) })?; @@ -313,8 +313,8 @@ impl Project { // Sort by key when there are keys to sort by. let mut result = Vec::new(); - if action.group.sort_by.is_empty() { - if action.group.reverse_sort { + if action.group.sort_by().is_empty() { + if action.group.reverse_sort() { directories.reverse(); } result.push(directories); @@ -324,13 +324,13 @@ impl Project { .expect("Valid JSON comparison") }); - if action.group.reverse_sort { + if action.group.reverse_sort() { directories.reverse(); } // Split by the sort key when requested. #[allow(clippy::redundant_closure_for_method_calls)] - if action.group.split_by_sort_key { + if action.group.split_by_sort_key() { result.extend( directories .chunk_by(|a, b| { @@ -424,7 +424,7 @@ products = ["one"] name = "two" command = "c" products = ["two"] -group.include = [["/i", "less_than", {}]] +group.include = [["/i", "<", {}]] [[action]] name = "three" @@ -465,10 +465,8 @@ previous_actions = ["two"] ); let mut action = project.workflow.action[1].clone(); - action - .group - .include - .push(("/i".into(), Comparison::GreaterThan, Value::from(4))); + let include = action.group.include.as_mut().unwrap(); + include.push(("/i".into(), Comparison::GreaterThan, Value::from(4))); assert_eq!( project .find_matching_directories(&action, all_directories.clone()) @@ -539,7 +537,7 @@ previous_actions = ["two"] reversed.reverse(); let mut action = project.workflow.action[0].clone(); - action.group.reverse_sort = true; + action.group.reverse_sort = Some(true); let groups = project .separate_into_groups(&action, all_directories.clone()) .unwrap(); @@ -578,7 +576,7 @@ previous_actions = ["two"] all_directories.sort_unstable(); let mut action = project.workflow.action[0].clone(); - action.group.sort_by = vec!["/j".to_string()]; + action.group.sort_by = Some(vec!["/j".to_string()]); let groups = project .separate_into_groups(&action, all_directories.clone()) .unwrap(); @@ -606,8 +604,8 @@ previous_actions = ["two"] all_directories.sort_unstable(); let mut action = project.workflow.action[0].clone(); - action.group.sort_by = vec!["/j".to_string()]; - action.group.split_by_sort_key = true; + action.group.sort_by = Some(vec!["/j".to_string()]); + action.group.split_by_sort_key = Some(true); let groups = project .separate_into_groups(&action, all_directories.clone()) .unwrap(); diff --git a/src/scheduler/bash.rs b/src/scheduler/bash.rs index 28c8357..e9f011a 100644 --- a/src/scheduler/bash.rs +++ b/src/scheduler/bash.rs @@ -96,10 +96,14 @@ export ACTION_NAME="{}" export ACTION_PROCESSES="{}" export ACTION_WALLTIME_IN_MINUTES="{}" "#, - self.cluster_name, self.action.name, self.total_processes, self.walltime_in_minutes, + self.cluster_name, + self.action.name(), + self.total_processes, + self.walltime_in_minutes, ); - if let Processes::PerDirectory(processes_per_directory) = self.action.resources.processes { + if let Processes::PerDirectory(processes_per_directory) = self.action.resources.processes() + { let _ = writeln!( result, "export ACTION_PROCESSES_PER_DIRECTORY=\"{processes_per_directory}\"", @@ -141,7 +145,7 @@ export ACTION_WALLTIME_IN_MINUTES="{}" ); } - let action_name = &self.action.name; + let action_name = self.action.name(); let row_executable = env::current_exe().map_err(Error::FindCurrentExecutable)?; let row_executable = row_executable.to_str().expect("UTF-8 path to executable."); let _ = write!( @@ -154,20 +158,20 @@ trap 'printf %s\\n "${{directories[@]}}" | {row_executable} scan --no-progress - } fn execution(&self) -> Result { - let contains_directory = self.action.command.contains("{directory}"); - let contains_directories = self.action.command.contains("{directories}"); + let contains_directory = self.action.command().contains("{directory}"); + let contains_directories = self.action.command().contains("{directories}"); if contains_directory && contains_directories { return Err(Error::ActionContainsMultipleTemplates( - self.action.name.clone(), + self.action.name().into(), )); } // Build up launcher prefix let mut launcher_prefix = String::new(); let mut process_launchers = 0; - for launcher in &self.action.launchers { + for launcher in self.action.launchers() { let launcher = self.launchers.get(launcher).ok_or_else(|| { - Error::LauncherNotFound(launcher.clone(), self.action.name.clone()) + Error::LauncherNotFound(launcher.clone(), self.action.name().into()) })?; launcher_prefix .push_str(&launcher.prefix(&self.action.resources, self.directories.len())); @@ -178,16 +182,16 @@ trap 'printf %s\\n "${{directories[@]}}" | {row_executable} scan --no-progress - if self.total_processes > 1 && process_launchers == 0 { return Err(Error::NoProcessLauncher( - self.action.name.clone(), + self.action.name().into(), self.total_processes, )); } if process_launchers > 1 { - return Err(Error::TooManyProcessLaunchers(self.action.name.clone())); + return Err(Error::TooManyProcessLaunchers(self.action.name().into())); } if contains_directory { - let command = self.action.command.replace("{directory}", "$directory"); + let command = self.action.command().replace("{directory}", "$directory"); Ok(format!( r#" for directory in "${{directories[@]}}" @@ -199,7 +203,7 @@ done } else if contains_directories { let command = self .action - .command + .command() .replace("{directories}", r#""${directories[@]}""#); Ok(format!( r#" @@ -207,7 +211,7 @@ done "# )) } else { - Err(Error::ActionContainsNoTemplate(self.action.name.clone())) + Err(Error::ActionContainsNoTemplate(self.action.name().into())) } } @@ -243,7 +247,7 @@ impl Scheduler for Bash { directories: &[PathBuf], should_terminate: Arc, ) -> Result, Error> { - debug!("Executing '{}' in bash.", action.name); + debug!("Executing '{}' in bash.", action.name()); let script = self.make_script(action, directories)?; let mut child = Command::new("bash") @@ -283,7 +287,7 @@ impl Scheduler for Bash { }, Some(code) => format!("exited with code {code}"), }; - return Err(Error::ExecuteAction(action.name.clone(), message)); + return Err(Error::ExecuteAction(action.name().into(), message)); } Ok(None) @@ -318,18 +322,18 @@ mod tests { fn setup() -> (Action, Vec, HashMap) { let resources = Resources { - processes: Processes::PerDirectory(2), + processes: Some(Processes::PerDirectory(2)), threads_per_process: Some(4), gpus_per_process: Some(1), - walltime: Walltime::PerSubmission( + walltime: Some(Walltime::PerSubmission( Duration::new(true, 0, 240, 0).expect("Valid duration."), - ), + )), }; let action = Action { - name: "action".to_string(), - command: "command {directory}".to_string(), - launchers: vec!["mpi".into()], + name: Some("action".to_string()), + command: Some("command {directory}".to_string()), + launchers: Some(vec!["mpi".into()]), resources, ..Action::default() }; @@ -415,7 +419,7 @@ mod tests { #[parallel] fn execution_directories() { let (mut action, directories, launchers) = setup(); - action.command = "command {directories}".to_string(); + action.command = Some("command {directories}".to_string()); let script = BashScriptBuilder::new("cluster", &action, &directories, &launchers) .build() @@ -429,9 +433,9 @@ mod tests { #[parallel] fn execution_openmp() { let (mut action, directories, launchers) = setup(); - action.resources.processes = Processes::PerSubmission(1); - action.launchers = vec!["openmp".into()]; - action.command = "command {directories}".to_string(); + action.resources.processes = Some(Processes::PerSubmission(1)); + action.launchers = Some(vec!["openmp".into()]); + action.command = Some("command {directories}".to_string()); let script = BashScriptBuilder::new("cluster", &action, &directories, &launchers) .build() @@ -445,8 +449,8 @@ mod tests { #[parallel] fn execution_mpi() { let (mut action, directories, launchers) = setup(); - action.launchers = vec!["mpi".into()]; - action.command = "command {directories}".to_string(); + action.launchers = Some(vec!["mpi".into()]); + action.command = Some("command {directories}".to_string()); let script = BashScriptBuilder::new("cluster", &action, &directories, &launchers) .build() @@ -462,7 +466,7 @@ mod tests { #[parallel] fn command_errors() { let (mut action, directories, launchers) = setup(); - action.command = "command {directory} {directories}".to_string(); + action.command = Some("command {directory} {directories}".to_string()); let result = BashScriptBuilder::new("cluster", &action, &directories, &launchers).build(); @@ -471,7 +475,7 @@ mod tests { Err(Error::ActionContainsMultipleTemplates { .. }) )); - action.command = "command".to_string(); + action.command = Some("command".to_string()); let result = BashScriptBuilder::new("cluster", &action, &directories, &launchers).build(); @@ -504,9 +508,10 @@ mod tests { #[parallel] fn more_variables() { let (mut action, directories, launchers) = setup(); - action.resources.processes = Processes::PerSubmission(10); - action.resources.walltime = - Walltime::PerDirectory(Duration::new(true, 0, 60, 0).expect("Valid duration.")); + action.resources.processes = Some(Processes::PerSubmission(10)); + action.resources.walltime = Some(Walltime::PerDirectory( + Duration::new(true, 0, 60, 0).expect("Valid duration."), + )); action.resources.threads_per_process = None; action.resources.gpus_per_process = None; @@ -547,8 +552,8 @@ mod tests { #[parallel] fn launcher_required() { let (mut action, directories, launchers) = setup(); - action.launchers = vec![]; - action.command = "command {directories}".to_string(); + action.launchers = Some(vec![]); + action.command = Some("command {directories}".to_string()); let result = BashScriptBuilder::new("cluster", &action, &directories, &launchers).build(); @@ -559,9 +564,9 @@ mod tests { #[parallel] fn too_many_launchers() { let (mut action, directories, launchers) = setup(); - action.resources.processes = Processes::PerSubmission(1); - action.launchers = vec!["mpi".into(), "mpi".into()]; - action.command = "command {directories}".to_string(); + action.resources.processes = Some(Processes::PerSubmission(1)); + action.launchers = Some(vec!["mpi".into(), "mpi".into()]); + action.command = Some("command {directories}".to_string()); let result = BashScriptBuilder::new("cluster", &action, &directories, &launchers).build(); diff --git a/src/scheduler/slurm.rs b/src/scheduler/slurm.rs index df25492..231d714 100644 --- a/src/scheduler/slurm.rs +++ b/src/scheduler/slurm.rs @@ -45,7 +45,7 @@ impl Scheduler for Slurm { let mut preamble = String::with_capacity(512); let mut user_partition = &None; - write!(preamble, "#SBATCH --job-name={}", action.name).expect("valid format"); + write!(preamble, "#SBATCH --job-name={}", action.name()).expect("valid format"); let _ = match directories.first() { Some(directory) => match directories.len() { 0..=1 => writeln!(preamble, "-{}", directory.display()), @@ -59,7 +59,7 @@ impl Scheduler for Slurm { None => writeln!(preamble), }; - let _ = writeln!(preamble, "#SBATCH --output={}-%j.out", action.name); + let _ = writeln!(preamble, "#SBATCH --output={}-%j.out", action.name()); if let Some(submit_options) = action.submit_options.get(&self.cluster.name) { user_partition = &submit_options.partition; @@ -141,7 +141,7 @@ impl Scheduler for Slurm { directories: &[PathBuf], should_terminate: Arc, ) -> Result, Error> { - debug!("Submtitting '{}' with sbatch.", action.name); + debug!("Submtitting '{}' with sbatch.", action.name()); // output() below is blocking with no convenient way to interrupt it. // If the user pressed ctrl-C, let the current call to submit() finish @@ -190,7 +190,7 @@ impl Scheduler for Slurm { }, Some(code) => format!("sbatch exited with code {code}"), }; - Err(Error::SubmitAction(action.name.clone(), message)) + Err(Error::SubmitAction(action.name().into(), message)) } } @@ -286,9 +286,9 @@ mod tests { fn setup() -> (Action, Vec, Slurm) { let action = Action { - name: "action".to_string(), - command: "command {directory}".to_string(), - launchers: vec!["mpi".into()], + name: Some("action".to_string()), + command: Some("command {directory}".to_string()), + launchers: Some(vec!["mpi".into()]), ..Action::default() }; @@ -328,7 +328,7 @@ mod tests { fn ntasks() { let (mut action, directories, slurm) = setup(); - action.resources.processes = Processes::PerDirectory(3); + action.resources.processes = Some(Processes::PerDirectory(3)); let script = slurm .make_script(&action, &directories) @@ -483,7 +483,7 @@ mod tests { let slurm = Slurm::new(cluster, launchers.by_cluster("cluster")); - action.resources.processes = Processes::PerSubmission(81); + action.resources.processes = Some(Processes::PerSubmission(81)); let script = slurm .make_script(&action, &directories) @@ -511,7 +511,7 @@ mod tests { let slurm = Slurm::new(cluster, launchers.by_cluster("cluster")); - action.resources.processes = Processes::PerSubmission(81); + action.resources.processes = Some(Processes::PerSubmission(81)); action.resources.gpus_per_process = Some(1); let script = slurm diff --git a/src/state.rs b/src/state.rs index 3f75423..ef33cae 100644 --- a/src/state.rs +++ b/src/state.rs @@ -180,8 +180,8 @@ impl State { // Ensure that completed has keys for all actions in the workflow. for action in &workflow.action { - if !state.completed.contains_key(&action.name) { - state.completed.insert(action.name.clone(), HashSet::new()); + if !state.completed.contains_key(action.name()) { + state.completed.insert(action.name().into(), HashSet::new()); } } @@ -527,7 +527,7 @@ impl State { /// Remove missing completed actions and directories. fn remove_missing_completed(&mut self, workflow: &Workflow) { let current_actions: HashSet = - workflow.action.iter().map(|a| a.name.clone()).collect(); + workflow.action.iter().map(|a| a.name().into()).collect(); let actions_to_remove: Vec = self .completed @@ -560,7 +560,7 @@ impl State { /// Remove missing submitted actions and directories. fn remove_missing_submitted(&mut self, workflow: &Workflow) { let current_actions: HashSet = - workflow.action.iter().map(|a| a.name.clone()).collect(); + workflow.action.iter().map(|a| a.name().into()).collect(); let actions_to_remove: Vec = self .submitted diff --git a/src/workflow.rs b/src/workflow.rs index 17ef623..c2c0e77 100644 --- a/src/workflow.rs +++ b/src/workflow.rs @@ -33,10 +33,10 @@ pub struct Workflow { #[serde(default)] pub workspace: Workspace, - /// The submission options + /// Default tables #[serde(default)] - pub submit_options: HashMap, - // TODO: refactor handling of submit options into more general action defaults. + pub default: DefaultTables, + /// The actions. #[serde(default)] pub action: Vec, @@ -87,22 +87,22 @@ pub struct SubmitOptions { #[serde(deny_unknown_fields)] pub struct Action { /// Unique name defining the action. - pub name: String, + pub name: Option, /// The command to execute for this action. - pub command: String, + pub command: Option, /// Names of the launchers to use when executing the action. #[serde(default)] - pub launchers: Vec, + pub launchers: Option>, /// The names of the previous actions that must be completed before this action. #[serde(default)] - pub previous_actions: Vec, + pub previous_actions: Option>, /// The product files this action creates. #[serde(default)] - pub products: Vec, + pub products: Option>, /// Resources used by this action. #[serde(default)] @@ -115,6 +115,20 @@ pub struct Action { /// The group of jobs to submit. #[serde(default)] pub group: Group, + + // Name of the group to copy defaults from. + pub from: Option, +} + +/// Default tables +/// +/// Store default options for other tables in the file. +/// +#[derive(Clone, Debug, Default, Deserialize, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +pub struct DefaultTables { + #[serde(default)] + pub action: Action, } #[derive(Clone, Debug, Deserialize, PartialEq, Eq)] @@ -138,8 +152,7 @@ pub enum Processes { #[serde(deny_unknown_fields)] pub struct Resources { /// Number of processes. - #[serde(default)] - pub processes: Processes, + pub processes: Option, /// Threads per process. pub threads_per_process: Option, @@ -148,16 +161,22 @@ pub struct Resources { pub gpus_per_process: Option, // Walltime. - #[serde(default)] - pub walltime: Walltime, + pub walltime: Option, } /// Comparison operations #[derive(Clone, Debug, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum Comparison { + #[serde(rename(deserialize = "<"))] LessThan, + #[serde(rename(deserialize = "<="))] + LessThanOrEqualTo, + #[serde(rename(deserialize = "=="))] EqualTo, + #[serde(rename(deserialize = ">="))] + GreaterThanOrEqualTo, + #[serde(rename(deserialize = ">"))] GreaterThan, } @@ -167,26 +186,26 @@ pub enum Comparison { pub struct Group { /// Include members of the group where all JSON elements match the given values. #[serde(default)] - pub include: Vec<(String, Comparison, serde_json::Value)>, + pub include: Option>, /// Sort by the given set of JSON elements. #[serde(default)] - pub sort_by: Vec, + pub sort_by: Option>, /// Split into groups by the sort keys. #[serde(default)] - pub split_by_sort_key: bool, + pub split_by_sort_key: Option, /// Reverse the sort. #[serde(default)] - pub reverse_sort: bool, + pub reverse_sort: Option, /// Maximum size of the submitted group. pub maximum_size: Option, /// Submit only whole groups when true. #[serde(default)] - pub submit_whole: bool, + pub submit_whole: Option, } /// Resource cost to execute an action. @@ -275,7 +294,7 @@ impl Resources { /// `n_directories`: Number of directories in the submission. /// pub fn total_processes(&self, n_directories: usize) -> usize { - match self.processes { + match self.processes() { Processes::PerDirectory(p) => p * n_directories, Processes::PerSubmission(p) => p, } @@ -308,7 +327,7 @@ impl Resources { /// When the resulting walltime cannot be represented. /// pub fn total_walltime(&self, n_directories: usize) -> Duration { - match self.walltime { + match self.walltime() { Walltime::PerDirectory(ref w) => Duration::new( true, 0, @@ -350,6 +369,200 @@ impl Resources { gpu_hours: 0.0, } } + + /// Resolve omitted keys from the given template. + fn resolve(&mut self, template: &Resources) { + if self.processes.is_none() { + self.processes = template.processes.clone(); + } + if self.threads_per_process.is_none() { + self.threads_per_process = template.threads_per_process; + } + if self.gpus_per_process.is_none() { + self.gpus_per_process = template.gpus_per_process; + } + if self.walltime.is_none() { + self.walltime = template.walltime.clone(); + } + } + + pub fn processes(&self) -> Processes { + if let Some(processes) = self.processes.as_ref() { + processes.clone() + } else { + Processes::default() + } + } + pub fn walltime(&self) -> Walltime { + if let Some(walltime) = self.walltime.as_ref() { + walltime.clone() + } else { + Walltime::default() + } + } +} + +impl Action { + /// Get the action's `name`. + pub fn name(&self) -> &str { + if let Some(name) = self.name.as_ref() { + name + } else { + "" + } + } + + /// Get the action's `command`. + pub fn command(&self) -> &str { + if let Some(command) = self.command.as_ref() { + command + } else { + "" + } + } + + /// Get the action's `launchers`. + pub fn launchers(&self) -> &[String] { + if let Some(launchers) = self.launchers.as_ref() { + launchers + } else { + &[] + } + } + + /// Get the action's `previous_actions`. + pub fn previous_actions(&self) -> &[String] { + if let Some(previous_actions) = self.previous_actions.as_ref() { + previous_actions + } else { + &[] + } + } + + /// Get the action's products + pub fn products(&self) -> &[String] { + if let Some(products) = self.products.as_ref() { + products + } else { + &[] + } + } + + /// Resolve the action's omitted keys with defaults + fn resolve(&mut self, template: &Action) { + if self.name.is_none() { + self.name = template.name.clone(); + } + if self.command.is_none() { + self.command = template.command.clone(); + } + if self.launchers.is_none() { + self.launchers = template.launchers.clone(); + } + if self.previous_actions.is_none() { + self.previous_actions = template.previous_actions.clone(); + } + if self.products.is_none() { + self.products = template.products.clone(); + } + + self.resources.resolve(&template.resources); + self.group.resolve(&template.group); + + // Populate each action's submit_options with the global ones. + for (name, template_options) in &template.submit_options { + if self.submit_options.contains_key(name) { + let action_options = self + .submit_options + .get_mut(name) + .expect("Key should be present"); + if action_options.account.is_none() { + action_options.account = template_options.account.clone(); + } + if action_options.setup.is_none() { + action_options.setup = template_options.setup.clone(); + } + if action_options.partition.is_none() { + action_options.partition = template_options.partition.clone(); + } + if action_options.custom.is_empty() { + action_options.custom = template_options.custom.clone(); + } + } else { + self.submit_options + .insert(name.clone(), template_options.clone()); + } + } + } +} + +impl Group { + /// Get the group's `include`. + pub fn include(&self) -> &[(String, Comparison, serde_json::Value)] { + if let Some(include) = self.include.as_ref() { + include + } else { + &[] + } + } + + /// Get the group's `sort_by`. + pub fn sort_by(&self) -> &[String] { + if let Some(sort_by) = self.sort_by.as_ref() { + sort_by + } else { + &[] + } + } + + /// Get the group's `split_by_sort_key`. + pub fn split_by_sort_key(&self) -> bool { + if let Some(split_by_sort_key) = self.split_by_sort_key { + split_by_sort_key + } else { + false + } + } + + /// Get the group's `reverse_sort`. + pub fn reverse_sort(&self) -> bool { + if let Some(reverse_sort) = self.reverse_sort { + reverse_sort + } else { + false + } + } + + /// Get the group's `submit_whole`. + pub fn submit_whole(&self) -> bool { + if let Some(submit_whole) = self.submit_whole { + submit_whole + } else { + false + } + } + + /// Resolve omitted keys from the given template. + fn resolve(&mut self, template: &Group) { + if self.include.is_none() { + self.include = template.include.clone(); + } + if self.sort_by.is_none() { + self.sort_by = template.sort_by.clone(); + } + if self.split_by_sort_key.is_none() { + self.split_by_sort_key = template.split_by_sort_key; + } + if self.reverse_sort.is_none() { + self.reverse_sort = template.reverse_sort; + } + if self.maximum_size.is_none() { + self.maximum_size = template.maximum_size; + } + if self.submit_whole.is_none() { + self.submit_whole = template.submit_whole; + } + } } impl Workflow { @@ -391,7 +604,7 @@ impl Workflow { /// Find the action that matches the given name. pub fn action_by_name(&self, name: &str) -> Option<&Action> { - if let Some(action_index) = self.action.iter().position(|a| a.name == name) { + if let Some(action_index) = self.action.iter().position(|a| a.name() == name) { Some(&self.action[action_index]) } else { None @@ -400,48 +613,46 @@ impl Workflow { /// Validate a `Workflow` and populate defaults. /// - /// Most defaults are populated by the serde configuration. This method handles cases where - /// users provide no walltime and/or no processes. + /// Resolve each action to a fully defined struct with defaults populated + /// from: The current action, the action named by "from", and the default + /// action (in that order). /// fn validate_and_set_defaults(mut self) -> Result { let mut action_names = HashSet::with_capacity(self.action.len()); - for action in &mut self.action { - trace!("Validating action '{}'.", action.name); + if self.default.action.from.is_some() { + return Err(Error::DefaultActionSetsFrom()); + } - // Verify action names are unique. - if !action_names.insert(action.name.clone()) { - return Err(Error::DuplicateAction(action.name.clone())); - } + let source_actions = self.action.clone(); - // Populate each action's submit_options with the global ones. - for (name, global_options) in &self.submit_options { - if action.submit_options.contains_key(name) { - let action_options = action - .submit_options - .get_mut(name) - .expect("Key should be present"); - if action_options.account.is_none() { - action_options.account = global_options.account.clone(); - } - if action_options.setup.is_none() { - action_options.setup = global_options.setup.clone(); - } - if action_options.partition.is_none() { - action_options.partition = global_options.partition.clone(); - } - if action_options.custom.is_empty() { - action_options.custom = global_options.custom.clone(); + for (action_idx, action) in self.action.iter_mut().enumerate() { + if let Some(from) = &action.from { + if let Some(action_index) = source_actions.iter().position(|a| a.name() == from) { + if let Some(recursive_from) = &source_actions[action_index].from { + return Err(Error::RecursiveFrom(recursive_from.clone())); } + + action.resolve(&source_actions[action_index]); } else { - action - .submit_options - .insert(name.clone(), global_options.clone()); + return Err(Error::FromActionNotFound(from.clone())); } } + action.resolve(&self.default.action); + + action_names.insert(action.name().to_string()); + trace!("Validating action '{}'.", action.name()); + + if action.name.is_none() { + return Err(Error::ActionMissingName(action_idx)); + } + if action.command.is_none() { + return Err(Error::ActionMissingCommand(action.name().into())); + } + // Warn for apparently invalid sort_by. - for pointer in &action.group.sort_by { + for pointer in action.group.sort_by() { if !pointer.is_empty() && !pointer.starts_with('/') { warn!("The JSON pointer '{pointer}' does not appear valid. Did you mean '/{pointer}'?"); } @@ -449,11 +660,24 @@ impl Workflow { } for action in &self.action { - for previous_action in &action.previous_actions { + for previous_action in action.previous_actions() { if !action_names.contains(previous_action) { return Err(Error::PreviousActionNotFound( previous_action.clone(), - action.name.clone(), + action.name().into(), + )); + } + } + + if let Some(first_action) = self.action_by_name(action.name()) { + if action.previous_actions != first_action.previous_actions { + return Err(Error::DuplicateActionsDifferentPreviousActions( + action.name().to_string(), + )); + } + if action.products != first_action.products { + return Err(Error::DuplicateActionsDifferentProducts( + action.name().to_string(), )); } } @@ -584,7 +808,7 @@ mod tests { assert_eq!(workflow.root, temp.path().canonicalize().unwrap()); assert_eq!(workflow.workspace.path, PathBuf::from("workspace")); assert!(workflow.workspace.value_file.is_none()); - assert!(workflow.submit_options.is_empty()); + assert_eq!(workflow.default.action, Action::default()); assert!(workflow.action.is_empty()); } @@ -607,7 +831,7 @@ value_file = "s" #[parallel] fn submit_options_defaults() { let temp = TempDir::new().unwrap(); - let workflow = "[submit_options.a]"; + let workflow = "[default.action.submit_options.a]"; let workflow = Workflow::open_str(temp.path(), workflow).unwrap(); assert_eq!( @@ -615,10 +839,10 @@ value_file = "s" temp.path().canonicalize().unwrap() ); - assert_eq!(workflow.submit_options.len(), 1); - assert!(workflow.submit_options.contains_key("a")); + assert_eq!(workflow.default.action.submit_options.len(), 1); + assert!(workflow.default.action.submit_options.contains_key("a")); - let submit_options = workflow.submit_options.get("a").unwrap(); + let submit_options = workflow.default.action.submit_options.get("a").unwrap(); assert_eq!(submit_options.account, None); assert_eq!(submit_options.setup, None); assert!(submit_options.custom.is_empty()); @@ -630,7 +854,7 @@ value_file = "s" fn submit_options_nondefault() { let temp = TempDir::new().unwrap(); let workflow = r#" -[submit_options.a] +[default.action.submit_options.a] account = "my_account" setup = "module load openmpi" custom = ["--option1", "--option2"] @@ -643,10 +867,10 @@ partition = "gpu" temp.path().canonicalize().unwrap() ); - assert_eq!(workflow.submit_options.len(), 1); - assert!(workflow.submit_options.contains_key("a")); + assert_eq!(workflow.default.action.submit_options.len(), 1); + assert!(workflow.default.action.submit_options.contains_key("a")); - let submit_options = workflow.submit_options.get("a").unwrap(); + let submit_options = workflow.default.action.submit_options.get("a").unwrap(); assert_eq!(submit_options.account, Some(String::from("my_account"))); assert_eq!( submit_options.setup, @@ -670,27 +894,65 @@ command = "c" assert_eq!(workflow.action.len(), 1); let action = workflow.action.first().unwrap(); - assert_eq!(action.name, "b"); - assert_eq!(action.command, "c"); - assert!(action.previous_actions.is_empty()); - assert!(action.products.is_empty()); - assert!(action.launchers.is_empty()); - - assert_eq!(action.resources.processes, Processes::PerSubmission(1)); + assert_eq!(action.name(), "b"); + assert_eq!(action.command(), "c"); + assert!(action.previous_actions.is_none()); + assert!(action.products.is_none()); + assert!(action.launchers.is_none()); + + assert_eq!(action.resources.processes, None); + assert_eq!(action.resources.processes(), Processes::PerSubmission(1)); assert_eq!(action.resources.threads_per_process, None); assert_eq!(action.resources.gpus_per_process, None); + assert_eq!(action.resources.walltime, None,); assert_eq!( - action.resources.walltime, + action.resources.walltime(), Walltime::PerDirectory(Duration::new(true, 0, 3600, 0).unwrap()) ); assert!(action.submit_options.is_empty()); - assert!(action.group.include.is_empty()); - assert!(action.group.sort_by.is_empty()); - assert!(!action.group.split_by_sort_key); + assert_eq!(action.group.include, None); + assert!(action.group.include().is_empty()); + assert_eq!(action.group.sort_by, None); + assert!(action.group.sort_by().is_empty()); + assert_eq!(action.group.split_by_sort_key, None); + assert!(!action.group.split_by_sort_key()); assert_eq!(action.group.maximum_size, None); - assert!(!action.group.submit_whole); - assert!(!action.group.reverse_sort); + assert_eq!(action.group.submit_whole, None); + assert!(!action.group.submit_whole()); + assert_eq!(action.group.reverse_sort, None); + assert!(!action.group.reverse_sort()); + } + + #[test] + #[parallel] + fn action_no_name() { + let temp = TempDir::new().unwrap(); + let workflow = r#" +[[action]] +command = "c" +"#; + let result = Workflow::open_str(temp.path(), workflow); + assert!(result.is_err()); + + assert!(result.unwrap_err().to_string().contains("missing `name`")); + } + + #[test] + #[parallel] + fn action_no_command() { + let temp = TempDir::new().unwrap(); + let workflow = r#" +[[action]] +name = "a" +"#; + let result = Workflow::open_str(temp.path(), workflow); + assert!(result.is_err()); + + assert!(result + .unwrap_err() + .to_string() + .contains("missing `command`")); } #[test] @@ -708,18 +970,14 @@ command = "c" assert_eq!(workflow.action.len(), 1); let action = workflow.action.first().unwrap(); - assert_eq!( - action.resources.walltime, - Walltime::PerDirectory(Duration::new(true, 0, 3600, 0).unwrap()) - ); assert!(action.submit_options.is_empty()); - assert!(action.group.include.is_empty()); - assert!(action.group.sort_by.is_empty()); - assert!(!action.group.split_by_sort_key); + assert!(action.group.include().is_empty()); + assert!(action.group.sort_by().is_empty()); + assert!(!action.group.split_by_sort_key()); assert_eq!(action.group.maximum_size, None); - assert!(!action.group.submit_whole); - assert!(!action.group.reverse_sort); + assert!(!action.group.submit_whole()); + assert!(!action.group.reverse_sort()); } #[test] @@ -736,15 +994,64 @@ name = "b" command = "d" "#; let result = Workflow::open_str(temp.path(), workflow); - assert!( - result.is_err(), - "Expected duplicate action error, but got {result:?}" - ); + assert!(result.is_ok()); + } + + #[test] + #[parallel] + fn action_duplicate_different_products() { + let temp = TempDir::new().unwrap(); + let workflow = r#" +[[action]] +name = "b" +command = "c" +products = ["e"] + +[[action]] +name = "b" +command = "d" +products = ["b"] +"#; + let result = Workflow::open_str(temp.path(), workflow); + assert!(matches!( + result, + Err(Error::DuplicateActionsDifferentProducts(_)) + )); assert!(result .unwrap_err() .to_string() - .starts_with("Found duplicate action")); + .contains("must have the same `products`")); + } + + #[test] + #[parallel] + fn action_duplicate_different_previous_actions() { + let temp = TempDir::new().unwrap(); + let workflow = r#" +[[action]] +name = "b" +command = "c" + +[[action]] +name = "b" +command = "d" +previous_actions = ["a"] + +[[action]] +name = "a" +command = "e" +"#; + let result = Workflow::open_str(temp.path(), workflow); + assert!(matches!( + result, + Err(Error::DuplicateActionsDifferentPreviousActions(_)) + )); + + assert!(result + .unwrap_err() + .to_string() + .contains("must have the same `previous_actions`")); } #[test] @@ -763,7 +1070,10 @@ launchers = ["openmp", "mpi"] assert_eq!(workflow.action.len(), 1); let action = workflow.action.first().unwrap(); - assert_eq!(action.launchers, vec!["openmp", "mpi"]); + assert_eq!( + action.launchers(), + vec!["openmp".to_string(), "mpi".to_string()] + ); } #[test] @@ -786,13 +1096,13 @@ previous_actions = ["b"] assert_eq!(workflow.action.len(), 2); let action = workflow.action.get(1).unwrap(); - assert_eq!(action.previous_actions, vec!["b"]); + assert_eq!(action.previous_actions(), vec!["b".to_string()]); let action_a = workflow.action_by_name("b"); - assert_eq!(action_a.unwrap().command, "c"); + assert_eq!(action_a.unwrap().command(), "c"); let action_d = workflow.action_by_name("d"); - assert_eq!(action_d.unwrap().command, "e"); + assert_eq!(action_d.unwrap().command(), "e"); assert!(workflow.action_by_name("f").is_none()); } @@ -839,11 +1149,11 @@ walltime.per_submission = "4d, 05:32:11" assert_eq!(workflow.action.len(), 1); let action = workflow.action.first().unwrap(); - assert_eq!(action.resources.processes, Processes::PerSubmission(12)); + assert_eq!(action.resources.processes(), Processes::PerSubmission(12)); assert_eq!(action.resources.threads_per_process, Some(8)); assert_eq!(action.resources.gpus_per_process, Some(1)); assert_eq!( - action.resources.walltime, + action.resources.walltime(), Walltime::PerSubmission( Duration::new(true, 4, 5 * 3600 + 32 * 60 + 11, 0) .expect("this should be a valid Duration"), @@ -869,10 +1179,10 @@ walltime.per_directory = "00:01" assert_eq!(workflow.action.len(), 1); let action = workflow.action.first().unwrap(); - assert_eq!(action.resources.processes, Processes::PerDirectory(1)); + assert_eq!(action.resources.processes(), Processes::PerDirectory(1)); assert_eq!( - action.resources.walltime, + action.resources.walltime(), Walltime::PerDirectory( Duration::new(true, 0, 60, 0).expect("this should be a valid Duration") ) @@ -944,7 +1254,7 @@ products = ["d", "e"] assert_eq!(workflow.action.len(), 1); let action = workflow.action.first().unwrap(); - assert_eq!(action.products, vec!["d".to_string(), "e".to_string()]); + assert_eq!(action.products(), vec!["d".to_string(), "e".to_string()]); } #[test] @@ -956,7 +1266,7 @@ products = ["d", "e"] name = "b" command = "c" [action.group] -include = [["/d", "equal_to", 5], ["/float", "greater_than", 6.5], ["/string", "less_than", "str"], ["/array", "equal_to", [1,2,3]], ["/bool", "equal_to", false]] +include = [["/d", "==", 5], ["/float", ">", 6.5], ["/string", "<", "str"], ["/array", "==", [1,2,3]], ["/bool", "==", false]] sort_by = ["/sort"] split_by_sort_key = true maximum_size = 10 @@ -970,7 +1280,7 @@ reverse_sort = true let action = workflow.action.first().unwrap(); assert_eq!( - action.group.include, + action.group.include(), vec![ ( "/d".to_string(), @@ -999,11 +1309,11 @@ reverse_sort = true ) ] ); - assert_eq!(action.group.sort_by, vec![String::from("/sort")]); - assert!(action.group.split_by_sort_key); + assert_eq!(action.group.sort_by(), vec![String::from("/sort")]); + assert!(action.group.split_by_sort_key()); assert_eq!(action.group.maximum_size, Some(10)); - assert!(action.group.submit_whole); - assert!(action.group.reverse_sort); + assert!(action.group.submit_whole()); + assert!(action.group.reverse_sort()); } #[test] @@ -1087,7 +1397,7 @@ partition = "i" fn action_submit_options_global() { let temp = TempDir::new().unwrap(); let workflow = r#" -[submit_options.d] +[default.action.submit_options.d] account = "e" setup = "f" custom = ["g", "h"] @@ -1118,7 +1428,7 @@ command = "c" fn action_submit_options_no_override() { let temp = TempDir::new().unwrap(); let workflow = r#" -[submit_options.d] +[default.action.submit_options.d] account = "e" setup = "f" custom = ["g", "h"] @@ -1155,7 +1465,7 @@ partition = "n" fn action_submit_options_override() { let temp = TempDir::new().unwrap(); let workflow = r#" -[submit_options.d] +[default.action.submit_options.d] account = "e" setup = "f" custom = ["g", "h"] @@ -1183,11 +1493,404 @@ command = "c" assert_eq!(submit_options.partition, Some("i".to_string())); } + #[test] + #[parallel] + fn default_action_from() { + let temp = TempDir::new().unwrap(); + let workflow = r#" +[default.action] +from = "a" +"#; + let result = Workflow::open_str(temp.path(), workflow); + assert!(result.is_err()); + + assert!(result + .unwrap_err() + .to_string() + .contains("must not set `from`")); + } + + #[test] + #[parallel] + fn empty_action_default() { + let temp = TempDir::new().unwrap(); + let workflow = " +[default.action] +"; + + let workflow = Workflow::open_str(temp.path(), workflow).unwrap(); + + assert_eq!(workflow.action.len(), 0); + + let action = workflow.default.action; + assert_eq!(action.name, None); + assert_eq!(action.command, None); + assert_eq!(action.launchers, None); + assert_eq!(action.previous_actions, None); + assert_eq!(action.products, None); + assert_eq!(action.resources.processes, None); + assert_eq!(action.resources.threads_per_process, None); + assert_eq!(action.resources.gpus_per_process, None); + assert_eq!(action.resources.walltime, None); + assert!(action.submit_options.is_empty()); + assert_eq!(action.group.include, None); + assert_eq!(action.group.sort_by, None); + assert_eq!(action.group.split_by_sort_key, None); + assert_eq!(action.group.reverse_sort, None); + assert_eq!(action.group.maximum_size, None); + assert_eq!(action.group.submit_whole, None); + assert_eq!(action.from, None); + } + + #[test] + #[parallel] + fn action_default() { + let temp = TempDir::new().unwrap(); + let workflow = r#" +[default.action] +name = "a" +command = "b" +launchers = ["c"] +previous_actions = ["d"] +products = ["e"] + +[default.action.resources] +processes.per_directory = 2 +threads_per_process = 3 +gpus_per_process = 4 +walltime.per_submission = "00:00:01" + +# submit_options is tested above + +[default.action.group] +include = [["/f", "==", 5]] +sort_by = ["/g"] +split_by_sort_key = true +reverse_sort = true +maximum_size = 6 +submit_whole = true + +[[action]] + +[[action]] +name = "d" +"#; + + let workflow = Workflow::open_str(temp.path(), workflow).unwrap(); + + assert_eq!(workflow.action.len(), 2); + + let action = workflow.action.first().unwrap(); + assert_eq!(action.name(), "a"); + assert_eq!(action.command(), "b"); + assert_eq!(action.launchers(), vec!["c"]); + assert_eq!(action.previous_actions(), vec!["d"]); + assert_eq!(action.products(), vec!["e"]); + assert_eq!(action.resources.processes(), Processes::PerDirectory(2)); + assert_eq!(action.resources.threads_per_process, Some(3)); + assert_eq!(action.resources.gpus_per_process, Some(4)); + assert_eq!( + action.resources.walltime(), + Walltime::PerSubmission(Duration::new(true, 0, 1, 0).unwrap()) + ); + assert!(action.submit_options.is_empty()); + assert_eq!( + action.group.include(), + vec![("/f".into(), Comparison::EqualTo, serde_json::Value::from(5))] + ); + assert_eq!(action.group.sort_by(), vec!["/g"]); + assert!(action.group.split_by_sort_key()); + assert!(action.group.reverse_sort()); + assert_eq!(action.group.maximum_size, Some(6)); + assert!(action.group.submit_whole()); + assert_eq!(action.from, None); + } + + #[test] + #[parallel] + fn action_override_default() { + let temp = TempDir::new().unwrap(); + let workflow = r#" +[default.action] +name = "a" +command = "b" +launchers = ["c"] +products = ["e"] + +[default.action.resources] +processes.per_directory = 2 +threads_per_process = 3 +gpus_per_process = 4 +walltime.per_submission = "00:00:01" + +# submit_options is tested above + +[default.action.group] +include = [["/f", "==", 5]] +sort_by = ["/g"] +split_by_sort_key = true +reverse_sort = true +maximum_size = 6 +submit_whole = true + +[[action]] +name = "aa" +command = "bb" +launchers = ["cc"] +previous_actions = ["dd"] +products = ["ee"] + +[action.resources] +processes.per_directory = 4 +threads_per_process = 6 +gpus_per_process = 8 +walltime.per_submission = "00:00:02" + +# submit_options is tested above + +[action.group] +include = [["/ff", "==", 10]] +sort_by = ["/gg"] +split_by_sort_key = false +reverse_sort = false +maximum_size = 12 +submit_whole = false + +[[action]] +name = "dd" +"#; + + let workflow = Workflow::open_str(temp.path(), workflow).unwrap(); + + assert_eq!(workflow.action.len(), 2); + + let action = workflow.action.first().unwrap(); + assert_eq!(action.name(), "aa"); + assert_eq!(action.command(), "bb"); + assert_eq!(action.launchers(), vec!["cc"]); + assert_eq!(action.previous_actions(), vec!["dd"]); + assert_eq!(action.products(), vec!["ee"]); + assert_eq!(action.resources.processes(), Processes::PerDirectory(4)); + assert_eq!(action.resources.threads_per_process, Some(6)); + assert_eq!(action.resources.gpus_per_process, Some(8)); + assert_eq!( + action.resources.walltime(), + Walltime::PerSubmission(Duration::new(true, 0, 2, 0).unwrap()) + ); + assert!(action.submit_options.is_empty()); + assert_eq!( + action.group.include(), + vec![( + "/ff".into(), + Comparison::EqualTo, + serde_json::Value::from(10) + )] + ); + assert_eq!(action.group.sort_by(), vec!["/gg"]); + assert!(!action.group.split_by_sort_key()); + assert!(!action.group.reverse_sort()); + assert_eq!(action.group.maximum_size, Some(12)); + assert!(!action.group.submit_whole()); + assert_eq!(action.from, None); + } + + #[test] + #[parallel] + fn action_from() { + let temp = TempDir::new().unwrap(); + let workflow = r#" +[[action]] +name = "a" +command = "b" +launchers = ["c"] +previous_actions = ["d"] +products = ["e"] + +[default.action.resources] +processes.per_directory = 2 +threads_per_process = 3 +gpus_per_process = 4 +walltime.per_submission = "00:00:01" + +# submit_options is tested above + +[default.action.group] +include = [["/f", "==", 5]] +sort_by = ["/g"] +split_by_sort_key = true +reverse_sort = true +maximum_size = 6 +submit_whole = true + +[[action]] +from = "a" + +[[action]] +name = "d" +command = "e" +"#; + + let workflow = Workflow::open_str(temp.path(), workflow).unwrap(); + + assert_eq!(workflow.action.len(), 3); + + let action = &workflow.action[1]; + assert_eq!(action.name(), "a"); + assert_eq!(action.command(), "b"); + assert_eq!(action.launchers(), vec!["c"]); + assert_eq!(action.previous_actions(), vec!["d"]); + assert_eq!(action.products(), vec!["e"]); + assert_eq!(action.resources.processes(), Processes::PerDirectory(2)); + assert_eq!(action.resources.threads_per_process, Some(3)); + assert_eq!(action.resources.gpus_per_process, Some(4)); + assert_eq!( + action.resources.walltime(), + Walltime::PerSubmission(Duration::new(true, 0, 1, 0).unwrap()) + ); + assert!(action.submit_options.is_empty()); + assert_eq!( + action.group.include(), + vec![("/f".into(), Comparison::EqualTo, serde_json::Value::from(5))] + ); + assert_eq!(action.group.sort_by(), vec!["/g"]); + assert!(action.group.split_by_sort_key()); + assert!(action.group.reverse_sort()); + assert_eq!(action.group.maximum_size, Some(6)); + assert!(action.group.submit_whole()); + assert_eq!(action.from, Some("a".into())); + } + + #[test] + #[parallel] + fn action_override_from() { + let temp = TempDir::new().unwrap(); + let workflow = r#" +[[action]] +name = "a" +command = "b" +launchers = ["c"] +previous_actions = ["d"] +products = ["e"] + +[default.action.resources] +processes.per_directory = 2 +threads_per_process = 3 +gpus_per_process = 4 +walltime.per_submission = "00:00:01" + +# submit_options is tested above + +[default.action.group] +include = [["/f", "==", 5]] +sort_by = ["/g"] +split_by_sort_key = true +reverse_sort = true +maximum_size = 6 +submit_whole = true + +[[action]] +from = "a" + +name = "aa" +command = "bb" +launchers = ["cc"] +previous_actions = ["dd"] +products = ["ee"] + +[action.resources] +processes.per_directory = 4 +threads_per_process = 6 +gpus_per_process = 8 +walltime.per_submission = "00:00:02" + +# submit_options is tested above + +[action.group] +include = [["/ff", "==", 10]] +sort_by = ["/gg"] +split_by_sort_key = false +reverse_sort = false +maximum_size = 12 +submit_whole = false + +[[action]] +name = "dd" +command = "ee" + +[[action]] +name = "d" +command = "e" +"#; + + let workflow = Workflow::open_str(temp.path(), workflow).unwrap(); + + assert_eq!(workflow.action.len(), 4); + + let action = &workflow.action[1]; + assert_eq!(action.name(), "aa"); + assert_eq!(action.command(), "bb"); + assert_eq!(action.launchers(), vec!["cc"]); + assert_eq!(action.previous_actions(), vec!["dd"]); + assert_eq!(action.products(), vec!["ee"]); + assert_eq!(action.resources.processes(), Processes::PerDirectory(4)); + assert_eq!(action.resources.threads_per_process, Some(6)); + assert_eq!(action.resources.gpus_per_process, Some(8)); + assert_eq!( + action.resources.walltime(), + Walltime::PerSubmission(Duration::new(true, 0, 2, 0).unwrap()) + ); + assert!(action.submit_options.is_empty()); + assert_eq!( + action.group.include(), + vec![( + "/ff".into(), + Comparison::EqualTo, + serde_json::Value::from(10) + )] + ); + assert_eq!(action.group.sort_by(), vec!["/gg"]); + assert!(!action.group.split_by_sort_key()); + assert!(!action.group.reverse_sort()); + assert_eq!(action.group.maximum_size, Some(12)); + assert!(!action.group.submit_whole()); + assert_eq!(action.from, Some("a".into())); + } + + #[test] + #[parallel] + fn action_override_mixed() { + let temp = TempDir::new().unwrap(); + let workflow = r#" +[default.action] +resources.threads_per_process = 2 + +[[action]] +name = "a" +command = "b" +resources.gpus_per_process = 4 + +[[action]] +from = "a" +resources.processes.per_directory = 8 +"#; + + let workflow = Workflow::open_str(temp.path(), workflow).unwrap(); + + assert_eq!(workflow.action.len(), 2); + + let action = &workflow.action[1]; + assert_eq!(action.name(), "a"); + assert_eq!(action.command(), "b"); + assert_eq!(action.resources.processes(), Processes::PerDirectory(8)); + assert_eq!(action.resources.threads_per_process, Some(2)); + assert_eq!(action.resources.gpus_per_process, Some(4)); + } + #[test] #[parallel] fn total_processes() { let r = Resources { - processes: Processes::PerSubmission(10), + processes: Some(Processes::PerSubmission(10)), ..Resources::default() }; @@ -1196,7 +1899,7 @@ command = "c" assert_eq!(r.total_processes(1000), 10); let r = Resources { - processes: Processes::PerDirectory(10), + processes: Some(Processes::PerDirectory(10)), ..Resources::default() }; @@ -1209,7 +1912,7 @@ command = "c" #[parallel] fn total_cpus() { let r = Resources { - processes: Processes::PerSubmission(10), + processes: Some(Processes::PerSubmission(10)), threads_per_process: Some(2), ..Resources::default() }; @@ -1219,7 +1922,7 @@ command = "c" assert_eq!(r.total_cpus(1000), 20); let r = Resources { - processes: Processes::PerDirectory(10), + processes: Some(Processes::PerDirectory(10)), threads_per_process: None, ..Resources::default() }; @@ -1233,7 +1936,7 @@ command = "c" #[parallel] fn total_gpus() { let r = Resources { - processes: Processes::PerSubmission(10), + processes: Some(Processes::PerSubmission(10)), gpus_per_process: Some(2), ..Resources::default() }; @@ -1243,7 +1946,7 @@ command = "c" assert_eq!(r.total_gpus(1000), 20); let r = Resources { - processes: Processes::PerDirectory(10), + processes: Some(Processes::PerDirectory(10)), gpus_per_process: None, ..Resources::default() }; @@ -1257,7 +1960,9 @@ command = "c" #[parallel] fn total_walltime() { let r = Resources { - walltime: Walltime::PerDirectory(Duration::new(true, 1, 3600, 0).unwrap()), + walltime: Some(Walltime::PerDirectory( + Duration::new(true, 1, 3600, 0).unwrap(), + )), ..Resources::default() }; @@ -1275,7 +1980,9 @@ command = "c" ); let r = Resources { - walltime: Walltime::PerSubmission(Duration::new(true, 1, 3600, 0).unwrap()), + walltime: Some(Walltime::PerSubmission( + Duration::new(true, 1, 3600, 0).unwrap(), + )), ..Resources::default() }; @@ -1297,8 +2004,10 @@ command = "c" #[parallel] fn resource_cost() { let r = Resources { - processes: Processes::PerSubmission(10), - walltime: Walltime::PerDirectory(Duration::new(true, 0, 3600, 0).unwrap()), + processes: Some(Processes::PerSubmission(10)), + walltime: Some(Walltime::PerDirectory( + Duration::new(true, 0, 3600, 0).unwrap(), + )), ..Resources::default() }; @@ -1307,8 +2016,10 @@ command = "c" assert_eq!(r.cost(4), ResourceCost::with_values(40.0, 0.0)); let r = Resources { - processes: Processes::PerSubmission(10), - walltime: Walltime::PerDirectory(Duration::new(true, 0, 3600, 0).unwrap()), + processes: Some(Processes::PerSubmission(10)), + walltime: Some(Walltime::PerDirectory( + Duration::new(true, 0, 3600, 0).unwrap(), + )), threads_per_process: Some(4), ..Resources::default() }; @@ -1318,8 +2029,10 @@ command = "c" assert_eq!(r.cost(4), ResourceCost::with_values(160.0, 0.0)); let r = Resources { - processes: Processes::PerSubmission(10), - walltime: Walltime::PerDirectory(Duration::new(true, 0, 3600, 0).unwrap()), + processes: Some(Processes::PerSubmission(10)), + walltime: Some(Walltime::PerDirectory( + Duration::new(true, 0, 3600, 0).unwrap(), + )), threads_per_process: Some(4), gpus_per_process: Some(2), }; diff --git a/src/workspace.rs b/src/workspace.rs index 313a5f9..c1068e9 100644 --- a/src/workspace.rs +++ b/src/workspace.rs @@ -110,8 +110,8 @@ pub fn find_completed_directories( let mut action_products: Vec<(String, Vec)> = Vec::new(); for action in &workflow.action { - if !action.products.is_empty() { - action_products.push((action.name.clone(), action.products.clone())); + if !action.products().is_empty() { + action_products.push((action.name().into(), action.products().into())); } }