Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Smart submit mode #110

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
917cdb9
v1 - test
olszowski Jun 3, 2018
0e55c15
refactoring + tests
olszowski Jun 11, 2018
2b0736a
smart submit
olszowski Jun 18, 2018
32c6c04
fixing file name for stdout and stderr
olszowski Jun 26, 2018
acdd1c7
removing testing code
olszowski Jun 26, 2018
843574e
ignore invalid lines from command output
olszowski Jun 26, 2018
60ce443
fixing command call
olszowski Jun 26, 2018
b06d075
fixing script generation
olszowski Jun 30, 2018
d2cfc6a
improved log message
olszowski Jun 30, 2018
08c05e6
Merge remote-tracking branch 'mcpartools/master' into feature/smart_s…
olszowski Jun 30, 2018
3532112
removing unnecessary files
olszowski Jun 30, 2018
50385a3
import for reduce method
olszowski Jun 30, 2018
f3f23c3
fixing missing error class in python 3.x
olszowski Jun 30, 2018
02f00d5
removing unnecessary whitespaces
olszowski Jun 30, 2018
9cdeadd
include j2 file in manifest
olszowski Jun 30, 2018
371f2c1
fixing submit file
olszowski Jul 17, 2018
2f91474
improved options format, configurable utilisation and ratio
olszowski Jul 22, 2018
b925a95
configurable partition
olszowski Jul 22, 2018
99830f7
fix partition support
olszowski Jul 25, 2018
c33cd2e
improved logging
olszowski Jul 25, 2018
6b6814e
experimental: append sbatch log info to the end of file instead of
olszowski Jul 25, 2018
615f718
log nodes in order
olszowski Jul 25, 2018
df19d7c
fixing python3 vs python2 ways to deal with std out from subprocess
olszowski Jul 26, 2018
c1619af
Merge remote-tracking branch 'mcpartools/master' into feature/smart_s…
olszowski Jul 29, 2018
ab27fd8
change string in tests to bstring to better match real output
olszowski Jul 29, 2018
4f3a8a8
python 2 vs 3 test fix
olszowski Jul 29, 2018
76935ab
python 2 vs 3 test fix
olszowski Jul 29, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
recursive-include mcpartools *.py *.sh *.json
recursive-include mcpartools *.py *.sh *.j2 *.json
exclude *.sh *.yml *.ini *.txt *.spec .gitkeep
exclude versioneer.py LICENSE CODE_OF_CONDUCT.md
prune docs
Expand Down
3 changes: 3 additions & 0 deletions mcpartools/generatemc.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ def main(args=sys.argv[1:]):
parser.add_argument('-w', '--workspace',
type=str,
help='workspace directory')
parser.add_argument('--smart',
action='store_true',
help='smart mode on (only for slurm)')
parser.add_argument('-m', '--mc_run_template',
type=str,
default=None,
Expand Down
22 changes: 18 additions & 4 deletions mcpartools/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class Options:
def __init__(self, args):
self._valid = True

self.is_smart = args.smart

self.particle_no = args.particle_no
if self.particle_no < 1:
logger.error("Number of particles should be positive integer (got " + str(self.particle_no) + " instead")
Expand Down Expand Up @@ -128,7 +130,10 @@ def run(self):
if scheduler_class: # if not empty
# list should have only 1 element - that's why we call scheduler_class[0] (list is not callable)
self.scheduler = scheduler_class[0](self.options.scheduler_options)
logger.info("Using: " + self.scheduler.id)
if self.options.is_smart:
logger.info("Using: " + self.scheduler.id + " (smart mode)")
else:
logger.info("Using: " + self.scheduler.id)
else:
logger.error("Given scheduler: \'%s\' is not on the list of supported batch systems: %s",
self.options.batch, [supported.id for supported in SchedulerDiscover.supported])
Expand All @@ -138,7 +143,7 @@ def run(self):
self.generate_workspace()

# generate submit script
self.generate_submit_script()
self.generate_submit_script(is_smart=self.options.is_smart)

# copy input files
self.copy_input()
Expand Down Expand Up @@ -191,15 +196,24 @@ def generate_workspace(self):
self.scheduler.write_main_run_script(jobs_no=self.options.jobs_no, output_dir=self.workspace_dir)
self.mc_engine.write_collect_script(self.main_dir)

def generate_submit_script(self):
def generate_submit_script(self, is_smart=False):
if is_smart:
from mcpartools.scheduler.smart.slurm import get_cluster_state_from_os
cluster_state = get_cluster_state_from_os()
nodes = cluster_state.get_nodes_for_scheduling(int(self.options.jobs_no))
else:
nodes = []

script_path = os.path.join(self.main_dir, self.scheduler.submit_script)
logger.debug("Preparation to generate " + script_path)
logger.debug("Jobs no " + str(self.options.jobs_no))
self.scheduler.write_submit_script(
main_dir=self.main_dir,
script_basename=self.scheduler.submit_script,
jobs_no=self.options.jobs_no,
workspace_dir=self.workspace_dir)
workspace_dir=self.workspace_dir,
is_smart=is_smart,
nodes=nodes)

