Skip to content

Commit

Permalink
types and cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
mr-c committed Aug 25, 2021
1 parent c932df0 commit e48e39a
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 72 deletions.
10 changes: 8 additions & 2 deletions src/toil/batchSystems/singleMachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,14 @@ def _runDebugJob(self, jobCommand, jobID, environment):
# We can actually run in this thread
jobName, jobStoreLocator, jobStoreID = jobCommand.split()[1:4] # Parse command
jobStore = Toil.resumeJobStore(jobStoreLocator)
toil_worker.workerScript(jobStore, jobStore.config, jobName, jobStoreID, None,
redirectOutputToLogFile=not self.debugWorker) # Call the worker
toil_worker.workerScript(
jobStore,
jobStore.config,
jobName,
jobStoreID,
None,
redirectOutputToLogFile=not self.debugWorker,
) # Call the worker
else:
# Run synchronously. If starting or running the command fails, let the exception stop us.
subprocess.check_call(jobCommand,
Expand Down
25 changes: 15 additions & 10 deletions src/toil/leader.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,8 +664,8 @@ def innerLoop(self):

# Consistency check the toil state
assert self.toilState.updatedJobs == {}
#assert self.toilState.successorCounts == {}
#assert self.toilState.successorJobStoreIDToPredecessorJobs == {}
# assert self.toilState.successorCounts == {}
# assert self.toilState.successorJobStoreIDToPredecessorJobs == {}
assert self.toilState.serviceJobStoreIDToPredecessorJob == {}
assert self.toilState.servicesIssued == {}
# assert self.toilState.jobsToBeScheduledWithMultiplePredecessors # These are not properly emptied yet
Expand Down Expand Up @@ -749,17 +749,22 @@ def issueJob(self, jobNode):
for context in self.batchSystem.getWorkerContexts():
# For each context manager hook the batch system wants to run in
# the worker, serialize and send it.
workerCommand.append('--context')
workerCommand.append(base64.b64encode(pickle.dumps(context)).decode('utf-8'))
workerCommand.append("--context")
workerCommand.append(
base64.b64encode(pickle.dumps(context)).decode("utf-8")
)

# add the toilState as a pickle
workerCommand.append('--toilState')
workerCommand.append(base64.b64encode(pickle.dumps(self.toilState)).decode('utf-8'))

jobNode.command = ' '.join(workerCommand)
workerCommand.append("--toilState")
workerCommand.append(
base64.b64encode(pickle.dumps(self.toilState)).decode("utf-8")
)

omp_threads = os.environ.get('OMP_NUM_THREADS') \
or str(max(1, int(jobNode.cores))) # make sure OMP_NUM_THREADS is a positive integer
jobNode.command = " ".join(workerCommand)

omp_threads = os.environ.get("OMP_NUM_THREADS") or str(
max(1, int(jobNode.cores))
) # make sure OMP_NUM_THREADS is a positive integer

job_environment = {
# Set the number of cores used by OpenMP applications
Expand Down
49 changes: 33 additions & 16 deletions src/toil/test/cwl/cwlTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,11 +814,11 @@ def test_download_structure(self) -> None:
class CWLToilOptimizeTests(ToilTest):
def setUp(self):
"""Runs anew before each test to create farm fresh temp dirs."""
self.outDir = f'/tmp/toil-cwl-test-{str(uuid.uuid4())}'
self.outDir = f"/tmp/toil-cwl-test-{str(uuid.uuid4())}"
os.makedirs(self.outDir)
self.rootDir = self._projectRootPath()
self.jobDir = os.path.join(self.outDir, 'jobStore')
self.statDir = os.path.join(self.jobDir, 'stats')
self.jobDir = os.path.join(self.outDir, "jobStore")
self.statDir = os.path.join(self.jobDir, "stats")

def tearDown(self):
"""Clean up outputs."""
Expand All @@ -828,36 +828,53 @@ def tearDown(self):

def _tester(self, cwlfile, jobfile, expect, main_args=[]):
from toil.cwl import cwltoil

st = StringIO()
main_args = main_args[:]
main_args.extend(['--logDebug','--stats','--outdir', self.outDir, '--jobStore', self.jobDir,
os.path.join(self.rootDir, cwlfile), os.path.join(self.rootDir, jobfile)])
main_args.extend(
[
"--logDebug",
"--stats",
"--outdir",
self.outDir,
"--jobStore",
self.jobDir,
os.path.join(self.rootDir, cwlfile),
os.path.join(self.rootDir, jobfile),
]
)
cwltoil.main(main_args, stdout=st)
out = self._extract_job_lists()
self.assertEqual(out, expect)

def _match_extract_string(self, stringin):
import re
search_pattern = re.compile('^.* (\w*) kind-CWLJob/instance-.*$')

search_pattern = re.compile("^.* (\w*) kind-CWLJob/instance-.*$")
if search_pattern.match(stringin):
return(search_pattern.sub(r'\1',stringin))
return search_pattern.sub(r"\1", stringin)
else:
return(None)
return None

def _extract_job_lists(self):
worker_list = []
for filename in os.listdir(self.statDir):
with open(os.path.join(self.statDir,filename)) as f:
with open(os.path.join(self.statDir, filename)) as f:
test_json = json.load(f)
if 'workers' in test_json.keys() and len(test_json['jobs']) > 0:
job_list = [self._match_extract_string(x) for x in test_json['logs']['names']]
if "workers" in test_json.keys() and len(test_json["jobs"]) > 0:
job_list = [
self._match_extract_string(x)
for x in test_json["logs"]["names"]
]
if not all(x == None for x in job_list):
worker_list.append(job_list)
worker_list.sort()
return(worker_list)
return worker_list

def test_biobb_fail(self):
self._tester('src/toil/test/cwl/md_list_reduced.cwl',
'src/toil/test/cwl/md_list_reduced.json',
[['genion', 'grompp', 'pdb2gmx', 'editconf', 'solvate']],
main_args=[])
self._tester(
"src/toil/test/cwl/md_list_reduced.cwl",
"src/toil/test/cwl/md_list_reduced.json",
[["genion", "grompp", "pdb2gmx", "editconf", "solvate"]],
main_args=[],
)
4 changes: 3 additions & 1 deletion src/toil/utils/toilDebugJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,7 @@ def main() -> None:

jobID = options.jobID[0]
logger.debug(f"Running the following job locally: {jobID}")
workerScript(jobStore, config, jobID, jobID, redirectOutputToLogFile=False)
workerScript(
jobStore, config, jobID, jobID, options.toilState, redirectOutputToLogFile=False
)
logger.debug(f"Finished running: {jobID}")
Loading

0 comments on commit e48e39a

Please sign in to comment.