Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dump results implementation #174

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions mcpartools/generatemc.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ def main(args=sys.argv[1:]):
type=int,
required=True,
help='number of parallel jobs')
parser.add_argument('-D', '--dump',
action='store_true',
help='Generate dumping script')
parser.add_argument('input',
type=str,
help='path to input configuration')
Expand Down
26 changes: 23 additions & 3 deletions mcpartools/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ def __init__(self, args):
# no checks needed - argparse does it
self.batch = args.batch

# no checks needed - argparse does it
self.dump = args.dump

@property
def valid(self):
return self._valid
Expand All @@ -103,7 +106,8 @@ def __init__(self, options):
self.mc_engine = EngineDiscover.get_mcengine(input_path=self.options.input_path,
mc_run_script=self.options.mc_run_template,
collect_method=self.options.collect,
mc_engine_options=self.options.mc_engine_options)
mc_engine_options=self.options.mc_engine_options,
dump_opt=self.options.dump)
# assigned in methods
self.scheduler = None
self.input_dir = None
Expand All @@ -120,14 +124,15 @@ def run(self):

# get scheduler and pass main dir for log file
if not self.options.batch:
self.scheduler = SchedulerDiscover.get_scheduler(self.options.scheduler_options, self.main_dir)
self.scheduler = SchedulerDiscover.get_scheduler(self.options.scheduler_options, self.options.dump,
self.main_dir)
else:
# get desired scheduler class and pass arguments
scheduler_class = [class_obj for class_obj in SchedulerDiscover.supported
if class_obj.id == self.options.batch]
if scheduler_class: # if not empty
# list should have only 1 element - that's why we call scheduler_class[0] (list is not callable)
self.scheduler = scheduler_class[0](self.options.scheduler_options)
self.scheduler = scheduler_class[0](self.options.scheduler_options, self.options.dump)
logger.info("Using: " + self.scheduler.id)
else:
logger.error("Given scheduler: \'%s\' is not on the list of supported batch systems: %s",
Expand All @@ -140,6 +145,10 @@ def run(self):
# generate submit script
self.generate_submit_script()

# generate dump script
if self.options.dump:
self.generate_dump_script()

# copy input files
self.copy_input()

Expand Down Expand Up @@ -201,6 +210,17 @@ def generate_submit_script(self):
jobs_no=self.options.jobs_no,
workspace_dir=self.workspace_dir)

def generate_dump_script(self):
script_path = os.path.join(self.main_dir, self.scheduler.dump_script)
logger.debug("Preparation to generate " + script_path)
self.scheduler.write_dump_script(
main_dir=self.main_dir,
script_basename=self.scheduler.dump_script,
jobs_no=self.options.jobs_no,
workspace_dir=self.workspace_dir,
dump_function=self.mc_engine.dump_function,
dump_signal=self.mc_engine.dump_signal)

def copy_input(self):
indir_name = 'input'
indir_path = os.path.join(self.main_dir, indir_name)
Expand Down
6 changes: 3 additions & 3 deletions mcpartools/mcengine/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ def __init__(self):
pass

@classmethod
def get_mcengine(cls, input_path, mc_run_script, collect_method, mc_engine_options):
def get_mcengine(cls, input_path, mc_run_script, collect_method, mc_engine_options, dump_opt):
if os.path.isfile(input_path) and input_path.endswith('.inp'):
logger.debug("Discovered MC engine FLUKA")
return Fluka(input_path, mc_run_script, collect_method, mc_engine_options)
return Fluka(input_path, mc_run_script, collect_method, mc_engine_options, dump_opt)
elif os.path.isdir(input_path):
logger.debug("Discovered MC engine SHIELDHIT")
return ShieldHit(input_path, mc_run_script, collect_method, mc_engine_options)
return ShieldHit(input_path, mc_run_script, collect_method, mc_engine_options, dump_opt)
else:
logger.error("Input file doesn't match available MC codes")
return None
33 changes: 31 additions & 2 deletions mcpartools/mcengine/data/collect.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,40 @@
# Exit immediately if a simple command exits with a non-zero status.
set -e

function usage () {{
cat <<EOF
Usage: $progname [-d <dir_name>]
where:
-d collect on results from directory <dir_name>
EOF
exit 0
}}

INPUT_WILDCARD={output_dir:s}/workspace/job_*/{wildcard:s}
OUTPUT_DIRECTORY={output_dir:s}/output

# change working directory
cd {output_dir:s}
while getopts "d:" opt; do
case $opt in
d)
INPUT_WILDCARD="$OPTARG/job_*/*.bdo"
OUTPUT_DIRECTORY="$OPTARG/output"
;;
\?)
echo "Invalid option: -$OPTARG" >&2
usage
exit 1
;;
:)
echo "Option -$OPTARG requires an argument." >&2
usage
exit 1
;;
*)
usage
exit 1
;;
esac
done

