Skip to content

Commit

Permalink
Refactored docstrings to obey to PEP. Patch for pharmbio#56
Browse files Browse the repository at this point in the history
  • Loading branch information
Luca Venturini authored and Luca Venturini committed May 22, 2018
1 parent a98161d commit ec77d2f
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 168 deletions.
33 changes: 19 additions & 14 deletions sciluigi/audit.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'''
"""
This module contains functionality for the audit-trail logging functionality
'''
"""

import logging
import luigi
Expand All @@ -15,20 +15,22 @@

# ==============================================================================


class AuditTrailHelpers(object):
'''
"""
Mixin for luigi.Task:s, with functionality for writing audit logs of running tasks
'''
"""
def add_auditinfo(self, infotype, infoval):
'''
"""
Alias to _add_auditinfo(), that can be overridden.
'''
"""
return self._add_auditinfo(self.instance_name, infotype, infoval)


def _add_auditinfo(self, instance_name, infotype, infoval):
'''
"""
Save audit information in a designated file, specific for this task.
'''
"""
dirpath = self.workflow_task.get_auditdirpath()
if not os.path.isdir(dirpath):
time.sleep(random.random())
Expand All @@ -42,32 +44,35 @@ def _add_auditinfo(self, instance_name, infotype, infoval):
with open(auditfile, 'a') as afile:
afile.write('%s: %s\n' % (infotype, infoval))


def get_instance_name(self):
'''
"""
Return the luigi instance_name
'''
"""
instance_name = None
if self.instance_name is not None:
instance_name = self.instance_name
else:
instance_name = self.task_id
return instance_name


@luigi.Task.event_handler(luigi.Event.START)
def save_start_time(self):
'''
"""
Log start of execution of task.
'''
"""
if hasattr(self, 'workflow_task') and self.workflow_task is not None:
msg = 'Task {task} started'.format(
task=self.get_instance_name())
log.info(msg)


@luigi.Task.event_handler(luigi.Event.PROCESSING_TIME)
def save_end_time(self, task_exectime_sec):
'''
"""
Log end of execution of task, with execution time.
'''
"""
if hasattr(self, 'workflow_task') and self.workflow_task is not None:
msg = 'Task {task} finished after {proctime:.3f}s'.format(
task=self.get_instance_name(),
Expand Down
46 changes: 26 additions & 20 deletions sciluigi/dependencies.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'''
"""
This module contains functionality for dependency resolution for constructing
the dependency graph of workflows.
'''
"""

import luigi
from luigi.contrib.postgres import PostgresTarget
Expand All @@ -10,11 +10,12 @@

# ==============================================================================


class TargetInfo(object):
'''
"""
Class to be used for sending specification of which target, from which
task, to use, when stitching workflow tasks' outputs and inputs together.
'''
"""
task = None
path = None
target = None
Expand All @@ -25,21 +26,24 @@ def __init__(self, task, path, format=None, is_tmp=False):
self.target = luigi.LocalTarget(path, format, is_tmp)

def open(self, *args, **kwargs):
'''
"""
Forward open method, from luigi's target class
'''
"""
return self.target.open(*args, **kwargs)

# ==============================================================================


class S3TargetInfo(TargetInfo):

def __init__(self, task, path, format=None, client=None):
self.task = task
self.path = path
self.target = S3Target(path, format=format, client=client)

# ==============================================================================


class PostgresTargetInfo(TargetInfo):
def __init__(self, task, host, database, user, password, update_id, table=None, port=None):
self.task = task
Expand All @@ -54,28 +58,30 @@ def __init__(self, task, host, database, user, password, update_id, table=None,

# ==============================================================================


class DependencyHelpers(object):
'''

"""
Mixin implementing methods for supporting dynamic, and target-based
workflow definition, as opposed to the task-based one in vanilla luigi.
'''
"""

# --------------------------------------------------------
# Handle inputs
# --------------------------------------------------------

def requires(self):
'''
"""
Implement luigi API method by returning upstream tasks
'''
"""
return self._upstream_tasks()

def _upstream_tasks(self):
'''
"""
Extract upstream tasks from the TargetInfo objects
or functions returning those (or lists of both the earlier)
for use in luigi's requires() method.
'''
"""
upstream_tasks = []
for attrname, attrval in iteritems(self.__dict__):
if 'in_' == attrname[0:3]:
Expand All @@ -84,11 +90,11 @@ def _upstream_tasks(self):
return upstream_tasks

def _parse_inputitem(self, val, tasks):
'''
"""
Recursively loop through lists of TargetInfos, or
callables returning TargetInfos, or lists of ...
(repeat recursively) ... and return all tasks.
'''
"""
if callable(val):
val = val()
if isinstance(val, TargetInfo):
Expand All @@ -108,17 +114,17 @@ def _parse_inputitem(self, val, tasks):
# --------------------------------------------------------

def output(self):
'''
"""
Implement luigi API method
'''
"""
return self._output_targets()

def _output_targets(self):
'''
"""
Extract output targets from the TargetInfo objects
or functions returning those (or lists of both the earlier)
for use in luigi's output() method.
'''
"""
output_targets = []
for attrname in dir(self):
attrval = getattr(self, attrname)
Expand All @@ -128,11 +134,11 @@ def _output_targets(self):
return output_targets

def _parse_outputitem(self, val, targets):
'''
"""
Recursively loop through lists of TargetInfos, or
callables returning TargetInfos, or lists of ...
(repeat recursively) ... and return all targets.
'''
"""
if callable(val):
val = val()
if isinstance(val, TargetInfo):
Expand Down
20 changes: 12 additions & 8 deletions sciluigi/interface.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'''
"""
This module contains mappings of methods that are part of the sciluigi API
'''
"""

import luigi
import logging
Expand All @@ -11,10 +11,11 @@
LOGFMT_SCILUIGI = '%(asctime)s %(levelname)8s SCILUIGI %(message)s'
DATEFMT = '%Y-%m-%d %H:%M:%S'


def setup_logging():
'''
"""
Set up SciLuigi specific logging
'''
"""
sciluigi.util.ensuredir('log')
log_path = 'log/sciluigi_run_%s_detailed.log' % sciluigi.util.timepath()

Expand Down Expand Up @@ -49,16 +50,19 @@ def setup_logging():
sciluigi_logger.addHandler(sciluigi_file_handler)
sciluigi_logger.setLevel(logging.DEBUG)


setup_logging()


def run(*args, **kwargs):
'''
"""
Forwarding luigi's run method
'''
"""
luigi.run(*args, **kwargs)


def run_local(*args, **kwargs):
'''
"""
Forwarding luigi's run method, with local scheduler
'''
"""
run(local_scheduler=True, *args, **kwargs)
9 changes: 5 additions & 4 deletions sciluigi/parameter.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
'''
"""
This module contains a sciluigi subclass of luigi's Parameter, where
custom functionality might be added in the future.
'''
"""

import luigi


class Parameter(luigi.Parameter):
'''
"""
Subclass of luigi's Parameter, where custom functionality might be added in the future.
'''
"""
pass
Loading

0 comments on commit ec77d2f

Please sign in to comment.