diff --git a/setup.py b/setup.py index 08d540e..d6e0631 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ long_description = readmeFile.read() setup(name='wes-service', - version='2.9', + version='3.0', description='GA4GH Workflow Execution Service reference implementation', long_description=long_description, author='GA4GH Containers and Workflows task team', @@ -25,9 +25,9 @@ install_requires=[ 'future', 'connexion==1.4.2', - 'ruamel.yaml >= 0.12.4, < 0.15', + 'ruamel.yaml >= 0.12.4, <= 0.15.77', 'cwlref-runner==1.0', - 'schema-salad>=2.6, <3', + 'schema-salad >= 3.0, < 3.1', 'subprocess32==3.5.2' ], entry_points={ diff --git a/test/test_integration.py b/test/test_integration.py index da666cf..cfc5f86 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -23,7 +23,7 @@ class IntegrationTest(unittest.TestCase): def setUpClass(cls): # cwl cls.cwl_dockstore_url = 'https://dockstore.org:8443/api/ga4gh/v2/tools/quay.io%2Fbriandoconnor%2Fdockstore-tool-md5sum/versions/master/plain-CWL/descriptor/%2FDockstore.cwl' - cls.cwl_local_path = os.path.abspath('testdata/md5sum.cwl') + cls.cwl_local_path = "file://" + os.path.abspath('testdata/md5sum.cwl') cls.cwl_json_input = "file://" + os.path.abspath('testdata/md5sum.json') cls.cwl_attachments = ['file://' + os.path.abspath('testdata/md5sum.input'), 'file://' + os.path.abspath('testdata/dockstore-tool-md5sum.cwl')] @@ -52,15 +52,15 @@ def tearDown(self): time.sleep(3) except OSError as e: print(e) - if os.path.exists('workflows'): - shutil.rmtree('workflows') unittest.TestCase.tearDown(self) def test_dockstore_md5sum(self): """HTTP md5sum cwl (dockstore), run it on the wes-service server, and check for the correct output.""" - outfile_path, _ = self.run_md5sum(wf_input=self.cwl_dockstore_url, + outfile_path, run_id = self.run_md5sum(wf_input=self.cwl_dockstore_url, json_input=self.cwl_json_input, workflow_attachment=self.cwl_attachments) + state = self.wait_for_finish(run_id) + assert state == "COMPLETE" self.assertTrue(check_for_file(outfile_path), 'Output file was not found: ' + str(outfile_path)) def test_local_md5sum(self): @@ -68,6 +68,8 @@ def test_local_md5sum(self): outfile_path, run_id = self.run_md5sum(wf_input=self.cwl_local_path, json_input=self.cwl_json_input, workflow_attachment=self.cwl_attachments) + state = self.wait_for_finish(run_id) + assert state == "COMPLETE" self.assertTrue(check_for_file(outfile_path), 'Output file was not found: ' + str(outfile_path)) def test_run_attachments(self): @@ -76,6 +78,8 @@ def test_run_attachments(self): json_input=self.cwl_json_input, workflow_attachment=self.cwl_attachments) get_response = self.client.get_run_log(run_id)["request"] + state = self.wait_for_finish(run_id) + assert state == "COMPLETE" self.assertTrue(check_for_file(outfile_path), 'Output file was not found: ' + get_response["workflow_attachment"]) attachment_tool_path = get_response["workflow_attachment"][7:] + "/dockstore-tool-md5sum.cwl" self.assertTrue(check_for_file(attachment_tool_path), 'Attachment file was not found: ' + get_response["workflow_attachment"]) @@ -90,7 +94,7 @@ def test_get_service_info(self): assert 'workflow_type_versions' in r assert 'supported_wes_versions' in r assert 'supported_filesystem_protocols' in r - assert 'engine_versions' in r + assert 'workflow_engine_versions' in r def test_list_runs(self): """ @@ -121,6 +125,18 @@ def run_md5sum(self, wf_input, json_input, workflow_attachment=None): output_dir = os.path.abspath(os.path.join('workflows', response['run_id'], 'outdir')) return os.path.join(output_dir, 'md5sum.txt'), response['run_id'] + def wait_for_finish(self, run_id, seconds=120): + """Return True if a file exists within a certain amount of time.""" + wait_counter = 0 + r = self.client.get_run_status(run_id) + while r["state"] in ("QUEUED", "INITIALIZING", "RUNNING"): + time.sleep(1) + wait_counter += 1 + if wait_counter > seconds: + return None + r = self.client.get_run_status(run_id) + return r["state"] + def get_server_pids(): try: @@ -149,9 +165,13 @@ def setUp(self): Start a (local) wes-service server to make requests against. Use cwltool as the wes-service server 'backend'. """ + if os.path.exists('workflows'): + shutil.rmtree('workflows') self.wes_server_process = subprocess.Popen( - 'python {}'.format(os.path.abspath('wes_service/wes_service_main.py')), - shell=True) + ['python', os.path.abspath('wes_service/wes_service_main.py'), + '--backend=wes_service.cwl_runner', + '--port=8080', + '--debug']) time.sleep(5) diff --git a/testdata/md5sum.json b/testdata/md5sum.json index cbf99b2..547158f 100644 --- a/testdata/md5sum.json +++ b/testdata/md5sum.json @@ -1,2 +1 @@ -{"output_file": {"path": "/tmp/md5sum.txt", "class": "File"}, - "input_file": {"path": "md5sum.input", "class": "File"}} +{"input_file": {"path": "md5sum.input", "class": "File"}} diff --git a/wes_client/util.py b/wes_client/util.py index cc9c1e5..7eebc35 100644 --- a/wes_client/util.py +++ b/wes_client/util.py @@ -107,7 +107,9 @@ def build_wes_request(workflow_file, json_path, attachments=None): :return: A list of tuples formatted to be sent in a post to the wes-server (Swagger API). """ workflow_file = "file://" + workflow_file if ":" not in workflow_file else workflow_file + wfbase = None if json_path.startswith("file://"): + wfbase = os.path.dirname(json_path[7:]) json_path = json_path[7:] with open(json_path) as f: wf_params = json.dumps(json.load(f)) @@ -122,17 +124,21 @@ def build_wes_request(workflow_file, json_path, attachments=None): ("workflow_type_version", wf_version)] if workflow_file.startswith("file://"): + if wfbase is None: + wfbase = os.path.dirname(workflow_file[7:]) parts.append(("workflow_attachment", (os.path.basename(workflow_file[7:]), open(workflow_file[7:], "rb")))) parts.append(("workflow_url", os.path.basename(workflow_file[7:]))) else: parts.append(("workflow_url", workflow_file)) + if wfbase is None: + wfbase = os.getcwd() if attachments: for attachment in attachments: if attachment.startswith("file://"): attachment = attachment[7:] attach_f = open(attachment, "rb") - relpath = os.path.relpath(attachment, os.getcwd()) + relpath = os.path.relpath(attachment, wfbase) elif attachment.startswith("http"): attach_f = urlopen(attachment) relpath = os.path.basename(attach_f) diff --git a/wes_service/arvados_wes.py b/wes_service/arvados_wes.py index ab6348b..6121b71 100644 --- a/wes_service/arvados_wes.py +++ b/wes_service/arvados_wes.py @@ -75,7 +75,7 @@ def GetServiceInfo(self): "supported_wes_versions": ["0.3.0", "1.0.0"], "supported_filesystem_protocols": ["http", "https", "keep"], "workflow_engine_versions": { - "arvados-cwl-runner": stderr + "arvados-cwl-runner": str(stderr) }, "default_workflow_engine_parameters": [], "system_state_counts": {}, diff --git a/wes_service/cwl_runner.py b/wes_service/cwl_runner.py index b6c5681..483b994 100644 --- a/wes_service/cwl_runner.py +++ b/wes_service/cwl_runner.py @@ -162,18 +162,19 @@ class CWLRunnerBackend(WESBackend): def GetServiceInfo(self): runner = self.getopt("runner", default="cwl-runner") stdout, stderr = subprocess.Popen([runner, "--version"], stderr=subprocess.PIPE).communicate() - return { + r = { "workflow_type_versions": { "CWL": {"workflow_type_version": ["v1.0"]} }, "supported_wes_versions": ["0.3.0", "1.0.0"], "supported_filesystem_protocols": ["file", "http", "https"], "workflow_engine_versions": { - "cwl-runner": stderr + "cwl-runner": str(stderr) }, "system_state_counts": {}, "tags": {} } + return r def ListRuns(self, page_size=None, page_token=None, state_search=None): # FIXME #15 results don't page diff --git a/wes_service/toil_wes.py b/wes_service/toil_wes.py index b33d67d..6f244a2 100644 --- a/wes_service/toil_wes.py +++ b/wes_service/toil_wes.py @@ -291,7 +291,7 @@ def GetServiceInfo(self): }, 'supported_wes_versions': ['0.3.0', '1.0.0'], 'supported_filesystem_protocols': ['file', 'http', 'https'], - 'engine_versions': ['3.16.0'], + 'workflow_engine_versions': ['3.16.0'], 'system_state_counts': {}, 'key_values': {} } diff --git a/wes_service/util.py b/wes_service/util.py index f8fe86a..dd23687 100644 --- a/wes_service/util.py +++ b/wes_service/util.py @@ -65,7 +65,7 @@ def collect_attachments(self, run_id=None): body[k] = "file://%s" % tempdir # Reference to temp working dir. elif k in ("workflow_params", "tags", "workflow_engine_parameters"): content = v.read() - body[k] = json.loads(content) + body[k] = json.loads(content.decode("utf-8")) else: body[k] = v.read().decode()