def copy_input(self):
indir_name = 'input'
Expand Down
38 changes: 26 additions & 12 deletions mcpartools/scheduler/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,36 @@ def __init__(self, scheduler_options):
submit_script = 'submit.sh'
main_run_script = 'main_run.sh'

def submit_script_body(self, jobs_no, main_dir, workspace_dir):
def submit_script_body(self, jobs_no, main_dir, workspace_dir, is_smart=False, nodes=[]):
from pkg_resources import resource_string
tpl = resource_string(__name__, self.submit_script_template)
self.submit_script = tpl.decode('ascii')

log_dir = os.path.join(main_dir, "log")
if not os.path.exists(log_dir):
os.mkdir(log_dir)

return self.submit_script.format(options_args=self.options_args,
jobs_no=jobs_no,
log_dir=log_dir,
script_dir=workspace_dir,
calculate_script_name='main_run.sh',
main_dir=main_dir,
collect_script_name='collect.sh')
if is_smart:
from jinja2.environment import Template
tpl = resource_string(__name__, self.smart_submit_script_template)

self.submit_script = tpl.decode('ascii')

return Template(self.submit_script).render(options_args=self.options_args,
jobs_no=jobs_no,
log_dir=log_dir,
workspace_dir=workspace_dir,
nodes=nodes)
else:
tpl = resource_string(__name__, self.submit_script_template)

self.submit_script = tpl.decode('ascii')

return self.submit_script.format(options_args=self.options_args,
jobs_no=jobs_no,
log_dir=log_dir,
script_dir=workspace_dir,
calculate_script_name='main_run.sh',
main_dir=main_dir,
collect_script_name='collect.sh')

def main_run_script_body(self, jobs_no, workspace_dir):
from pkg_resources import resource_string
Expand All @@ -51,12 +65,12 @@ def main_run_script_body(self, jobs_no, workspace_dir):
jobs_no=jobs_no)
return self.main_run_script

def write_submit_script(self, main_dir, script_basename, jobs_no, workspace_dir):
def write_submit_script(self, main_dir, script_basename, jobs_no, workspace_dir, is_smart, nodes):
script_path = os.path.join(main_dir, script_basename)
fd = open(script_path, 'w')
abs_path_workspace = os.path.abspath(workspace_dir)
abs_path_main_dir = os.path.abspath(main_dir)
fd.write(self.submit_script_body(jobs_no, abs_path_main_dir, abs_path_workspace))
fd.write(self.submit_script_body(jobs_no, abs_path_main_dir, abs_path_workspace, is_smart, nodes))
fd.close()
os.chmod(script_path, 0o750)
logger.debug("Saved submit script: " + script_path)
Expand Down
30 changes: 30 additions & 0 deletions mcpartools/scheduler/data/smart_submit_slurm.sh.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/bin/bash

# Log file submit.log will be created in the same directory submit.sh is located
# submit.log is for storing stdout and stderr of sbatch command, for log info from individual jobs see {log_dir:s} directory
LOGFILE="$(cd $(dirname $0) && pwd)/submit.log"
echo -n "" > "$LOGFILE"

