diff --git a/MANIFEST.in b/MANIFEST.in index 05a8763..af8e343 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,4 @@ - recursive-include mcpartools *.py *.sh *.json + recursive-include mcpartools *.py *.sh *.j2 *.json exclude *.sh *.yml *.ini *.txt *.spec .gitkeep exclude versioneer.py LICENSE CODE_OF_CONDUCT.md prune docs diff --git a/mcpartools/generatemc.py b/mcpartools/generatemc.py index 55ade53..e49f9b7 100644 --- a/mcpartools/generatemc.py +++ b/mcpartools/generatemc.py @@ -14,61 +14,86 @@ def main(args=sys.argv[1:]): """ import mcpartools parser = argparse.ArgumentParser() - parser.add_argument('-V', '--version', - action='version', - version=mcpartools.__version__) - parser.add_argument('-v', '--verbose', - action='count', - default=0, - help='Give more output. Option is additive, ' - 'and can be used up to 3 times') - parser.add_argument('-q', '--quiet', - action='count', - default=0, - help='Be silent') - parser.add_argument('-w', '--workspace', - type=str, - help='workspace directory') - parser.add_argument('-m', '--mc_run_template', - type=str, - default=None, - help='path to optional MC run script') - parser.add_argument('-s', '--scheduler_options', - type=str, - default=None, - help='optional scheduler options: path to a file or list of options in square brackets') - parser.add_argument('-e', '--mc_engine_options', - type=str, - default=None, - help='optional MC engine options: path to a file or list of options in square brackets') - parser.add_argument('-x', '--external_files', - nargs='+', # list may be empty - type=str, - help='list of external files to be copied into each job working directory') - parser.add_argument('-b', '--batch', - type=str, - default=None, - choices=[b.id for b in SchedulerDiscover.supported], - help='Available batch systems: {}'.format([b.id for b in SchedulerDiscover.supported])) - parser.add_argument('-c', '--collect', - type=str, - default='mv', - choices=Options.collect_methods, - help='Available collect methods') - parser.add_argument('-p', '--particle_no', - dest='particle_no', - metavar='particle_no', - type=int, - required=True, - help='number of primary particles per job') - parser.add_argument('-j', '--jobs_no', - type=int, - required=True, - help='number of parallel jobs') + + general_args = parser.add_argument_group(title=None, description='General options') + config_args = parser.add_argument_group(title=None, description='Generator configuration options') + mc_args = parser.add_argument_group(title=None, description='Monte Carlo options') + smart_args = parser.add_argument_group(title=None, description='Smart submit options.') + parser.add_argument('input', type=str, help='path to input configuration') - # TODO add grouping of options + + general_args.add_argument('-V', '--version', + action='version', + version=mcpartools.__version__) + general_args.add_argument('-v', '--verbose', + action='count', + default=0, + help='Give more output. Option is additive, ' + 'and can be used up to 3 times') + general_args.add_argument('-q', '--quiet', + action='count', + default=0, + help='Be silent') + + config_args.add_argument('-w', '--workspace', + type=str, + help='workspace directory') + + config_args.add_argument('-m', '--mc_run_template', + type=str, + default=None, + help='path to optional MC run script') + config_args.add_argument('-s', '--scheduler_options', + type=str, + default=None, + help='optional scheduler options: path to a file or list of options in square brackets') + config_args.add_argument('-e', '--mc_engine_options', + type=str, + default=None, + help='optional MC engine options: path to a file or list of options in square brackets') + config_args.add_argument('-x', '--external_files', + nargs='+', # list may be empty + type=str, + help='list of external files to be copied into each job working directory') + config_args.add_argument('-b', '--batch', + type=str, + default=None, + choices=[b.id for b in SchedulerDiscover.supported], + help='Available batch systems: {}'.format([b.id for b in SchedulerDiscover.supported])) + config_args.add_argument('-c', '--collect', + type=str, + default='mv', + choices=Options.collect_methods, + help='Available collect methods') + + mc_args.add_argument('-p', '--particle_no', + dest='particle_no', + metavar='particle_no', + type=int, + required=True, + help='number of primary particles per job') + mc_args.add_argument('-j', '--jobs_no', + type=int, + required=True, + help='number of parallel jobs') + + smart_args.add_argument('--smart', + action='store_true', + help='smart mode on (only for slurm)') + smart_args.add_argument('--partition', + default="plgrid", + help='partition (default: %(default)s)') + smart_args.add_argument('--utilisation', + type=float, + default=0.5, + help='cluster node utilisation (default: %(default)s), 1 stands for 100%%') + smart_args.add_argument('--ratio', + type=float, + default=1.08, + help='multiply utilisation by ratio (default: %(default)s) to contain all jobs') + args = parser.parse_args(args) if args.quiet: diff --git a/mcpartools/generator.py b/mcpartools/generator.py index 8fcfc19..4672c31 100644 --- a/mcpartools/generator.py +++ b/mcpartools/generator.py @@ -92,11 +92,31 @@ def __init__(self, args): # no checks needed - argparse does it self.batch = args.batch + self.smart_options = SmartOptions(args) + + def is_smart_enabled(self): + return self.smart_options.is_smart_enabled() + @property def valid(self): return self._valid +class SmartOptions: + def __init__(self, args): + self.is_smart = args.smart + self.utilisation = args.utilisation + self.ratio = args.ratio + self.partition = args.partition + self.nodes = None + + def is_smart_enabled(self): + return self.is_smart is True + + def set_nodes(self, nodes): + self.nodes = nodes + + class Generator: def __init__(self, options): self.options = options @@ -128,7 +148,10 @@ def run(self): if scheduler_class: # if not empty # list should have only 1 element - that's why we call scheduler_class[0] (list is not callable) self.scheduler = scheduler_class[0](self.options.scheduler_options) - logger.info("Using: " + self.scheduler.id) + if self.options.is_smart_enabled(): + logger.info("Using: " + self.scheduler.id + " (smart mode)") + else: + logger.info("Using: " + self.scheduler.id) else: logger.error("Given scheduler: \'%s\' is not on the list of supported batch systems: %s", self.options.batch, [supported.id for supported in SchedulerDiscover.supported]) @@ -138,7 +161,7 @@ def run(self): self.generate_workspace() # generate submit script - self.generate_submit_script() + self.generate_submit_script(smart=self.options.smart_options) # copy input files self.copy_input() @@ -191,7 +214,15 @@ def generate_workspace(self): self.scheduler.write_main_run_script(jobs_no=self.options.jobs_no, output_dir=self.workspace_dir) self.mc_engine.write_collect_script(self.main_dir) - def generate_submit_script(self): + def generate_submit_script(self, smart): + if smart.is_smart_enabled(): + from mcpartools.scheduler.smart.slurm import get_cluster_state_from_os + cluster_state = get_cluster_state_from_os(partition=smart.partition) + smart.set_nodes( + cluster_state.get_nodes_for_scheduling( + int(self.options.jobs_no), smart.utilisation, smart.ratio)) + self._log_selected_nodes(smart) + script_path = os.path.join(self.main_dir, self.scheduler.submit_script) logger.debug("Preparation to generate " + script_path) logger.debug("Jobs no " + str(self.options.jobs_no)) @@ -199,7 +230,14 @@ def generate_submit_script(self): main_dir=self.main_dir, script_basename=self.scheduler.submit_script, jobs_no=self.options.jobs_no, - workspace_dir=self.workspace_dir) + workspace_dir=self.workspace_dir, + smart=smart) + + def _log_selected_nodes(self, smart): + logger.info("Nodes selected for smart mode submit:") + nodes_tuple = tuple(smart.nodes) + for node in sorted(set(smart.nodes)): + logger.info("Node " + node + " used " + str(nodes_tuple.count(node)) + " times") def copy_input(self): indir_name = 'input' diff --git a/mcpartools/scheduler/base.py b/mcpartools/scheduler/base.py index 3f58efa..0cc4472 100644 --- a/mcpartools/scheduler/base.py +++ b/mcpartools/scheduler/base.py @@ -26,22 +26,37 @@ def __init__(self, scheduler_options): submit_script = 'submit.sh' main_run_script = 'main_run.sh' - def submit_script_body(self, jobs_no, main_dir, workspace_dir): + def submit_script_body(self, jobs_no, main_dir, workspace_dir, smart): from pkg_resources import resource_string - tpl = resource_string(__name__, self.submit_script_template) - self.submit_script = tpl.decode('ascii') log_dir = os.path.join(main_dir, "log") if not os.path.exists(log_dir): os.mkdir(log_dir) - return self.submit_script.format(options_args=self.options_args, - jobs_no=jobs_no, - log_dir=log_dir, - script_dir=workspace_dir, - calculate_script_name='main_run.sh', - main_dir=main_dir, - collect_script_name='collect.sh') + if smart.is_smart_enabled(): + from jinja2.environment import Template + tpl = resource_string(__name__, self.smart_submit_script_template) + + self.submit_script = tpl.decode('ascii') + + return Template(self.submit_script).render(options_args=self.options_args, + jobs_no=jobs_no, + log_dir=log_dir, + workspace_dir=workspace_dir, + nodes=smart.nodes, + partition=smart.partition) + else: + tpl = resource_string(__name__, self.submit_script_template) + + self.submit_script = tpl.decode('ascii') + + return self.submit_script.format(options_args=self.options_args, + jobs_no=jobs_no, + log_dir=log_dir, + script_dir=workspace_dir, + calculate_script_name='main_run.sh', + main_dir=main_dir, + collect_script_name='collect.sh') def main_run_script_body(self, jobs_no, workspace_dir): from pkg_resources import resource_string @@ -51,12 +66,12 @@ def main_run_script_body(self, jobs_no, workspace_dir): jobs_no=jobs_no) return self.main_run_script - def write_submit_script(self, main_dir, script_basename, jobs_no, workspace_dir): + def write_submit_script(self, main_dir, script_basename, jobs_no, workspace_dir, smart): script_path = os.path.join(main_dir, script_basename) fd = open(script_path, 'w') abs_path_workspace = os.path.abspath(workspace_dir) abs_path_main_dir = os.path.abspath(main_dir) - fd.write(self.submit_script_body(jobs_no, abs_path_main_dir, abs_path_workspace)) + fd.write(self.submit_script_body(jobs_no, abs_path_main_dir, abs_path_workspace, smart)) fd.close() os.chmod(script_path, 0o750) logger.debug("Saved submit script: " + script_path) diff --git a/mcpartools/scheduler/data/smart_submit_slurm.sh.j2 b/mcpartools/scheduler/data/smart_submit_slurm.sh.j2 new file mode 100644 index 0000000..e3b4591 --- /dev/null +++ b/mcpartools/scheduler/data/smart_submit_slurm.sh.j2 @@ -0,0 +1,30 @@ +#!/bin/bash + +# Log file submit.log will be created in the same directory submit.sh is located +# submit.log is for storing stdout and stderr of sbatch command, for log info from individual jobs see {log_dir:s} directory +LOGFILE="$(cd $(dirname $0) && pwd)/submit.log" +echo -n "" > "$LOGFILE" + +# Create temporary files for parsing stdout and stderr output from sbatch command before storing them in submit.log +OUT=`mktemp` +ERR=`mktemp` +# On exit or if the script is interrupted (i.e. by receiving SIGINT signal) delete temporary files +trap "rm -f $OUT $ERR" EXIT +{% for node_id in nodes %} +sbatch {{options_args}} --partition={{partition}} --nodelist={{node_id}} -n1 --output="{{log_dir}}/output_{{loop.index0}}_{{node_id}}.log" --error="{{log_dir}}/error_{{loop.index0}}_{{node_id}}.log" --parsable {{workspace_dir}}/{{"job_{0:04d}".format(loop.index)}}/run.sh >> $OUT 2>> $ERR +{% endfor %} +echo "Saving logs to $LOGFILE" + +# If sbatch command ended with a success log following info +if [ $? -eq 0 ] ; then + echo "Job ID: `cat $OUT | cut -d ";" -f 1`" > "$LOGFILE" + echo "Submission time: `date +"%Y-%m-%d %H:%M:%S"`" >> "$LOGFILE" +fi + +# If output from stderr isn't an empty string then log it as well to submit.log +if [ "`cat $ERR`" != "" ] ; then + echo "---------------------" >> "$LOGFILE" + echo "ERROR MESSAGE" >>"$LOGFILE" + echo "---------------------" >> "$LOGFILE" + cat $ERR >> "$LOGFILE" +fi diff --git a/mcpartools/scheduler/slurm.py b/mcpartools/scheduler/slurm.py index b809ac2..dda8210 100644 --- a/mcpartools/scheduler/slurm.py +++ b/mcpartools/scheduler/slurm.py @@ -11,5 +11,6 @@ def __init__(self, options_content): JobScheduler.__init__(self, options_content) submit_script_template = os.path.join('data', 'submit_slurm.sh') + smart_submit_script_template = os.path.join('data', 'smart_submit_slurm.sh.j2') main_run_script_template = os.path.join('data', 'run_slurm.sh') diff --git a/mcpartools/scheduler/smart/__init__.py b/mcpartools/scheduler/smart/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mcpartools/scheduler/smart/slurm.py b/mcpartools/scheduler/smart/slurm.py new file mode 100644 index 0000000..87ab8bd --- /dev/null +++ b/mcpartools/scheduler/smart/slurm.py @@ -0,0 +1,101 @@ +import logging + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +class NodeInfo: + def __init__(self, line): + logger.debug(line) + parameters = line.split(" ") + logger.debug(parameters) + self.node_id = parameters[0] + self.partition = parameters[1] + self.load = parameters[2] + self.state = parameters[3] + + cpu = parameters[4].split('/') + self.cpu_available = int(cpu[0]) + self.cpu_idle = int(cpu[1]) + self.cpu_other = int(cpu[2]) + self.cpu_total = int(cpu[3]) + + def is_idle(self): + return self.state == "idle" + + def is_mixed(self): + return self.state == "mixed" + + +class ClusterState: + def __init__(self, nodes_info): + """ + :type nodes_info: list of NodeInfo + """ + self.nodes_info = nodes_info + + def get_idle_nodes(self): + return [node for node in self.nodes_info if node.is_idle()] + + def get_mixed_nodes(self): + return [node for node in self.nodes_info if node.is_mixed()] + + def max_capacity(self): + capacities = [node.cpu_idle for node in self.nodes_info] + from functools import reduce + return reduce((lambda x, y: x + y), capacities) + + def get_nodes_for_scheduling(self, jobs_no, utilisation, ratio): + if jobs_no > self.max_capacity(): + raise AssertionError("Jobs count exceeds maximum cluster capacity.") + nodes_sorted = self.__sort(self.nodes_info) + + ''' + Iteratively increase cluster nodes utilisation based on given params. + If, for the given initial utilisation, it is impossible to contain all required jobs, + multiply utilisation by given ratio. + In other words, utilisation 0.5 means that at minimum half of the available cores on single node + will be used (if necessary). Ratio 1.08 means that if initial utilisation is not enough, in the next iteration + algorithm will try to use utilisation of 0.5*1.08 = 0.54 (8% bigger). + ''' + util = utilisation + while int(self.max_capacity() * util) < jobs_no: + util = util * ratio + + if util > 1: + util = 1 + + node_ids = [] + from itertools import repeat + for node in nodes_sorted: + count = int(round(node.cpu_idle * util)) + node_ids.extend(repeat(node.node_id, times=count)) + + return node_ids[:jobs_no] + + def __sort(self, nodes): + from operator import attrgetter + return sorted(nodes, key=attrgetter('state', 'load', 'cpu_idle')) + + +def cluster_status_from_stdout(std_out): + splitted_output = std_out.split("\n")[1:] + nodes = [] + for line in splitted_output: + try: + nodeinfo = NodeInfo(line) + nodes.append(nodeinfo) + except Exception: + logger.info("Unable to parse line, skipping: " + line) + cluster_info = ClusterState(nodes) + return cluster_info + + +def get_cluster_state_from_os(partition): + from subprocess import check_output, STDOUT + from shlex import split + command = "sinfo --states='idle,mixed' --partition={partition} --format='%n %P %O %T %C'" \ + .format(partition=partition) + output = check_output(split(command), shell=False, stderr=STDOUT).decode("UTF-8") + cluster_info = cluster_status_from_stdout(output) + return cluster_info diff --git a/tests/scheduler/__init__.py b/tests/scheduler/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/scheduler/smart/__init__.py b/tests/scheduler/smart/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/scheduler/smart/test_slurm.py b/tests/scheduler/smart/test_slurm.py new file mode 100644 index 0000000..3820c2e --- /dev/null +++ b/tests/scheduler/smart/test_slurm.py @@ -0,0 +1,94 @@ +from itertools import repeat +from unittest import TestCase + +from mcpartools.scheduler.smart.slurm import cluster_status_from_stdout + + +class TestClusterInfo(TestCase): + stdout = """HOSTNAMES PARTITION NODES CPU_LOAD STATE CPUS(A/I/O/T) +p0615 plgrid* 3.92 mixed 16/8/0/24 +p0620 plgrid* 0.41 idle 0/24/0/24 +p0627 plgrid* 4.00 mixed 16/8/0/24""" + invalid_stdout = """HOSTNAMES PARTITION NODES CPU_LOAD STATE CPUS(A/I/O/T) +p0615 plgrid* 3.92 mixed [16/8/0/24 +p0615 plgrid* 3.92 mixed 16/8/0/24 +p0620 plgrid* 0.41 idle 0/24/0/24 +p0627 plgrid* 4.00 mixed 16/8/0/24 + +""" + utilisation = 0.5 + ratio = 1.08 + + def test_cluster_status_from_raw_stdout(self): + cluster_status = cluster_status_from_stdout(self.stdout) + self.assertEquals(len(cluster_status.nodes_info), 3) + + def test_should_skip_invalid_line_while_building_cluster_status_from_raw_stdout(self): + cluster_status = cluster_status_from_stdout(self.invalid_stdout) + self.assertEquals(len(cluster_status.nodes_info), 3) + + def test_get_idle_nodes(self): + cluster_status = cluster_status_from_stdout(self.stdout) + idle = cluster_status.get_idle_nodes() + self.assertEquals(len(idle), 1) + + idle_ids = [node.node_id for node in idle] + self.assertEquals(idle_ids, ["p0620"]) + + def test_get_mixed_nodes(self): + cluster_status = cluster_status_from_stdout(self.stdout) + idle = cluster_status.get_mixed_nodes() + self.assertEquals(len(idle), 2) + + idle_ids = [node.node_id for node in idle] + self.assertEquals(idle_ids, ["p0615", "p0627"]) + + def test_get_nodes_for_scheduling_1(self): + cluster_status = cluster_status_from_stdout(self.stdout) + nodes = cluster_status.get_nodes_for_scheduling(5, self.utilisation, self.ratio) + self.assertEquals(nodes, ['p0620', 'p0620', 'p0620', 'p0620', 'p0620']) + + def test_get_nodes_for_scheduling_2(self): + cluster_status = cluster_status_from_stdout(self.stdout) + nodes = cluster_status.get_nodes_for_scheduling(20, self.utilisation, self.ratio) + expected = [] + expected.extend(repeat('p0620', 12)) + expected.extend(repeat('p0615', 4)) + expected.extend(repeat('p0627', 4)) + self.assertEquals(nodes, expected) + + def test_get_nodes_for_scheduling_3(self): + cluster_status = cluster_status_from_stdout(self.stdout) + nodes = cluster_status.get_nodes_for_scheduling(30, self.utilisation, self.ratio) + expected = [] + expected.extend(repeat('p0620', 19)) + expected.extend(repeat('p0615', 6)) + expected.extend(repeat('p0627', 5)) + self.assertEquals(nodes, expected) + + def test_get_nodes_for_scheduling_4(self): + cluster_status = cluster_status_from_stdout(self.stdout) + nodes = cluster_status.get_nodes_for_scheduling(31, self.utilisation, self.ratio) + expected = [] + expected.extend(repeat('p0620', 19)) + expected.extend(repeat('p0615', 6)) + expected.extend(repeat('p0627', 6)) + self.assertEquals(nodes, expected) + + def test_get_nodes_for_scheduling_5(self): + cluster_status = cluster_status_from_stdout(self.stdout) + nodes = cluster_status.get_nodes_for_scheduling(39, self.utilisation, self.ratio) + expected = [] + expected.extend(repeat('p0620', 24)) + expected.extend(repeat('p0615', 8)) + expected.extend(repeat('p0627', 7)) + self.assertEquals(nodes, expected) + + def test_get_nodes_for_scheduling_6(self): + cluster_status = cluster_status_from_stdout(self.stdout) + nodes = cluster_status.get_nodes_for_scheduling(40, self.utilisation, self.ratio) + expected = [] + expected.extend(repeat('p0620', 24)) + expected.extend(repeat('p0615', 8)) + expected.extend(repeat('p0627', 8)) + self.assertEquals(nodes, expected)