diff --git a/flow/directives.py b/flow/directives.py index bdf5231e3..4517b0b7a 100644 --- a/flow/directives.py +++ b/flow/directives.py @@ -8,24 +8,20 @@ """ import datetime import functools -import operator import sys +import warnings from collections.abc import MutableMapping -from flow.errors import DirectivesError +import flow.util.misc +from flow.errors import DirectivesError, SubmitError class _Directive: """The definition of a single directive. - Logic for validation of values when setting, defaults, and the ability - for directives to inspect other directives (such as using ``nranks`` and - ``omp_num_threads`` for computing ``np``). This is only meant to work + Logic for validation when setting and providing defaults. This is only meant to work with the internals of signac-flow. - The validation of a directive occurs before the call to ``finalize``. It is - the caller's responsibility to ensure that finalized values are still valid. - Since directive values can be dependent on jobs we allow all directives to be set to a callable which is lazily validated. @@ -40,23 +36,6 @@ class _Directive: callable directly returns the passed value. Defaults to ``None``. default : any, optional Sets the default for the directive, defaults to ``None``. - serial : callable, optional - A callable that takes two inputs for the directive and returns the - appropriate value for these operations running in serial. If ``None`` or - not provided, the ``max`` function is used. Defaults to ``None``. - parallel : callable, optional - A callable that takes two inputs for the directive and returns the - appropriate value for these operations running in parallel. If ``None`` - or not provided, the ``operator.add`` function is used. Defaults to - ``None``. Defaults to ``None``. - finalize : callable, optional - A callable that takes the set value of the directive and the - :class:`~._Directives` object it is a child of and outputs the finalized - value for that directive. This is useful if some directives have - multiple ways to be set or are dependent in some way on other - directives. If ``None`` or not provided, the set value is returned. - Defaults to ``None``. - """ def __init__( @@ -65,23 +44,14 @@ def __init__( *, validator=None, default=None, - serial=max, - parallel=operator.add, - finalize=None, ): self._name = name self._default = default - self._serial = serial - self._parallel = parallel def identity(value): return value - def default_finalize(value, directives): - return value - self._validator = identity if validator is None else validator - self._finalize = default_finalize if finalize is None else finalize def __call__(self, value): """Return a validated value for the given directive. @@ -157,7 +127,7 @@ def _set_defined_directive(self, key, value): def __getitem__(self, key): if key in self._defined_directives and key in self._directive_definitions: value = self._defined_directives[key] - return self._directive_definitions[key]._finalize(value, self) + return value if key in self._user_directives: return self._user_directives[key] raise KeyError(f"{key} not in directives.") @@ -188,34 +158,15 @@ def __str__(self): def __repr__(self): return f"_Directives({str(self)})" - def update(self, other, aggregate=False, jobs=None, parallel=False): - """Update directives with another set of directives. - - This method accounts for serial/parallel behavior and aggregation. - - Parameters - ---------- - other : :class:`~._Directives` - The other set of directives. - aggregate : bool - Whether to combine directives according to serial/parallel rules. - jobs : :class:`signac.job.Job` or tuple of :class:`signac.job.Job` - The jobs used to evaluate directives. - parallel : bool - Whether to aggregate according to parallel rules. - - """ - if aggregate: - self._aggregate(other, jobs=jobs, parallel=parallel) - else: - super().update(other) - def evaluate(self, jobs): """Evaluate directives for the provided jobs. This method updates the directives in place, replacing callable directives with their evaluated values. + The method also provides common intermediate quantities for determining + resource submission (cpus, gpus, and total memory). + Parameters ---------- jobs : :class:`signac.job.Job` or tuple of :class:`signac.job.Job` @@ -227,17 +178,14 @@ def evaluate(self, jobs): self[key] = _evaluate(value, jobs) self._evaluated = True - def _aggregate(self, other, jobs=None, parallel=False): - self.evaluate(jobs) - other.evaluate(jobs) - agg_func_attr = "_parallel" if parallel else "_serial" - for name in self._defined_directives: - agg_func = getattr(self._directive_definitions[name], agg_func_attr) - default_value = self._directive_definitions[name]._default - other_directive = other.get(name, default_value) - directive = self[name] - if other_directive is not None: - self._defined_directives[name] = agg_func(directive, other_directive) + directives = dict(self) + directives["cpus"] = directives["processes"] * directives["threads_per_process"] + directives["gpus"] = directives["processes"] * directives["gpus_per_process"] + if (memory := directives["memory_per_cpu"]) is not None: + directives["memory"] = directives["cpus"] * memory + else: + directives["memory"] = None + return directives @property def user_keys(self): # noqa: D401 @@ -255,6 +203,165 @@ def _evaluate(value, jobs): return value +def _list_of_dicts_to_dict_of_list(a): + """Help convert from a directives list to dict with list items.""" + if len(a) == 0: + return {} + # This None could be problematic, but we use if for directives that will + # always exist. + return {k: [m.get(k, None) for m in a] for k in a[0]} + + +def _check_compatible_directives(directives_of_lists): + """Routine checks for directives within a group.""" + mpi_directives = [ + i + for i, launcher in enumerate(directives_of_lists["launcher"]) + if launcher == "mpi" + ] + if len(mpi_directives) > 0: + base_directives = { + "processes": directives_of_lists["processes"][mpi_directives[0]], + "gpus_per_process": directives_of_lists["gpus_per_process"][ + mpi_directives[0] + ], + "threads_per_process": directives_of_lists["threads_per_process"][ + mpi_directives[0] + ], + } + if len(mpi_directives) > 1 and any( + directives_of_lists["processes"][i] != base_directives["processes"] + or directives_of_lists["gpus_per_process"][i] + != base_directives["gpus_per_process"] + or directives_of_lists["threads_per_process"][i] + != base_directives["threads_per_process"] + for i in mpi_directives[1:] + ): + raise SubmitError("Cannot submit non-homogeneous MPI jobs.") + if len(mpi_directives) != len(directives_of_lists["processes"]): + for i in range(len(directives_of_lists["processes"])): + if i in mpi_directives: + continue + if ( + directives_of_lists["cpus"] + <= base_directives["threads_per_process"] + ): + raise SubmitError( + "Cannot submit nonMPI job that requires mores cores than " + "threads per MPI task." + ) + if directives_of_lists["gpus"] <= base_directives["gpus_per_process"]: + raise SubmitError( + "Cannot submit nonMPI job that requires mores GPUs than " + "GPUs per MPI task." + ) + else: + if len(set(directives_of_lists["gpus"])) > 1: + warnings.warn( + "Operations with varying numbers of GPUs are being submitted together.", + RuntimeWarning, + ) + if len(set(directives_of_lists["cpus"])) > 1: + warnings.warn( + "Operations with varying numbers of CPUs are being submitted together.", + RuntimeWarning, + ) + + +def _group_directive_aggregation(group_directives): + directives = _list_of_dicts_to_dict_of_list(group_directives) + _check_compatible_directives(directives) + # Each group will have a primary operation (the one that requests the most + # resources. This may or may not be unique. We have to pick on for purposes + # of scheduling though to properly request resources. + if "mpi" in directives["launcher"]: + # All MPI operations must be homogeneous can pick any one and any non-MPI ones are subsets + # that should work correctly. + primary_directive = group_directives[0] + else: + primary_operation_index = flow.util.misc._tolerant_argmax(directives["cpus"]) + primary_directive = group_directives[primary_operation_index] + primary_directive["gpus"] = max(directives["gpus"]) + primary_directive["cpus"] = max(directives["cpus"]) + primary_directive["memory"] = flow.util.misc._tolerant_max(directives["memory"]) + + primary_directive["walltime"] = flow.util.misc._tolerant_sum( + directives["walltime"], start=datetime.timedelta() + ) + return primary_directive + + +def _check_bundle_directives(directives_of_lists, parallel): + if "mpi" in directives_of_lists["launcher"] and parallel: + raise SubmitError("Cannot run MPI operations in parallel.") + _check_compatible_directives(directives_of_lists) + + +def _bundle_directives_aggregation(list_of_directives, parallel): + directives_of_lists = _list_of_dicts_to_dict_of_list(list_of_directives) + _check_bundle_directives(directives_of_lists, parallel) + # We know we don't have MPI operations here. + if parallel: + cpus = sum(directives_of_lists["cpus"]) + gpus = sum(directives_of_lists["gpus"]) + memory = flow.util.misc._tolerant_sum(directives_of_lists["memory"]) + memory_per_cpu = None if memory is None else memory / cpus + return { + "launcher": None, + "walltime": flow.util.misc._tolerant_max(directives_of_lists["walltime"]), + "processes": 1, + "threads_per_process": cpus, + "gpus_per_process": gpus, + "memory_per_cpu": memory_per_cpu, + "cpus": cpus, + "gpus": gpus, + "memory": memory, + } + walltime = flow.util.misc._tolerant_sum( + directives_of_lists["walltime"], start=datetime.timedelta() + ) + if "mpi" in directives_of_lists["launcher"]: + # All MPI operations must be homogeneous can pick any one and any non-MPI ones are subsets + # that should work correctly. + primary_operation = list_of_directives[ + directives_of_lists["launcher"].index("mpi") + ] + cpus = primary_operation["processes"] * primary_operation["threads_per_process"] + memory = flow.util.misc._tolerant_max(directives_of_lists["memory"]) + if memory is None: + memory_per_cpu = None + else: + memory_per_cpu = memory / cpus + return { + "launcher": primary_operation["launcher"], + "walltime": walltime, + "processes": primary_operation["processes"], + "threads_per_process": primary_operation["threads_per_process"], + "gpus_per_process": primary_operation["gpus_per_process"], + "memory_per_cpu": memory_per_cpu, + "cpus": cpus, + "gpus": primary_operation["processes"] + * primary_operation["gpus_per_process"], + "memory": memory, + } + # Serial non-MPI + cpus = max(directives_of_lists["cpus"]) + total_memory = flow.util.misc._tolerant_max(directives_of_lists["memory"]) + memory_per_cpu = None if total_memory is None else total_memory / cpus + gpus = max(directives_of_lists["gpus"]) + return { + "launcher": None, + "walltime": walltime, + "processes": 1, + "threads_per_process": cpus, + "gpus_per_process": gpus, + "memory_per_cpu": memory_per_cpu, + "cpus": cpus, + "gpus": gpus, + "memory": total_memory, + } + + class _OnlyTypes: def __init__(self, *types, preprocess=None, postprocess=None): def identity(value): @@ -297,36 +404,6 @@ def is_greater_or_equal(value): return is_greater_or_equal -_NP_DEFAULT = 1 - - -def _finalize_np(np, directives): - """Return the actual number of processes/threads to use. - - We check the default np because when aggregation occurs we multiply the - number of MPI ranks and OMP_NUM_THREADS. If we always took the greater of - the given NP and ranks * threads then after aggregating we will inflate the - number of processors needed as (r1 * t1) + (r2 * t2) <= (r1 + r2) * (t1 + t2) - for numbers greater than one. - """ - if callable(np) or np != _NP_DEFAULT: - return np - nranks = directives.get("nranks", 1) - omp_num_threads = directives.get("omp_num_threads", 1) - if callable(nranks) or callable(omp_num_threads): - return np - return max(np, max(1, nranks) * max(1, omp_num_threads)) - - -# Helper validators for defining _Directive -def _no_aggregation(value, other): - """Return the first argument. - - This is used for directives that ignore aggregation rules. - """ - return value - - def _is_fraction(value): if 0 <= value <= 1: return value @@ -354,9 +431,9 @@ def _parse_walltime(walltime): """ if walltime is None: return None - if not isinstance(walltime, datetime.timedelta): - walltime = datetime.timedelta(hours=walltime) - return walltime + if isinstance(walltime, datetime.timedelta): + return walltime + return datetime.timedelta(hours=walltime) def _parse_memory(memory): @@ -401,44 +478,10 @@ def _parse_memory(memory): ) -def _max_not_none(value, other): - """Return the max of two values, with special handling of None. - - This is used for memory directives in serial and walltime directives in - parallel. - """ - if value is None and other is None: - return None - elif other is None: - return value - elif value is None: - return other - else: - return max(value, other) - - -def _sum_not_none(value, other): - """Return the sum of two values, with special handling of None. - - This is used for memory directives in parallel and walltime directives in - serial. - """ - if value is None and other is None: - return None - elif other is None: - return value - elif value is None: - return other - else: - return operator.add(value, other) - - # Definitions used for validating directives -_bool = _OnlyTypes(bool) _natural_number = _OnlyTypes(int, postprocess=_raise_below(1)) _nonnegative_int = _OnlyTypes(int, postprocess=_raise_below(0)) _positive_real_walltime = _OnlyTypes( - float, datetime.timedelta, type(None), preprocess=_parse_walltime, @@ -461,10 +504,8 @@ def _GET_EXECUTABLE(): # This is because we mock `sys.executable` while generating template reference data. _EXECUTABLE = _Directive( "executable", - validator=_OnlyTypes(str), + validator=_OnlyTypes(str, type(None)), default=sys.executable, - serial=_no_aggregation, - parallel=_no_aggregation, ) _EXECUTABLE.__doc__ = """Return the path to the executable to be used for an operation. @@ -478,25 +519,27 @@ def _GET_EXECUTABLE(): return _EXECUTABLE -_FORK = _Directive("fork", validator=_bool, default=False) -_FORK.__doc__ = """The fork directive can be set to True to enforce that a -particular operation is always executed within a subprocess and not within the -Python interpreter's process even if there are no other reasons that would prevent that. +_LAUNCHER = _Directive("launcher", validator=_OnlyTypes(str, type(None)), default=None) +_LAUNCHER.__doc__ = """The launcher to use to execute this operation. + +A launcher is defined as a separate program used to launch an application. +Primarily this is designed to specify whether or not MPI should be used to +launch the operation. Set to "mpi" for this case. Defaults to ``None``. -.. note:: +For example: + +.. code-block:: python - Setting ``fork=False`` will not prevent forking if there are other reasons for forking, - such as a timeout. + @Project.operation(directives={"launcher": "mpi"}) + def op(job): + pass """ -_MEMORY = _Directive( - "memory", - validator=_positive_real_memory, - default=None, - serial=_max_not_none, - parallel=_sum_not_none, + +_MEMORY_PER_CPU = _Directive( + "memory_per_cpu", validator=_positive_real_memory, default=None ) -_MEMORY.__doc__ = """The memory to request for this operation. +_MEMORY_PER_CPU.__doc__ = """The memory to request per CPU for this operation. The memory to validate should be either a float, int, or string. A valid memory argument is defined as: @@ -536,36 +579,24 @@ def op2(job): pass """ -_NGPU = _Directive("ngpu", validator=_nonnegative_int, default=0) -_NGPU.__doc__ = """The number of GPUs to use for this operation. - -Expects a nonnegative integer. Defaults to 0. -""" - -_NP = _Directive( - "np", validator=_natural_number, default=_NP_DEFAULT, finalize=_finalize_np +_GPUS_PER_PROCESS = _Directive( + "gpus_per_process", validator=_nonnegative_int, default=0 ) -_NP.__doc__ = """The total number of CPU cores to request for a given operation. - -Expects a natural number (i.e. an integer >= 1). This directive introspects into -the "nranks" or "omp_num_threads" directives and uses their product if it is -greater than the current set value. Defaults to 1. +_GPUS_PER_PROCESS.__doc__ = """The number of GPUs to use per process. -Warning: - Generally for multicore applications, either this if not using MPI, or "nranks" and - "omp_num_threads" should be specified but not both. +Expects a nonnegative integer. Defaults to 0. """ -_NRANKS = _Directive("nranks", validator=_nonnegative_int, default=0) -_NRANKS.__doc__ = """The number of MPI ranks to use for this operation. Defaults to 0. +_PROCESSES = _Directive("processes", validator=_natural_number, default=1) +_PROCESSES.__doc__ = """The number of processes the operation plans on using. -Expects a nonnegative integer. +Expects a natural number (i.e. an integer >= 1). Defualts to 1. """ -_OMP_NUM_THREADS = _Directive("omp_num_threads", validator=_nonnegative_int, default=0) -_OMP_NUM_THREADS.__doc__ = """The number of OpenMP threads to use for this operation. Defaults to 0. - -When used in conjunction with "nranks" this specifies the OpenMP threads per rank. +_THREADS_PER_PROCESS = _Directive( + "threads_per_process", validator=_nonnegative_int, default=1 +) +_THREADS_PER_PROCESS.__doc__ = """The number of threads to use per process. Defaults to 0. Using this directive sets the environmental variable ``OMP_NUM_THREADS`` in the operation's execution environment. @@ -573,30 +604,7 @@ def op2(job): Expects a nonnegative integer. """ -_PROCESSOR_FRACTION = _Directive( - "processor_fraction", - validator=_OnlyTypes(float, postprocess=_is_fraction), - default=1.0, - serial=_no_aggregation, - parallel=_no_aggregation, -) -_PROCESSOR_FRACTION.__doc__ = """Fraction of a resource to use on a single operation. - -If set to 0.5 for a bundled job with 20 operations (all with 'np' set to 1), 10 -CPUs will be used. Defaults to 1. - -.. note:: - - This can be particularly useful on Stampede2's launcher. -""" - -_WALLTIME = _Directive( - "walltime", - validator=_positive_real_walltime, - default=None, - serial=_sum_not_none, - parallel=_max_not_none, -) +_WALLTIME = _Directive("walltime", validator=_positive_real_walltime, default=None) _WALLTIME.__doc__ = """The number of hours to request for executing this job. This directive expects a float representing the walltime in hours. Fractional diff --git a/flow/environment.py b/flow/environment.py index 79a04a383..f787939a4 100644 --- a/flow/environment.py +++ b/flow/environment.py @@ -17,15 +17,14 @@ from functools import lru_cache from .directives import ( - _FORK, _GET_EXECUTABLE, - _MEMORY, - _NGPU, - _NP, - _NRANKS, - _OMP_NUM_THREADS, - _PROCESSOR_FRACTION, + _GPUS_PER_PROCESS, + _LAUNCHER, + _MEMORY_PER_CPU, + _PROCESSES, + _THREADS_PER_PROCESS, _WALLTIME, + _bundle_directives_aggregation, _Directives, ) from .errors import NoSchedulerError, SubmitError @@ -36,7 +35,7 @@ from .scheduling.simple_scheduler import SimpleScheduler from .scheduling.slurm import SlurmScheduler from .util.misc import _deprecated_warning -from .util.template_filters import calc_num_nodes, calc_tasks +from .util.template_filters import calc_num_nodes logger = logging.getLogger(__name__) @@ -115,14 +114,12 @@ class _PartitionConfig: - GPUs for a partition - Node type of a partition - When querying a value for a specific partition, the logic first searches the - provided mapping, if any, for the partition. If it is not found, then the - mapping is searched for "default" if it exists. If not, the class default is - used. The list below shows the search order hierarchically. + When querying a value for a specific partition the logic first searches the + provided mapping if any for the partition, if it is not found then the + mapping is searched for "default" if it exists, if not the class default is + used. - 1. Partition specific - 2. Provided default - 3. _PartitionConfig default + 1. Partition specific -> 2. Provided default -> 3. _PartitionConfig default The class defaults are @@ -360,12 +357,12 @@ def add_args(cls, parser): pass @classmethod - def _get_omp_prefix(cls, operation): - """Get the OpenMP prefix based on the ``omp_num_threads`` directive. + def _get_omp_prefix(cls, directives): + """Get the OpenMP prefix based on the ``threads_per_process`` directive. Parameters ---------- - operation : :class:`flow.project._JobOperation` + directives : dict[str, any] The operation to be prefixed. Returns @@ -374,22 +371,16 @@ def _get_omp_prefix(cls, operation): The prefix to be added to the operation's command. """ - return "export OMP_NUM_THREADS={}; ".format( - operation.directives["omp_num_threads"] - ) + return "export OMP_NUM_THREADS={}; ".format(directives["threads_per_process"]) @classmethod - def _get_mpi_prefix(cls, operation, parallel): + def _get_mpi_prefix(cls, directives): """Get the MPI prefix based on the ``nranks`` directives. Parameters ---------- - operation : :class:`flow.project._JobOperation` + directives : dict[str, any] The operation to be prefixed. - parallel : bool - If True, operations are assumed to be executed in parallel, which - means that the number of total tasks is the sum of all tasks - instead of the maximum number of tasks. Default is set to False. Returns ------- @@ -397,28 +388,22 @@ def _get_mpi_prefix(cls, operation, parallel): The prefix to be added to the operation's command. """ - if operation.directives.get("nranks"): - return "{} -n {} ".format(cls.mpi_cmd, operation.directives["nranks"]) - return "" + processes = directives.get("processes", 0) + if processes == 0: + return "" + base_str = f"{cls.mpi_cmd} --ntasks={processes}" + base_str += f" --cpus-per-task={directives['threads_per_process']}" + base_str += f" --gpus-per-task={directives['gpus_per_process']}" + return base_str @template_filter - def get_prefix(cls, operation, parallel=False, mpi_prefix=None, cmd_prefix=None): + def get_prefix(cls, directives): """Template filter generating a command prefix from directives. Parameters ---------- operation : :class:`flow.project._JobOperation` The operation to be prefixed. - parallel : bool - If True, operations are assumed to be executed in parallel, which means - that the number of total tasks is the sum of all tasks instead of the - maximum number of tasks. Default is set to False. - mpi_prefix : str - User defined mpi_prefix string. Default is set to None. - This will be deprecated and removed in the future. - cmd_prefix : str - User defined cmd_prefix string. Default is set to None. - This will be deprecated and removed in the future. Returns ------- @@ -427,16 +412,9 @@ def get_prefix(cls, operation, parallel=False, mpi_prefix=None, cmd_prefix=None) """ prefix = "" - if operation.directives.get("omp_num_threads"): - prefix += cls._get_omp_prefix(operation) - if mpi_prefix: - prefix += mpi_prefix - else: - prefix += cls._get_mpi_prefix(operation, parallel) - if cmd_prefix: - prefix += cmd_prefix - # if cmd_prefix and if mpi_prefix for backwards compatibility - # Can change to get them from directives for future + if directives.get("threads_per_process"): + prefix += cls._get_omp_prefix(directives) + prefix += cls._get_mpi_prefix(directives) return prefix @classmethod @@ -444,13 +422,11 @@ def _get_default_directives(cls): return _Directives( ( _GET_EXECUTABLE(), - _FORK, - _MEMORY, - _NGPU, - _NP, - _NRANKS, - _OMP_NUM_THREADS, - _PROCESSOR_FRACTION, + _MEMORY_PER_CPU, + _GPUS_PER_PROCESS, + _PROCESSES, + _THREADS_PER_PROCESS, + _LAUNCHER, _WALLTIME, ) ) @@ -465,28 +441,18 @@ def _get_scheduler_values(cls, context): """ partition = cls._partition_config[context.get("partition", None)] force = context.get("force", False) - cpu_tasks_total = calc_tasks( - context["operations"], - "np", + directives = _bundle_directives_aggregation( + [op.primary_directives for op in context["operations"]], context.get("parallel", False), - context.get("force", False), - ) - gpu_tasks_total = calc_tasks( - context["operations"], - "ngpu", - context.get("parallel", False), - context.get("force", False), ) - num_nodes = partition.calculate_num_nodes( + cpu_tasks_total = directives["processes"] * directives["threads_per_process"] + gpu_tasks_total = directives["processes"] * directives["gpus_per_process"] + + directives["num_nodes"] = partition.calculate_num_nodes( cpu_tasks_total, gpu_tasks_total, force ) - - return { - "ncpu_tasks": cpu_tasks_total, - "ngpu_tasks": gpu_tasks_total, - "num_nodes": num_nodes, - } + return directives class StandardEnvironment(ComputeEnvironment): diff --git a/flow/environments/drexel.py b/flow/environments/drexel.py index 893364485..5dd485d01 100644 --- a/flow/environments/drexel.py +++ b/flow/environments/drexel.py @@ -2,7 +2,7 @@ # All rights reserved. # This software is licensed under the BSD 3-Clause License. """Drexel University HPC Environments.""" -from ..environment import DefaultSlurmEnvironment +from ..environment import DefaultSlurmEnvironment, _PartitionConfig class PicotteEnvironment(DefaultSlurmEnvironment): @@ -15,6 +15,10 @@ class PicotteEnvironment(DefaultSlurmEnvironment): hostname_pattern = r".*\.cm\.cluster$" template = "drexel-picotte.sh" + _partition_config = _PartitionConfig( + cpus_per_node={"default": 48}, gpus_per_node={"gpu": 4} + ) + @classmethod def add_args(cls, parser): """Add arguments to parser. diff --git a/flow/environments/incite.py b/flow/environments/incite.py index b89e656ea..ab5fcce2a 100644 --- a/flow/environments/incite.py +++ b/flow/environments/incite.py @@ -5,8 +5,6 @@ http://www.doeleadershipcomputing.org/ """ -from math import ceil, gcd - from ..environment import ( DefaultLSFEnvironment, DefaultSlurmEnvironment, @@ -14,7 +12,6 @@ _PartitionConfig, template_filter, ) -from ..util.template_filters import check_utilization class SummitEnvironment(DefaultLSFEnvironment): @@ -23,9 +20,9 @@ class SummitEnvironment(DefaultLSFEnvironment): Example:: @Project.operation(directives={ - "nranks": 3, # 3 MPI ranks per operation - "ngpu": 3, # 3 GPUs - "np": 3, # 3 CPU cores + "launcher": "mpi", # use MPI + "n_processes": 3, # 3 ranks + "gpus_per_process": 1, # 3 GPUs "rs_tasks": 3, # 3 tasks per resource set "extra_jsrun_args": '--smpiargs="-gpu"', # extra jsrun arguments }) @@ -45,59 +42,13 @@ def my_operation(job): ) @template_filter - def calc_num_nodes(cls, resource_sets, parallel=False): - """Compute the number of nodes needed. - - Parameters - ---------- - resource_sets : iterable of tuples - Resource sets for each operation, as a sequence of tuples of - *(Number of resource sets, tasks (MPI Ranks) per resource set, - physical cores (CPUs) per resource set, GPUs per resource set)*. - parallel : bool - Whether operations should run in parallel or serial. (Default value - = False) - - Returns - ------- - int - Number of nodes needed. - - """ - nodes_used_final = 0 - cores_used = gpus_used = nodes_used = 0 - for nsets, tasks, cpus_per_task, gpus in resource_sets: - if not parallel: - # In serial mode we reset for every operation. - cores_used = gpus_used = nodes_used = 0 - for _ in range(nsets): - cores_used += tasks * cpus_per_task - gpus_used += gpus - while cores_used > cls.cores_per_node or gpus_used > cls.gpus_per_node: - nodes_used += 1 - cores_used = max(0, cores_used - cls.cores_per_node) - gpus_used = max(0, gpus_used - cls.gpus_per_node) - if not parallel: - # Note that when running in serial the "leftovers" must be - # accounted for on a per-operation basis. - if cores_used > 0 or gpus_used > 0: - nodes_used += 1 - nodes_used_final = max(nodes_used, nodes_used_final) - if parallel: - if cores_used > 0 or gpus_used > 0: - nodes_used += 1 - nodes_used_final = nodes_used - return nodes_used_final - - @template_filter - def guess_resource_sets(cls, operation): + def guess_resource_sets(cls, directives): """Determine the resources sets needed for an operation. Parameters ---------- - operation : :class:`flow.BaseFlowOperation` - The operation whose directives will be used to compute the resource - set. + directives : dict + The directives to use to compute the resource set. Returns ------- @@ -111,26 +62,21 @@ def guess_resource_sets(cls, operation): Number of GPUs per resource set. """ - ntasks = max(operation.directives.get("nranks", 1), 1) - np = operation.directives.get("np", ntasks) - - cpus_per_task = max(operation.directives.get("omp_num_threads", 1), 1) + ntasks = directives["processes"] + cpus_per_task = directives["threads_per_process"] # separate OMP threads (per resource sets) from tasks - np //= cpus_per_task - - np_per_task = max(1, np // ntasks) - ngpu = operation.directives.get("ngpu", 0) - g = gcd(ngpu, ntasks) - if ngpu >= ntasks: - nsets = ngpu // (ngpu // g) + gpus_per_process = directives["gpus_per_process"] + ngpus = gpus_per_process * ntasks + if ngpus >= ntasks: + nsets = gpus_per_process // (gpus_per_process // ngpus) else: - nsets = ntasks // (ntasks // g) + nsets = ntasks // (ntasks // ngpus) tasks_per_set = max(ntasks // nsets, 1) - tasks_per_set = max(tasks_per_set, operation.directives.get("rs_tasks", 1)) + tasks_per_set = max(tasks_per_set, directives.get("rs_tasks", 1)) - gpus_per_set = ngpu // nsets - cpus_per_set = tasks_per_set * cpus_per_task * np_per_task + gpus_per_set = gpus_per_process // nsets + cpus_per_set = tasks_per_set * cpus_per_task return nsets, tasks_per_set, cpus_per_set, gpus_per_set @@ -158,17 +104,13 @@ def jsrun_options(cls, resource_set): return f"-n {nsets} -a {tasks} -c {cpus} -g {gpus} {cuda_aware_mpi}" @classmethod - def _get_mpi_prefix(cls, operation, parallel): + def _get_mpi_prefix(cls, operation): """Get the jsrun options based on directives. Parameters ---------- operation : :class:`flow.project._JobOperation` The operation to be prefixed. - parallel : bool - If True, operations are assumed to be executed in parallel, which - means that the number of total tasks is the sum of all tasks - instead of the maximum number of tasks. Default is set to False. Returns ------- @@ -233,44 +175,6 @@ class CrusherEnvironment(DefaultSlurmEnvironment): mpi_cmd = "srun" - @template_filter - def calc_num_nodes(cls, ngpus, ncpus, threshold): - """Compute the number of nodes needed to meet the resource request. - - Also raise an error when the requested resource do not come close to saturating the asked - for nodes. - """ - nodes_gpu = max(1, int(ceil(ngpus / cls.gpus_per_node))) - nodes_cpu = max(1, int(ceil(ncpus / cls.cores_per_node))) - if nodes_gpu >= nodes_cpu: - check_utilization(nodes_gpu, ngpus, cls.gpus_per_node, threshold, "compute") - return nodes_gpu - check_utilization(nodes_cpu, ncpus, cls.cores_per_node, threshold, "compute") - return nodes_cpu - - @classmethod - def _get_mpi_prefix(cls, operation, parallel): - """Get the correct srun command for the job. - - We don't currently support CPU/GPU mapping and expect the program to do this in code. - """ - nranks = operation.directives.get("nranks", 0) - if nranks == 0: - return "" - ngpus = operation.directives["ngpu"] - np = operation.directives.get("np", 1) - omp_num_threads = max(operation.directives.get("omp_num_threads", 1), 1) - mpi_np_calc = nranks * omp_num_threads - if np > 1 and nranks > 1 and np != mpi_np_calc: - raise RuntimeWarning( - f"Using provided value for np={np}, which seems incompatible with MPI directives " - f"{mpi_np_calc}." - ) - base_str = f"{cls.mpi_cmd} --ntasks={nranks}" - threads = max(omp_num_threads, np) if nranks == 1 else max(1, omp_num_threads) - base_str += f" --cpus-per-task={threads} --gpus={ngpus}" - return base_str - class FrontierEnvironment(DefaultSlurmEnvironment): """Environment profile for the Frontier supercomputer. @@ -287,29 +191,6 @@ class FrontierEnvironment(DefaultSlurmEnvironment): ) mpi_cmd = "srun" - @classmethod - def _get_mpi_prefix(cls, operation, parallel): - """Get the correct srun command for the job. - - We don't currently support CPU/GPU mapping and expect the program to do this in code. - """ - nranks = operation.directives.get("nranks", 0) - if nranks == 0: - return "" - ngpus = operation.directives["ngpu"] - np = operation.directives.get("np", 1) - omp_num_threads = max(operation.directives.get("omp_num_threads", 1), 1) - mpi_np_calc = nranks * omp_num_threads - if np > 1 and nranks > 1 and np != mpi_np_calc: - raise RuntimeWarning( - f"Using provided value for np={np}, which seems incompatible with MPI directives " - f"{mpi_np_calc}." - ) - base_str = f"{cls.mpi_cmd} --ntasks={nranks}" - threads = max(omp_num_threads, np) if nranks == 1 else max(1, omp_num_threads) - base_str += f" --cpus-per-task={threads} --gpus={ngpus}" - return base_str - __all__ = [ "SummitEnvironment", diff --git a/flow/project.py b/flow/project.py index 6948f1a59..16a8cc16e 100644 --- a/flow/project.py +++ b/flow/project.py @@ -45,7 +45,7 @@ aggregator, get_aggregate_id, ) -from .directives import _document_directive +from .directives import _document_directive, _group_directive_aggregation from .environment import ComputeEnvironment, get_environment, registered_environments from .errors import ( ConfigKeyError, @@ -278,22 +278,21 @@ def _make_bundles(operations, size=None): class _JobOperation: - """Class containing execution information for one group and one job. + """Class containing execution or submission information for one group and one aggregate. - The execution or submission of a :class:`~.FlowGroup` uses a passed-in command - which can either be a string or function with no arguments that returns a shell - executable command. The shell executable command won't be used if it is - determined that the group can be executed without forking. + The class serves as a helper class to :class:`~._RunOperation` and + :class:`~._SubmissionOperation`. - .. note:: - - This class is used by the :class:`~.FlowGroup` class for the execution and - submission process and should not be instantiated by users themselves. + Note + ---- + This class and subclasses are used by the :class:`~.FlowGroup` class for the execution and + submission process and should not be instantiated by users themselves. Parameters ---------- id : str - The id of this _JobOperation instance. The id should be unique. + Unique id for the execution or submission unit. The id is needed for + execution counting in running and unique scheduler ids in submission. name : str The name of the _JobOperation. jobs : tuple of :class:`~signac.job.Job` @@ -301,15 +300,9 @@ class _JobOperation: cmd : callable or str The command that executes this operation. Can be a callable that when evaluated returns a string. - directives : dict - A `dict` object of additional parameters that provide instructions on - how to execute this operation, e.g., specifically required resources. - user_directives : set - Keys in ``directives`` that correspond to user-specified directives - that are not part of the environment's standard directives. """ - def __init__(self, id, name, jobs, cmd, directives, user_directives): + def __init__(self, id, name, jobs, cmd): self._id = id self.name = name self._jobs = jobs @@ -317,41 +310,18 @@ def __init__(self, id, name, jobs, cmd, directives, user_directives): raise ValueError("cmd must be a callable or string.") self._cmd = cmd - # We use a special dictionary that tracks all keys that have been - # evaluated by the template engine and compare them to those explicitly - # set by the user. See also comment below. - self.directives = _TrackGetItemDict(directives) - - # Keys which were explicitly set by the user, but are not evaluated by - # the template engine are cause for concern and might hint at a bug in - # the template script or ill-defined directives. We are therefore - # keeping track of all keys set by the user and check whether they have - # been evaluated by the template script engine later. - self.directives._keys_set_by_user = user_directives - def __str__(self): aggregate_id = get_aggregate_id(self._jobs) return f"{self.name}({aggregate_id})" def __repr__(self): - return "{type}(name='{name}', jobs='{jobs}', cmd={cmd}, directives={directives})".format( + return "{type}(name='{name}', jobs='{jobs}', cmd={cmd})".format( type=type(self).__name__, name=self.name, jobs="(" + ", ".join(map(repr, self._jobs)) + ")", cmd=repr(self.cmd), - directives=self.directives, ) - def __hash__(self): - return hash(self.id) - - def __eq__(self, other): - return self.id == other.id - - @property - def id(self): - return self._id - @property def cmd(self): if callable(self._cmd): @@ -364,9 +334,62 @@ def cmd(self): return self._cmd() return self._cmd + @property + def id(self): + return self._id + + def __hash__(self): + return hash(self._id) + + def __eq__(self, other): + return self.id == other.id + + +class _RunOperation(_JobOperation): + """Class containing execution information for one operation and one aggregate. + + The execution of a :class:`~.FlowOperation` uses a passed-in command + which can either be a string or function with no arguments that returns a shell + executable command. The shell executable command won't be used if it is + determined that the group can be executed without forking. + + .. note:: + + This class is used by the :class:`~.FlowGroup` class for the execution + process and should not be instantiated by users themselves. + + Parameters + ---------- + id : str + Unique id for the execution unit. + name : str + The name of the _JobOperation. + jobs : tuple of :class:`~signac.job.Job` + The jobs associated with this operation. + cmd : callable or str + The command that executes this operation. Can be a callable that when + evaluated returns a string. + fork : bool + Whether the operation needs to fork to execute correctly. See + :meth:`FlowGroup._fork_op` for logic. + """ + + def __init__(self, id, name, jobs, cmd, fork): + super().__init__(id, name, jobs, cmd) + self.fork = fork + + def __repr__(self): + return "{type}(name='{name}', jobs='{jobs}', cmd={cmd}, fork={fork})".format( + type=type(self).__name__, + name=self.name, + jobs="(" + ", ".join(map(repr, self._jobs)) + ")", + cmd=repr(self.cmd), + fork=self.fork, + ) + class _SubmissionJobOperation(_JobOperation): - r"""Class containing submission information for one group and one job. + r"""Class containing submission information for one group and one aggregate. This class extends :class:`_JobOperation` to include a set of groups that will be executed via the "run" command. These groups are known at @@ -374,8 +397,20 @@ class _SubmissionJobOperation(_JobOperation): Parameters ---------- - \*args - Passed to the constructor of :class:`_JobOperation`. + id : str + Unique id for the submission unit. + name : str + The name of the _JobOperation. + jobs : tuple of :class:`~signac.job.Job` + The jobs associated with this operation. + cmd : callable or str + The command that executes this operation. Can be a callable that when + evaluated returns a string. + primary_directives : dict[str, any] + Directives of the maximal job or directives such that all operations + have their resources met. + directives_list : list[dict[str, any]] + List of directives for each operation in the flow group. eligible_operations : list A list of :class:`_JobOperation` that will be executed when this submitted job is executed. @@ -395,13 +430,20 @@ class _SubmissionJobOperation(_JobOperation): def __init__( self, - *args, + id, + name, + job, + cmd, + primary_directives, + directives_list, eligible_operations=None, operations_with_unmet_preconditions=None, operations_with_met_postconditions=None, **kwargs, ): - super().__init__(*args, **kwargs) + super().__init__(id, name, job, cmd) + self.primary_directives = primary_directives + self.directives_list = directives_list if eligible_operations is None: eligible_operations = [] @@ -742,20 +784,20 @@ class FlowGroup: Examples -------- - In the example below, the directives will be ``{'nranks': 4}`` for op1 and - ``{'nranks': 2, 'executable': 'python3'}`` for op2. + In the example below, the directives will be ``{'processes': 4}`` for op1 and + ``{'processes': 2, 'executable': 'python3'}`` for op2. .. code-block:: python group = FlowProject.make_group(name='example_group') - @group(directives={"nranks": 4}) - @FlowProject.operation({"nranks": 2, "executable": "python3"}) + @group(directives={"processes": 4}) + @FlowProject.operation({"processes": 2, "executable": "python3"}) def op1(job): pass @group - @FlowProject.operation({"nranks": 2, "executable": "python3"}) + @FlowProject.operation({"processes": 2, "executable": "python3"}) def op2(job): pass @@ -840,6 +882,16 @@ def _determine_entrypoint(self, entrypoint, directives, jobs): return "{} {}".format(entrypoint["executable"], entrypoint["path"]).lstrip() def _resolve_directives(self, name, defaults, env): + """Resolve a single operation's directives. + + Search for the operation in ``operation_directives`` first and if not + there use provided default if any. + + Note + ---- + Any unevaluated function directives will remain unevaluated, and must be + called before use. + """ all_directives = env._get_default_directives() if name in self.operation_directives: all_directives.update(self.operation_directives[name]) @@ -856,12 +908,14 @@ def _submit_cmd(self, entrypoint, ignore_conditions, jobs): options += " --ignore-conditions=" + str(ignore_conditions) return " ".join((cmd, options)).strip() - def _run_cmd(self, entrypoint, operation_name, operation, directives, jobs): + def _run_cmd( + self, entrypoint, operation_name, operation, directives, jobs, environment + ): if isinstance(operation, FlowCmdOperation): return operation(*jobs).lstrip() entrypoint = self._determine_entrypoint(entrypoint, directives, jobs) cmd = f"{entrypoint} exec {operation_name} {get_aggregate_id(jobs)} {self.run_options}" - return cmd.strip() + return (environment.get_prefix(directives) + cmd).strip() def __iter__(self): yield from self.operations.values() @@ -1083,6 +1137,7 @@ def _get_run_ops(ignore_ops, additional_ignores_flag): submission_directives = self._get_submission_directives( default_directives, jobs ) + primary_directives = _group_directive_aggregation(submission_directives) eligible_operations = _get_run_ops([], IgnoreConditions.NONE) operations_with_unmet_preconditions = _get_run_ops( eligible_operations, IgnoreConditions.PRE @@ -1091,18 +1146,17 @@ def _get_run_ops(ignore_ops, additional_ignores_flag): eligible_operations, IgnoreConditions.POST ) - submission_job_operation = _SubmissionJobOperation( - self._generate_id(jobs), - self.name, - jobs, + return _SubmissionJobOperation( + id=self._generate_id(jobs), + name=self.name, + job=jobs, cmd=unevaluated_cmd, - directives=dict(submission_directives), - user_directives=set(submission_directives.user_keys), + primary_directives=primary_directives, + directives_list=submission_directives, eligible_operations=eligible_operations, operations_with_unmet_preconditions=operations_with_unmet_preconditions, operations_with_met_postconditions=operations_with_met_postconditions, ) - return submission_job_operation def _create_run_job_operations( self, @@ -1134,8 +1188,8 @@ def _create_run_job_operations( Returns ------- - Iterator[_JobOperation] - Iterator of eligible instances of :class:`~._JobOperation`. + Iterator[_RunOperation] + Iterator of eligible instances of :class:`~._RunOperation`. """ # Assuming all the jobs belong to the same FlowProject @@ -1144,8 +1198,7 @@ def _create_run_job_operations( if operation._eligible(jobs, ignore_conditions): directives = self._resolve_directives( operation_name, default_directives, env - ) - directives.evaluate(jobs) + ).evaluate(jobs) # Return an unevaluated command to make evaluation lazy and # reduce side effects in callable FlowCmdOperations. unevaluated_cmd = _cached_partial( @@ -1155,46 +1208,54 @@ def _create_run_job_operations( operation=operation, directives=directives, jobs=jobs, + environment=env, ) - job_op = _JobOperation( + yield _RunOperation( self._generate_id(jobs, operation_name), operation_name, jobs, cmd=unevaluated_cmd, - directives=dict(directives), - user_directives=set(directives.user_keys), + fork=self._fork_op(directives), ) - # Get the prefix, and if it's non-empty, set the fork directive - # to True since we must launch a separate process. Override - # the command directly. - prefix = jobs[0]._project._environment.get_prefix(job_op) - if prefix != "" or self.run_options != "": - job_op.directives["fork"] = True - job_op._cmd = f"{prefix} {job_op.cmd}" - yield job_op + + def _fork_op(self, directives): + # TODO: note that since we use threads_per_process and not specifically + # OMP threads, we don't necessarily need to fork when setting + # threads_per_process, however, to correctly use OMP we do. Perhaps this + # is an argument for an omp directive. Otherwise, we need to fork here + # if that is set which we currently don't. Or allow for multiple + # launchers (consider OMP a launcher) and check for compatibility. + return ( + len(self.run_options) > 0 + or directives["executable"] != sys.executable + or directives["launcher"] is not None + ) def _get_submission_directives(self, default_directives, jobs): - """Get the combined resources for submission. + """Get the resolved and evaluated resources for submission. No checks are done to mitigate inappropriate aggregation of operations. This can lead to poor utilization of computing resources. """ env = jobs[0]._project._environment operation_names = list(self.operations.keys()) - # The first operation's directives are evaluated, then all other - # operations' directives are applied as updates with aggregate=True - directives = self._resolve_directives( - operation_names[0], default_directives, env - ) - directives.evaluate(jobs) - for name in operation_names[1:]: - # get directives for operation - directives.update( - self._resolve_directives(name, default_directives, env), - aggregate=True, - jobs=jobs, + return [ + self._directives_to_track_dict( + self._resolve_directives(name, default_directives, env).evaluate(jobs), + set(env._get_default_directives().keys()), ) - return directives + for name in operation_names + ] + + @staticmethod + def _directives_to_track_dict(directives, internal_keys): + """Convert evaluated directives to tracking dictionaries. + + Excludes environment/internal keys from tracking. + """ + dict_ = _TrackGetItemDict(**directives) + dict_._keys_used = internal_keys + return dict_ class _FlowProjectClass(type): @@ -1428,7 +1489,7 @@ def hello(job): .. code-block:: python - @FlowProject.operation({"nranks": 4}) + @FlowProject.operation({"processes": 4, "launcher": "mpi"}) def mpi_hello(job): print("hello") @@ -1851,7 +1912,6 @@ def _setup_template_environment(self): ] = template_filters.format_timedelta template_environment.filters["format_memory"] = template_filters.format_memory template_environment.filters["identical"] = template_filters.identical - template_environment.filters["with_np_offset"] = template_filters.with_np_offset template_environment.filters["calc_tasks"] = template_filters.calc_tasks template_environment.filters["calc_num_nodes"] = template_filters.calc_num_nodes template_environment.filters["calc_walltime"] = template_filters.calc_walltime @@ -3387,21 +3447,19 @@ class _PickleError(Exception): """Indicates a pickling error while trying to parallelize the execution of operations.""" @staticmethod - def _job_operation_to_tuple(operation): + def _run_operation_to_tuple(operation): return ( operation.id, operation.name, [job.id for job in operation._jobs], operation.cmd, - operation.directives, + operation.fork, ) - def _job_operation_from_tuple(self, data): - id, name, job_ids, cmd, directives = data + def _run_operation_from_tuple(self, data): + id_, name, job_ids, cmd, fork = data jobs = tuple(self.open_job(id=job_id) for job_id in job_ids) - return _JobOperation( - id, name, jobs, cmd, directives, directives._keys_set_by_user - ) + return _RunOperation(id_, name, jobs, cmd, fork) def _run_operations_in_parallel(self, pool, operations, progress, timeout): """Execute operations in parallel. @@ -3433,7 +3491,7 @@ def _run_operations_in_parallel(self, pool, operations, progress, timeout): ( cloudpickle.loads, serialized_project, - self._job_operation_to_tuple(operation), + self._run_operation_to_tuple(operation), ) for operation in tqdm( operations, desc="Serialize tasks", file=sys.stderr @@ -3505,14 +3563,11 @@ def _execute_operation(self, operation, timeout=None, pretend=False): # Check if we need to fork for operation execution... if ( - # The 'fork' directive was provided and evaluates to True: - operation.directives.get("fork", False) + operation.fork # A separate process is needed to cancel with timeout: or timeout is not None # The operation function is an instance of FlowCmdOperation: or isinstance(self._operations[operation.name], FlowCmdOperation) - # The specified executable is not the same as the interpreter instance: - or operation.directives.get("executable", sys.executable) != sys.executable ): # ... need to fork: logger.debug( @@ -4069,7 +4124,7 @@ def _submit_operations( Parameters ---------- - operations : A sequence of instances of :class:`~._JobOperation` + operations : A sequence of instances of :class:`~._SubmissionOperation` The operations to submit. _id : str The _id to be used for this submission. (Default value = None) @@ -4130,10 +4185,8 @@ def _msg(group): keys_unused = { key for op in operations - for key in op.directives._keys_set_by_user.difference( - op.directives.keys_used - ) - if key not in ("fork", "nranks", "omp_num_threads") # ignore list + for directives in op.directives_list + for key in directives.keys() - directives.keys_used } if keys_unused: logger.warning( @@ -5242,7 +5295,7 @@ class MyProject(FlowProject): def _deserialize_and_run_operation(loads, project, operation_data): project = loads(project) - project._execute_operation(project._job_operation_from_tuple(operation_data)) + project._execute_operation(project._run_operation_from_tuple(operation_data)) return None diff --git a/flow/templates/andes.sh b/flow/templates/andes.sh index ee7cbd40c..c8e30eb76 100644 --- a/flow/templates/andes.sh +++ b/flow/templates/andes.sh @@ -2,20 +2,9 @@ {% extends "slurm.sh" %} {% block tasks %} {% if 'gpu' in partition %} - {% if resources.ncpu_tasks > resources.ngpu_tasks * 14 and not force %} + {% if resources.cpus > resources.gpus * 14 and not force %} {% raise "Cannot request more than 14 CPUs per GPU." %} {% endif %} {% endif %} -#SBATCH -N {{ resources.num_nodes }} -#SBATCH --ntasks={{ resources.ncpu_tasks }} - {% if partition == 'gpu' %} -#SBATCH --gpus={{ resources.ngpu_tasks }} - {% endif %} -{% endblock tasks %} -{% block header %} {{- super() -}} - {% set account = account|default(project|get_account_name, true) %} - {% if account %} -#SBATCH -A {{ account }} - {% endif %} -{% endblock header %} +{% endblock tasks %} diff --git a/flow/templates/anvil.sh b/flow/templates/anvil.sh index 023957606..a49e1b965 100644 --- a/flow/templates/anvil.sh +++ b/flow/templates/anvil.sh @@ -1,20 +1,7 @@ {# Templated in accordance with: https://www.rcac.purdue.edu/knowledge/anvil/ #} {% extends "slurm.sh" %} -{% block tasks %} - {% if resources.num_nodes > 1 %} -#SBATCH -N {{ resources.num_nodes }} - {% endif %} -#SBATCH --ntasks={{ resources.ncpu_tasks }} - {% if 'gpu' in partition %} -#SBATCH --gpus={{ resources.ngpu_tasks }} - {% endif %} -{% endblock tasks %} {% block header %} {{- super() -}} - {% set account = account|default(project|get_account_name, true) %} - {% if account %} -#SBATCH --account {{ account }} - {% endif %} # As of 2023-10-30, Anvil incorrectly binds ranks to cores with `mpirun -n`. # Disable core binding to work around this issue. export OMPI_MCA_hwloc_base_binding_policy="" diff --git a/flow/templates/base_script.sh b/flow/templates/base_script.sh index d46d03cfe..d4aeb4cf7 100644 --- a/flow/templates/base_script.sh +++ b/flow/templates/base_script.sh @@ -1,9 +1,4 @@ {# The following variables are available to all scripts. #} -{% if parallel %} - {% set np_global = operations|map(attribute='directives.np')|sum %} -{% else %} - {% set np_global = operations|map(attribute='directives.np')|max %} -{% endif %} {% block header %} {% block preamble %} {% endblock preamble %} diff --git a/flow/templates/bridges2.sh b/flow/templates/bridges2.sh index 25cdeb57f..acd683984 100644 --- a/flow/templates/bridges2.sh +++ b/flow/templates/bridges2.sh @@ -1,18 +1,2 @@ {# Templated in accordance with: https://www.psc.edu/resources/bridges-2/user-guide #} {% extends "slurm.sh" %} -{% block tasks %} - {% if resources.num_nodes > 1 or resources.ncpu_tasks >= 128 or resources.ngpu_tasks >= 8 %} -#SBATCH -N {{ resources.num_nodes }} - {% endif %} -#SBATCH --ntasks={{ resources.ncpu_tasks }} - {% if 'GPU' in partition %} -#SBATCH --gpus={{ resources.ngpu_tasks }} - {% endif %} -{% endblock tasks %} -{% block header %} - {{- super() -}} - {% set account = account|default(project|get_account_name, true) %} - {% if account %} -#SBATCH -A {{ account }} - {% endif %} -{% endblock header %} diff --git a/flow/templates/crusher.sh b/flow/templates/crusher.sh index 842d10bb5..4218df7c9 100644 --- a/flow/templates/crusher.sh +++ b/flow/templates/crusher.sh @@ -5,9 +5,5 @@ {% endblock tasks %} {% block header %} {{- super() -}} - {% set account = account|default(project|get_account_name, true) %} - {% if account %} -#SBATCH --account={{ account }} - {% endif %} #SBATCH --partition=batch {% endblock header %} diff --git a/flow/templates/delta.sh b/flow/templates/delta.sh index 4502ddf66..f3581a462 100644 --- a/flow/templates/delta.sh +++ b/flow/templates/delta.sh @@ -6,18 +6,5 @@ increased charges and is expected to be suitable for a minority of use cases." %} {% endif %} - {% if resources.num_nodes > 1 %} -#SBATCH -N {{ resources.num_nodes }} - {% endif %} -#SBATCH --ntasks={{ resources.ncpu_tasks }} - {% if "gpu" in partition %} -#SBATCH --gpus={{ resources.ngpu_tasks }} - {% endif %} -{% endblock tasks %} -{% block header %} {{- super() -}} - {% set account = account|default(project|get_account_name, true) %} - {% if account %} -#SBATCH -A {{ account }} - {% endif %} -{% endblock header %} +{% endblock tasks %} diff --git a/flow/templates/drexel-picotte.sh b/flow/templates/drexel-picotte.sh index 2a1003c4d..03cd84c92 100644 --- a/flow/templates/drexel-picotte.sh +++ b/flow/templates/drexel-picotte.sh @@ -1,28 +1,2 @@ {% extends "slurm.sh" %} {% set partition = partition|default('standard', true) %} -{% block tasks %} - {% set threshold = 0 if force else 0.9 %} - {% set cpu_tasks = operations|calc_tasks('np', parallel, force) %} - {% set gpu_tasks = operations|calc_tasks('ngpu', parallel, force) %} - {% if gpu_tasks and 'gpu' not in partition and not force %} - {% raise "Requesting GPUs requires a gpu partition!" %} - {% endif %} - {% set nn_cpu = cpu_tasks|calc_num_nodes(48) if 'gpu' not in partition else cpu_tasks|calc_num_nodes(48) %} - {% set nn_gpu = gpu_tasks|calc_num_nodes(4) if 'gpu' in partition else 0 %} - {% set nn = nn|default((nn_cpu, nn_gpu)|max, true) %} - {% if partition == 'gpu' %} -#SBATCH --nodes={{ nn|default(1, true) }} -#SBATCH --ntasks-per-node={{ (gpu_tasks, cpu_tasks)|max }} -#SBATCH --gres=gpu:{{ gpu_tasks }} - {% else %}{# def partition #} -#SBATCH --nodes={{ nn }} -#SBATCH --ntasks-per-node={{ (48, cpu_tasks)|min }} - {% endif %} -{% endblock tasks %} -{% block header %} - {{- super () -}} - {% set account = account|default(project|get_account_name, true) %} - {% if account %} -#SBATCH --account={{ account }} - {% endif %} -{% endblock header %} diff --git a/flow/templates/expanse.sh b/flow/templates/expanse.sh index d6c4efb46..1abbd9c62 100644 --- a/flow/templates/expanse.sh +++ b/flow/templates/expanse.sh @@ -1,16 +1,2 @@ {# Templated in accordance with: https://www.sdsc.edu/support/user_guides/expanse.html #} {% extends "slurm.sh" %} -{% block tasks %} -#SBATCH -N {{ resources.num_nodes }} -#SBATCH --ntasks={{ resources.ncpu_tasks }} - {% if 'gpu' in partition %} -#SBATCH --gpus={{ resources.ngpu_tasks }} - {% endif %} -{% endblock tasks %} -{% block header %} - {{- super() -}} - {% set account = account|default(project|get_account_name, true) %} - {% if account %} -#SBATCH -A {{ account }} - {% endif %} -{% endblock header %} diff --git a/flow/templates/frontier.sh b/flow/templates/frontier.sh index dd60a01c1..54ea371e4 100644 --- a/flow/templates/frontier.sh +++ b/flow/templates/frontier.sh @@ -3,10 +3,3 @@ {% block tasks %} #SBATCH --nodes={{ resources.num_nodes }} {% endblock tasks %} -{% block header %} - {{- super() -}} - {% set account = account|default(project|get_account_name, true) %} - {% if account %} -#SBATCH --account={{ account }} - {% endif %} -{% endblock header %} diff --git a/flow/templates/pbs.sh b/flow/templates/pbs.sh index 08c28f9f1..71475e99a 100644 --- a/flow/templates/pbs.sh +++ b/flow/templates/pbs.sh @@ -20,8 +20,8 @@ {% endblock preamble %} {% block tasks %} {% set threshold = 0 if force else 0.9 %} - {% set s_gpu = ':gpus=1' if resources.ngpu_tasks else '' %} - {% set ppn = ppn|default(operations|calc_tasks('omp_num_threads', parallel, force), true) %} + {% set s_gpu = ':gpus={}'|format(resources.gpus_per_process) if resources.gpus_per_process else '' %} + {% set ppn = ppn|default(operations|calc_tasks('threads_per_process', parallel, force), true) %} {% if ppn %} #PBS -l nodes={{ resources.num_nodes }}:ppn={{ ppn }}{{ s_gpu }} {% else %} diff --git a/flow/templates/slurm.sh b/flow/templates/slurm.sh index 88f4c2747..9503e5e3f 100644 --- a/flow/templates/slurm.sh +++ b/flow/templates/slurm.sh @@ -3,23 +3,29 @@ {% block preamble %} #!/bin/bash #SBATCH --job-name="{{ id }}" - {% set memory_requested = operations | calc_memory(parallel) %} - {% if memory_requested %} -#SBATCH --mem={{ memory_requested|format_memory }} - {% endif %} - {% if partition %} + {% if partition %} #SBATCH --partition={{ partition }} - {% endif %} - {% set walltime = operations | calc_walltime(parallel) %} - {% if walltime %} -#SBATCH -t {{ walltime|format_timedelta }} - {% endif %} - {% if job_output %} + {% endif %} + {% if resources.walltime %} +#SBATCH -t {{ resources.walltime|format_timedelta }} + {% endif %} + {% if job_output %} #SBATCH --output={{ job_output }} #SBATCH --error={{ job_output }} - {% endif %} + {% endif %} + {% set account = account|default(project|get_account_name, true) %} + {% if account %} +#SBATCH --account={{ account }} + {% endif %} {% endblock preamble %} {% block tasks %} -#SBATCH --ntasks={{ resources.ncpu_tasks }} +#SBATCH --ntasks={{ resources.processes }} +#SBATCH --cpus-per-task={{ resources.threads_per_process }} + {% if resources.memory_per_cpu is not none %} +#SBATCH --mem-per-task={{ resources.memory_per_cpu|format_memory }} + {% endif %} + {% if resources.gpus_per_process > 0 %} +#SBATCH --gpus-per-task={{ resources.gpus_per_process }} + {% endif %} {% endblock tasks %} {% endblock header %} diff --git a/flow/templates/umich-greatlakes.sh b/flow/templates/umich-greatlakes.sh index 1fdcd0439..20a1c3899 100644 --- a/flow/templates/umich-greatlakes.sh +++ b/flow/templates/umich-greatlakes.sh @@ -1,18 +1 @@ {% extends "slurm.sh" %} -{% set partition = partition|default('standard', true) %} -{% set nranks = (operations|calc_tasks("nranks", parallel, force), 1) | max %} -{% block tasks %} -#SBATCH --nodes={{ resources.num_nodes }}-{{ resources.num_nodes }} -#SBATCH --ntasks={{ nranks }} -#SBATCH --cpus-per-task={{ resources.ncpu_tasks // nranks}} - {% if partition.startswith('gpu') %} -#SBATCH --gpus-per-task={{ resources.ngpu_tasks // nranks }} - {% endif %} -{% endblock tasks %} -{% block header %} - {{- super () -}} - {% set account = account|default(project|get_account_name, true) %} - {% if account %} -#SBATCH --account={{ account }} - {% endif %} -{% endblock header %} diff --git a/flow/util/misc.py b/flow/util/misc.py index 7d93c9da8..571bc2b3c 100644 --- a/flow/util/misc.py +++ b/flow/util/misc.py @@ -432,3 +432,31 @@ def _deprecated_warning( __all__ = [ "redirect_log", ] + + +def _argmax(a): + max_i = 0 + max_ = None + for i, value in enumerate(a): + if max_ is None: + max_ = value + max_i = i + if max_ < value: + max_ = value + max_i = i + return max_i + + +def _tolerant_iter_function(func): + def new_func(iter, *args, **kwargs): + values = [i for i in iter if i is not None] + if len(values) == 0: + return None + return func(values, *args, **kwargs) + + return new_func + + +_tolerant_argmax = _tolerant_iter_function(_argmax) +_tolerant_max = _tolerant_iter_function(max) +_tolerant_sum = _tolerant_iter_function(sum) diff --git a/flow/util/template_filters.py b/flow/util/template_filters.py index f7e4e8fe8..9d1f3240c 100644 --- a/flow/util/template_filters.py +++ b/flow/util/template_filters.py @@ -4,11 +4,11 @@ """Provide jinja2 template environment filter functions.""" import datetime import sys -from functools import partial from math import ceil from ..errors import ConfigKeyError, SubmitError from .config import get_config_value +from .misc import _tolerant_max, _tolerant_sum def identical(iterable): @@ -39,7 +39,11 @@ def homogeneous_openmp_mpi_config(operations): return ( len( { - (op.directives.get("nranks"), op.directives.get("omp_num_threads")) + ( + op.directives.get("n_processes"), + op.directives.get("threads_per_process"), + op.directives.get("gpus_per_process"), + ) for op in operations } ) @@ -47,28 +51,19 @@ def homogeneous_openmp_mpi_config(operations): ) -def with_np_offset(operations): - """Add the np_offset variable to the operations' directives.""" - offset = 0 - for operation in operations: - operation.directives.setdefault("np_offset", offset) - offset += operation.directives["np"] - return operations - - def calc_tasks(operations, name, parallel=False, allow_mixed=False): """Compute the number of tasks required for the given set of operations. Calculates the number of tasks for a specific processing unit requested in - the operations' directive, e.g., 'np' or 'ngpu'. + the operations' directive, e.g., 'n_processes' or 'gpus_per_process'. Parameters ---------- operations : :class:`~._JobOperation` The operations used to calculate the total number of required tasks. name : str - The name of the processing unit to calculate the tasks for, e.g., 'np' - or 'ngpu'. + The name of the processing unit to calculate the tasks for, e.g., + 'n_processes'. parallel : bool If True, operations are assumed to be executed in parallel, which means that the number of total tasks is the sum of all tasks instead of the @@ -91,10 +86,7 @@ def calc_tasks(operations, name, parallel=False, allow_mixed=False): set to True. """ - processing_units = [ - op.directives[name] * op.directives.get("processor_fraction", 1) - for op in operations - ] + processing_units = [op.primary_directives[name] for op in operations] if identical(processing_units) or allow_mixed: if len(processing_units) > 0: sum_processing_units = round(sum(processing_units)) @@ -139,7 +131,7 @@ def calc_memory(operations, parallel=False): Parameters ---------- operations : list - A list of :class:`~._JobOperation`\ s used to calculate the maximum + A list of :class:`~._SubmissionOperation`\ s used to calculate the maximum memory required. parallel : bool If True, operations are assumed to be executed in parallel, which @@ -152,8 +144,8 @@ def calc_memory(operations, parallel=False): float The reserved memory (numeric value) in gigabytes. """ - func = sum if parallel else max - return func(operation.directives["memory"] or 0 for operation in operations) + func = _tolerant_sum if parallel else _tolerant_max + return func(op.primary_directives["memory"] for op in operations) def calc_walltime(operations, parallel=False): @@ -175,17 +167,11 @@ def calc_walltime(operations, parallel=False): :class:`datetime.timedelta` The total walltime. """ - # Replace the sum function with partial(sum, start=datetime.timedelta()) - # when dropping Python 3.7 support. - func = ( - max - if parallel - else partial(lambda start, iterable: sum(iterable, start), datetime.timedelta()) - ) - return func( - operation.directives["walltime"] or datetime.timedelta() - for operation in operations - ) + walltimes = (op.primary_directives["walltime"] for op in operations) + if parallel: + return _tolerant_max(walltimes) + else: + return _tolerant_sum(walltimes, start=datetime.timedelta()) def check_utilization(nn, np, ppn, threshold=0.9, name=None): diff --git a/tests/define_template_test_project.py b/tests/define_template_test_project.py index ad9977547..a5e4f71ed 100644 --- a/tests/define_template_test_project.py +++ b/tests/define_template_test_project.py @@ -2,12 +2,12 @@ class TestProject(flow.FlowProject): - ngpu = 2 - np = 3 - omp_num_threads = 4 - nranks = 5 + gpus_per_process = 1 + processes = 3 + threads_per_process = 4 + launcher = "mpi" walltime = 1 - memory = "512m" + memory_per_cpu = "512m" group1 = TestProject.make_group(name="group1") @@ -20,25 +20,30 @@ def serial_op(job): @group1 -@TestProject.operation(directives={"np": TestProject.np}) +@TestProject.operation(directives={"processes": TestProject.processes}) def parallel_op(job): pass -@TestProject.operation(directives={"nranks": TestProject.nranks}) +@TestProject.operation( + directives={"processes": TestProject.processes, "launcher": TestProject.launcher} +) def mpi_op(job): pass -@TestProject.operation(directives={"omp_num_threads": TestProject.omp_num_threads}) +@TestProject.operation( + directives={"threads_per_process": TestProject.threads_per_process} +) def omp_op(job): pass @TestProject.operation( directives={ - "nranks": TestProject.nranks, - "omp_num_threads": TestProject.omp_num_threads, + "processes": TestProject.processes, + "threads_per_process": TestProject.threads_per_process, + "launcher": TestProject.launcher, } ) def hybrid_op(job): @@ -46,21 +51,29 @@ def hybrid_op(job): @TestProject.operation( - directives={"ngpu": TestProject.ngpu, "nranks": TestProject.ngpu} + directives={ + "gpus_per_process": TestProject.gpus_per_process, + "processes": TestProject.gpus_per_process, + "launcher": TestProject.launcher, + } ) def gpu_op(job): pass @TestProject.operation( - directives={"ngpu": TestProject.ngpu, "nranks": TestProject.nranks} + directives={ + "gpus_per_process": TestProject.gpus_per_process, + "processes": TestProject.processes, + "launcher": "mpi", + } ) def mpi_gpu_op(job): pass @group1 -@TestProject.operation(directives={"memory": TestProject.memory}) +@TestProject.operation(directives={"memory_per_cpu": TestProject.memory_per_cpu}) def memory_op(job): pass diff --git a/tests/define_test_project.py b/tests/define_test_project.py index 876e351fc..d235401a5 100644 --- a/tests/define_test_project.py +++ b/tests/define_test_project.py @@ -44,29 +44,25 @@ def b_is_even(job): # The submit interface should warn about unused directives. "bad_directive": 0, # But not this one: - "np": 1, + "processes": 1, }, ) def op1(job): return f'echo "hello" > {job.path}/world.txt' -def _need_to_fork(job): - return job.doc.get("fork") - - @group1 @_TestProject.post.true("test") -@_TestProject.operation(directives={"fork": _need_to_fork}) +@_TestProject.operation def op2(job): job.document.test = os.getpid() -@group2(directives={"omp_num_threads": 4}) +@group2(directives={"threads_per_process": 4}) @_TestProject.post.true("test3_true") @_TestProject.post.false("test3_false") @_TestProject.post.not_(lambda job: job.doc.test3_false) -@_TestProject.operation(directives={"ngpu": 1, "omp_num_threads": 1}) +@_TestProject.operation(directives={"gpus_per_process": 1, "threads_per_process": 1}) def op3(job): job.document.test3_true = True job.document.test3_false = False diff --git a/tests/generate_template_reference_data.py b/tests/generate_template_reference_data.py index ecc077c23..c6dce0890 100755 --- a/tests/generate_template_reference_data.py +++ b/tests/generate_template_reference_data.py @@ -9,12 +9,12 @@ import operator import os import sys +from contextlib import redirect_stdout from hashlib import sha1 import jinja2 import signac from define_template_test_project import TestProject -from test_project import redirect_stdout import flow import flow.environments @@ -27,6 +27,17 @@ ) PROJECT_DIRECTORY = '/home/user/path with spaces and "quotes" and \\backslashes/' MOCK_EXECUTABLE = "/usr/local/bin/python" +DEFAULT_BUNDLES = [ + {"bundles": [("omp_op", "parallel_op")], "parallel": [True, False]}, + {"bundles": [("mpi_op", "op", "memory_op")], "parallel": [False]}, + {"bundles": [("hybrid_op", "omp_op")], "parallel": [False]}, +] + + +def set_bundles(partitions=None): + if partitions is None: + return DEFAULT_BUNDLES + return [{"partition": partitions, **bundle} for bundle in DEFAULT_BUNDLES] def cartesian(**kwargs): @@ -65,94 +76,37 @@ def init(project): environments = { "environment.StandardEnvironment": [], "environments.xsede.Bridges2Environment": [ - { - "partition": ["RM", "RM-shared", "GPU", "GPU-shared"], - }, - { - "partition": ["RM"], - "parallel": [False, True], - "bundle": [["mpi_op", "omp_op"]], - }, + {"partition": ["RM", "RM-shared", "GPU", "GPU-shared"]}, + *set_bundles(["RM"]), ], "environments.umich.GreatLakesEnvironment": [ - { - "partition": ["standard", "gpu", "gpu_mig40"], - }, - { - "parallel": [False, True], - "bundle": [["mpi_op", "omp_op"]], - }, - ], - "environments.incite.SummitEnvironment": [ - {}, - { - "parallel": [False, True], - "bundle": [["mpi_op", "omp_op"]], - }, + {"partition": ["standard", "gpu", "gpu_mig40"]}, + *set_bundles(), ], + "environments.incite.SummitEnvironment": [{}, *set_bundles()], "environments.incite.AndesEnvironment": [ - { - "partition": ["batch", "gpu"], - }, - { - "partition": ["batch"], - "parallel": [False, True], - "bundle": [["mpi_op", "omp_op"]], - }, - ], - "environments.umn.MangiEnvironment": [ - {}, - { - "parallel": [False, True], - "bundle": [["mpi_op", "omp_op"]], - }, + {"partition": ["batch", "gpu"]}, + *set_bundles(["batch"]), ], + "environments.umn.MangiEnvironment": [{}, *set_bundles()], "environments.xsede.ExpanseEnvironment": [ { "partition": ["compute", "shared", "gpu", "gpu-shared", "large-shared"], }, - { - "partition": ["compute"], - "parallel": [False, True], - "bundle": [["mpi_op", "omp_op"]], - }, + *set_bundles(["compute"]), ], "environments.drexel.PicotteEnvironment": [ - { - "partition": ["def", "gpu"], - }, - { - "partition": ["def"], - "parallel": [False, True], - "bundle": [["mpi_op", "omp_op"]], - }, + {"partition": ["def", "gpu"]}, + *set_bundles(["def"]), ], "environments.xsede.DeltaEnvironment": [ - { - "partition": ["cpu", "gpuA40x4", "gpuA100x4"], - }, - { - "partition": ["cpu"], - "parallel": [False, True], - "bundle": [["mpi_op", "omp_op"]], - }, - ], - "environments.incite.CrusherEnvironment": [ - {}, - { - "parallel": [False, True], - "bundle": [["mpi_op", "omp_op"]], - }, + {"partition": ["cpu", "gpuA40x4", "gpuA100x4"]}, + *set_bundles(["cpu"]), ], + "environments.incite.CrusherEnvironment": [{}, *set_bundles()], # Frontier cannot use partitions as logic requires gpu # in the name of partitions that are gpu nodes. - "environments.incite.FrontierEnvironment": [ - {}, - { - "parallel": [False, True], - "bundle": [["mpi_op", "omp_op"]], - }, - ], + "environments.incite.FrontierEnvironment": [{}, *set_bundles()], "environments.purdue.AnvilEnvironment": [ { "partition": [ @@ -165,11 +119,7 @@ def init(project): "gpu", ], }, - { - "partition": ["wholenode"], - "parallel": [False, True], - "bundle": [["mpi_op", "omp_op"]], - }, + *set_bundles(["wholenode"]), ], } diff --git a/tests/template_reference_data.tar.gz b/tests/template_reference_data.tar.gz index 9c9a19202..2f5c468e3 100644 Binary files a/tests/template_reference_data.tar.gz and b/tests/template_reference_data.tar.gz differ diff --git a/tests/test_directives.py b/tests/test_directives.py index 5ea20c1aa..319bfe79f 100644 --- a/tests/test_directives.py +++ b/tests/test_directives.py @@ -10,16 +10,14 @@ from flow import FlowProject from flow.directives import ( _GET_EXECUTABLE, - _MEMORY, - _NGPU, - _NP, - _NRANKS, - _OMP_NUM_THREADS, - _PROCESSOR_FRACTION, + _GPUS_PER_PROCESS, + _LAUNCHER, + _MEMORY_PER_CPU, + _PROCESSES, + _THREADS_PER_PROCESS, _WALLTIME, _Directive, _Directives, - _no_aggregation, ) from flow.errors import DirectivesError @@ -27,14 +25,13 @@ @pytest.fixture() def available_directives_list(): return [ - _NP, - _NRANKS, - _NGPU, - _OMP_NUM_THREADS, + _MEMORY_PER_CPU, + _GPUS_PER_PROCESS, + _PROCESSES, + _THREADS_PER_PROCESS, + _LAUNCHER, _GET_EXECUTABLE(), _WALLTIME, - _MEMORY, - _PROCESSOR_FRACTION, ] @@ -50,22 +47,7 @@ def val(v): raise ValueError("Price cannot be less than 10 units") return v - def finalize(value, dict): - discount = dict.get("discount", 0) - free = dict.get("free", False) - value = value - discount - if value < 0 or free: - return 0 - return value - - product = _Directive( - name="product", - validator=val, - default=10, - serial=_no_aggregation, - parallel=_no_aggregation, - finalize=finalize, - ) + product = _Directive(name="product", validator=val, default=10) return product @@ -73,24 +55,20 @@ def finalize(value, dict): def non_default_directive_values(): return [ { - "np": 1, - "ngpu": 10, - "nranks": 5, - "omp_num_threads": 20, + "processes": 5, + "threads_per_process": 20, "executable": "Non Default Path", "walltime": 64.0, - "memory": 32, - "processor_fraction": 0.5, + "memory_per_cpu": 2, + "launcher": "mpi", }, { - "np": 4, - "ngpu": 1, - "nranks": 0, - "omp_num_threads": 10, + "processes": 4, + "gpus_per_process": 1, + "threads_per_process": 10, "executable": "PathFinder", "walltime": 20.0, - "memory": 16, - "processor_fraction": 0.5, + "memory_per_cpu": 1, }, ] @@ -99,31 +77,28 @@ class TestItems: """Tests for _Directive class.""" def test_default(self): - assert _NP._default == 1 - assert _NGPU._default == 0 - assert _NRANKS._default == 0 - assert _OMP_NUM_THREADS._default == 0 + assert _PROCESSES._default == 1 + assert _GPUS_PER_PROCESS._default == 0 + assert _THREADS_PER_PROCESS._default == 1 + assert _MEMORY_PER_CPU._default is None assert _GET_EXECUTABLE()._default == sys.executable assert _WALLTIME._default is None - assert _MEMORY._default is None - assert _PROCESSOR_FRACTION._default == 1.0 + assert _LAUNCHER._default is None def test_invalid_values(self, available_directives_list): invalid_values = { - "np": [-1, "foo", {}, None], - "ngpu": [-1, "foo", {}, None], - "nranks": [-1, "foo", {}, None], - "omp_num_threads": [-1, "foo", {}, None], + "processes": [-1, "foo", {}, None], + "gpus_per_process": [-1, "foo", {}, None], + "threads_per_process": [-1, "foo", {}, None], "walltime": [-1, "foo", {}], - "memory": [-1, "foo", {}], - "processor_fraction": [-0.5, 2.5, "foo", {}, None], + "memory_per_cpu": [-1, "foo", {}], } for directive in available_directives_list: - if directive._name == "executable": - # Executable expect a string, if not found, then it tries to convert - # it into a string and becomes successful almost every time. - # Hence skipping Executable. + if directive._name in ("executable", "launcher"): + # Executable and launcher expect a string, if not found, then it tries + # to convert it into a string and becomes successful almost every time. + # Hence the skipping. continue for i, value in enumerate(invalid_values[directive._name]): with pytest.raises((ValueError, TypeError)): @@ -133,49 +108,6 @@ def test_defaults_are_valid(self, available_directives_list): for directive in available_directives_list: directive._validator(directive._default) - def test_serial(self): - assert _NP._serial(4, 2) == 4 - assert _NRANKS._serial(4, 2) == 4 - assert _NGPU._serial(4, 2) == 4 - assert _OMP_NUM_THREADS._serial(4, 2) == 4 - assert _GET_EXECUTABLE()._serial("Path1", "Path2") == "Path1" - assert _WALLTIME._serial(4, 2) == 6 - assert _WALLTIME._serial(4, None) == 4 - assert _WALLTIME._serial(None, 4) == 4 - assert _WALLTIME._serial(None, None) is None - assert _MEMORY._serial(4, 2) == 4 - assert _MEMORY._serial(4, None) == 4 - assert _MEMORY._serial(None, 4) == 4 - assert _MEMORY._serial(None, None) is None - assert _PROCESSOR_FRACTION._serial(0.4, 0.2) == 0.4 - - def test_parallel(self): - assert _NP._parallel(4, 2) == 6 - assert _NRANKS._parallel(4, 2) == 6 - assert _NGPU._parallel(4, 2) == 6 - assert _OMP_NUM_THREADS._parallel(4, 2) == 6 - assert _GET_EXECUTABLE()._parallel("Path1", "Path2") == "Path1" - assert _WALLTIME._parallel(4, 2) == 4 - assert _WALLTIME._parallel(4, None) == 4 - assert _WALLTIME._parallel(None, 4) == 4 - assert _WALLTIME._parallel(None, None) is None - assert _MEMORY._parallel(4, 2) == 6 - assert _MEMORY._parallel(4, None) == 4 - assert _MEMORY._parallel(None, 4) == 4 - assert _MEMORY._parallel(None, None) is None - assert _PROCESSOR_FRACTION._parallel(0.4, 0.2) == 0.4 - - def test_finalize(self): - dict_directives = { - "nranks": _NRANKS._default, - "omp_num_threads": _OMP_NUM_THREADS._default, - } - assert _NP._finalize(2, dict_directives) == 2 - dict_directives["nranks"] = 2 - dict_directives["omp_num_threads"] = 4 - assert _NP._finalize(2, dict_directives) == 2 - assert _NP._finalize(1, dict_directives) == 8 - def test_manual_item_default(self, product_directive): assert product_directive._default == 10 @@ -186,19 +118,6 @@ def test_manual_item_validation(self, product_directive): with pytest.raises(ValueError): product_directive._validator(0) - def test_manual_item_serial(self, product_directive): - product_directive._serial(10, 20) == 10 - product_directive._serial(20, 10) == 20 - - def test_manual_item_parallel(self, product_directive): - product_directive._parallel(10, 20) == 10 - - def test_manual_item_finalize(self, product_directive): - asset_dict = {"free": False, "discount": 5} - assert product_directive._finalize(50, asset_dict) == 45 - asset_dict["free"] = True - assert product_directive._finalize(50, asset_dict) == 0 - class TestDirectives: """Tests for _Directives Class.""" @@ -208,21 +127,22 @@ def test_get_directive(self, directives, available_directives_list): assert directives[item._name] == item._default def test_add_directive(self, available_directives_list): - directives = _Directives(available_directives_list[:-1]) - directives._add_directive(_PROCESSOR_FRACTION) - assert directives[_PROCESSOR_FRACTION._name] == _PROCESSOR_FRACTION._default + last_directive = available_directives_list.pop() + directives = _Directives(available_directives_list) + directives._add_directive(last_directive) + assert directives[last_directive._name] == last_directive._default with pytest.raises(TypeError): directives._add_directive("Test") with pytest.raises(ValueError): - directives._add_directive(_PROCESSOR_FRACTION) + directives._add_directive(last_directive) def test_set_defined_directive(self, directives): - directives._set_defined_directive(_NP._name, 10) - assert directives[_NP._name] == 10 + directives._set_defined_directive(_PROCESSES._name, 10) + assert directives[_PROCESSES._name] == 10 def test_set_defined_directive_invalid(self, directives): with pytest.raises(ValueError): - directives._set_defined_directive(_NP._name, 0) + directives._set_defined_directive(_PROCESSES._name, 0) def test_set_undefined_directive(self, directives): with pytest.raises(DirectivesError): @@ -234,85 +154,32 @@ def test_set_directives_item(self, directives): def test_del_directive(self, directives): directives["test"] = True - directives._set_defined_directive(_NP._name, 100) - assert directives[_NP._name] == 100 + directives._set_defined_directive(_PROCESSES._name, 100) + assert directives[_PROCESSES._name] == 100 assert directives["test"] - del directives[_NP._name] - assert directives[_NP._name] == _NP._default + del directives[_PROCESSES._name] + assert directives[_PROCESSES._name] == _PROCESSES._default del directives["test"] with pytest.raises(KeyError): directives["test"] - def test_update_directive_without_aggregate( - self, directives, non_default_directive_values - ): - valid_values_1 = non_default_directive_values[1] - expected_values = { - "np": 4, - "ngpu": 1, - "nranks": 0, - "omp_num_threads": 10, - "executable": "PathFinder", - "walltime": datetime.timedelta(hours=20.0), - "memory": 16, - "processor_fraction": 0.5, - } - directives.update(valid_values_1) - for dirs in directives: - assert directives[dirs] == expected_values[dirs] - - def test_update_directive_serial( - self, available_directives_list, non_default_directive_values - ): - directives1 = _Directives(available_directives_list) - directives2 = _Directives(available_directives_list) - valid_values_0 = non_default_directive_values[0] - valid_values_1 = non_default_directive_values[1] - expected_values = { - "np": 100, - "ngpu": 10, - "nranks": 5, - "omp_num_threads": 20, - "executable": "Non Default Path", - "walltime": datetime.timedelta(hours=84.0), - "memory": 32, - "processor_fraction": 0.5, - } - directives1.update(valid_values_0) - directives2.update(valid_values_1) - directives1.update(directives2, aggregate=True) - for dirs in directives1: - assert directives1[dirs] == expected_values[dirs] - - def test_update_directive_parallel( - self, available_directives_list, non_default_directive_values - ): - directives1 = _Directives(available_directives_list) - directives2 = _Directives(available_directives_list) - valid_values_0 = non_default_directive_values[0] - valid_values_1 = non_default_directive_values[1] - expected_values = { - "np": 104, - "ngpu": 11, - "nranks": 5, - "omp_num_threads": 30, - "executable": "Non Default Path", - "walltime": datetime.timedelta(hours=64.0), - "memory": 48, - "processor_fraction": 0.5, - } - directives1.update(valid_values_0) - directives2.update(valid_values_1) - directives1.update(directives2, aggregate=True, parallel=True) - for dirs in directives1: - assert directives1[dirs] == expected_values[dirs] + def test_update(self, directives, non_default_directive_values): + new_directives = non_default_directive_values[1] + directives.update(new_directives) + for dir_ in new_directives: + if dir_ == "walltime": + assert directives[dir_] == datetime.timedelta( + hours=new_directives[dir_] + ) + else: + assert directives[dir_] == new_directives[dir_] def test_evaluate_directive_none_job( self, directives, non_default_directive_values ): directives.evaluate(None) valid_values = non_default_directive_values[0] - valid_values["processor_fraction"] = lambda job: job.sp.i / 10 + valid_values["processes"] = lambda job: job.sp.i + 1 directives.update(valid_values) with pytest.raises(RuntimeError): directives.evaluate(None) @@ -326,13 +193,8 @@ def test_evaluate_directive_valid_job( for i in range(5): project.open_job(dict(i=i)).init() - valid_values = non_default_directive_values[0] - valid_values["processor_fraction"] = lambda job: round(job.sp.i / 10, 1) - for job in project: directives = _Directives(available_directives_list) - directives.update( - {"processor_fraction": lambda job: round(job.sp.i / 10, 1)} - ) + directives.update({"processes": lambda job: job.sp.i + 1}) directives.evaluate((job,)) - assert directives["processor_fraction"] == round(job.sp.i / 10, 1) + assert directives["processes"] == job.sp.i + 1 diff --git a/tests/test_environment.py b/tests/test_environment.py index 5b179b161..8b1ee688b 100644 --- a/tests/test_environment.py +++ b/tests/test_environment.py @@ -45,15 +45,15 @@ class TestEnvironments(conftest.TestProjectBase): class Project(flow.FlowProject): pass - @Project.operation(directives={"ngpu": 1}) + @Project.operation(directives={"gpus_per_process": 1}) def gpu_op(job): pass - @Project.operation(directives={"np": 1_000}) + @Project.operation(directives={"processes": 1_000}) def large_cpu_op(job): pass - @Project.operation(directives={"np": 1}) + @Project.operation(directives={"processes": 1}) def small_cpu_op(job): pass diff --git a/tests/test_project.py b/tests/test_project.py index 68dd4fbbf..e886a4be8 100644 --- a/tests/test_project.py +++ b/tests/test_project.py @@ -27,6 +27,7 @@ from deprecation import fail_if_not_removed import flow +import flow.directives from flow import FlowProject, aggregator, get_aggregate_id, init from flow.environment import ComputeEnvironment from flow.errors import ( @@ -73,6 +74,41 @@ def is_present(cls): return True +def get_unevaluated_directive( + project: flow.FlowProject, + job: signac.job.Job, + operation_name: str, + directive_name: str, + group_name: str = None, +): + base_directives = project._environment._get_default_directives() + if group_name is None: + operation_directives = project.groups[operation_name].operation_directives[ + operation_name + ] + else: + operation_directives = project.groups[group_name].operation_directives[ + operation_name + ] + base_directives.update(operation_directives) + return base_directives[directive_name] + + +def get_evaluated_directive( + project: flow.FlowProject, + job: signac.job.Job, + operation_name: str, + directive_name: str, + group_name: str = None, +): + unevaluated_directives = get_unevaluated_directive( + project, job, operation_name, directive_name, group_name + ) + if isinstance(job, signac.job.Job): + job = (job,) + return flow.directives._evaluate(unevaluated_directives, job) + + class MockSchedulerSubmitError(Scheduler): def jobs(self): pass @@ -536,7 +572,7 @@ def test_invalid_memory_directive(self): class A(FlowProject): pass - @A.operation(directives={"memory": value}) + @A.operation(directives={"memory_per_cpu": value}) def op1(job): pass @@ -564,17 +600,19 @@ def test_memory_directive(self): class A(FlowProject): pass - @A.operation(directives={"memory": value}) + @A.operation(directives={"memory_per_cpu": value}) def op1(job): pass project = self.mock_project(A) for job in project: - for op in project._next_operations([(job,)]): - if value is None: - assert op.directives["memory"] is None - else: - assert op.directives["memory"] == 0.5 + unevalated_memory = get_unevaluated_directive( + project, job, "op1", "memory_per_cpu" + ) + if value is None: + assert unevalated_memory is None + else: + assert unevalated_memory == 0.5 def test_walltime_directive(self): for value in [ @@ -595,11 +633,13 @@ def op1(job): project = self.mock_project(A) for job in project: - for op in project._next_operations([(job,)]): - if value is None: - assert op.directives["walltime"] is None - else: - assert str(op.directives["walltime"]) == "1:00:00" + evaluated_walltime = get_evaluated_directive( + project, job, "op1", "walltime" + ) + if value is None: + assert evaluated_walltime is None + else: + assert str(evaluated_walltime) == "1:00:00" def test_invalid_walltime_directive(self): for value in [0, -1, "1.0", datetime.timedelta(0), {}]: @@ -621,10 +661,7 @@ def test_callable_directives(self): """Test that callable directives are properly evaluated. _JobOperations and _SubmissionJobOperations should have fully evaluated - (no callable) directives when initialized. We additionally test that the - directives are evaluated to their proper value specifically in the case - of 'np' which is determined by 'nranks' and 'omp_num_threads' if not set - directly. + (no callable) directives when initialized. """ class A(FlowProject): @@ -632,8 +669,10 @@ class A(FlowProject): @A.operation( directives={ - "nranks": lambda job: job.doc.get("nranks", 1), - "omp_num_threads": lambda job: job.doc.get("omp_num_threads", 1), + "processes": lambda job: job.doc.get("processes", 1), + "threads_per_process": lambda job: job.doc.get( + "threads_per_process", 1 + ), } ) def a(job): @@ -641,37 +680,27 @@ def a(job): project = self.mock_project(A) - # test setting neither nranks nor omp_num_threads + # test setting not setting directives for job in project: for next_op in project._next_operations([(job,)]): - assert next_op.directives["np"] == 1 - - # test only setting nranks - for i, job in enumerate(project): - job.doc.nranks = i + 1 - for next_op in project._next_operations([(job,)]): - assert next_op.directives["np"] == next_op.directives["nranks"] - del job.doc["nranks"] + processes = get_evaluated_directive(project, job, "a", "processes") + assert processes == 1 + threads_per_process = get_evaluated_directive( + project, job, "a", "threads_per_process" + ) + assert threads_per_process == 1 - # test only setting omp_num_threads + # test setting threads_per_process for i, job in enumerate(project): - job.doc.omp_num_threads = i + 1 - for next_op in project._next_operations([(job,)]): - assert next_op.directives["np"] == next_op.directives["omp_num_threads"] - del job.doc["omp_num_threads"] - # test setting both nranks and omp_num_threads - for i, job in enumerate(project): - job.doc.omp_num_threads = i + 1 - job.doc.nranks = i % 3 + 1 - expected_np = (i + 1) * (i % 3 + 1) - for next_op in project._next_operations([(job,)]): - assert next_op.directives["np"] == expected_np + job.doc.threads_per_process = i + 1 + threads_per_process = get_evaluated_directive( + project, job, "a", "threads_per_process" + ) + assert threads_per_process == i + 1 + del job.doc["threads_per_process"] # test for proper evaluation of all directives job = next(iter(project)) - job_operation = next(project._next_operations([(job,)])) - assert all(not callable(value) for value in job_operation.directives.values()) - # Also test for submitting operations submit_job_operation = next( project._get_submission_operations( aggregates=[(job,)], @@ -679,7 +708,8 @@ def a(job): ) ) assert all( - not callable(value) for value in submit_job_operation.directives.values() + not callable(value) + for value in submit_job_operation.primary_directives.values() ) def test_callable_directives_with_groups(self): @@ -700,8 +730,10 @@ class A(FlowProject): @group @A.operation( directives={ - "nranks": lambda job: job.doc.get("nranks", 1), - "omp_num_threads": lambda job: job.doc.get("omp_num_threads", 1), + "processes": lambda job: job.doc.get("processes", 1), + "threads_per_process": lambda job: job.doc.get( + "threads_per_process", 1 + ), } ) def a(job): @@ -710,8 +742,10 @@ def a(job): @group @A.operation( directives={ - "nranks": lambda job: job.doc.get("nranks", 1), - "omp_num_threads": lambda job: job.doc.get("omp_num_threads", 1), + "processes": lambda job: job.doc.get("processes", 1), + "threads_per_process": lambda job: job.doc.get( + "threads_per_process", 1 + ), } ) def b(job): @@ -728,7 +762,8 @@ def b(job): if submit_job_operation.name == "group": break assert all( - not callable(value) for value in submit_job_operation.directives.values() + not callable(value) + for value in submit_job_operation.primary_directives.values() ) def test_copy_conditions(self): @@ -1113,21 +1148,6 @@ def op3(job): good_ops = all_ops.difference(bad_ops) assert all([job.doc.get(op, False) for op in good_ops for job in project]) - def test_run_fork(self): - project = self.mock_project() - for job in project: - job.doc.fork = True - break - - with setup_project_subprocess_execution(project): - project.run() - - for job in project: - if job.doc.get("fork"): - assert os.getpid() != job.doc.test - else: - assert os.getpid() == job.doc.test - def test_run_invalid_ops(self): class A(FlowProject): pass @@ -1147,9 +1167,12 @@ def op1(job): def test_submit_operations(self): project = self.mock_project() - operations = [] - for job in project: - operations.extend(project._next_operations([(job,)])) + operations = list( + project._get_submission_operations( + [(job,) for job in project], + project._environment._get_default_directives(), + ) + ) assert len(list(MockScheduler.jobs())) == 0 cluster_job_id = project._store_bundled(operations) with redirect_stderr(StringIO()): @@ -1271,16 +1294,19 @@ def test_submit_status(self): def test_submit_operations_bad_directive(self): project = self.mock_project() - operations = [] - for job in project: - operations.extend( - project._next_operations([(job,)], operation_names=["op1"]) + operations = list( + project._get_submission_operations( + [(job,) for job in project], + project._environment._get_default_directives(), ) + ) assert len(list(MockScheduler.jobs())) == 0 cluster_job_id = project._store_bundled(operations) stderr = StringIO() with redirect_stderr(stderr): - project._submit_operations(_id=cluster_job_id, operations=operations) + project._submit_operations( + _id=cluster_job_id, operations=operations, force=True + ) assert len(list(MockScheduler.jobs())) == 1 assert "Some of the keys provided as part of the directives were not used by the template " "script, including: bad_directive\n" in stderr.getvalue() @@ -1491,6 +1517,7 @@ def test_main_submit_walltime_with_directive(self, monkeypatch): "submit -o op_walltime --pretend --template slurm.sh", subprocess.STDOUT, ).decode("utf-8") + print(output) assert "#SBATCH -t 01:00:00" in output def test_main_submit_walltime_no_directive(self, monkeypatch): @@ -1680,7 +1707,10 @@ def test_directives_hierarchy(self): ) assert len(job_ops) == 1 assert all( - [job_op.directives.get("omp_num_threads", 0) == 4 for job_op in job_ops] + [ + job_op.primary_directives.get("threads_per_process", 0) == 4 + for job_op in job_ops + ] ) job_ops = list( project._get_submission_operations( @@ -1691,26 +1721,10 @@ def test_directives_hierarchy(self): ) assert len(job_ops) == 1 assert all( - [job_op.directives.get("omp_num_threads", 0) == 1 for job_op in job_ops] - ) - # Test run JobOperations - job_ops = list( - project.groups["group2"]._create_run_job_operations( - project._entrypoint, project._get_default_directives(), (job,) - ) - ) - assert len(job_ops) == 1 - assert all( - [job_op.directives.get("omp_num_threads", 0) == 4 for job_op in job_ops] - ) - job_ops = list( - project.groups["op3"]._create_run_job_operations( - project._entrypoint, project._get_default_directives(), (job,) - ) - ) - assert len(job_ops) == 1 - assert all( - [job_op.directives.get("omp_num_threads", 0) == 1 for job_op in job_ops] + [ + job_op.primary_directives.get("threads_per_process", 0) == 1 + for job_op in job_ops + ] ) def test_unique_group_operation_names(self): @@ -1801,29 +1815,34 @@ class A(flow.FlowProject): group = A.make_group("group") - @group(directives={"ngpu": 2, "nranks": 4}) + @group(directives={"gpus_per_process": 1, "processes": 4}) @A.operation def op1(job): pass - @group(directives={"ngpu": 2, "nranks": 4}) + @group(directives={"gpus_per_process": 1, "processes": 4}) @A.operation def op2(job): pass @group - @A.operation(directives={"nranks": 2}) + @A.operation(directives={"processes": 2}) def op3(job): pass project = self.mock_project(A) group = project.groups["group"] job = [j for j in project][0] - directives = group._get_submission_directives( - project._get_default_directives(), (job,) + submission_operations = list( + project._get_submission_operations( + [(job,)], + project._get_default_directives(), + ) ) assert all( - [directives["ngpu"] == 2, directives["nranks"] == 4, directives["np"] == 4] + [directives["gpus_per_process"] == 1 and directives["processes"] == 4] + for submission_op in submission_operations + for directives in submission_op.directives_list ) def test_flowgroup_repr(self): @@ -1832,7 +1851,7 @@ class A(flow.FlowProject): group = A.make_group("group") - @group(directives={"ngpu": 2, "nranks": 4}) + @group(directives={"gpus_per_process": 1, "processes": 4}) @A.operation def op1(job): pass @@ -1843,15 +1862,15 @@ def op2(job): pass @group - @A.operation(directives={"nranks": 2}) + @A.operation(directives={"processes": 2}) def op3(job): pass project = self.mock_project(A) rep_string = repr(project.groups["group"]) assert all(op in rep_string for op in ["op1", "op2", "op3"]) - assert "'nranks': 4" in rep_string - assert "'ngpu': 2" in rep_string + assert "'processes': 4" in rep_string + assert "'gpus_per_process': 1" in rep_string assert "submit_options=''" in rep_string assert "run_options=''" in rep_string assert "name='group'" in rep_string @@ -1897,7 +1916,7 @@ def op1(job): ) ) assert all(" --debug" in op.cmd for op in run_ops) - assert all(op.directives["fork"] for op in run_ops) + assert all(op.fork for op in run_ops) class TestGroupExecutionProject(TestProjectBase): diff --git a/tests/test_util.py b/tests/test_util.py index 392d76b09..dacadaf48 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -77,7 +77,14 @@ class TestTemplateFilters: def mock_operations(self): class MockOp: def __init__(self, memory=None): - self.directives = {"memory": memory} + self.primary_directives = { + "processes": 1, + "threads_per_process": 1, + "memory_per_cpu": memory, + "memory": memory, + "cpus": 1, + "gpus": 0, + } return [MockOp(1), MockOp(8), MockOp()] @@ -94,7 +101,7 @@ def test_calc_memory_serial(self, mock_operations): def test_calc_memory_parallel(self, mock_operations): # Test when operations run in parallel assert calc_memory([mock_operations[0], mock_operations[1]], True) == 9 - assert calc_memory([mock_operations[2]], True) == 0 + assert calc_memory([mock_operations[2]], True) is None assert ( calc_memory( [mock_operations[0], mock_operations[1], mock_operations[2]], True