From ec77d2f24e361a99442c56195d81d4342fe3e1dd Mon Sep 17 00:00:00 2001 From: Luca Venturini Date: Tue, 22 May 2018 17:07:15 +0100 Subject: [PATCH] Refactored docstrings to obey to PEP. Patch for #56 --- sciluigi/audit.py | 33 ++++++++------ sciluigi/dependencies.py | 46 +++++++++++--------- sciluigi/interface.py | 20 +++++---- sciluigi/parameter.py | 9 ++-- sciluigi/slurm.py | 92 +++++++++++++++++++++++----------------- sciluigi/task.py | 28 ++++++------ sciluigi/util.py | 35 +++++++++------ sciluigi/workflow.py | 69 ++++++++++++++++-------------- tools/init_projdir.py | 57 ++++++++++++++----------- 9 files changed, 221 insertions(+), 168 deletions(-) diff --git a/sciluigi/audit.py b/sciluigi/audit.py index 0fe6216..451404a 100644 --- a/sciluigi/audit.py +++ b/sciluigi/audit.py @@ -1,6 +1,6 @@ -''' +""" This module contains functionality for the audit-trail logging functionality -''' +""" import logging import luigi @@ -15,20 +15,22 @@ # ============================================================================== + class AuditTrailHelpers(object): - ''' + """ Mixin for luigi.Task:s, with functionality for writing audit logs of running tasks - ''' + """ def add_auditinfo(self, infotype, infoval): - ''' + """ Alias to _add_auditinfo(), that can be overridden. - ''' + """ return self._add_auditinfo(self.instance_name, infotype, infoval) + def _add_auditinfo(self, instance_name, infotype, infoval): - ''' + """ Save audit information in a designated file, specific for this task. - ''' + """ dirpath = self.workflow_task.get_auditdirpath() if not os.path.isdir(dirpath): time.sleep(random.random()) @@ -42,10 +44,11 @@ def _add_auditinfo(self, instance_name, infotype, infoval): with open(auditfile, 'a') as afile: afile.write('%s: %s\n' % (infotype, infoval)) + def get_instance_name(self): - ''' + """ Return the luigi instance_name - ''' + """ instance_name = None if self.instance_name is not None: instance_name = self.instance_name @@ -53,21 +56,23 @@ def get_instance_name(self): instance_name = self.task_id return instance_name + @luigi.Task.event_handler(luigi.Event.START) def save_start_time(self): - ''' + """ Log start of execution of task. - ''' + """ if hasattr(self, 'workflow_task') and self.workflow_task is not None: msg = 'Task {task} started'.format( task=self.get_instance_name()) log.info(msg) + @luigi.Task.event_handler(luigi.Event.PROCESSING_TIME) def save_end_time(self, task_exectime_sec): - ''' + """ Log end of execution of task, with execution time. - ''' + """ if hasattr(self, 'workflow_task') and self.workflow_task is not None: msg = 'Task {task} finished after {proctime:.3f}s'.format( task=self.get_instance_name(), diff --git a/sciluigi/dependencies.py b/sciluigi/dependencies.py index 2a83454..393a236 100644 --- a/sciluigi/dependencies.py +++ b/sciluigi/dependencies.py @@ -1,7 +1,7 @@ -''' +""" This module contains functionality for dependency resolution for constructing the dependency graph of workflows. -''' +""" import luigi from luigi.contrib.postgres import PostgresTarget @@ -10,11 +10,12 @@ # ============================================================================== + class TargetInfo(object): - ''' + """ Class to be used for sending specification of which target, from which task, to use, when stitching workflow tasks' outputs and inputs together. - ''' + """ task = None path = None target = None @@ -25,14 +26,16 @@ def __init__(self, task, path, format=None, is_tmp=False): self.target = luigi.LocalTarget(path, format, is_tmp) def open(self, *args, **kwargs): - ''' + """ Forward open method, from luigi's target class - ''' + """ return self.target.open(*args, **kwargs) # ============================================================================== + class S3TargetInfo(TargetInfo): + def __init__(self, task, path, format=None, client=None): self.task = task self.path = path @@ -40,6 +43,7 @@ def __init__(self, task, path, format=None, client=None): # ============================================================================== + class PostgresTargetInfo(TargetInfo): def __init__(self, task, host, database, user, password, update_id, table=None, port=None): self.task = task @@ -54,28 +58,30 @@ def __init__(self, task, host, database, user, password, update_id, table=None, # ============================================================================== + class DependencyHelpers(object): - ''' + + """ Mixin implementing methods for supporting dynamic, and target-based workflow definition, as opposed to the task-based one in vanilla luigi. - ''' + """ # -------------------------------------------------------- # Handle inputs # -------------------------------------------------------- def requires(self): - ''' + """ Implement luigi API method by returning upstream tasks - ''' + """ return self._upstream_tasks() def _upstream_tasks(self): - ''' + """ Extract upstream tasks from the TargetInfo objects or functions returning those (or lists of both the earlier) for use in luigi's requires() method. - ''' + """ upstream_tasks = [] for attrname, attrval in iteritems(self.__dict__): if 'in_' == attrname[0:3]: @@ -84,11 +90,11 @@ def _upstream_tasks(self): return upstream_tasks def _parse_inputitem(self, val, tasks): - ''' + """ Recursively loop through lists of TargetInfos, or callables returning TargetInfos, or lists of ... (repeat recursively) ... and return all tasks. - ''' + """ if callable(val): val = val() if isinstance(val, TargetInfo): @@ -108,17 +114,17 @@ def _parse_inputitem(self, val, tasks): # -------------------------------------------------------- def output(self): - ''' + """ Implement luigi API method - ''' + """ return self._output_targets() def _output_targets(self): - ''' + """ Extract output targets from the TargetInfo objects or functions returning those (or lists of both the earlier) for use in luigi's output() method. - ''' + """ output_targets = [] for attrname in dir(self): attrval = getattr(self, attrname) @@ -128,11 +134,11 @@ def _output_targets(self): return output_targets def _parse_outputitem(self, val, targets): - ''' + """ Recursively loop through lists of TargetInfos, or callables returning TargetInfos, or lists of ... (repeat recursively) ... and return all targets. - ''' + """ if callable(val): val = val() if isinstance(val, TargetInfo): diff --git a/sciluigi/interface.py b/sciluigi/interface.py index cca0532..36c81cf 100644 --- a/sciluigi/interface.py +++ b/sciluigi/interface.py @@ -1,6 +1,6 @@ -''' +""" This module contains mappings of methods that are part of the sciluigi API -''' +""" import luigi import logging @@ -11,10 +11,11 @@ LOGFMT_SCILUIGI = '%(asctime)s %(levelname)8s SCILUIGI %(message)s' DATEFMT = '%Y-%m-%d %H:%M:%S' + def setup_logging(): - ''' + """ Set up SciLuigi specific logging - ''' + """ sciluigi.util.ensuredir('log') log_path = 'log/sciluigi_run_%s_detailed.log' % sciluigi.util.timepath() @@ -49,16 +50,19 @@ def setup_logging(): sciluigi_logger.addHandler(sciluigi_file_handler) sciluigi_logger.setLevel(logging.DEBUG) + setup_logging() + def run(*args, **kwargs): - ''' + """ Forwarding luigi's run method - ''' + """ luigi.run(*args, **kwargs) + def run_local(*args, **kwargs): - ''' + """ Forwarding luigi's run method, with local scheduler - ''' + """ run(local_scheduler=True, *args, **kwargs) diff --git a/sciluigi/parameter.py b/sciluigi/parameter.py index 9ca063d..fe08fe0 100644 --- a/sciluigi/parameter.py +++ b/sciluigi/parameter.py @@ -1,12 +1,13 @@ -''' +""" This module contains a sciluigi subclass of luigi's Parameter, where custom functionality might be added in the future. -''' +""" import luigi + class Parameter(luigi.Parameter): - ''' + """ Subclass of luigi's Parameter, where custom functionality might be added in the future. - ''' + """ pass diff --git a/sciluigi/slurm.py b/sciluigi/slurm.py index 85d475d..abab6fe 100644 --- a/sciluigi/slurm.py +++ b/sciluigi/slurm.py @@ -1,7 +1,7 @@ -''' +""" This module contains functionality related to integration with the SLURM HPC resource manger. -''' +""" import datetime import logging @@ -10,6 +10,7 @@ import sciluigi.parameter import sciluigi.task import subprocess as sub +from multiprocessing import cpu_count # ================================================================================ @@ -23,10 +24,11 @@ # ================================================================================ -class SlurmInfo(): - ''' + +class SlurmInfo: + """ A data object for keeping slurm run parameters. - ''' + """ runmode = None # One of RUNMODE_LOCAL|RUNMODE_HPC|RUNMODE_MPI project = None partition = None @@ -36,10 +38,10 @@ class SlurmInfo(): threads = None def __init__(self, runmode, project, partition, cores, time, jobname, threads): - ''' + """ Init a SlurmInfo object, from string data. Time is on format: [[[d-]HH:]MM:]SS - ''' + """ self.runmode = runmode self.project = project self.partition = partition @@ -49,9 +51,9 @@ def __init__(self, runmode, project, partition, cores, time, jobname, threads): self.threads = threads def __str__(self): - ''' + """ Return a readable string representation of the info stored - ''' + """ strrepr = ('(time: {t}, ' 'partition: {pt}, ' 'cores: {c}, ' @@ -67,10 +69,10 @@ def __str__(self): return strrepr def get_argstr_hpc(self): - ''' + """ Return a formatted string with arguments and option flags to SLURM commands such as salloc and sbatch, for non-MPI, HPC jobs. - ''' + """ argstr = ' -A {pr} -p {pt} -n {c} -t {t} -J {j} srun -n 1 -c {thr} '.format( pr=self.project, pt=self.partition, @@ -81,10 +83,10 @@ def get_argstr_hpc(self): return argstr def get_argstr_mpi(self): - ''' + """ Return a formatted string with arguments and option flags to SLURM commands such as salloc and sbatch, for MPI jobs. - ''' + """ argstr = ' -A {pr} -p {pt} -n {c1} -t {t} -J {j} mpirun -v -np {c2} '.format( pr=self.project, pt=self.partition, @@ -96,10 +98,16 @@ def get_argstr_mpi(self): # ================================================================================ + +default_info = SlurmInfo(RUNMODE_LOCAL, + "sciluigi", None, cpu_count(), + "1-00:00:00", "sciluigi", cpu_count()) + + class SlurmInfoParameter(sciluigi.parameter.Parameter): - ''' + """ A specialized luigi parameter, taking SlurmInfo objects. - ''' + """ def parse(self, x): if isinstance(x, SlurmInfo): return x @@ -108,18 +116,19 @@ def parse(self, x): # ================================================================================ + class SlurmHelpers(): - ''' + """ Mixin with various convenience methods for executing jobs via SLURM - ''' + """ # Other class-fields - slurminfo = SlurmInfoParameter(default=None) # Class: SlurmInfo + slurminfo = SlurmInfoParameter(default=default_info) # Class: SlurmInfo # Main Execution methods def ex(self, command): - ''' + """ Execute either locally or via SLURM, depending on config - ''' + """ if isinstance(command, list): command = ' '.join(command) @@ -133,25 +142,23 @@ def ex(self, command): log.info('Executing command in MPI mode: %s', command) self.ex_mpi(command) - def ex_hpc(self, command): - ''' + """ Execute command in HPC mode - ''' + """ if isinstance(command, list): command = sub.list2cmdline(command) - fullcommand = 'salloc %s %s' % (self.slurminfo.get_argstr_hpc(), command) + fullcommand = 'salloc {} {}'.format(self.slurminfo.get_argstr_hpc(), command) (retcode, stdout, stderr) = self.ex_local(fullcommand) self.log_slurm_info(stderr) return (retcode, stdout, stderr) - def ex_mpi(self, command): - ''' + """ Execute command in HPC mode with MPI support (multi-node, message passing interface). - ''' + """ if isinstance(command, list): command = sub.list2cmdline(command) @@ -161,35 +168,39 @@ def ex_mpi(self, command): self.log_slurm_info(stderr) return (retcode, stdout, stderr) - # Various convenience methods - def assert_matches_character_class(self, char_class, a_string): - ''' + @staticmethod + def assert_matches_character_class(char_class, a_string): + """ Helper method, that tests whether a string matches a regex character class. - ''' + """ if not bool(re.match('^{c}+$'.format(c=char_class), a_string)): raise Exception('String {s} does not match character class {cc}'.format( s=a_string, cc=char_class)) - def clean_filename(self, filename): - ''' + @staticmethod + def clean_filename(filename): + """ Clean up a string to make it suitable for use as file name. - ''' + """ return re.sub('[^A-Za-z0-9\_\ ]', '_', str(filename)).replace(' ', '_') #def get_task_config(self, name): # return luigi.configuration.get_config().get(self.task_family, name) def log_slurm_info(self, slurm_stderr): - ''' + """ Parse information of the following example form: salloc: Granted job allocation 5836263 srun: Job step created salloc: Relinquishing job allocation 5836263 salloc: Job allocation 5836263 has been revoked. - ''' + """ + + if isinstance(slurm_stderr, bytes): + slurm_stderr = slurm_stderr.decode() matches = re.search('[0-9]+', str(slurm_stderr)) if matches: @@ -198,10 +209,12 @@ def log_slurm_info(self, slurm_stderr): # Write slurm execution time to audit log cmd = 'sacct -j {jobid} --noheader --format=elapsed'.format(jobid=jobid) (_, jobinfo_stdout, _) = self.ex_local(cmd) + if isinstance(jobinfo_stdout, bytes): + jobinfo_stdout = jobinfo_stdout.decode() sacct_matches = re.findall('([0-9\:\-]+)', str(jobinfo_stdout)) if len(sacct_matches) < 2: - log.warn('Not enough matches from sacct for task %s: %s', + log.warning('Not enough matches from sacct for task %s: %s', self.instance_name, ', '.join(['Match: %s' % m for m in sacct_matches]) ) else: @@ -237,8 +250,9 @@ def log_slurm_info(self, slurm_stderr): # ================================================================================ + class SlurmTask(SlurmHelpers, sciluigi.task.Task): - ''' + """ luigi task that includes the SlurmHelpers mixin. - ''' + """ pass diff --git a/sciluigi/task.py b/sciluigi/task.py index 6a9e4f6..27031df 100644 --- a/sciluigi/task.py +++ b/sciluigi/task.py @@ -1,6 +1,6 @@ -''' +""" This module contains sciluigi's subclasses of luigi's Task class. -''' +""" import json import luigi from luigi.six import iteritems, string_types @@ -15,11 +15,12 @@ # ============================================================================== + def new_task(name, cls, workflow_task, **kwargs): - ''' + """ Instantiate a new task. Not supposed to be used by the end-user (use WorkflowTask.new_task() instead). - ''' + """ slurminfo = None for key, val in [(key, val) for key, val in iteritems(kwargs)]: # Handle non-string keys @@ -42,18 +43,19 @@ def new_task(name, cls, workflow_task, **kwargs): newtask.slurminfo = slurminfo return newtask + class Task(sciluigi.audit.AuditTrailHelpers, sciluigi.dependencies.DependencyHelpers, luigi.Task): - ''' + """ SciLuigi Task, implementing SciLuigi specific functionality for dependency resolution and audit trail logging. - ''' + """ workflow_task = luigi.Parameter() instance_name = luigi.Parameter() def ex_local(self, command): - ''' + """ Execute command locally (not through resource manager). - ''' + """ # If list, convert to string if isinstance(command, list): command = sub.list2cmdline(command) @@ -80,21 +82,23 @@ def ex_local(self, command): return (retcode, stdout, stderr) def ex(self, command): - ''' + """ Execute command. This is a short-hand function, to be overridden e.g. if supporting execution via SLURM - ''' + """ return self.ex_local(command) + # ============================================================================== + class ExternalTask( sciluigi.audit.AuditTrailHelpers, sciluigi.dependencies.DependencyHelpers, luigi.ExternalTask): - ''' + """ SviLuigi specific implementation of luigi.ExternalTask, representing existing files. - ''' + """ workflow_task = luigi.Parameter() instance_name = luigi.Parameter() diff --git a/sciluigi/util.py b/sciluigi/util.py index e9ec58a..96deb83 100644 --- a/sciluigi/util.py +++ b/sciluigi/util.py @@ -1,54 +1,61 @@ -''' +""" This module contains utility methods that are used in various places across the sciluigi library -''' +""" import csv import os import time from luigi.six import iteritems + def timestamp(datefmt='%Y-%m-%d, %H:%M:%S'): - ''' + """ Create timestamp as a formatted string. - ''' + """ return time.strftime(datefmt, time.localtime()) + def timepath(sep='_'): - ''' + """ Create timestmap, formatted for use in file names. - ''' + """ return timestamp('%Y%m%d{sep}%H%M%S'.format(sep=sep)) + def timelog(): - ''' + """ Create time stamp for use in log files. - ''' + """ return timestamp('[%Y-%m-%d %H:%M:%S]') + def ensuredir(dirpath): - ''' + """ Ensure directory exists. - ''' + """ if not os.path.exists(dirpath): os.makedirs(dirpath) + RECORDFILE_DELIMITER = ':' + def recordfile_to_dict(filehandle): - ''' + """ Convert a record file to a dictionary. - ''' + """ csvrd = csv.reader(filehandle, delimiter=RECORDFILE_DELIMITER, skipinitialspace=True) records = {} for row in csvrd: records[row[0]] = row[1] return records + def dict_to_recordfile(filehandle, records): - ''' + """ Convert a dictionary to a recordfile. - ''' + """ csvwt = csv.writer(filehandle, delimiter=RECORDFILE_DELIMITER, skipinitialspace=True) rows = [] for key, val in iteritems(records): diff --git a/sciluigi/workflow.py b/sciluigi/workflow.py index 63f9738..160803f 100644 --- a/sciluigi/workflow.py +++ b/sciluigi/workflow.py @@ -1,6 +1,6 @@ -''' +""" This module contains sciluigi's subclasses of luigi's Task class. -''' +""" import datetime import luigi @@ -12,15 +12,28 @@ import sciluigi.dependencies import sciluigi.slurm + log = logging.getLogger('sciluigi-interface') + # ============================================================================== + +class WorkflowNotImplementedException(Exception): + """ + Exception to throw if the workflow() SciLuigi API method is not implemented. + """ + pass + + +# ================================================================================ + + class WorkflowTask(sciluigi.audit.AuditTrailHelpers, luigi.Task): - ''' + """ SciLuigi-specific task, that has a method for implementing a (dynamic) workflow definition (workflow()). - ''' + """ instance_name = luigi.Parameter(default='sciluigi_workflow') @@ -32,16 +45,16 @@ class WorkflowTask(sciluigi.audit.AuditTrailHelpers, luigi.Task): _hasaddedhandler = False def _ensure_timestamp(self): - ''' + """ Make sure that there is a time stamp for when the workflow started. - ''' + """ if self._wfstart == '': self._wfstart = datetime.datetime.utcnow().strftime('%Y%m%d_%H%M%S_%f') def get_wflogpath(self): - ''' + """ Get the path to the workflow-speicfic log file. - ''' + """ if self._wflogpath == '': self._ensure_timestamp() clsname = self.__class__.__name__.lower() @@ -50,41 +63,41 @@ def get_wflogpath(self): return self._wflogpath def get_auditdirpath(self): - ''' + """ Get the path to the workflow-speicfic audit trail directory. - ''' + """ self._ensure_timestamp() clsname = self.__class__.__name__.lower() audit_dirpath = 'audit/.audit_%s_%s' % (clsname, self._wfstart) return audit_dirpath def get_auditlogpath(self): - ''' + """ Get the path to the workflow-speicfic audit trail file. - ''' + """ self._ensure_timestamp() clsname = self.__class__.__name__.lower() audit_dirpath = 'audit/workflow_%s_started_%s.audit' % (clsname, self._wfstart) return audit_dirpath def add_auditinfo(self, infotype, infolog): - ''' + """ Add audit information to the audit log. - ''' + """ return self._add_auditinfo(self.__class__.__name__.lower(), infotype, infolog) def workflow(self): - ''' + """ SciLuigi API methoed. Implement your workflow here, and return the last task(s) of the dependency graph. - ''' + """ raise WorkflowNotImplementedException( 'workflow() method is not implemented, for ' + str(self)) def requires(self): - ''' + """ Implementation of Luigi API method. - ''' + """ if not self._hasaddedhandler: wflog_formatter = logging.Formatter( sciluigi.interface.LOGFMT_STREAM, @@ -110,16 +123,16 @@ def requires(self): return workflow_output def output(self): - ''' + """ Implementation of Luigi API method - ''' + """ return {'log': luigi.LocalTarget(self.get_wflogpath()), 'audit': luigi.LocalTarget(self.get_auditlogpath())} def run(self): - ''' + """ Implementation of Luigi API method - ''' + """ if self.output()['audit'].exists(): errmsg = ('Audit file already exists, ' 'when trying to create it: %s') % self.output()['audit'].path @@ -139,17 +152,9 @@ def run(self): self._hasloggedfinish = True def new_task(self, instance_name, cls, **kwargs): - ''' + """ Create new task instance, and link it to the current workflow. - ''' + """ newtask = sciluigi.new_task(instance_name, cls, self, **kwargs) self._tasks[instance_name] = newtask return newtask - -# ================================================================================ - -class WorkflowNotImplementedException(Exception): - ''' - Exception to throw if the workflow() SciLuigi API method is not implemented. - ''' - pass diff --git a/tools/init_projdir.py b/tools/init_projdir.py index 7fccf14..b559db7 100644 --- a/tools/init_projdir.py +++ b/tools/init_projdir.py @@ -2,42 +2,46 @@ import os import shutil import luigi +import six + projdir_struct = { - 'bin':None, - 'conf':None, - 'doc' : - { 'paper': None }, - 'experiments' : - { '2000-01-01-example' : - { 'audit':None, - 'bin':None, - 'conf':None, - 'data':None, - 'doc':None, - 'lib':None, - 'log':None, - 'raw':None, - 'results':None, - 'run':None, - 'tmp':None }}, - 'lib':None, - 'raw':None, - 'results':None, - 'src':None } + 'bin': None, + 'conf': None, + 'doc': {'paper': None}, + 'experiments': + {'2000-01-01-example': + {'audit': None, + 'bin': None, + 'conf': None, + 'data': None, + 'doc': None, + 'lib': None, + 'log': None, + 'raw': None, + 'results': None, + 'run': None, + 'tmp': None}}, + 'lib': None, + 'raw': None, + 'results': None, + 'src': None} + def get_file_dir(): return os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) + def print_dirs(dir_structure, padding, padstep): if type(dir_structure) is dict: - for k,v in dir_structure.iteritems(): - print str(' ' * padding) + k - print_dirs(v, padding+padstep, padstep) + for k,v in six.iteritems(dir_structure): + six.print_(str(' ' * padding) + k) + six.print_(print_dirs(v, padding+padstep, padstep)) + def create_dirs(dirtree): if type(dirtree) is dict: - for dir,subtree in dirtree.iteritems(): + for dir,subtree in six.iteritems(dirtree): print('Creating ' + dir + ' ...') os.makedirs(dir) if subtree is not None: @@ -45,6 +49,7 @@ def create_dirs(dirtree): create_dirs(subtree) os.chdir('..') + def print_and_create_projdirs(): print('Now creating the following directory structure:') print('-'*80) @@ -55,6 +60,7 @@ def print_and_create_projdirs(): class InitProj(luigi.Task): + projname = luigi.Parameter() def output(self): @@ -63,6 +69,7 @@ def output(self): def run(self): shutil.copytree(get_file_dir() + '/../.projtpl', self.projname) + if __name__ == '__main__': luigi.run() #print get_file_dir()