-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Smart submit mode #110
Open
olszowski
wants to merge
27
commits into
DataMedSci:master
Choose a base branch
from
olszowski:feature/smart_submit_mode
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Smart submit mode #110
Changes from 16 commits
Commits
Show all changes
27 commits
Select commit
Hold shift + click to select a range
917cdb9
v1 - test
olszowski 0e55c15
refactoring + tests
olszowski 2b0736a
smart submit
olszowski 32c6c04
fixing file name for stdout and stderr
olszowski acdd1c7
removing testing code
olszowski 843574e
ignore invalid lines from command output
olszowski 60ce443
fixing command call
olszowski b06d075
fixing script generation
olszowski d2cfc6a
improved log message
olszowski 08c05e6
Merge remote-tracking branch 'mcpartools/master' into feature/smart_s…
olszowski 3532112
removing unnecessary files
olszowski 50385a3
import for reduce method
olszowski f3f23c3
fixing missing error class in python 3.x
olszowski 02f00d5
removing unnecessary whitespaces
olszowski 9cdeadd
include j2 file in manifest
olszowski 371f2c1
fixing submit file
olszowski 2f91474
improved options format, configurable utilisation and ratio
olszowski b925a95
configurable partition
olszowski 99830f7
fix partition support
olszowski c33cd2e
improved logging
olszowski 6b6814e
experimental: append sbatch log info to the end of file instead of
olszowski 615f718
log nodes in order
olszowski df19d7c
fixing python3 vs python2 ways to deal with std out from subprocess
olszowski c1619af
Merge remote-tracking branch 'mcpartools/master' into feature/smart_s…
olszowski ab27fd8
change string in tests to bstring to better match real output
olszowski 4f3a8a8
python 2 vs 3 test fix
olszowski 76935ab
python 2 vs 3 test fix
olszowski File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
#!/bin/bash | ||
|
||
# Log file submit.log will be created in the same directory submit.sh is located | ||
# submit.log is for storing stdout and stderr of sbatch command, for log info from individual jobs see {log_dir:s} directory | ||
LOGFILE="$(cd $(dirname $0) && pwd)/submit.log" | ||
echo -n "" > "$LOGFILE" | ||
|
||
# Create temporary files for parsing stdout and stderr output from sbatch command before storing them in submit.log | ||
OUT=`mktemp` | ||
ERR=`mktemp` | ||
# On exit or if the script is interrupted (i.e. by receiving SIGINT signal) delete temporary files | ||
trap "rm -f $OUT $ERR" EXIT | ||
{% for node_id in nodes %} | ||
sbatch {{options_args}} --nodelist={{node_id}} -n1 --output="{{log_dir}}/output_{{loop.index0}}_{{node_id}}.log" --error="{{log_dir}}/error_{{loop.index0}}_{{node_id}}.log" --parsable {{workspace_dir}}/{{"job_{0:04d}".format(loop.index)}}/run.sh > $OUT 2> $ERR | ||
{% endfor %} | ||
echo "Saving logs to $LOGFILE" | ||
|
||
# If sbatch command ended with a success log following info | ||
if [ $? -eq 0 ] ; then | ||
echo "Job ID: `cat $OUT | cut -d ";" -f 1`" > "$LOGFILE" | ||
echo "Submission time: `date +"%Y-%m-%d %H:%M:%S"`" >> "$LOGFILE" | ||
fi | ||
|
||
# If output from stderr isn't an empty string then log it as well to submit.log | ||
if [ "`cat $ERR`" != "" ] ; then | ||
echo "---------------------" >> "$LOGFILE" | ||
echo "ERROR MESSAGE" >>"$LOGFILE" | ||
echo "---------------------" >> "$LOGFILE" | ||
cat $ERR >> "$LOGFILE" | ||
fi |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
import logging | ||
|
||
logger = logging.getLogger(__name__) | ||
logger.setLevel(logging.INFO) | ||
|
||
|
||
class NodeInfo: | ||
def __init__(self, line): | ||
logger.debug(line) | ||
parameters = line.split(" ") | ||
logger.debug(parameters) | ||
self.node_id = parameters[0] | ||
self.partition = parameters[1] | ||
self.load = parameters[2] | ||
self.state = parameters[3] | ||
|
||
cpu = parameters[4].split('/') | ||
self.cpu_available = int(cpu[0]) | ||
self.cpu_idle = int(cpu[1]) | ||
self.cpu_other = int(cpu[2]) | ||
self.cpu_total = int(cpu[3]) | ||
|
||
def is_idle(self): | ||
return self.state == "idle" | ||
|
||
def is_mixed(self): | ||
return self.state == "mixed" | ||
|
||
|
||
class ClusterState: | ||
efficiency_ratio = 0.5 | ||
step = 1.08 | ||
|
||
def __init__(self, nodes_info): | ||
""" | ||
:type nodes_info: list of NodeInfo | ||
""" | ||
self.nodes_info = nodes_info | ||
|
||
def get_idle_nodes(self): | ||
return [node for node in self.nodes_info if node.is_idle()] | ||
|
||
def get_mixed_nodes(self): | ||
return [node for node in self.nodes_info if node.is_mixed()] | ||
|
||
def max_capacity(self): | ||
capacities = [node.cpu_idle for node in self.nodes_info] | ||
from functools import reduce | ||
return reduce((lambda x, y: x + y), capacities) | ||
|
||
def get_nodes_for_scheduling(self, jobs_no): | ||
if jobs_no > self.max_capacity(): | ||
raise AssertionError("Jobs count exceeds maximum cluster capacity.") | ||
nodes_sorted = self.__sort(self.nodes_info) | ||
|
||
ratio = self.efficiency_ratio | ||
while int(self.max_capacity() * ratio) < jobs_no: | ||
ratio = ratio * self.step | ||
|
||
if ratio > 1: | ||
ratio = 1 | ||
|
||
node_ids = [] | ||
from itertools import repeat | ||
for node in nodes_sorted: | ||
count = int(round(node.cpu_idle * ratio)) | ||
node_ids.extend(repeat(node.node_id, times=count)) | ||
|
||
return node_ids[:jobs_no] | ||
|
||
def __sort(self, nodes): | ||
from operator import attrgetter | ||
return sorted(nodes, key=attrgetter('state', 'load', 'cpu_idle')) | ||
|
||
|
||
def cluster_status_from_raw_stdout(std_out): | ||
splitted_output = std_out.split("\n")[1:] | ||
nodes = [] | ||
for line in splitted_output: | ||
try: | ||
nodeinfo = NodeInfo(line) | ||
nodes.append(nodeinfo) | ||
except Exception: | ||
logger.info("Unable to parse line, skipping: " + line) | ||
cluster_info = ClusterState(nodes) | ||
return cluster_info | ||
|
||
|
||
def get_cluster_state_from_os(): | ||
from subprocess import check_output, STDOUT | ||
from shlex import split | ||
command = "sinfo --states='idle,mixed' --partition=plgrid --format='%n %P %O %T %C'" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why |
||
output = check_output(split(command), shell=False, stderr=STDOUT) | ||
cluster_info = cluster_status_from_raw_stdout(output) | ||
return cluster_info |
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
from itertools import repeat | ||
from unittest import TestCase | ||
|
||
from mcpartools.scheduler.smart.slurm import cluster_status_from_raw_stdout | ||
|
||
|
||
class TestClusterInfo(TestCase): | ||
raw_stdout = """HOSTNAMES PARTITION NODES CPU_LOAD STATE CPUS(A/I/O/T) | ||
p0615 plgrid* 3.92 mixed 16/8/0/24 | ||
p0620 plgrid* 0.41 idle 0/24/0/24 | ||
p0627 plgrid* 4.00 mixed 16/8/0/24""" | ||
invalid_raw_stdout = """HOSTNAMES PARTITION NODES CPU_LOAD STATE CPUS(A/I/O/T) | ||
p0615 plgrid* 3.92 mixed [16/8/0/24 | ||
p0615 plgrid* 3.92 mixed 16/8/0/24 | ||
p0620 plgrid* 0.41 idle 0/24/0/24 | ||
p0627 plgrid* 4.00 mixed 16/8/0/24 | ||
|
||
""" | ||
|
||
def test_cluster_status_from_raw_stdout(self): | ||
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout) | ||
self.assertEquals(len(cluster_status.nodes_info), 3) | ||
|
||
def test_should_skip_invalid_line_while_building_cluster_status_from_raw_stdout(self): | ||
cluster_status = cluster_status_from_raw_stdout(self.invalid_raw_stdout) | ||
self.assertEquals(len(cluster_status.nodes_info), 3) | ||
|
||
def test_get_idle_nodes(self): | ||
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout) | ||
idle = cluster_status.get_idle_nodes() | ||
self.assertEquals(len(idle), 1) | ||
|
||
idle_ids = [node.node_id for node in idle] | ||
self.assertEquals(idle_ids, ["p0620"]) | ||
|
||
def test_get_mixed_nodes(self): | ||
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout) | ||
idle = cluster_status.get_mixed_nodes() | ||
self.assertEquals(len(idle), 2) | ||
|
||
idle_ids = [node.node_id for node in idle] | ||
self.assertEquals(idle_ids, ["p0615", "p0627"]) | ||
|
||
def test_get_nodes_for_scheduling_1(self): | ||
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout) | ||
nodes = cluster_status.get_nodes_for_scheduling(5) | ||
self.assertEquals(nodes, ['p0620', 'p0620', 'p0620', 'p0620', 'p0620']) | ||
|
||
def test_get_nodes_for_scheduling_2(self): | ||
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout) | ||
nodes = cluster_status.get_nodes_for_scheduling(20) | ||
expected = [] | ||
expected.extend(repeat('p0620', 12)) | ||
expected.extend(repeat('p0615', 4)) | ||
expected.extend(repeat('p0627', 4)) | ||
self.assertEquals(nodes, expected) | ||
|
||
def test_get_nodes_for_scheduling_3(self): | ||
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout) | ||
nodes = cluster_status.get_nodes_for_scheduling(30) | ||
expected = [] | ||
expected.extend(repeat('p0620', 19)) | ||
expected.extend(repeat('p0615', 6)) | ||
expected.extend(repeat('p0627', 5)) | ||
self.assertEquals(nodes, expected) | ||
|
||
def test_get_nodes_for_scheduling_4(self): | ||
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout) | ||
nodes = cluster_status.get_nodes_for_scheduling(31) | ||
expected = [] | ||
expected.extend(repeat('p0620', 19)) | ||
expected.extend(repeat('p0615', 6)) | ||
expected.extend(repeat('p0627', 6)) | ||
self.assertEquals(nodes, expected) | ||
|
||
def test_get_nodes_for_scheduling_5(self): | ||
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout) | ||
nodes = cluster_status.get_nodes_for_scheduling(39) | ||
expected = [] | ||
expected.extend(repeat('p0620', 24)) | ||
expected.extend(repeat('p0615', 8)) | ||
expected.extend(repeat('p0627', 7)) | ||
self.assertEquals(nodes, expected) | ||
|
||
def test_get_nodes_for_scheduling_6(self): | ||
cluster_status = cluster_status_from_raw_stdout(self.raw_stdout) | ||
nodes = cluster_status.get_nodes_for_scheduling(40) | ||
expected = [] | ||
expected.extend(repeat('p0620', 24)) | ||
expected.extend(repeat('p0615', 8)) | ||
expected.extend(repeat('p0627', 8)) | ||
self.assertEquals(nodes, expected) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why
.j2
suffix ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a jinja2 template, I wanted this to be explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok