diff --git a/simtbx/command_line/hopper_ensemble.py b/simtbx/command_line/hopper_ensemble.py index ca578a9214..cc83a710e1 100644 --- a/simtbx/command_line/hopper_ensemble.py +++ b/simtbx/command_line/hopper_ensemble.py @@ -22,6 +22,7 @@ import sys import logging import pandas +import time from simtbx.diffBragg.hopper_ensemble_utils import load_inputs from libtbx.mpi4py import MPI @@ -75,7 +76,13 @@ def write_commandline(params): else: mpi_logger.setup_logging_from_params(params) - df = pandas.read_pickle(args.input) + for i in range(3): + try: + df = pandas.read_pickle(args.input) + break + except FileNotFoundError: + time.sleep(60) + # wait for preimported files to be written if params.skip is not None: df = df.iloc[params.skip:] @@ -92,7 +99,9 @@ def write_commandline(params): os.makedirs(gather_dir) modelers = load_inputs(df, params, exper_key=args.exp, refls_key=args.refl, gather_dir=gather_dir) - # note, we only go beyond this point if perImport flag was not passed + # do not proceed beyond this point if we are only preimporting + if args.preImport: + sys.exit() modelers.cell_for_mtz = args.cell modelers.max_sigma = args.maxSigma modelers.outdir = args.outdir if args.outdir is not None else modelers.params.outdir diff --git a/simtbx/diffBragg/hopper_ensemble_utils.py b/simtbx/diffBragg/hopper_ensemble_utils.py index 061ef28d26..ba82ef948a 100644 --- a/simtbx/diffBragg/hopper_ensemble_utils.py +++ b/simtbx/diffBragg/hopper_ensemble_utils.py @@ -16,6 +16,7 @@ from dials.array_family import flex from dxtbx.model import ExperimentList from xfel.merging.application.utils.memory_usage import get_memory_usage +from libtbx.mpi4py import mpi_abort_on_exception COMM = MPI.COMM_WORLD @@ -577,6 +578,7 @@ def get_gather_name(exper_name, gather_dir): return os.path.abspath(gathered_name) +@mpi_abort_on_exception def load_inputs(pandas_table, params, exper_key="exp_name", refls_key='predictions', gather_dir=None): @@ -621,7 +623,11 @@ def load_inputs(pandas_table, params, exper_key="exp_name", refls_key='predictio exper_dataframe = pandas_table.query("%s=='%s'" % (exper_key, exper_name)) refl_name = exper_dataframe[refls_key].values[0] - refls = flex.reflection_table.from_file(refl_name) + try: + refls = flex.reflection_table.from_file(refl_name) + except IOError: + MAIN_LOGGER.critical("Unable to load reflection file %s -- skipping." % refl_name) + continue miller_inds = list( refls['miller_index']) is_not_000 = [h != (0, 0, 0) for h in miller_inds] @@ -641,16 +647,17 @@ def load_inputs(pandas_table, params, exper_key="exp_name", refls_key='predictio shot_modeler.exper_name = exper_name shot_modeler.refl_name = refl_name shot_modeler.rank = COMM.rank - if params.refiner.load_data_from_refl: - gathered = shot_modeler.GatherFromReflectionTable(expt, refls, sg_symbol=params.space_group) - MAIN_LOGGER.debug("tried loading from reflection table") - else: - gathered = shot_modeler.GatherFromExperiment(expt, refls, sg_symbol=params.space_group) - MAIN_LOGGER.debug("tried loading data from expt table") - if not gathered: - raise IOError("Failed to gather data from experiment %s", exper_name) - else: + try: + if params.refiner.load_data_from_refl: + gathered = shot_modeler.GatherFromReflectionTable(expt, refls, sg_symbol=params.space_group) + MAIN_LOGGER.debug("tried loading from reflection table") + else: + gathered = shot_modeler.GatherFromExperiment(expt, refls, sg_symbol=params.space_group) + MAIN_LOGGER.debug("tried loading data from expt table") MAIN_LOGGER.debug("successfully loaded data") + except RuntimeError: + MAIN_LOGGER.critical("Failed to gather data from experiment %s; skipping.", exper_name) + continue MAIN_LOGGER.info("EVENT: DONE LOADING ROI") if gather_dir is not None: @@ -696,7 +703,7 @@ def load_inputs(pandas_table, params, exper_key="exp_name", refls_key='predictio pandas_table.to_pickle(pd_name) print("Wrote file %s to be used to re-run ens.hopper . Use optional ens.hopper arg '--refl ens.hopper.imported', and the phil param load_data_from_refl=True to load the imported data" % pd_name) COMM.barrier() - sys.exit() + return # this must not be a sys.exit() which will be interpreted by the @mpi_abort_on_exception decorator as an exception shot_modelers.mpi_set_x_slices() assert shot_modelers.num_modelers > 0 diff --git a/simtbx/diffBragg/prep_stage2_input.py b/simtbx/diffBragg/prep_stage2_input.py index 205128234f..df7c9fbe62 100644 --- a/simtbx/diffBragg/prep_stage2_input.py +++ b/simtbx/diffBragg/prep_stage2_input.py @@ -32,20 +32,29 @@ def prep_dataframe(df, refls_key="predictions"): nshots = len(df) refls_names = df[refls_key] refls_per_shot = [] + shots_skipped = [] if COMM.rank==0: LOGGER.info("Loading nrefls per shot") for i_shot, name in enumerate(refls_names): if i_shot % COMM.size != COMM.rank: continue - R = flex.reflection_table.from_file(name) + try: + R = flex.reflection_table.from_file(name) + except IOError as e: + LOGGER.critical("Unable to load reflections from file %s -- skipping file." % name) + LOGGER.info("Attempting to load file produced error:\n%s" % e) + shots_skipped.append(i_shot) + refls_per_shot.append((i_shot, 0)) + continue if len(R)==0: LOGGER.critical("Reflection %s has 0 reflections !" % (name, len(R))) refls_per_shot.append((i_shot, len(R))) refls_per_shot = COMM.reduce(refls_per_shot, root=0) + shots_skipped = COMM.reduce(shots_skipped, root=0) work_distribution = None if COMM.rank==0: - print("Found %d shots" % nshots) + print("Able to read reflections tables for %d of %d shots" % ((nshots - len(shots_skipped)), nshots)) refls_per_shot = sorted(refls_per_shot) indices, weights = zip(*refls_per_shot) assert list(indices) == list(range(nshots)) # new sanity test diff --git a/simtbx/diffBragg/utils.py b/simtbx/diffBragg/utils.py index a88898a241..5617590dcd 100644 --- a/simtbx/diffBragg/utils.py +++ b/simtbx/diffBragg/utils.py @@ -567,6 +567,8 @@ def get_roi_background_and_selection_flags(refls, imgs, shoebox_sz=10, reject_ed MAIN_LOGGER.debug("Number of skipped ROI with negative BGs: %d / %d" % (num_roi_negative_bg, len(rois))) MAIN_LOGGER.debug("Number of skipped ROI with NAN in BGs: %d / %d" % (num_roi_nan_bg, len(rois))) MAIN_LOGGER.info("Number of ROIS that will proceed to refinement: %d/%d" % (np.sum(selection_flags), len(rois))) + if np.sum(selection_flags) == 0: + raise RuntimeError("Can't proceed with zero ROIs") if ret_cov: return kept_rois, panel_ids, tilt_abc, selection_flags, background, all_cov else: