Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add prediction feature for shieldhit #172

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions mcpartools/generatemc.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ def main(args=sys.argv[1:]):
type=int,
required=True,
help='number of parallel jobs')
parser.add_argument('-P', '--prediction',
action='store_true',
help='Calculate best configuration')
parser.add_argument('-d', '--dry_run',
action='store_true',
help='Dry run without creating a workspace')
parser.add_argument('input',
type=str,
help='path to input configuration')
Expand Down
50 changes: 49 additions & 1 deletion mcpartools/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from mcpartools.mcengine.common import EngineDiscover
from mcpartools.scheduler.common import SchedulerDiscover
from progress.bar import ChargingBar
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add try/catch


logger = logging.getLogger(__name__)

Expand All @@ -17,7 +18,6 @@


class Options:

collect_methods = ('mv', 'cp', 'plotdata', 'image', 'custom')

def __init__(self, args):
Expand Down Expand Up @@ -92,6 +92,12 @@ def __init__(self, args):
# no checks needed - argparse does it
self.batch = args.batch

# no checks needed - argparse does it
self.prediction = args.prediction

# no checks needed - argparse does it
self.dry_run = args.dry_run

@property
def valid(self):
return self._valid
Expand All @@ -115,6 +121,45 @@ def run(self):
logger.error("Invalid options, aborting run")
return None

logger.info("Given configuration:")
logger.info("Particles per job - {0}".format(self.options.particle_no))
logger.info("Number of jobs - {0}".format(self.options.jobs_no))
estimated_time = self.mc_engine.calculation_time(self.options.particle_no, self.options.jobs_no,
self.options.collect)
m, s = divmod(estimated_time, 60)
logger.info("Estimated calculation time: {} minute(s) {} second(s)\n".format(int(m), int(s)))

# predict jobs_no for particle_no if option was chosen
if self.options.prediction:
try:
total_part_no = self.options.particle_no * self.options.jobs_no
predicted_jobs_no = self.mc_engine.predict_best(total_part_no, self.options.collect)
if predicted_jobs_no:
self.options.jobs_no = predicted_jobs_no
self.options.particle_no = total_part_no // self.options.jobs_no

logger.info("Predicted configuration:")
logger.info("Particles per job - {0}".format(self.options.particle_no))
logger.info("Number of jobs - {0}".format(self.options.jobs_no))

estimated_time = self.mc_engine.calculation_time(self.options.particle_no, self.options.jobs_no,
self.options.collect)
m, s = divmod(estimated_time, 60)
logger.info("Estimated calculation time: {} minute(s) {} second(s)\n".format(int(m), int(s)))

if total_part_no - self.options.particle_no * self.options.jobs_no > 0:
logger.warn("{0} is not divided by {1} !".format(total_part_no, self.options.jobs_no))
logger.warn("{0} particles will be calculated! NOT {1} !\n".format(
self.options.particle_no * self.options.jobs_no, total_part_no))
else:
return None

except NotImplementedError:
logger.error("Prediction feature is not supported for {0}".format(self.mc_engine))

if self.options.dry_run:
return None

# generate main dir according to date
self.generate_main_dir()

Expand Down Expand Up @@ -174,6 +219,7 @@ def generate_workspace(self):
logger.debug("Generated workspace directory path: " + wspdir_path)
os.mkdir(wspdir_path)
self.workspace_dir = wspdir_path
bar = ChargingBar("Creating workspace", max=self.options.jobs_no)

for jobid in range(self.options.jobs_no):
jobdir_name = "job_{0:04d}".format(jobid + 1)
Expand All @@ -187,9 +233,11 @@ def generate_workspace(self):
self.mc_engine.save_input(jobdir_path)

self.mc_engine.save_run_script(jobdir_path, jobid + 1)
bar.next()

self.scheduler.write_main_run_script(jobs_no=self.options.jobs_no, output_dir=self.workspace_dir)
self.mc_engine.write_collect_script(self.main_dir)
bar.finish()

def generate_submit_script(self):
script_path = os.path.join(self.main_dir, self.scheduler.submit_script)
Expand Down
24 changes: 24 additions & 0 deletions mcpartools/mcengine/data/regression.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
; Configuration file for regressions.

[SHIELDHIT]

; Experimentally found parameters of functions extimating calculation time
JOBS_AND_PARTICLES = 0.00795388
JOBS_AND_SIZE_A = 0.00765724
JOBS_AND_SIZE_B = 0.00001393
FILES_AND_SIZE_A = 0.00066932
FILES_AND_SIZE_B = 85.3317364
FILES_AND_SIZE_C = 0.35205707
DENSITY_AND_SIZE = 7.66666667

COLLECT_STANDARD_DEVIATION = 1.38
CALCULATION_STANDARD_DEVIATION = 1.31
COLLECT_SIZE_MIN = 10

; Relative correlation between different collect types
IMAGE_COLLECT_COEF = 1
PLOTDATA_COLLECT_COEF = 1.2
MV_COLLECT_COEF = 0
CP_COLLECT_COEF = 0.5
CUSTOM_COLLECT_COEF = 1
MV_COLLECT_TIME = 1
3 changes: 3 additions & 0 deletions mcpartools/mcengine/fluka.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ def __init__(self, input_path, mc_run_script, collect_method, mc_engine_options)

self.collect_script_content = resource_string(__name__, self.collect_script).decode('ascii')

def __str__(self):
return "Fluka"

@property
def input_files(self):
# TODO check if additional files are needed
Expand Down
6 changes: 6 additions & 0 deletions mcpartools/mcengine/mcengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,9 @@ def write_collect_script(self, output_dir):
def find_external_files(self, run_input_dir):
"""Returns paths to found external files"""
raise NotImplementedError()

def predict_best(self, particle_no, collect_type):
raise NotImplementedError

def calculation_time(self, total_particles_no, jobs_no, collect_type):
raise NotImplementedError
156 changes: 153 additions & 3 deletions mcpartools/mcengine/shieldhit.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
from configparser import ConfigParser
from pkg_resources import resource_string

from mcpartools.mcengine.mcengine import Engine
Expand All @@ -8,9 +9,11 @@


class ShieldHit(Engine):

default_run_script_path = os.path.join('data', 'run_shieldhit.sh')
regression_cfg_path = os.path.join('data', 'regression.ini')
output_wildcard = "*.bdo"
max_predicted_job_number = 750
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract to some configuration file

smallCollectFileCoef = (3 * 15 / 125000000.0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also should go to configuration


def __init__(self, input_path, mc_run_script, collect_method, mc_engine_options):
Engine.__init__(self, input_path, mc_run_script, collect_method, mc_engine_options)
Expand All @@ -26,18 +29,60 @@ def __init__(self, input_path, mc_run_script, collect_method, mc_engine_options)
tpl_fd.close()
logger.debug("Using user run script: " + self.run_script_path)

self.config = self.regression_config
if self.config is None:
logger.warning("Could not properly parse configuration file for prediction feature")
else:
try:
self.jobs_and_particles_regression = float(self.config["JOBS_AND_PARTICLES"])
self.jobs_and_size_regression = [float(self.config["JOBS_AND_SIZE_A"]),
float(self.config["JOBS_AND_SIZE_B"])]
self.files_and_size_regression = [float(self.config["FILES_AND_SIZE_A"]), float(self.config["FILES_AND_SIZE_B"]),
float(self.config["FILES_AND_SIZE_C"])]
self.density_and_size_regression = float(self.config["DENSITY_AND_SIZE"])
self.collect_std_deviation = float(self.config['COLLECT_STANDARD_DEVIATION'])
self.calculation_std_deviation = float(self.config['CALCULATION_STANDARD_DEVIATION'])
logger.debug("Regressions from config file:")
logger.debug("JOBS_AND_PARTICLES = {0}".format(self.jobs_and_particles_regression))
logger.debug("JOBS_AND_SIZE = {0}".format(self.jobs_and_size_regression))
logger.debug("DENSITY_AND_SIZE = {0}".format(self.density_and_size_regression))
except ValueError:
logger.warning("Config file could not be read properly! Probably coefficients are not floats")
except KeyError:
logger.warning("Config file could not be read properly! Probably missing some variables")

self.collect_script_content = resource_string(__name__, self.collect_script).decode('ascii')

self.files_size = self.calculate_size()
self.files_no_multiplier = 1 if self.files_size[0] == 0 else (self.files_size[1] / 10.0) * \
self.files_and_size_regression[0] * \
(self.files_size[0] - self.files_and_size_regression[1]) ** 2 + \
self.files_and_size_regression[2] * ((self.files_size[1] + 10) / 10.0)

self.particle_no = 1
self.rng_seed = 1

def __str__(self):
return "ShieldHit"

@property
def input_files(self):
base = os.path.abspath(self.input_path)
files = ('beam.dat', 'geo.dat', 'mat.dat', 'detect.dat')
result = (os.path.join(base, f) for f in files)
return result

@property
def regression_config(self):
config = ConfigParser()
cfg_rs = resource_string(__name__, self.regression_cfg_path)
config_string = cfg_rs.decode('ascii')
config.read_string(config_string)
try:
return config["SHIELDHIT"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe better config.get("SHIELDHIT") ? It should return None if key is missing

This comment was marked as outdated.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately it is not a dictionary and it is not possible to do this that smoothly.

except KeyError:
return None

def randomize(self, new_seed, output_dir=None):
self.rng_seed = new_seed

Expand Down Expand Up @@ -118,12 +163,12 @@ def _parse_beam_file(self, file_path, run_input_dir):
# line length checking to prevent IndexError
if len(split_line) > 2 and split_line[0] == "USEBMOD":
logger.debug("Found reference to external file in BEAM file: {0} {1}".format(
split_line[0], split_line[2]))
split_line[0], split_line[2]))
external_files.append(split_line[2])
paths_to_replace.append(split_line[2])
elif len(split_line) > 1 and split_line[0] == "USECBEAM":
logger.debug("Found reference to external file in BEAM file: {0} {1}".format(
split_line[0], split_line[1]))
split_line[0], split_line[1]))
external_files.append(split_line[1])
paths_to_replace.append(split_line[1])
if paths_to_replace:
Expand Down Expand Up @@ -242,3 +287,108 @@ def _rewrite_paths_in_file(config_file, paths_to_replace):
with open(config_file, 'w') as outfile:
for line in lines:
outfile.write(line)

def predict_best(self, total_particle_no, collect_type):
try:
import numpy as np
except ImportError as e:
logger.error("Numpy not found. Please install numpy or avoid -P option")
raise e

try:

# This type of collect almost not affect calculation time
if collect_type == "mv":
return self.max_predicted_job_number

# The coefficients correspond to the derivative function. That function was found experimentally
# For small output file, collect behave differently than for big ones
elif self.files_size[0] < 10:
coeff = [self.collect_std_deviation * self.files_no_multiplier * self.collect_coefficient(collect_type) * self.smallCollectFileCoef,
0, 0, 0, -self.jobs_and_particles_regression * total_particle_no * self.calculation_std_deviation]
else:
coeff = [self.collect_std_deviation * self.files_no_multiplier * self.collect_coefficient(collect_type) *
(self.jobs_and_size_regression[1] * self.files_size[0] ** 2 +
self.jobs_and_size_regression[0] * self.files_size[0]), 0,
-self.jobs_and_particles_regression * total_particle_no * self.calculation_std_deviation]

# smallest, real solution
results = [int(x.real) for x in np.roots(coeff) if np.isreal(x) and x.real > 0]
result = sorted([(x, self._calculation_time(total_particle_no, x, collect_type)) for x in results],
key=lambda x: x[1])[0][0]

result = self.max_predicted_job_number if result > self.max_predicted_job_number else result
except ZeroDivisionError:
# output file is extremely small
result = self.max_predicted_job_number
except AttributeError:
logger.error("Could not predict configuration! Check correctness of config file for prediction feature")
return None
return result

def calculate_size(self):
try:
beam_file, geo_file, mat_file, detect_file = self.input_files
count = True
a = self.density_and_size_regression
files_size = 0
i = 0
counter = 0
with open(detect_file, 'r') as detect:
for line in detect:
if line[0] == "*":
i = 0
if i % 4 == 1:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add many comments in this method

count = True
scoring = line.split()[0]
logger.debug("Found {0} in detect.dat".format(scoring))
if scoring == "GEOMAP":
count = False
if i % 4 == 2 and count:
x, y, z = [int(j) for j in line.split()[0:3]]
files_size += a * (x * y * z) / 1000000
counter += 1
logger.debug("x = {0}, y = {1}, z = {2}, files_size = {3} ".format(x, y, z, files_size))
i += 1
return files_size, counter
except AttributeError:
logger.error("Could not calculate size of files! Check correctness of config file for prediction feature")
return None

def calculation_time(self, particles_no_per_job, jobs_no, collect_type):
return self._calculation_time(particles_no_per_job * jobs_no, jobs_no, collect_type)

def _calculation_time(self, total_particles_no, jobs_no, collect_type):
try:
# This type of collect has constant execution time
if collect_type == "mv":
collect_time = float(self.config['MV_COLLECT_TIME'])
elif self.files_size[0] < 10:
collect_time = 5 + 15 * (jobs_no ** 3) / 125000000
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are these constants ?

else:
collect_time = self.jobs_and_size_regression[0] * self.files_size[0] * jobs_no + \
self.jobs_and_size_regression[1] * jobs_no * self.files_size[0] ** 2

calc_time = self.jobs_and_particles_regression * (1 / float(jobs_no)) * total_particles_no
collect_time *= self.files_no_multiplier * self.collect_std_deviation
collect_coef = self.collect_coefficient(collect_type)
if collect_coef > 0:
collect_time *= collect_coef

calc_time *= self.calculation_std_deviation

return collect_time + calc_time
except AttributeError:
logger.error("Could not estimate calculation time! Check correctness of config file for prediction feature")
return None

def collect_coefficient(self, collect_option):
collect_coef = {
'mv': self.config['MV_COLLECT_COEF'],
'cp': self.config['CP_COLLECT_COEF'],
'plotdata': self.config['PLOTDATA_COLLECT_COEF'],
'image': self.config['IMAGE_COLLECT_COEF'],
'custom': self.config['CUSTOM_COLLECT_COEF'],
}[collect_option]

return float(collect_coef)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def get_cmdclass():
version=get_version(),
packages=setuptools.find_packages(where='.', exclude=("*.tests", "*.tests.*", "tests.*", "tests")),
package_data={
'mcpartools.mcengine.data': ['*.sh', '*.json'],
'mcpartools.mcengine.data': ['*.sh', '*.json', '*.ini'],
'mcpartools.scheduler.data': ['*.sh'],
},
url='https://github.com/DataMedSci/mcpartools',
Expand Down