# Create temporary files for parsing stdout and stderr output from sbatch command before storing them in submit.log
OUT=`mktemp`
ERR=`mktemp`
# On exit or if the script is interrupted (i.e. by receiving SIGINT signal) delete temporary files
trap "rm -f $OUT $ERR" EXIT
{% for node_id in nodes %}
sbatch {{options_args}} --nodelist={{node_id}} -n1 --output="{{log_dir}}/output_{{loop.index0}}_{{node_id}}.log" --error="{{log_dir}}/error_{{loop.index0}}_{{node_id}}.log" --parsable {{workspace_dir}}/{{"job_{0:04d}".format(loop.index)}}/run.sh > $OUT 2> $ERR
{% endfor %}
echo "Saving logs to $LOGFILE"

# If sbatch command ended with a success log following info
if [ $? -eq 0 ] ; then
echo "Job ID: `cat $OUT | cut -d ";" -f 1`" > "$LOGFILE"
echo "Submission time: `date +"%Y-%m-%d %H:%M:%S"`" >> "$LOGFILE"
fi

# If output from stderr isn't an empty string then log it as well to submit.log
if [ "`cat $ERR`" != "" ] ; then
echo "---------------------" >> "$LOGFILE"
echo "ERROR MESSAGE" >>"$LOGFILE"
echo "---------------------" >> "$LOGFILE"
cat $ERR >> "$LOGFILE"
fi
1 change: 1 addition & 0 deletions mcpartools/scheduler/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ def __init__(self, options_content):
JobScheduler.__init__(self, options_content)

submit_script_template = os.path.join('data', 'submit_slurm.sh')
smart_submit_script_template = os.path.join('data', 'smart_submit_slurm.sh.j2')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why .j2 suffix ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a jinja2 template, I wanted this to be explicit.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


main_run_script_template = os.path.join('data', 'run_slurm.sh')
Empty file.
95 changes: 95 additions & 0 deletions mcpartools/scheduler/smart/slurm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import logging

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


class NodeInfo:
def __init__(self, line):
logger.debug(line)
parameters = line.split(" ")
logger.debug(parameters)
self.node_id = parameters[0]
self.partition = parameters[1]
self.load = parameters[2]
self.state = parameters[3]

cpu = parameters[4].split('/')
self.cpu_available = int(cpu[0])
self.cpu_idle = int(cpu[1])
self.cpu_other = int(cpu[2])
self.cpu_total = int(cpu[3])

def is_idle(self):
return self.state == "idle"

def is_mixed(self):
return self.state == "mixed"


class ClusterState:
efficiency_ratio = 0.5
step = 1.08

def __init__(self, nodes_info):
"""
:type nodes_info: list of NodeInfo
"""
self.nodes_info = nodes_info

def get_idle_nodes(self):
return [node for node in self.nodes_info if node.is_idle()]

def get_mixed_nodes(self):
return [node for node in self.nodes_info if node.is_mixed()]

def max_capacity(self):
capacities = [node.cpu_idle for node in self.nodes_info]
from functools import reduce
return reduce((lambda x, y: x + y), capacities)

def get_nodes_for_scheduling(self, jobs_no):
if jobs_no > self.max_capacity():
raise AssertionError("Jobs count exceeds maximum cluster capacity.")
nodes_sorted = self.__sort(self.nodes_info)

ratio = self.efficiency_ratio
while int(self.max_capacity() * ratio) < jobs_no:
ratio = ratio * self.step

if ratio > 1:
ratio = 1

node_ids = []
from itertools import repeat
for node in nodes_sorted:
count = int(round(node.cpu_idle * ratio))
node_ids.extend(repeat(node.node_id, times=count))

return node_ids[:jobs_no]

def __sort(self, nodes):
from operator import attrgetter
return sorted(nodes, key=attrgetter('state', 'load', 'cpu_idle'))


def cluster_status_from_raw_stdout(std_out):
splitted_output = std_out.split("\n")[1:]
nodes = []
for line in splitted_output:
try:
nodeinfo = NodeInfo(line)
nodes.append(nodeinfo)
except Exception:
logger.info("Unable to parse line, skipping: " + line)
cluster_info = ClusterState(nodes)
return cluster_info