# make output folder
mkdir -p $OUTPUT_DIRECTORY
Expand Down
2 changes: 2 additions & 0 deletions mcpartools/mcengine/data/dump_function_fluka.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
echo "Not supported feature for fluka"
exit 1
22 changes: 22 additions & 0 deletions mcpartools/mcengine/data/dump_function_shieldhit.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
function dump_function(){
echo -e "\n\t##############################\n"
echo "THIS FEATURE WILL ONLY WORK CORRECTLY WHEN SECOND ARGUMENT OF NSTAT"
echo "(Step of saving) IN FILE input/beam.dat IS SET TO -1"
echo -e "\n\t##############################\n"
echo "Waiting for results..."
sleep 10 # waiting 10 sec for results to dump and meantime user can read information

for i in $WORKSPACE_DIR/* ; do # Goes through all directories job_* in workspace
if [ -d "$i" ]; then
DUMP_SUBDIR=$DUMP_DIR`basename $i`
mkdir -p $DUMP_SUBDIR
find "$i" -name "*bdo" -exec cp -- "{}" $DUMP_SUBDIR \; # copy output files to dump directory
BDO_NUM=$(ls -l "$DUMP_SUBDIR" | grep ".*.bdo" | wc -l 2>/dev/null) # check number of bdo files copied
if [[ $BDO_NUM -eq 0 ]]; then
echo "Did not copied any files from `basename $i`. Most probably job has not started yet"
else
echo "Copied $BDO_NUM .bdo files from `basename $i` to dump dir..."
fi
fi
done
}
2 changes: 1 addition & 1 deletion mcpartools/mcengine/data/run_fluka.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ FLUKA_BIN={fluka_bin:s}
cd {working_directory:s}

# run rfluka
$FLUKA_BIN -N0 -M1 {engine_options:s} {input_basename:s}
$FLUKA_BIN -N0 -M1 {engine_options:s} {input_basename:s}

# each fluka run will save files with same name, in order to distinguish output from multiple runs
# we rename output files, appending suffix with jobid to each of them
Expand Down
4 changes: 3 additions & 1 deletion mcpartools/mcengine/data/run_shieldhit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,7 @@ DETECT_FILE={detect_file:s}
cd {working_directory:s}

# execute simulation
$SHIELDHIT_BIN --beamfile=$BEAM_FILE --geofile=$GEO_FILE --matfile=$MAT_FILE --detectfile=$DETECT_FILE -n $PARTICLE_NO -N $RNG_SEED {engine_options:s} $WORK_DIR
$SHIELDHIT_BIN --beamfile=$BEAM_FILE --geofile=$GEO_FILE --matfile=$MAT_FILE --detectfile=$DETECT_FILE -n $PARTICLE_NO -N $RNG_SEED {engine_options:s} $WORK_DIR {dumping:s}

# save PID of SHIELDHIT proces so it can be read by main_run script
PID=$!
6 changes: 5 additions & 1 deletion mcpartools/mcengine/fluka.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
class Fluka(Engine):

default_run_script_path = os.path.join('data', 'run_fluka.sh')
default_dump_function_path = os.path.join('data', 'dump_function_fluka.sh')
output_wildcard = "*_fort*"

def __init__(self, input_path, mc_run_script, collect_method, mc_engine_options):
def __init__(self, input_path, mc_run_script, collect_method, mc_engine_options, dump_opt):
Engine.__init__(self, input_path, mc_run_script, collect_method, mc_engine_options)

# user didn't provided path to input scripts, use default
Expand All @@ -31,6 +32,9 @@ def __init__(self, input_path, mc_run_script, collect_method, mc_engine_options)
in_fd.close()

self.collect_script_content = resource_string(__name__, self.collect_script).decode('ascii')
self.dump_function = resource_string(__name__, self.default_dump_function_path).decode('ascii')
self.dump_signal = 'None'
self.dump_available = dump_opt

@property
def input_files(self):
Expand Down
9 changes: 7 additions & 2 deletions mcpartools/mcengine/shieldhit.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
class ShieldHit(Engine):

default_run_script_path = os.path.join('data', 'run_shieldhit.sh')
default_dump_function_path = os.path.join('data', 'dump_function_shieldhit.sh')
output_wildcard = "*.bdo"

def __init__(self, input_path, mc_run_script, collect_method, mc_engine_options):
def __init__(self, input_path, mc_run_script, collect_method, mc_engine_options, dump_opt):
Engine.__init__(self, input_path, mc_run_script, collect_method, mc_engine_options)

# user didn't provided path to input scripts, use default
Expand All @@ -28,8 +29,11 @@ def __init__(self, input_path, mc_run_script, collect_method, mc_engine_options)

self.collect_script_content = resource_string(__name__, self.collect_script).decode('ascii')

self.dump_function = resource_string(__name__, self.default_dump_function_path).decode('ascii')
self.particle_no = 1
self.rng_seed = 1
self.dump_signal = 'USR1'
self.dump_available = dump_opt

@property
def input_files(self):
Expand Down Expand Up @@ -63,7 +67,8 @@ def save_run_script(self, output_dir, job_id):
beam_file=os.path.join(input_dir, os.path.basename(beam_file)),
geo_file=os.path.join(input_dir, os.path.basename(geo_file)),
mat_file=os.path.join(input_dir, os.path.basename(mat_file)),
detect_file=os.path.join(input_dir, os.path.basename(detect_file))
detect_file=os.path.join(input_dir, os.path.basename(detect_file)),
dumping="&" if self.dump_available else ""
)
out_file_name = "run.sh"
out_file_path = os.path.join(output_dir, out_file_name)
Expand Down
58 changes: 54 additions & 4 deletions mcpartools/scheduler/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


class JobScheduler:
def __init__(self, scheduler_options):
def __init__(self, scheduler_options, dump_opt):
# check if user provided path to options file
if scheduler_options is None:
self.options_header = "# no user options provided"
Expand All @@ -22,9 +22,27 @@ def __init__(self, scheduler_options):
self.options_header = "# no user options provided"
self.options_args = scheduler_options[1:-1]
logger.debug("Scheduler options argument:" + self.options_args)
self.dump_available = dump_opt

submit_script = 'submit.sh'
main_run_script = 'main_run.sh'
dump_script = 'dump.sh'
_dump_functions = {
'check_running': """# Check if executable is still running
if [[ ! -z $PID ]]; then
IS_RUNNING=`eval ps -p $PID | wc -l`
while [[ $IS_RUNNING -eq 2 ]]; do
IS_RUNNING=`eval ps -p $PID | wc -l`
sleep 0.5
done
fi""",
'trap_sig': """_term() {
echo Caught SIGUSR1 signal in main run script, resending!
kill -SIGUSR1 $PID 2>/dev/null
}

trap _term SIGUSR1"""
}

def submit_script_body(self, jobs_no, main_dir, workspace_dir):
from pkg_resources import resource_string
Expand All @@ -43,12 +61,34 @@ def submit_script_body(self, jobs_no, main_dir, workspace_dir):
main_dir=main_dir,
collect_script_name='collect.sh')

def dump_script_body(self, jobs_no, main_dir, workspace_dir, dump_function, dump_signal):
from pkg_resources import resource_string
tpl = resource_string(__name__, self.dump_script_template)
self.dump_script = tpl.decode('ascii')

return self.dump_script.format(options_args=self.options_args,
jobs_no=jobs_no,
workspace_dir=workspace_dir,
calculate_script_name='main_run.sh',
main_dir=main_dir,
collect_script_name='collect.sh',
dump_function=dump_function,
dump_signal=dump_signal)

def main_run_script_body(self, jobs_no, workspace_dir):
from pkg_resources import resource_string
tpl = resource_string(__name__, self.main_run_script_template)
self.main_run_script = tpl.decode('ascii').format(options_header=self.options_header,
workspace_dir=workspace_dir,
jobs_no=jobs_no)
if self.dump_available:
self.main_run_script = tpl.decode('ascii').format(options_header=self.options_header,
workspace_dir=workspace_dir,
jobs_no=jobs_no,
check_running=self._dump_functions["check_running"],
trap_sig=self._dump_functions["trap_sig"])
else:
self.main_run_script = tpl.decode('ascii').format(options_header=self.options_header,
workspace_dir=workspace_dir,
check_running="",
trap_sig="")
return self.main_run_script

def write_submit_script(self, main_dir, script_basename, jobs_no, workspace_dir):
Expand All @@ -63,6 +103,16 @@ def write_submit_script(self, main_dir, script_basename, jobs_no, workspace_dir)
logger.debug("Jobs no " + str(jobs_no))
logger.debug("Workspace " + abs_path_workspace)

def write_dump_script(self, main_dir, script_basename, jobs_no, workspace_dir, dump_function, dump_signal):
script_path = os.path.join(main_dir, script_basename)
fd = open(script_path, 'w')
abs_path_workspace = os.path.abspath(workspace_dir)
abs_path_main_dir = os.path.abspath(main_dir)
fd.write(self.dump_script_body(jobs_no, abs_path_main_dir, abs_path_workspace, dump_function, dump_signal))
fd.close()
os.chmod(script_path, 0o750)
logger.debug("Saved dump script: " + script_path)

def write_main_run_script(self, jobs_no, output_dir):
output_dir_abspath = os.path.abspath(output_dir)
out_file_path = os.path.join(output_dir_abspath, self.main_run_script)
Expand Down
6 changes: 3 additions & 3 deletions mcpartools/scheduler/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@ def __init__(self):
pass

@classmethod
def get_scheduler(cls, scheduler_options, log_location):
def get_scheduler(cls, scheduler_options, dump_opt, log_location):
file_logger = logging.getLogger('file_logger')
try:
srun_output = check_output(['srun --version'], shell=True)
file_logger.info("srun version: {}".format(srun_output[:-1]))
logger.debug("Discovered job scheduler SLURM")
return Slurm(scheduler_options)
return Slurm(scheduler_options, dump_opt)
except CalledProcessError as e:
logger.debug("Slurm not found: %s", e)
try:
qsub_output = check_output(['qsub --version'], shell=True)
file_logger.info("qsub version: {}".format(qsub_output[:-1]))
logger.debug("Discovered job scheduler Torque")
return Torque(scheduler_options)
return Torque(scheduler_options, dump_opt)
except CalledProcessError as e:
logger.debug("Torque not found: %s", e)
raise SystemError("No known batch system found!")
Loading