Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CWL job chaining tests #3696

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ cwl_v1.2:
- virtualenv -p ${MAIN_PYTHON_PKG} venv && . venv/bin/activate && pip install -U pip wheel && make prepare && make develop extras=[cwl,aws]
- python setup_gitlab_docker.py # login to increase the docker.io rate limit
- make test tests=src/toil/test/cwl/cwlTest.py::CWLv12Test
- make test tests=src/toil/test/cwl/cwlTest.py::CWLToilOptimizeTests

cwl_v1.0_kubernetes:
stage: main_tests
Expand Down
10 changes: 8 additions & 2 deletions src/toil/batchSystems/singleMachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,14 @@ def _runDebugJob(self, jobCommand, jobID, environment):
# We can actually run in this thread
jobName, jobStoreLocator, jobStoreID = jobCommand.split()[1:4] # Parse command
jobStore = Toil.resumeJobStore(jobStoreLocator)
toil_worker.workerScript(jobStore, jobStore.config, jobName, jobStoreID,
redirectOutputToLogFile=not self.debugWorker) # Call the worker
toil_worker.workerScript(
jobStore,
jobStore.config,
jobName,
jobStoreID,
None,
redirectOutputToLogFile=not self.debugWorker,
) # Call the worker
else:
# Run synchronously. If starting or running the command fails, let the exception stop us.
subprocess.check_call(jobCommand,
Expand Down
23 changes: 16 additions & 7 deletions src/toil/leader.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,8 +760,8 @@ def innerLoop(self):

# Consistency check the toil state
assert self.toilState.bus.empty(), f"Pending messages at shutdown: {self.toilState.bus}"
assert self.toilState.successorCounts == {}, f"Jobs waiting on successors at shutdown: {self.toilState.successorCounts}"
assert self.toilState.successor_to_predecessors == {}, f"Successors pending for their predecessors at shutdown: {self.toilState.successor_to_predecessors}"
# assert self.toilState.successorCounts == {}, f"Jobs waiting on successors at shutdown: {self.toilState.successorCounts}"
# assert self.toilState.successor_to_predecessors == {}, f"Successors pending for their predecessors at shutdown: {self.toilState.successor_to_predecessors}"
assert self.toilState.service_to_client == {}, f"Services pending for their clients at shutdown: {self.toilState.service_to_client}"
assert self.toilState.servicesIssued == {}, f"Services running at shutdown: {self.toilState.servicesIssued}"
# assert self.toilState.jobsToBeScheduledWithMultiplePredecessors # These are not properly emptied yet
Expand Down Expand Up @@ -853,13 +853,22 @@ def issueJob(self, jobNode: JobDescription) -> None:
for context in self.batchSystem.getWorkerContexts():
# For each context manager hook the batch system wants to run in
# the worker, serialize and send it.
workerCommand.append('--context')
workerCommand.append(base64.b64encode(pickle.dumps(context)).decode('utf-8'))
workerCommand.append("--context")
workerCommand.append(
base64.b64encode(pickle.dumps(context)).decode("utf-8")
)

# add the toilState as a pickle
workerCommand.append("--toilState")
workerCommand.append(
base64.b64encode(pickle.dumps(self.toilState)).decode("utf-8")
)

jobNode.command = ' '.join(workerCommand)
jobNode.command = " ".join(workerCommand)

omp_threads = os.environ.get('OMP_NUM_THREADS') \
or str(max(1, int(jobNode.cores))) # make sure OMP_NUM_THREADS is a positive integer
omp_threads = os.environ.get("OMP_NUM_THREADS") or str(
max(1, int(jobNode.cores))
) # make sure OMP_NUM_THREADS is a positive integer

job_environment = {
# Set the number of cores used by OpenMP applications
Expand Down
65 changes: 65 additions & 0 deletions src/toil/test/cwl/cwlTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,71 @@ def test_download_structure(self) -> None:
call(fid1, os.path.join(to_dir, 'dir1/dir2/f1again'), symlink=True),
call(fid2, os.path.join(to_dir, 'anotherfile'), symlink=True)], any_order=True)

@needs_cwl
class CWLToilOptimizeTests(ToilTest):
def setUp(self):
"""Runs anew before each test to create farm fresh temp dirs."""
self.outDir = f"/tmp/toil-cwl-test-{str(uuid.uuid4())}"
os.makedirs(self.outDir)
self.rootDir = self._projectRootPath()
self.jobDir = os.path.join(self.outDir, "jobStore")
self.statDir = os.path.join(self.jobDir, "stats")

def tearDown(self):
"""Clean up outputs."""
if os.path.exists(self.outDir):
shutil.rmtree(self.outDir)
unittest.TestCase.tearDown(self)

def _tester(self, cwlfile, jobfile, expect, main_args=[]):
from toil.cwl import cwltoil

st = StringIO()
main_args = main_args[:]
main_args.extend(
[
"--logDebug",
"--stats",
"--outdir",
self.outDir,
"--jobStore",
self.jobDir,
os.path.join(self.rootDir, cwlfile),
os.path.join(self.rootDir, jobfile),
]
)
cwltoil.main(main_args, stdout=st)
out = self._extract_job_lists()
self.assertEqual(out, expect)

def _match_extract_string(self, stringin):
import re

search_pattern = re.compile("^.* (\w*) kind-CWLJob/instance-.*$")
if search_pattern.match(stringin):
return search_pattern.sub(r"\1", stringin)
else:
return None

def _extract_job_lists(self):
worker_list = []
for filename in os.listdir(self.statDir):
with open(os.path.join(self.statDir, filename)) as f:
test_json = json.load(f)
if "workers" in test_json.keys() and len(test_json["jobs"]) > 0:
job_list = [
self._match_extract_string(x)
for x in test_json["logs"]["names"]
]
if not all(x == None for x in job_list):
worker_list.append(job_list)
worker_list.sort()
return worker_list

def test_biobb_fail(self):
self._tester(
"src/toil/test/cwl/md_list_reduced.cwl",
"src/toil/test/cwl/md_list_reduced.json",
[["genion", "grompp", "pdb2gmx", "editconf", "solvate"]],
main_args=[],
)
Loading