def get_cluster_state_from_os():
from subprocess import check_output, STDOUT
from shlex import split
command = "sinfo --states='idle,mixed' --partition=plgrid --format='%n %P %O %T %C'"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why plgrid partition is hardcoded ?

output = check_output(split(command), shell=False, stderr=STDOUT)
cluster_info = cluster_status_from_raw_stdout(output)
return cluster_info
Empty file added tests/scheduler/__init__.py
Empty file.
Empty file.
92 changes: 92 additions & 0 deletions tests/scheduler/smart/test_slurm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from itertools import repeat
from unittest import TestCase

from mcpartools.scheduler.smart.slurm import cluster_status_from_raw_stdout


class TestClusterInfo(TestCase):
raw_stdout = """HOSTNAMES PARTITION NODES CPU_LOAD STATE CPUS(A/I/O/T)
p0615 plgrid* 3.92 mixed 16/8/0/24
p0620 plgrid* 0.41 idle 0/24/0/24
p0627 plgrid* 4.00 mixed 16/8/0/24"""
invalid_raw_stdout = """HOSTNAMES PARTITION NODES CPU_LOAD STATE CPUS(A/I/O/T)
p0615 plgrid* 3.92 mixed [16/8/0/24
p0615 plgrid* 3.92 mixed 16/8/0/24
p0620 plgrid* 0.41 idle 0/24/0/24
p0627 plgrid* 4.00 mixed 16/8/0/24

"""

def test_cluster_status_from_raw_stdout(self):
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout)
self.assertEquals(len(cluster_status.nodes_info), 3)

def test_should_skip_invalid_line_while_building_cluster_status_from_raw_stdout(self):
cluster_status = cluster_status_from_raw_stdout(self.invalid_raw_stdout)
self.assertEquals(len(cluster_status.nodes_info), 3)

def test_get_idle_nodes(self):
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout)
idle = cluster_status.get_idle_nodes()
self.assertEquals(len(idle), 1)

idle_ids = [node.node_id for node in idle]
self.assertEquals(idle_ids, ["p0620"])

def test_get_mixed_nodes(self):
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout)
idle = cluster_status.get_mixed_nodes()
self.assertEquals(len(idle), 2)

idle_ids = [node.node_id for node in idle]
self.assertEquals(idle_ids, ["p0615", "p0627"])

def test_get_nodes_for_scheduling_1(self):
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout)
nodes = cluster_status.get_nodes_for_scheduling(5)
self.assertEquals(nodes, ['p0620', 'p0620', 'p0620', 'p0620', 'p0620'])

def test_get_nodes_for_scheduling_2(self):
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout)
nodes = cluster_status.get_nodes_for_scheduling(20)
expected = []
expected.extend(repeat('p0620', 12))
expected.extend(repeat('p0615', 4))
expected.extend(repeat('p0627', 4))
self.assertEquals(nodes, expected)

def test_get_nodes_for_scheduling_3(self):
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout)
nodes = cluster_status.get_nodes_for_scheduling(30)
expected = []
expected.extend(repeat('p0620', 19))
expected.extend(repeat('p0615', 6))
expected.extend(repeat('p0627', 5))
self.assertEquals(nodes, expected)

def test_get_nodes_for_scheduling_4(self):
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout)
nodes = cluster_status.get_nodes_for_scheduling(31)
expected = []
expected.extend(repeat('p0620', 19))
expected.extend(repeat('p0615', 6))
expected.extend(repeat('p0627', 6))
self.assertEquals(nodes, expected)

def test_get_nodes_for_scheduling_5(self):
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout)
nodes = cluster_status.get_nodes_for_scheduling(39)
expected = []
expected.extend(repeat('p0620', 24))
expected.extend(repeat('p0615', 8))
expected.extend(repeat('p0627', 7))
self.assertEquals(nodes, expected)

def test_get_nodes_for_scheduling_6(self):
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout)
nodes = cluster_status.get_nodes_for_scheduling(40)
expected = []
expected.extend(repeat('p0620', 24))
expected.extend(repeat('p0615', 8))
expected.extend(repeat('p0627', 8))
self.assertEquals(nodes, expected)