Skip to content

Commit

Permalink
add an option to raise within tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
oliche committed Nov 29, 2024
1 parent faecc64 commit 76c4de0
Showing 1 changed file with 17 additions and 2 deletions.
19 changes: 17 additions & 2 deletions ibllib/pipes/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,10 @@ class Task(abc.ABC):
force = False # whether to re-download missing input files on local server if not present
job_size = 'small' # either 'small' or 'large', defines whether task should be run as part of the large or small job services
env = None # the environment name within which to run the task (NB: the env is not activated automatically!)
on_error = 'continue' # whether to raise an exception on error ('raise') or report the error and continue ('continue')

def __init__(self, session_path, parents=None, taskid=None, one=None,
machine=None, clobber=True, location='server', scratch_folder=None, **kwargs):
machine=None, clobber=True, location='server', scratch_folder=None, on_error='continue', **kwargs):
"""
Base task class
:param session_path: session path
Expand All @@ -129,6 +130,18 @@ def __init__(self, session_path, parents=None, taskid=None, one=None,
:param scratch_folder: optional: Path where to write intermediate temporary data
:param args: running arguments
"""
self.on_error = on_error
self.log = '' # placeholder to keep the log of the task for registration
self.cpu = kwargs.get('cpu', 1)
self.gpu = kwargs.get('gpu', 0)
self.io_charge = kwargs.get('io_charge', 5)
self.priority = kwargs.get('priority', 30)
self.ram = kwargs.get('ram', 4)
self.level = 0 # level in the pipeline hierarchy: level 0 means there is no parent task
self.outputs = [] # placeholder for a list of Path containing output files
self.time_elapsed_secs = None
self.time_out_secs = 3600 * 2 # time-out after which a task is considered dead
self.version = ibllib.__version__
self.taskid = taskid
self.one = one
self.session_path = session_path
Expand Down Expand Up @@ -263,10 +276,12 @@ def run(self, **kwargs):
self.outputs = outputs if not self.outputs else self.outputs # ensure None if no inputs registered
else:
self.outputs.extend(ensure_list(outputs)) # Add output files to list of inputs to register
except Exception:
except Exception as e:
_logger.error(traceback.format_exc())
_logger.info(f'Job {self.__class__} errored')
self.status = -1
if self.on_error == 'raise':
raise e

self.time_elapsed_secs = time.time() - start_time
# log the outputs
Expand Down

0 comments on commit 76c4de0

Please sign in to comment.