-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Smart submit mode #110
base: master
Are you sure you want to change the base?
Smart submit mode #110
Changes from 11 commits
917cdb9
0e55c15
2b0736a
32c6c04
acdd1c7
843574e
60ce443
b06d075
d2cfc6a
08c05e6
3532112
50385a3
f3f23c3
02f00d5
9cdeadd
371f2c1
2f91474
b925a95
99830f7
c33cd2e
6b6814e
615f718
df19d7c
c1619af
ab27fd8
4f3a8a8
76935ab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
#!/bin/bash | ||
|
||
# Log file submit.log will be created in the same directory submit.sh is located | ||
# submit.log is for storing stdout and stderr of sbatch command, for log info from individual jobs see {log_dir:s} directory | ||
LOGFILE="$(cd $(dirname $0) && pwd)/submit.log" | ||
echo -n "" > "$LOGFILE" | ||
|
||
# Create temporary files for parsing stdout and stderr output from sbatch command before storing them in submit.log | ||
OUT=`mktemp` | ||
ERR=`mktemp` | ||
# On exit or if the script is interrupted (i.e. by receiving SIGINT signal) delete temporary files | ||
trap "rm -f $OUT $ERR" EXIT | ||
{% for node_id in nodes %} | ||
sbatch {{options_args}} --nodelist={{node_id}} -n1 --output="{{log_dir}}/output_{{loop.index0}}_{{node_id}}.log" --error="{{log_dir}}/error_{{loop.index0}}_{{node_id}}.log" --parsable {{main_dir}}/{{"job_{0:04d}".format(loop.index)}}/run.sh > $OUT 2> $ERR | ||
{% endfor %} | ||
echo "Saving logs to $LOGFILE" | ||
|
||
# If sbatch command ended with a success log following info | ||
if [ $? -eq 0 ] ; then | ||
echo "Job ID: `cat $OUT | cut -d ";" -f 1`" > "$LOGFILE" | ||
echo "Submission time: `date +"%Y-%m-%d %H:%M:%S"`" >> "$LOGFILE" | ||
fi | ||
|
||
# If output from stderr isn't an empty string then log it as well to submit.log | ||
if [ "`cat $ERR`" != "" ] ; then | ||
echo "---------------------" >> "$LOGFILE" | ||
echo "ERROR MESSAGE" >>"$LOGFILE" | ||
echo "---------------------" >> "$LOGFILE" | ||
cat $ERR >> "$LOGFILE" | ||
fi |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,5 +11,6 @@ def __init__(self, options_content): | |
JobScheduler.__init__(self, options_content) | ||
|
||
submit_script_template = os.path.join('data', 'submit_slurm.sh') | ||
smart_submit_script_template = os.path.join('data', 'smart_submit_slurm.sh.j2') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a jinja2 template, I wanted this to be explicit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
|
||
main_run_script_template = os.path.join('data', 'run_slurm.sh') |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
import logging | ||
|
||
logger = logging.getLogger(__name__) | ||
logger.setLevel(logging.INFO) | ||
|
||
|
||
class NodeInfo: | ||
def __init__(self, line): | ||
logger.debug(line) | ||
parameters = line.split(" ") | ||
logger.debug(parameters) | ||
self.node_id = parameters[0] | ||
self.partition = parameters[1] | ||
self.load = parameters[2] | ||
self.state = parameters[3] | ||
|
||
cpu = parameters[4].split('/') | ||
self.cpu_available = int(cpu[0]) | ||
self.cpu_idle = int(cpu[1]) | ||
self.cpu_other = int(cpu[2]) | ||
self.cpu_total = int(cpu[3]) | ||
|
||
def is_idle(self): | ||
return self.state == "idle" | ||
|
||
def is_mixed(self): | ||
return self.state == "mixed" | ||
|
||
|
||
class ClusterState: | ||
efficiency_ratio = 0.5 | ||
step = 1.08 | ||
|
||
def __init__(self, nodes_info): | ||
""" | ||
:type nodes_info: list of NodeInfo | ||
""" | ||
self.nodes_info = nodes_info | ||
|
||
def get_idle_nodes(self): | ||
return [node for node in self.nodes_info if node.is_idle()] | ||
|
||
def get_mixed_nodes(self): | ||
return [node for node in self.nodes_info if node.is_mixed()] | ||
|
||
def max_capacity(self): | ||
capacities = [node.cpu_idle for node in self.nodes_info] | ||
return reduce((lambda x, y: x + y), capacities) | ||
|
||
def get_nodes_for_scheduling(self, jobs_no): | ||
if jobs_no > self.max_capacity(): | ||
raise AssertionError("Jobs count exceeds maximum cluster capacity.") | ||
nodes_sorted = self.__sort(self.nodes_info) | ||
|
||
ratio = self.efficiency_ratio | ||
while int(self.max_capacity() * ratio) < jobs_no: | ||
ratio = ratio * self.step | ||
|
||
if ratio > 1: | ||
ratio = 1 | ||
|
||
node_ids = [] | ||
for node in nodes_sorted: | ||
count = int(round(node.cpu_idle * ratio)) | ||
from itertools import repeat | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why doing |
||
node_ids.extend(repeat(node.node_id, times=count)) | ||
|
||
return node_ids[:jobs_no] | ||
|
||
def __sort(self, nodes): | ||
from operator import attrgetter | ||
return sorted(nodes, key=attrgetter('state', 'load', 'cpu_idle')) | ||
|
||
|
||
def cluster_status_from_raw_stdout(std_out): | ||
splitted_output = std_out.split("\n")[1:] | ||
nodes = [] | ||
for line in splitted_output: | ||
try: | ||
nodeinfo = NodeInfo(line) | ||
nodes.append(nodeinfo) | ||
except StandardError: | ||
logger.info("Unable to parse line, skipping: " + line) | ||
cluster_info = ClusterState(nodes) | ||
return cluster_info | ||
|
||
|
||
def get_cluster_state_from_os(): | ||
from subprocess import check_output, STDOUT | ||
from shlex import split | ||
command = "sinfo --states='idle,mixed' --partition=plgrid --format='%n %P %O %T %C'" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why |
||
output = check_output(split(command), shell=False, stderr=STDOUT) | ||
cluster_info = cluster_status_from_raw_stdout(output) | ||
return cluster_info |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
from itertools import repeat | ||
from unittest import TestCase | ||
|
||
from mcpartools.scheduler.smart.slurm import cluster_status_from_raw_stdout | ||
|
||
|
||
class TestClusterInfo(TestCase): | ||
raw_stdout = """HOSTNAMES PARTITION NODES CPU_LOAD STATE CPUS(A/I/O/T) | ||
p0615 plgrid* 3.92 mixed 16/8/0/24 | ||
p0620 plgrid* 0.41 idle 0/24/0/24 | ||
p0627 plgrid* 4.00 mixed 16/8/0/24""" | ||
invalid_raw_stdout = """HOSTNAMES PARTITION NODES CPU_LOAD STATE CPUS(A/I/O/T) | ||
p0615 plgrid* 3.92 mixed [16/8/0/24 | ||
p0615 plgrid* 3.92 mixed 16/8/0/24 | ||
p0620 plgrid* 0.41 idle 0/24/0/24 | ||
p0627 plgrid* 4.00 mixed 16/8/0/24 | ||
|
||
""" | ||
|
||
def test_cluster_status_from_raw_stdout(self): | ||
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout) | ||
self.assertEquals(len(cluster_status.nodes_info), 3) | ||
|
||
def test_should_skip_invalid_line_while_building_cluster_status_from_raw_stdout(self): | ||
cluster_status = cluster_status_from_raw_stdout(self.invalid_raw_stdout) | ||
self.assertEquals(len(cluster_status.nodes_info), 3) | ||
|
||
def test_get_idle_nodes(self): | ||
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout) | ||
idle = cluster_status.get_idle_nodes() | ||
self.assertEquals(len(idle), 1) | ||
|
||
idle_ids = [node.node_id for node in idle] | ||
self.assertEquals(idle_ids, ["p0620"]) | ||
|
||
def test_get_mixed_nodes(self): | ||
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout) | ||
idle = cluster_status.get_mixed_nodes() | ||
self.assertEquals(len(idle), 2) | ||
|
||
idle_ids = [node.node_id for node in idle] | ||
self.assertEquals(idle_ids, ["p0615", "p0627"]) | ||
|
||
def test_get_nodes_for_scheduling_1(self): | ||
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout) | ||
nodes = cluster_status.get_nodes_for_scheduling(5) | ||
self.assertEquals(nodes, ['p0620', 'p0620', 'p0620', 'p0620', 'p0620']) | ||
|
||
def test_get_nodes_for_scheduling_2(self): | ||
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout) | ||
nodes = cluster_status.get_nodes_for_scheduling(20) | ||
expected = [] | ||
expected.extend(repeat('p0620', 12)) | ||
expected.extend(repeat('p0615', 4)) | ||
expected.extend(repeat('p0627', 4)) | ||
self.assertEquals(nodes, expected) | ||
|
||
def test_get_nodes_for_scheduling_3(self): | ||
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout) | ||
nodes = cluster_status.get_nodes_for_scheduling(30) | ||
expected = [] | ||
expected.extend(repeat('p0620', 19)) | ||
expected.extend(repeat('p0615', 6)) | ||
expected.extend(repeat('p0627', 5)) | ||
self.assertEquals(nodes, expected) | ||
|
||
def test_get_nodes_for_scheduling_4(self): | ||
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout) | ||
nodes = cluster_status.get_nodes_for_scheduling(31) | ||
expected = [] | ||
expected.extend(repeat('p0620', 19)) | ||
expected.extend(repeat('p0615', 6)) | ||
expected.extend(repeat('p0627', 6)) | ||
self.assertEquals(nodes, expected) | ||
|
||
def test_get_nodes_for_scheduling_5(self): | ||
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout) | ||
nodes = cluster_status.get_nodes_for_scheduling(39) | ||
expected = [] | ||
expected.extend(repeat('p0620', 24)) | ||
expected.extend(repeat('p0615', 8)) | ||
expected.extend(repeat('p0627', 7)) | ||
self.assertEquals(nodes, expected) | ||
|
||
def test_get_nodes_for_scheduling_6(self): | ||
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout) | ||
nodes = cluster_status.get_nodes_for_scheduling(40) | ||
expected = [] | ||
expected.extend(repeat('p0620', 24)) | ||
expected.extend(repeat('p0615', 8)) | ||
expected.extend(repeat('p0627', 8)) | ||
self.assertEquals(nodes, expected) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please avoid whitespace changes