diff --git a/mcpartools/generatemc.py b/mcpartools/generatemc.py index 55ade53..6609d1a 100644 --- a/mcpartools/generatemc.py +++ b/mcpartools/generatemc.py @@ -65,6 +65,12 @@ def main(args=sys.argv[1:]): type=int, required=True, help='number of parallel jobs') + parser.add_argument('-P', '--prediction', + action='store_true', + help='Calculate best configuration') + parser.add_argument('-d', '--dry_run', + action='store_true', + help='Dry run without creating a workspace') parser.add_argument('input', type=str, help='path to input configuration') diff --git a/mcpartools/generator.py b/mcpartools/generator.py index 8fcfc19..b26fe4b 100644 --- a/mcpartools/generator.py +++ b/mcpartools/generator.py @@ -17,7 +17,6 @@ class Options: - collect_methods = ('mv', 'cp', 'plotdata', 'image', 'custom') def __init__(self, args): @@ -92,6 +91,12 @@ def __init__(self, args): # no checks needed - argparse does it self.batch = args.batch + # no checks needed - argparse does it + self.prediction = args.prediction + + # no checks needed - argparse does it + self.dry_run = args.dry_run + @property def valid(self): return self._valid @@ -115,6 +120,48 @@ def run(self): logger.error("Invalid options, aborting run") return None + logger.info("Given configuration:") + logger.info("Particles per job - {0}".format(self.options.particle_no)) + logger.info("Number of jobs - {0}".format(self.options.jobs_no)) + estimated_time = self.mc_engine.calculation_time(self.options.particle_no, self.options.jobs_no, + self.options.collect) + if estimated_time: + m, s = divmod(estimated_time, 60) + logger.info("Calculation time in the best case: {} minute(s) {} second(s)\n".format(int(m), int(s))) + + # predict jobs_no for particle_no if option was chosen + if self.options.prediction: + try: + total_part_no = self.options.particle_no * self.options.jobs_no + predicted_jobs_no = self.mc_engine.predict_best(total_part_no, self.options.collect) + if predicted_jobs_no: + self.options.jobs_no = predicted_jobs_no + self.options.particle_no = total_part_no // self.options.jobs_no + + logger.info("Predicted configuration:") + logger.info("Particles per job - {0}".format(self.options.particle_no)) + logger.info("Number of jobs - {0}".format(self.options.jobs_no)) + + estimated_time = self.mc_engine.calculation_time(self.options.particle_no, self.options.jobs_no, + self.options.collect) + if estimated_time: + m, s = divmod(estimated_time, 60) + logger.info("Calculation time in the best case: {} minute(s) {} second(s)\n".format( + int(m), int(s))) + + if total_part_no - self.options.particle_no * self.options.jobs_no > 0: + logger.warn("{0} is not divided by {1} !".format(total_part_no, self.options.jobs_no)) + logger.warn("{0} particles will be calculated! NOT {1} !\n".format( + self.options.particle_no * self.options.jobs_no, total_part_no)) + else: + return None + + except NotImplementedError: + logger.error("Prediction feature is not supported for {0}".format(self.mc_engine)) + + if self.options.dry_run: + return None + # generate main dir according to date self.generate_main_dir() @@ -174,6 +221,13 @@ def generate_workspace(self): logger.debug("Generated workspace directory path: " + wspdir_path) os.mkdir(wspdir_path) self.workspace_dir = wspdir_path + bar_avail = False + try: + from progress.bar import ChargingBar + bar = ChargingBar("Creating workspace", max=self.options.jobs_no) + bar_avail = True + except ImportError: + logger.info("Progress bar not available. Please install progress for better user experience") for jobid in range(self.options.jobs_no): jobdir_name = "job_{0:04d}".format(jobid + 1) @@ -187,9 +241,13 @@ def generate_workspace(self): self.mc_engine.save_input(jobdir_path) self.mc_engine.save_run_script(jobdir_path, jobid + 1) + if bar_avail: + bar.next() self.scheduler.write_main_run_script(jobs_no=self.options.jobs_no, output_dir=self.workspace_dir) self.mc_engine.write_collect_script(self.main_dir) + if bar_avail: + bar.finish() def generate_submit_script(self): script_path = os.path.join(self.main_dir, self.scheduler.submit_script) diff --git a/mcpartools/mcengine/data/regression.ini b/mcpartools/mcengine/data/regression.ini new file mode 100644 index 0000000..9b44279 --- /dev/null +++ b/mcpartools/mcengine/data/regression.ini @@ -0,0 +1,27 @@ +; Configuration file for regressions. + +[SHIELDHIT] + +; Experimentally found parameters of functions extimating calculation time +JOBS_AND_PARTICLES = 0.00795388 +JOBS_AND_SIZE_A = 0.00765724 +JOBS_AND_SIZE_B = 0.00001393 +FILES_AND_SIZE_A = 0.00066932 +FILES_AND_SIZE_B = 85.3317364 +FILES_AND_SIZE_C = 0.35205707 +DENSITY_AND_SIZE = 7.66666667 +SMALL_FILE_COEF = 0.00000012 + +COLLECT_STANDARD_DEVIATION = 1.38 +CALCULATION_STANDARD_DEVIATION = 1.31 +COLLECT_SIZE_MIN = 10 +MAX_JOB_NUMBER = 750 +MIN_COLLECT_TIME = 5 + +; Relative correlation between different collect types +IMAGE_COLLECT_COEF = 1 +PLOTDATA_COLLECT_COEF = 1.2 +MV_COLLECT_COEF = 0 +CP_COLLECT_COEF = 0.5 +CUSTOM_COLLECT_COEF = 1 +MV_COLLECT_TIME = 1 diff --git a/mcpartools/mcengine/fluka.py b/mcpartools/mcengine/fluka.py index d9f31d6..1e3e381 100644 --- a/mcpartools/mcengine/fluka.py +++ b/mcpartools/mcengine/fluka.py @@ -32,6 +32,9 @@ def __init__(self, input_path, mc_run_script, collect_method, mc_engine_options) self.collect_script_content = resource_string(__name__, self.collect_script).decode('ascii') + def __str__(self): + return "Fluka" + @property def input_files(self): # TODO check if additional files are needed @@ -92,3 +95,11 @@ def save_run_script(self, output_dir, jobid): def find_external_files(self, run_input_dir): return None + + def predict_best(self, particle_no, collect_type): + logger.error("Predict feature is not available for Fluka") + return None + + def calculation_time(self, particles_no_per_job, jobs_no, collect_type): + logger.error("Estimated calculation time is not known for Fluka") + return None diff --git a/mcpartools/mcengine/mcengine.py b/mcpartools/mcengine/mcengine.py index bf55b75..74cf3bb 100644 --- a/mcpartools/mcengine/mcengine.py +++ b/mcpartools/mcengine/mcengine.py @@ -80,3 +80,9 @@ def write_collect_script(self, output_dir): def find_external_files(self, run_input_dir): """Returns paths to found external files""" raise NotImplementedError() + + def predict_best(self, particle_no, collect_type): + raise NotImplementedError + + def calculation_time(self, total_particles_no, jobs_no, collect_type): + raise NotImplementedError diff --git a/mcpartools/mcengine/shieldhit.py b/mcpartools/mcengine/shieldhit.py index 4323c33..de0cc36 100644 --- a/mcpartools/mcengine/shieldhit.py +++ b/mcpartools/mcengine/shieldhit.py @@ -1,15 +1,19 @@ import logging import os -from pkg_resources import resource_string +import subprocess +import time +import re +import shutil +from pkg_resources import resource_string from mcpartools.mcengine.mcengine import Engine logger = logging.getLogger(__name__) class ShieldHit(Engine): - default_run_script_path = os.path.join('data', 'run_shieldhit.sh') + regression_cfg_path = os.path.join('data', 'regression.ini') output_wildcard = "*.bdo" def __init__(self, input_path, mc_run_script, collect_method, mc_engine_options): @@ -26,11 +30,45 @@ def __init__(self, input_path, mc_run_script, collect_method, mc_engine_options) tpl_fd.close() logger.debug("Using user run script: " + self.run_script_path) + self.config = self.regression_config + if self.config is None: + logger.warning("Could not properly parse configuration file for prediction feature") + else: + try: + self.jobs_and_particles_regression = float(self.config["JOBS_AND_PARTICLES"]) + self.jobs_and_size_regression = [float(self.config["JOBS_AND_SIZE_A"]), + float(self.config["JOBS_AND_SIZE_B"])] + self.files_and_size_regression = [float(self.config["FILES_AND_SIZE_A"]), + float(self.config["FILES_AND_SIZE_B"]), + float(self.config["FILES_AND_SIZE_C"])] + self.density_and_size_regression = float(self.config["DENSITY_AND_SIZE"]) + self.collect_std_deviation = float(self.config['COLLECT_STANDARD_DEVIATION']) + self.calculation_std_deviation = float(self.config['CALCULATION_STANDARD_DEVIATION']) + self.max_predicted_job_number = float(self.config['MAX_JOB_NUMBER']) + self.smallCollectFileCoef = float(self.config['SMALL_FILE_COEF']) + self.min_collect_time = float(self.config['MIN_COLLECT_TIME']) + self.mv_collect_time = float(self.config['MV_COLLECT_TIME']) + except ValueError: + logger.warning("Config file could not be read properly! Probably coefficients are not floats") + except KeyError: + logger.warning("Config file could not be read properly! Probably missing some variables") + self.collect_script_content = resource_string(__name__, self.collect_script).decode('ascii') + self.files_size = self.calculate_size() + if self.files_size is not None: + self.files_no_multiplier = 1 if self.files_size[0] == 0 else ( + self.files_size[1] / 10.0 + ) * self.files_and_size_regression[0] * ( + self.files_size[0] - self.files_and_size_regression[1] + ) ** 2 + self.files_and_size_regression[2] * ((self.files_size[1] + 10) / 10.0) + self.particle_no = 1 self.rng_seed = 1 + def __str__(self): + return "ShieldHit" + @property def input_files(self): base = os.path.abspath(self.input_path) @@ -38,6 +76,20 @@ def input_files(self): result = (os.path.join(base, f) for f in files) return result + @property + def regression_config(self): + try: + from configparser import ConfigParser + config = ConfigParser() + cfg_rs = resource_string(__name__, self.regression_cfg_path) + config_string = cfg_rs.decode('ascii') + config.read_string(config_string) + if config.has_section("SHIELDHIT"): + return config["SHIELDHIT"] + except ImportError: + logger.error("configparser not found. Please install configparser or avoid -P option") + return None + def randomize(self, new_seed, output_dir=None): self.rng_seed = new_seed @@ -118,12 +170,12 @@ def _parse_beam_file(self, file_path, run_input_dir): # line length checking to prevent IndexError if len(split_line) > 2 and split_line[0] == "USEBMOD": logger.debug("Found reference to external file in BEAM file: {0} {1}".format( - split_line[0], split_line[2])) + split_line[0], split_line[2])) external_files.append(split_line[2]) paths_to_replace.append(split_line[2]) elif len(split_line) > 1 and split_line[0] == "USECBEAM": logger.debug("Found reference to external file in BEAM file: {0} {1}".format( - split_line[0], split_line[1])) + split_line[0], split_line[1])) external_files.append(split_line[1]) paths_to_replace.append(split_line[1]) if paths_to_replace: @@ -229,7 +281,6 @@ def _rewrite_paths_in_file(config_file, paths_to_replace): """ lines = [] # make a copy of config - import shutil shutil.copyfile(config_file, str(config_file + '_original')) with open(config_file) as infile: for line in infile: @@ -242,3 +293,103 @@ def _rewrite_paths_in_file(config_file, paths_to_replace): with open(config_file, 'w') as outfile: for line in lines: outfile.write(line) + + def predict_best(self, total_particle_no, collect_type): + try: + import numpy as np + # This type of collect almost not affect calculation time + if collect_type == "mv": + return self.max_predicted_job_number + + # The coefficients correspond to the derivative function. That function was found experimentally + # For small output file, collect behave differently than for big ones + elif self.files_size[0] < 10: + coeff = [ + self.collect_std_deviation * self.files_no_multiplier * self.collect_coefficient( + collect_type + ) * 3 * self.smallCollectFileCoef, 0, 0, 0, + - self.jobs_and_particles_regression * total_particle_no * self.calculation_std_deviation + ] + else: + coeff = [ + self.collect_std_deviation * self.files_no_multiplier * self.collect_coefficient(collect_type) * + (self.jobs_and_size_regression[1] * self.files_size[0] ** 2 + + self.jobs_and_size_regression[0] * self.files_size[0]), 0, + - self.jobs_and_particles_regression * total_particle_no * self.calculation_std_deviation + ] + + # smallest, real solution + results = [int(x.real) for x in np.roots(coeff) if np.isreal(x) and x.real > 0] + result = sorted([(x, self._calculation_time(total_particle_no, x, collect_type)) for x in results], + key=lambda x: x[1])[0][0] + + result = self.max_predicted_job_number if result > self.max_predicted_job_number else result + return result + except AttributeError: + logger.error("Could not predict configuration! Check correctness of config file for prediction feature") + except ImportError as e: + logger.error("Numpy not found. Please install numpy or avoid -P option") + raise e + return None + + def calculate_size(self): + try: + beam_file, geo_file, mat_file, detect_file = self.input_files + dry_dir = os.path.join(self.input_path, time.strftime("dry_run_%Y%m%d_%H%M%S")) + os.mkdir(dry_dir) + args = ("shieldhit", "-b", beam_file, "-g", geo_file, "-m", mat_file, "-d", detect_file, "-n", "0", dry_dir) + popen = subprocess.Popen(args, stdout=subprocess.PIPE) + popen.wait() + output = popen.stdout.read().decode("utf-8") + a = self.density_and_size_regression + for line in output.splitlines(): + if "Number of detectors" in line: + no_files = int(re.findall(r"\d+", line)[0]) + elif "Placeholders needed" in line: + files_size = a * int(re.findall(r"\d+", line)[0]) / 1000000 + break + shutil.rmtree(dry_dir) + return files_size, no_files + except AttributeError: + logger.error("Could not calculate size of files! Check correctness of config file for prediction feature") + return None + + def calculation_time(self, particles_no_per_job, jobs_no, collect_type): + return self._calculation_time(particles_no_per_job * jobs_no, jobs_no, collect_type) + + def _calculation_time(self, total_particles_no, jobs_no, collect_type): + try: + # This type of collect has constant execution time + if collect_type == "mv": + collect_time = self.mv_collect_time + elif self.files_size[0] < 10: + collect_time = self.min_collect_time + self.smallCollectFileCoef * (jobs_no ** 3) + else: + collect_time = ( + self.jobs_and_size_regression[0] * self.files_size[0] * jobs_no + + self.jobs_and_size_regression[1] * jobs_no * self.files_size[0] ** 2 + ) + + calc_time = self.jobs_and_particles_regression * (1 / float(jobs_no)) * total_particles_no + collect_time *= self.files_no_multiplier * self.collect_std_deviation + collect_coef = self.collect_coefficient(collect_type) + if collect_coef > 0: + collect_time *= collect_coef + + calc_time *= self.calculation_std_deviation + + return collect_time + calc_time + except (AttributeError, TypeError): + logger.error("Could not estimate calculation time! Check correctness of config file for prediction feature") + return None + + def collect_coefficient(self, collect_option): + collect_coef = { + 'mv': self.config['MV_COLLECT_COEF'], + 'cp': self.config['CP_COLLECT_COEF'], + 'plotdata': self.config['PLOTDATA_COLLECT_COEF'], + 'image': self.config['IMAGE_COLLECT_COEF'], + 'custom': self.config['CUSTOM_COLLECT_COEF'], + }[collect_option] + + return float(collect_coef) diff --git a/setup.py b/setup.py index 083bdc5..8b6c82c 100644 --- a/setup.py +++ b/setup.py @@ -116,7 +116,7 @@ def get_cmdclass(): version=get_version(), packages=setuptools.find_packages(where='.', exclude=("*.tests", "*.tests.*", "tests.*", "tests")), package_data={ - 'mcpartools.mcengine.data': ['*.sh', '*.json'], + 'mcpartools.mcengine.data': ['*.sh', '*.json', '*.ini'], 'mcpartools.scheduler.data': ['*.sh'], }, url='https://github.com/DataMedSci/mcpartools', diff --git a/tox.ini b/tox.ini index 910cfda..ebc4b80 100644 --- a/tox.ini +++ b/tox.ini @@ -40,5 +40,6 @@ commands = [flake8] exclude = .tox,*.egg,build,_vendor,data,versioneer.py,*_version.py,docs/conf.py +ignore = W503,W504 select = E,W,F max-line-length = 120 \ No newline at end of file