From 9f3d72e5a1b0dd795099205752aefd0f219821b4 Mon Sep 17 00:00:00 2001 From: Iris Young Date: Wed, 15 Mar 2023 01:00:13 -0700 Subject: [PATCH 1/2] Wait if files aren't written yet (up to 3 minutes) --- simtbx/command_line/hopper_ensemble.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/simtbx/command_line/hopper_ensemble.py b/simtbx/command_line/hopper_ensemble.py index ca578a9214..a0370f48f7 100644 --- a/simtbx/command_line/hopper_ensemble.py +++ b/simtbx/command_line/hopper_ensemble.py @@ -22,6 +22,7 @@ import sys import logging import pandas +import time from simtbx.diffBragg.hopper_ensemble_utils import load_inputs from libtbx.mpi4py import MPI @@ -75,7 +76,13 @@ def write_commandline(params): else: mpi_logger.setup_logging_from_params(params) - df = pandas.read_pickle(args.input) + for i in range(3): + try: + df = pandas.read_pickle(args.input) + break + except FileNotFoundError: + time.sleep(60) + # wait for preimported files to be written if params.skip is not None: df = df.iloc[params.skip:] From 057f2e5fb6c1c1e554f75da837b57e1e831b8a34 Mon Sep 17 00:00:00 2001 From: Iris Young Date: Wed, 15 Mar 2023 01:01:21 -0700 Subject: [PATCH 2/2] Make diffBragg robust to individual bad images. Previously diffBragg would raise an Exception if any image failed to process for unexpected reasons. In future real data use cases we will want to log this issue but continue to process the rest of the dataset. The problems encountered for a single image in LZ08 are now moved to RuntimeErrors and caught by a try-except when looping over experiments/reflections/files. Design choices here are guided by the fact that previous Exceptions would kill a single rank and hang the slurm job. By adding the @mpi_abort_on_exception decorator to the function that loops over files, we correctly raise an Exception and kill the slurm job when anything except these RuntimeErrors triggers an Exception, and we preserve the most informative traceback (compared with decorating functions called in the loop). The problems caught as RuntimeErrors are logged at the "critical" level, with more information in the debug logs where possible. The logs also reflect the number of files that could be read, compared with all files found. Although it's not ideal that the files that couldn't be read are still included in the "work distribution" over ranks (marked as having zero reflections), there is an important sanity check taking stock of the expected images here that relies on those files being listed, and the alternative leaves us much more vulnerable to losing images and having no way to tell. A more invasive refactor could be a better choice at some future date. Finally, note that sys.exit() instances aren't safe to use in the functions decorated with @mpi_abort_on_exception. They either need to be replaced with things that aren't recognized as Exceptions or moved outside those functions. --- simtbx/command_line/hopper_ensemble.py | 4 +++- simtbx/diffBragg/hopper_ensemble_utils.py | 29 ++++++++++++++--------- simtbx/diffBragg/prep_stage2_input.py | 13 ++++++++-- simtbx/diffBragg/utils.py | 2 ++ 4 files changed, 34 insertions(+), 14 deletions(-) diff --git a/simtbx/command_line/hopper_ensemble.py b/simtbx/command_line/hopper_ensemble.py index a0370f48f7..cc83a710e1 100644 --- a/simtbx/command_line/hopper_ensemble.py +++ b/simtbx/command_line/hopper_ensemble.py @@ -99,7 +99,9 @@ def write_commandline(params): os.makedirs(gather_dir) modelers = load_inputs(df, params, exper_key=args.exp, refls_key=args.refl, gather_dir=gather_dir) - # note, we only go beyond this point if perImport flag was not passed + # do not proceed beyond this point if we are only preimporting + if args.preImport: + sys.exit() modelers.cell_for_mtz = args.cell modelers.max_sigma = args.maxSigma modelers.outdir = args.outdir if args.outdir is not None else modelers.params.outdir diff --git a/simtbx/diffBragg/hopper_ensemble_utils.py b/simtbx/diffBragg/hopper_ensemble_utils.py index 061ef28d26..ba82ef948a 100644 --- a/simtbx/diffBragg/hopper_ensemble_utils.py +++ b/simtbx/diffBragg/hopper_ensemble_utils.py @@ -16,6 +16,7 @@ from dials.array_family import flex from dxtbx.model import ExperimentList from xfel.merging.application.utils.memory_usage import get_memory_usage +from libtbx.mpi4py import mpi_abort_on_exception COMM = MPI.COMM_WORLD @@ -577,6 +578,7 @@ def get_gather_name(exper_name, gather_dir): return os.path.abspath(gathered_name) +@mpi_abort_on_exception def load_inputs(pandas_table, params, exper_key="exp_name", refls_key='predictions', gather_dir=None): @@ -621,7 +623,11 @@ def load_inputs(pandas_table, params, exper_key="exp_name", refls_key='predictio exper_dataframe = pandas_table.query("%s=='%s'" % (exper_key, exper_name)) refl_name = exper_dataframe[refls_key].values[0] - refls = flex.reflection_table.from_file(refl_name) + try: + refls = flex.reflection_table.from_file(refl_name) + except IOError: + MAIN_LOGGER.critical("Unable to load reflection file %s -- skipping." % refl_name) + continue miller_inds = list( refls['miller_index']) is_not_000 = [h != (0, 0, 0) for h in miller_inds] @@ -641,16 +647,17 @@ def load_inputs(pandas_table, params, exper_key="exp_name", refls_key='predictio shot_modeler.exper_name = exper_name shot_modeler.refl_name = refl_name shot_modeler.rank = COMM.rank - if params.refiner.load_data_from_refl: - gathered = shot_modeler.GatherFromReflectionTable(expt, refls, sg_symbol=params.space_group) - MAIN_LOGGER.debug("tried loading from reflection table") - else: - gathered = shot_modeler.GatherFromExperiment(expt, refls, sg_symbol=params.space_group) - MAIN_LOGGER.debug("tried loading data from expt table") - if not gathered: - raise IOError("Failed to gather data from experiment %s", exper_name) - else: + try: + if params.refiner.load_data_from_refl: + gathered = shot_modeler.GatherFromReflectionTable(expt, refls, sg_symbol=params.space_group) + MAIN_LOGGER.debug("tried loading from reflection table") + else: + gathered = shot_modeler.GatherFromExperiment(expt, refls, sg_symbol=params.space_group) + MAIN_LOGGER.debug("tried loading data from expt table") MAIN_LOGGER.debug("successfully loaded data") + except RuntimeError: + MAIN_LOGGER.critical("Failed to gather data from experiment %s; skipping.", exper_name) + continue MAIN_LOGGER.info("EVENT: DONE LOADING ROI") if gather_dir is not None: @@ -696,7 +703,7 @@ def load_inputs(pandas_table, params, exper_key="exp_name", refls_key='predictio pandas_table.to_pickle(pd_name) print("Wrote file %s to be used to re-run ens.hopper . Use optional ens.hopper arg '--refl ens.hopper.imported', and the phil param load_data_from_refl=True to load the imported data" % pd_name) COMM.barrier() - sys.exit() + return # this must not be a sys.exit() which will be interpreted by the @mpi_abort_on_exception decorator as an exception shot_modelers.mpi_set_x_slices() assert shot_modelers.num_modelers > 0 diff --git a/simtbx/diffBragg/prep_stage2_input.py b/simtbx/diffBragg/prep_stage2_input.py index 205128234f..df7c9fbe62 100644 --- a/simtbx/diffBragg/prep_stage2_input.py +++ b/simtbx/diffBragg/prep_stage2_input.py @@ -32,20 +32,29 @@ def prep_dataframe(df, refls_key="predictions"): nshots = len(df) refls_names = df[refls_key] refls_per_shot = [] + shots_skipped = [] if COMM.rank==0: LOGGER.info("Loading nrefls per shot") for i_shot, name in enumerate(refls_names): if i_shot % COMM.size != COMM.rank: continue - R = flex.reflection_table.from_file(name) + try: + R = flex.reflection_table.from_file(name) + except IOError as e: + LOGGER.critical("Unable to load reflections from file %s -- skipping file." % name) + LOGGER.info("Attempting to load file produced error:\n%s" % e) + shots_skipped.append(i_shot) + refls_per_shot.append((i_shot, 0)) + continue if len(R)==0: LOGGER.critical("Reflection %s has 0 reflections !" % (name, len(R))) refls_per_shot.append((i_shot, len(R))) refls_per_shot = COMM.reduce(refls_per_shot, root=0) + shots_skipped = COMM.reduce(shots_skipped, root=0) work_distribution = None if COMM.rank==0: - print("Found %d shots" % nshots) + print("Able to read reflections tables for %d of %d shots" % ((nshots - len(shots_skipped)), nshots)) refls_per_shot = sorted(refls_per_shot) indices, weights = zip(*refls_per_shot) assert list(indices) == list(range(nshots)) # new sanity test diff --git a/simtbx/diffBragg/utils.py b/simtbx/diffBragg/utils.py index a88898a241..5617590dcd 100644 --- a/simtbx/diffBragg/utils.py +++ b/simtbx/diffBragg/utils.py @@ -567,6 +567,8 @@ def get_roi_background_and_selection_flags(refls, imgs, shoebox_sz=10, reject_ed MAIN_LOGGER.debug("Number of skipped ROI with negative BGs: %d / %d" % (num_roi_negative_bg, len(rois))) MAIN_LOGGER.debug("Number of skipped ROI with NAN in BGs: %d / %d" % (num_roi_nan_bg, len(rois))) MAIN_LOGGER.info("Number of ROIS that will proceed to refinement: %d/%d" % (np.sum(selection_flags), len(rois))) + if np.sum(selection_flags) == 0: + raise RuntimeError("Can't proceed with zero ROIs") if ret_cov: return kept_rois, panel_ids, tilt_abc, selection_flags, background, all_cov else: