diff --git a/ibllib/pipes/tasks.py b/ibllib/pipes/tasks.py index d6f7fe1cd..bbb2d0abb 100644 --- a/ibllib/pipes/tasks.py +++ b/ibllib/pipes/tasks.py @@ -112,9 +112,10 @@ class Task(abc.ABC): force = False # whether to re-download missing input files on local server if not present job_size = 'small' # either 'small' or 'large', defines whether task should be run as part of the large or small job services env = None # the environment name within which to run the task (NB: the env is not activated automatically!) + on_error = 'continue' # whether to raise an exception on error ('raise') or report the error and continue ('continue') def __init__(self, session_path, parents=None, taskid=None, one=None, - machine=None, clobber=True, location='server', scratch_folder=None, **kwargs): + machine=None, clobber=True, location='server', scratch_folder=None, on_error='continue', **kwargs): """ Base task class :param session_path: session path @@ -129,6 +130,18 @@ def __init__(self, session_path, parents=None, taskid=None, one=None, :param scratch_folder: optional: Path where to write intermediate temporary data :param args: running arguments """ + self.on_error = on_error + self.log = '' # placeholder to keep the log of the task for registration + self.cpu = kwargs.get('cpu', 1) + self.gpu = kwargs.get('gpu', 0) + self.io_charge = kwargs.get('io_charge', 5) + self.priority = kwargs.get('priority', 30) + self.ram = kwargs.get('ram', 4) + self.level = 0 # level in the pipeline hierarchy: level 0 means there is no parent task + self.outputs = [] # placeholder for a list of Path containing output files + self.time_elapsed_secs = None + self.time_out_secs = 3600 * 2 # time-out after which a task is considered dead + self.version = ibllib.__version__ self.taskid = taskid self.one = one self.session_path = session_path @@ -263,10 +276,12 @@ def run(self, **kwargs): self.outputs = outputs if not self.outputs else self.outputs # ensure None if no inputs registered else: self.outputs.extend(ensure_list(outputs)) # Add output files to list of inputs to register - except Exception: + except Exception as e: _logger.error(traceback.format_exc()) _logger.info(f'Job {self.__class__} errored') self.status = -1 + if self.on_error == 'raise': + raise e self.time_elapsed_secs = time.time() - start_time # log the outputs