Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New directives syntax #819

Draft
wants to merge 34 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
9e220e0
refactor: Create partition and partition config classes
b-butler Nov 1, 2023
48d3c03
fix: argument ordering of _Partition
b-butler Nov 2, 2023
a18bf6a
refactor: Switch shared_partition for node_types
b-butler Nov 2, 2023
45f47f8
refactor: Raise more template errors in Python
b-butler Nov 2, 2023
ba63e0f
doc/refactor: Document new classes
b-butler Nov 2, 2023
14f4076
test: add force=True to submit (handle ngpu directives)
b-butler Nov 2, 2023
c3ccf3d
test: Test new errors in _Partition
b-butler Nov 6, 2023
aaa0e7f
refactor (WIP): Switch over directives to new structure
b-butler Nov 7, 2023
5be1991
refactor: Split _JobOperation into _JobOperation and _RunOperation
b-butler Nov 8, 2023
20bf55a
test: Remove fork directive tests.
b-butler Nov 8, 2023
92a6ec6
fix: Errors in run operations from no _RunOperation.id
b-butler Nov 9, 2023
0244e00
test: Fix test of run_options
b-butler Nov 9, 2023
2782aaf
feat: Make _Directives.evaluate returns self.
b-butler Nov 9, 2023
fed4c6e
Merge branch 'main' into refactor/environment-config
b-butler Nov 9, 2023
2c9ba3a
Merge branch 'refactor/environment-config' into refactor/new-directives
b-butler Nov 9, 2023
dcd9250
Merge branch 'main' into refactor/new-directives
b-butler Nov 16, 2023
e40dc2b
feat (WIP): "Working" submission logic for slurm.
b-butler Nov 16, 2023
7ad0419
Merge branch 'main' into refactor/new-directives
b-butler Jan 18, 2024
ad56f23
Merge branch 'main' into refactor/new-directives
b-butler Feb 16, 2024
5f3a845
feat: Add None tolerance max, sum, argmax functions
b-butler Feb 29, 2024
36b303e
refactor: _Directives.evaluate now computes total cpus, gpus, and memory
b-butler Feb 29, 2024
76f300c
refactor: Use total cpu, gpu, and memory internal directives
b-butler Feb 29, 2024
2123989
refactor: MPI/cmd prefix logic
b-butler Feb 29, 2024
ca805eb
refactor: Remove Summit's calc_num_nodes
b-butler Feb 29, 2024
4701573
refactor: Update template filters
b-butler Feb 29, 2024
3b03c5a
refactor: Attempt to update drexel configuration
b-butler Feb 29, 2024
ea51b43
refactor: Update remaining templates to new directives
b-butler Feb 29, 2024
1156b4f
fix: some documention and miscellaneous code
b-butler Feb 29, 2024
1aec7f5
test: Update tests to new directives
b-butler Feb 29, 2024
b523c7a
feat: Allow mixed MPI/nonMPI operation submission.
b-butler Feb 29, 2024
70b2778
Merge branch 'main' into refactor/new-directives
b-butler Mar 15, 2024
8f007b5
(WIP)fix: bugs surrounding directives processing
b-butler Mar 15, 2024
c86c4d4
test: Correct template test project launcher values
b-butler Mar 15, 2024
c17fd6f
test: Update template tests configuration generation
b-butler Mar 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
407 changes: 208 additions & 199 deletions flow/directives.py

Large diffs are not rendered by default.

116 changes: 41 additions & 75 deletions flow/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@
from functools import lru_cache

from .directives import (
_FORK,
_GET_EXECUTABLE,
_MEMORY,
_NGPU,
_NP,
_NRANKS,
_OMP_NUM_THREADS,
_PROCESSOR_FRACTION,
_GPUS_PER_PROCESS,
_LAUNCHER,
_MEMORY_PER_CPU,
_PROCESSES,
_THREADS_PER_PROCESS,
_WALLTIME,
_bundle_directives_aggregation,
_Directives,
)
from .errors import NoSchedulerError, SubmitError
Expand All @@ -36,7 +35,7 @@
from .scheduling.simple_scheduler import SimpleScheduler
from .scheduling.slurm import SlurmScheduler
from .util.misc import _deprecated_warning
from .util.template_filters import calc_num_nodes, calc_tasks
from .util.template_filters import calc_num_nodes

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -115,14 +114,12 @@ class _PartitionConfig:
- GPUs for a partition
- Node type of a partition

When querying a value for a specific partition, the logic first searches the
provided mapping, if any, for the partition. If it is not found, then the
mapping is searched for "default" if it exists. If not, the class default is
used. The list below shows the search order hierarchically.
When querying a value for a specific partition the logic first searches the
provided mapping if any for the partition, if it is not found then the
mapping is searched for "default" if it exists, if not the class default is
used.

1. Partition specific
2. Provided default
3. _PartitionConfig default
1. Partition specific -> 2. Provided default -> 3. _PartitionConfig default

The class defaults are

Expand Down Expand Up @@ -360,12 +357,12 @@ def add_args(cls, parser):
pass

@classmethod
def _get_omp_prefix(cls, operation):
"""Get the OpenMP prefix based on the ``omp_num_threads`` directive.
def _get_omp_prefix(cls, directives):
"""Get the OpenMP prefix based on the ``threads_per_process`` directive.

Parameters
----------
operation : :class:`flow.project._JobOperation`
directives : dict[str, any]
The operation to be prefixed.

Returns
Expand All @@ -374,51 +371,39 @@ def _get_omp_prefix(cls, operation):
The prefix to be added to the operation's command.

"""
return "export OMP_NUM_THREADS={}; ".format(
operation.directives["omp_num_threads"]
)
return "export OMP_NUM_THREADS={}; ".format(directives["threads_per_process"])

@classmethod
def _get_mpi_prefix(cls, operation, parallel):
def _get_mpi_prefix(cls, directives):
"""Get the MPI prefix based on the ``nranks`` directives.

Parameters
----------
operation : :class:`flow.project._JobOperation`
directives : dict[str, any]
The operation to be prefixed.
parallel : bool
If True, operations are assumed to be executed in parallel, which
means that the number of total tasks is the sum of all tasks
instead of the maximum number of tasks. Default is set to False.

Returns
-------
str
The prefix to be added to the operation's command.

"""
if operation.directives.get("nranks"):
return "{} -n {} ".format(cls.mpi_cmd, operation.directives["nranks"])
return ""
processes = directives.get("processes", 0)
if processes == 0:
return ""
base_str = f"{cls.mpi_cmd} --ntasks={processes}"
base_str += f" --cpus-per-task={directives['threads_per_process']}"
base_str += f" --gpus-per-task={directives['gpus_per_process']}"
return base_str

@template_filter
def get_prefix(cls, operation, parallel=False, mpi_prefix=None, cmd_prefix=None):
def get_prefix(cls, directives):
"""Template filter generating a command prefix from directives.

Parameters
----------
operation : :class:`flow.project._JobOperation`
The operation to be prefixed.
parallel : bool
If True, operations are assumed to be executed in parallel, which means
that the number of total tasks is the sum of all tasks instead of the
maximum number of tasks. Default is set to False.
mpi_prefix : str
User defined mpi_prefix string. Default is set to None.
This will be deprecated and removed in the future.
cmd_prefix : str
User defined cmd_prefix string. Default is set to None.
This will be deprecated and removed in the future.

Returns
-------
Expand All @@ -427,30 +412,21 @@ def get_prefix(cls, operation, parallel=False, mpi_prefix=None, cmd_prefix=None)

"""
prefix = ""
if operation.directives.get("omp_num_threads"):
prefix += cls._get_omp_prefix(operation)
if mpi_prefix:
prefix += mpi_prefix
else:
prefix += cls._get_mpi_prefix(operation, parallel)
if cmd_prefix:
prefix += cmd_prefix
# if cmd_prefix and if mpi_prefix for backwards compatibility
# Can change to get them from directives for future
if directives.get("threads_per_process"):
prefix += cls._get_omp_prefix(directives)
prefix += cls._get_mpi_prefix(directives)
return prefix

@classmethod
def _get_default_directives(cls):
return _Directives(
(
_GET_EXECUTABLE(),
_FORK,
_MEMORY,
_NGPU,
_NP,
_NRANKS,
_OMP_NUM_THREADS,
_PROCESSOR_FRACTION,
_MEMORY_PER_CPU,
_GPUS_PER_PROCESS,
_PROCESSES,
_THREADS_PER_PROCESS,
_LAUNCHER,
_WALLTIME,
)
)
Expand All @@ -465,28 +441,18 @@ def _get_scheduler_values(cls, context):
"""
partition = cls._partition_config[context.get("partition", None)]
force = context.get("force", False)
cpu_tasks_total = calc_tasks(
context["operations"],
"np",
directives = _bundle_directives_aggregation(
[op.primary_directives for op in context["operations"]],
context.get("parallel", False),
context.get("force", False),
)
gpu_tasks_total = calc_tasks(
context["operations"],
"ngpu",
context.get("parallel", False),
context.get("force", False),
)

num_nodes = partition.calculate_num_nodes(
cpu_tasks_total = directives["processes"] * directives["threads_per_process"]
gpu_tasks_total = directives["processes"] * directives["gpus_per_process"]

directives["num_nodes"] = partition.calculate_num_nodes(
cpu_tasks_total, gpu_tasks_total, force
)

return {
"ncpu_tasks": cpu_tasks_total,
"ngpu_tasks": gpu_tasks_total,
"num_nodes": num_nodes,
}
return directives


class StandardEnvironment(ComputeEnvironment):
Expand Down
6 changes: 5 additions & 1 deletion flow/environments/drexel.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# All rights reserved.
# This software is licensed under the BSD 3-Clause License.
"""Drexel University HPC Environments."""
from ..environment import DefaultSlurmEnvironment
from ..environment import DefaultSlurmEnvironment, _PartitionConfig


class PicotteEnvironment(DefaultSlurmEnvironment):
Expand All @@ -15,6 +15,10 @@ class PicotteEnvironment(DefaultSlurmEnvironment):
hostname_pattern = r".*\.cm\.cluster$"
template = "drexel-picotte.sh"

_partition_config = _PartitionConfig(
cpus_per_node={"default": 48}, gpus_per_node={"gpu": 4}
)

@classmethod
def add_args(cls, parser):
"""Add arguments to parser.
Expand Down
Loading
Loading