Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored docstrings to obey to PEP. Patch for #56 #57

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 19 additions & 14 deletions sciluigi/audit.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'''
"""
This module contains functionality for the audit-trail logging functionality
'''
"""

import logging
import luigi
Expand All @@ -15,20 +15,22 @@

# ==============================================================================


class AuditTrailHelpers(object):
'''
"""
Mixin for luigi.Task:s, with functionality for writing audit logs of running tasks
'''
"""
def add_auditinfo(self, infotype, infoval):
'''
"""
Alias to _add_auditinfo(), that can be overridden.
'''
"""
return self._add_auditinfo(self.instance_name, infotype, infoval)


def _add_auditinfo(self, instance_name, infotype, infoval):
'''
"""
Save audit information in a designated file, specific for this task.
'''
"""
dirpath = self.workflow_task.get_auditdirpath()
if not os.path.isdir(dirpath):
time.sleep(random.random())
Expand All @@ -42,32 +44,35 @@ def _add_auditinfo(self, instance_name, infotype, infoval):
with open(auditfile, 'a') as afile:
afile.write('%s: %s\n' % (infotype, infoval))


def get_instance_name(self):
'''
"""
Return the luigi instance_name
'''
"""
instance_name = None
if self.instance_name is not None:
instance_name = self.instance_name
else:
instance_name = self.task_id
return instance_name


@luigi.Task.event_handler(luigi.Event.START)
def save_start_time(self):
'''
"""
Log start of execution of task.
'''
"""
if hasattr(self, 'workflow_task') and self.workflow_task is not None:
msg = 'Task {task} started'.format(
task=self.get_instance_name())
log.info(msg)


@luigi.Task.event_handler(luigi.Event.PROCESSING_TIME)
def save_end_time(self, task_exectime_sec):
'''
"""
Log end of execution of task, with execution time.
'''
"""
if hasattr(self, 'workflow_task') and self.workflow_task is not None:
msg = 'Task {task} finished after {proctime:.3f}s'.format(
task=self.get_instance_name(),
Expand Down
46 changes: 26 additions & 20 deletions sciluigi/dependencies.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'''
"""
This module contains functionality for dependency resolution for constructing
the dependency graph of workflows.
'''
"""

import luigi
from luigi.contrib.postgres import PostgresTarget
Expand All @@ -10,11 +10,12 @@

# ==============================================================================


class TargetInfo(object):
'''
"""
Class to be used for sending specification of which target, from which
task, to use, when stitching workflow tasks' outputs and inputs together.
'''
"""
task = None
path = None
target = None
Expand All @@ -25,21 +26,24 @@ def __init__(self, task, path, format=None, is_tmp=False):
self.target = luigi.LocalTarget(path, format, is_tmp)

def open(self, *args, **kwargs):
'''
"""
Forward open method, from luigi's target class
'''
"""
return self.target.open(*args, **kwargs)

# ==============================================================================


class S3TargetInfo(TargetInfo):

def __init__(self, task, path, format=None, client=None):
self.task = task
self.path = path
self.target = S3Target(path, format=format, client=client)

# ==============================================================================


class PostgresTargetInfo(TargetInfo):
def __init__(self, task, host, database, user, password, update_id, table=None, port=None):
self.task = task
Expand All @@ -54,28 +58,30 @@ def __init__(self, task, host, database, user, password, update_id, table=None,

# ==============================================================================


class DependencyHelpers(object):
'''

"""
Mixin implementing methods for supporting dynamic, and target-based
workflow definition, as opposed to the task-based one in vanilla luigi.
'''
"""

# --------------------------------------------------------
# Handle inputs
# --------------------------------------------------------

def requires(self):
'''
"""
Implement luigi API method by returning upstream tasks
'''
"""
return self._upstream_tasks()

def _upstream_tasks(self):
'''
"""
Extract upstream tasks from the TargetInfo objects
or functions returning those (or lists of both the earlier)
for use in luigi's requires() method.
'''
"""
upstream_tasks = []
for attrname, attrval in iteritems(self.__dict__):
if 'in_' == attrname[0:3]:
Expand All @@ -84,11 +90,11 @@ def _upstream_tasks(self):
return upstream_tasks

def _parse_inputitem(self, val, tasks):
'''
"""
Recursively loop through lists of TargetInfos, or
callables returning TargetInfos, or lists of ...
(repeat recursively) ... and return all tasks.
'''
"""
if callable(val):
val = val()
if isinstance(val, TargetInfo):
Expand All @@ -108,17 +114,17 @@ def _parse_inputitem(self, val, tasks):
# --------------------------------------------------------

def output(self):
'''
"""
Implement luigi API method
'''
"""
return self._output_targets()

def _output_targets(self):
'''
"""
Extract output targets from the TargetInfo objects
or functions returning those (or lists of both the earlier)
for use in luigi's output() method.
'''
"""
output_targets = []
for attrname in dir(self):
attrval = getattr(self, attrname)
Expand All @@ -128,11 +134,11 @@ def _output_targets(self):
return output_targets

def _parse_outputitem(self, val, targets):
'''
"""
Recursively loop through lists of TargetInfos, or
callables returning TargetInfos, or lists of ...
(repeat recursively) ... and return all targets.
'''
"""
if callable(val):
val = val()
if isinstance(val, TargetInfo):
Expand Down
20 changes: 12 additions & 8 deletions sciluigi/interface.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'''
"""
This module contains mappings of methods that are part of the sciluigi API
'''
"""

import luigi
import logging
Expand All @@ -11,10 +11,11 @@
LOGFMT_SCILUIGI = '%(asctime)s %(levelname)8s SCILUIGI %(message)s'
DATEFMT = '%Y-%m-%d %H:%M:%S'


def setup_logging():
'''
"""
Set up SciLuigi specific logging
'''
"""
sciluigi.util.ensuredir('log')
log_path = 'log/sciluigi_run_%s_detailed.log' % sciluigi.util.timepath()

Expand Down Expand Up @@ -49,16 +50,19 @@ def setup_logging():
sciluigi_logger.addHandler(sciluigi_file_handler)
sciluigi_logger.setLevel(logging.DEBUG)


setup_logging()


def run(*args, **kwargs):
'''
"""
Forwarding luigi's run method
'''
"""
luigi.run(*args, **kwargs)


def run_local(*args, **kwargs):
'''
"""
Forwarding luigi's run method, with local scheduler
'''
"""
run(local_scheduler=True, *args, **kwargs)
9 changes: 5 additions & 4 deletions sciluigi/parameter.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
'''
"""
This module contains a sciluigi subclass of luigi's Parameter, where
custom functionality might be added in the future.
'''
"""

import luigi


class Parameter(luigi.Parameter):
'''
"""
Subclass of luigi's Parameter, where custom functionality might be added in the future.
'''
"""
pass
Loading