diff --git a/fasp/runner/fasp_runner.py b/fasp/runner/fasp_runner.py index 57f6a0e..0ca2e0d 100644 --- a/fasp/runner/fasp_runner.py +++ b/fasp/runner/fasp_runner.py @@ -56,9 +56,13 @@ def runQuery(self, query, note): query_job = self.searchClient.runQuery(query) # Send the query creditor.creditClass(self.searchClient) # repeat steps 2 and 3 for each row of the query + runIds = [] for row in query_job: - print("subject={}, drsID={}".format(row[0], row[1])) + # To do - get the subject/sample name from the query + runKey = 'subject' + print("{}={}, drsID={}".format(runKey, row[0], row[1])) + # Step 2 - Use DRS to get the URL objInfo = self.drsClient.getObject(row[1]) @@ -72,6 +76,7 @@ def runQuery(self, query, note): if self.live: pipeline_id = self.workClient.runWorkflow(url, outfile) print('workflow submitted, run:{}'.format(pipeline_id)) + runIds.append({runKey:row[0], 'run_id':pipeline_id}) creditor.creditClass(self.workClient) via = 'sh' #pipeline_id = 'paste here' @@ -80,4 +85,5 @@ def runQuery(self, query, note): if self.live: self.pipelineLogger.logRun(time, via, note, pipeline_id, outfile, str(fileSize), self.searchClient, self.drsClient, self.workClient) - + + return runIds \ No newline at end of file diff --git a/fasp/workflow/dnastack_wesclient.py b/fasp/workflow/dnastack_wesclient.py index 686152f..5ef42e7 100644 --- a/fasp/workflow/dnastack_wesclient.py +++ b/fasp/workflow/dnastack_wesclient.py @@ -1,17 +1,17 @@ -''' -Client for DNAStack WES Service -''' - -import requests -import os import json +import os import tempfile + import pandas as pd -import sys +import requests from fasp.workflow import WESClient + class DNAStackWESClient(WESClient): + """ + Client for DNAStack WES Service + """ def __init__(self, client_credentials_path, debug=False): super(DNAStackWESClient, self).__init__('https://workspaces-wes.prod.dnastack.com/ga4gh/wes/v1/runs') @@ -19,13 +19,13 @@ def __init__(self, client_credentials_path, debug=False): full_credentials_path = os.path.expanduser(client_credentials_path) with open(full_credentials_path) as f: self.credentials = json.load(f) - self.__updateAccessToken__() - self.headers = {'Authorization': 'Bearer {}'.format(self.accessToken)} + self.__updateAccessToken() + self.headers['Authorization'] = 'Bearer {}'.format(self.accessToken) self.debug = debug self.modulePath = os.path.dirname(os.path.abspath(__file__)) self.wdlPath = self.modulePath + '/wes/gwas' - def __updateAccessToken__(self): + def __updateAccessToken(self): if 'id' not in self.credentials or 'secret' not in self.credentials: raise RuntimeError('Credentials file must have "id" and "secret" values') payload = [ @@ -48,50 +48,23 @@ def __updateAccessToken__(self): def runWorkflow(self, fileurl, outfile): # use a temporary file to write out the input file - inputJson = {"md5Sum.inputFile":fileurl} - with tempfile.TemporaryFile() as fp: - fp.write(json.dumps(inputJson).encode('utf-8')) - fp.seek(0) - payload = {'workflow_url': 'checksum.wdl'} - files = { - 'workflow_params': ('inputs.json', fp, 'application/json'), - 'workflow_attachment': ('checksum.wdl', open(self.modulePath+'/wes/checksum.wdl', 'rb'), 'text/plain') - } + inputs = {"md5Sum.inputFile": fileurl} + + return self.runGenericWorkflow( + workflow_url='checksum.wdl', + workflow_params=json.dumps(inputs), + workflow_attachment=('checksum.wdl', open(self.modulePath+'/wes/checksum.wdl', 'rb'), 'text/plain') + ) - - response = requests.request("POST", self.api_url_base, headers=self.headers, data = payload, files = files) - if self.debug: - print(response) - if response.status_code == 200: - return response.json()['run_id'] - elif response.status_code == 401: - print("WES server authentication failed") - sys.exit(1) - else: - print("WES run submission failed. Response status:{}".format(response.status_code)) - sys.exit(1) - - - def runGWASWorkflowTest(self): ''' run the GWAS workflow by submitting local files ''' - payload = {'workflow_url': 'gwas.wdl'} - files = { - 'workflow_params': ('inputs.gwas.json', open(self.wdlPath+'/inputs.gwas.json', 'rb'), 'application/json'), - 'workflow_attachment': ('gwas.wdl', open(self.wdlPath+'/gwas.wdl', 'rb'), 'text/plain') - } - response = requests.request("POST", self.api_url_base, headers=self.headers, data = payload, files = files) - if response.status_code == 200: - return response.json()['run_id'] - elif response.status_code == 401: - print("WES server authentication failed") - sys.exit(1) - else: - print("WES run submission failed. Response status:{}".format(response.status_code)) - sys.exit(1) + return self.runGenericWorkflow( + workflow_url='gwas.wdl', + workflow_params=open(self.wdlPath+'/inputs.gwas.json', 'rb'), + workflow_attachment=('gwas.wdl', open(self.wdlPath+'/gwas.wdl', 'rb'), 'text/plain') + ) - return response.json()['run_id'] def runGWASWorkflow(self, vcfFileurl, csvfileurl): ''' run the GWAS workflow by using files accessed by DRS''' @@ -100,27 +73,18 @@ def runGWASWorkflow(self, vcfFileurl, csvfileurl): with tempfile.TemporaryFile() as fp: fp.write(json.dumps(inputJson).encode('utf-8')) fp.seek(0) - payload = {'workflow_url': 'gwas.wdl'} - files = { + payload = { + 'workflow_url': 'gwas.wdl', 'workflow_params': ('inputs.gwas.json', fp, 'application/json'), 'workflow_attachment': ('gwas.wdl', open(self.wdlPath+'/gwas.wdl', 'rb'), 'text/plain') } - - response = requests.request("POST", self.api_url_base, headers=self.headers, data = payload, files = files) - if self.debug: - print(response) - if response.status_code == 200: - return response.json()['run_id'] - elif response.status_code == 401: - print("WES server authentication failed") - sys.exit(1) - else: - print("WES run submission failed. Response status:{}".format(response.status_code)) - sys.exit(1) + return self.runGenericWorkflow( + workflow_url='gwas.wdl', + workflow_params=fp, + workflow_attachment=('gwas.wdl', open(self.wdlPath+'/gwas.wdl', 'rb'), 'text/plain') + ) - return response.json()['run_id'] - def addRun(self, run_id, runsdf): runURL = "{}/{}".format(self.api_url_base, run_id) @@ -167,8 +131,7 @@ def getRuns(self): if __name__ == "__main__": myClient = DNAStackWESClient('~/.keys/dnastack_wes_credentials.json') - res = myClient.runGWASWorkflowTest() - #res = myClient.runWorkflow('gs://dnastack-public-bucket/thousand_genomes_meta.csv', '') - + # res = myClient.runGWASWorkflowTest() + res = myClient.runWorkflow('gs://dnastack-public-bucket/thousand_genomes_meta.csv', '') print(res) diff --git a/fasp/workflow/elixir_wesclient.py b/fasp/workflow/elixir_wesclient.py new file mode 100644 index 0000000..bce8840 --- /dev/null +++ b/fasp/workflow/elixir_wesclient.py @@ -0,0 +1,47 @@ +import json +import os + +from fasp.workflow import WESClient + + +class ElixirWESClient(WESClient): + """ + Client for ELIXIR WES Service + """ + + def __init__(self, client_credentials_path, debug=False): + super(ElixirWESClient, self).__init__('https://wes-eu.egci-endpoints.imsi.athenarc.gr/ga4gh/wes/v1/runs') + full_credentials_path = os.path.expanduser(client_credentials_path) + with open(full_credentials_path) as f: + self.credentials = json.load(f) + if 'access_token' not in self.credentials: + raise RuntimeError('Must define "access_token" in credentials file') + self.headers['Authorization'] = 'Bearer {}'.format(self.credentials['access_token']) + self.debug = debug + self.modulePath = os.path.dirname(os.path.abspath(__file__)) + self.wdlPath = self.modulePath + '/wes/gwas' + + def runWorkflow(self): + # use a temporary file to write out the input file + workflow_url = 'https://github.com/uniqueg/cwl-example-workflows/blob/master/hashsplitter-workflow.cwl' + params = { + 'input': { + 'class': 'File', + 'path': 'http://62.217.82.57/test.txt' + } + } + + return self.runGenericWorkflow( + workflow_url=workflow_url, + workflow_params=json.dumps(params), + workflow_type='CWL', + workflow_type_version='v1.0' + ) + + +if __name__ == "__main__": + myClient = ElixirWESClient('~/.keys/elixir_wes_credentials.json') + + res = myClient.runWorkflow() + + print(res) diff --git a/fasp/workflow/wesclient.py b/fasp/workflow/wesclient.py index ce675ae..6a8a7f7 100644 --- a/fasp/workflow/wesclient.py +++ b/fasp/workflow/wesclient.py @@ -1,12 +1,16 @@ -import requests import json -import sys +from typing import Dict, Optional, Union, IO + +import requests + ''' base class for a WES Client''' class WESClient: + api_url_base: str + headers: Dict[str, str] - def __init__(self, api_url_base): self.api_url_base = api_url_base + self.headers = {} def getTaskStatus(self, run_id, verbose=False): @@ -32,25 +36,41 @@ def GetRunLog(self, run_id, verbose=False): if runResp.status_code == 400: return 'task not found' print(runResp) - - def runGenericWorkflow(self, body, verbose=False): - if verbose: print("sending to {}".format( self.api_url_base)) - - response = requests.request('POST', self.api_url_base, headers=self.headers, files=body) + + def runGenericWorkflow( + self, + workflow_url: str, + workflow_params: Union[str, IO[bytes], None] = None, + workflow_engine_params: Union[str, IO[bytes], None] = None, + workflow_type: Optional[str] = None, + workflow_type_version: Optional[str] = None, + tags: Union[str, IO[bytes], None] = None, + workflow_attachment=None, + verbose=False + ): if verbose: + print("sending to {}".format( self.api_url_base)) + + attachments = { + 'workflow_url': (None, workflow_url,'text/plain'), + 'workflow_params': (None, workflow_params, 'application/json'), + 'workflow_engine_params': (None, workflow_engine_params, 'application/json'), + 'workflow_type': (None, workflow_type, 'text/plain'), + 'workflow_type_version': (None, workflow_type_version, 'text/plain'), + 'tags': (None, tags, 'text/plain'), + 'workflow_attachment': workflow_attachment, + } + + response = requests.request('POST', self.api_url_base, headers=self.headers, files=attachments) + if verbose: + print(response.request.body) print(response.text) print(response) if response.status_code == 200: return response.json()['run_id'] elif response.status_code == 401: - print("WES server authentication failed") - sys.exit(1) + raise RuntimeError("WES server authentication failed") else: print("Full response status:\n{}".format(response)) - print("WES run submission failed. Response status:{}".format(response.status_code)) - - sys.exit(1) - - - - + print("Full response content:\n{}".format(response.content)) + raise RuntimeError("WES run submission failed. Response status:{}".format(response.status_code)) diff --git a/notebooks/GTEXExample-DNAStack.ipynb b/notebooks/GTEXExample-DNAStack.ipynb new file mode 100644 index 0000000..0bfc23e --- /dev/null +++ b/notebooks/GTEXExample-DNAStack.ipynb @@ -0,0 +1,184 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This notebook computes on the freely available GTEX version 8 files on Amazon AWS using the Seven Bridges Cancer Genomics Cloud WES service. \n", + "\n", + "In this case the DRS ids of the GTEX files are available via a client which acccess a local manifest file. The manifest file is obtained from Anvil as follows.\n", + "https://anvilproject.org/learn/reference/gtex-v8-free-egress-instructions\n", + "\n", + "Rather than use the Gen3 Downloader described in the egress instructions, this example uses the Anvil DRS service to obtain URLs which can be passed to the Seven Bridges CGC WES Service.\n", + "\n", + "The following step demonstrates a query run against that file. " + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "from fasp.runner import FASPRunner\n", + "\n", + "# The implementations we're using\n", + "from fasp.workflow import DNAStackWESClient\n", + "from fasp.loc import anvilDRSClient\n", + "from fasp.search import Gen3ManifestClient\n", + "\n", + "faspRunner = FASPRunner(program='GTEXExample')\n", + "\n", + "# Step 1 - Discovery\n", + "# query for relevant DRS objects\n", + "searchClient = Gen3ManifestClient('../fasp/data/gtex/gtex-cram-manifest.json')\n", + "res = searchClient.runQuery(3)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'access_methods': [{'access_id': 'gs',\n", + " 'access_url': {'url': 'gs://fc-secure-ff8156a3-ddf3-42e4-9211-0fd89da62108/GTEx_Analysis_2017-06-05_v8_WGS_CRAM_files/GTEX-1B98T-0004-SM-7J38T.cram'},\n", + " 'region': '',\n", + " 'type': 'gs'},\n", + " {'access_id': 's3',\n", + " 'access_url': {'url': 's3://AnVIL/GTEx_Analysis_2017-06-05_v8_WGS_CRAM_files/GTEX-1B98T-0004-SM-7J38T.cram'},\n", + " 'region': '',\n", + " 'type': 's3'}],\n", + " 'aliases': [],\n", + " 'checksums': [{'checksum': 'cfd2f4cde4aa3e0cf9f726f0c1255fcd',\n", + " 'type': 'md5'}],\n", + " 'contents': [],\n", + " 'created_time': '2020-07-08T18:53:27.860156',\n", + " 'description': None,\n", + " 'form': 'object',\n", + " 'id': 'dg.ANV0/ed9ac9ae-02da-4e97-93da-ad86aa77d227',\n", + " 'mime_type': 'application/json',\n", + " 'name': 'GTEX-1B98T-0004-SM-7J38T.cram',\n", + " 'self_uri': 'drs://gen3.theanvil.io/dg.ANV0/ed9ac9ae-02da-4e97-93da-ad86aa77d227',\n", + " 'size': 38379493977,\n", + " 'updated_time': '2020-07-08T18:53:27.860163',\n", + " 'version': '64acd5a6'}" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "drsClient = anvilDRSClient('~/.keys/anvil_credentials.json', access_id='gs')\n", + "drsClient.getObject(res[2][1])" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "url = drsClient.getAccessURL(res[2][1])" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Running query\n", + "3\n", + "subject=GTEX-1GTWX-0001-SM-7J3A5.cram, drsID=dg.ANV0/76bb893d-12da-41ca-8828-ff89551d3e15\n", + "workflow submitted, run:2f73492e-d109-4e1a-b2be-9950deae9890\n", + "subject=GTEX-14PQA-0003-SM-7DLH4.cram, drsID=dg.ANV0/66352de8-4b50-4cae-881d-b76d03df5ac8\n", + "workflow submitted, run:6c63ef08-e61b-43a8-ab07-7975a98bdbb8\n", + "subject=GTEX-1B98T-0004-SM-7J38T.cram, drsID=dg.ANV0/ed9ac9ae-02da-4e97-93da-ad86aa77d227\n", + "workflow submitted, run:1d764abd-5c8f-4c89-ad2b-26a43b2025f1\n" + ] + } + ], + "source": [ + "settings = faspRunner.settings\n", + "wesClient = DNAStackWESClient('~/.keys/dnastack_wes_credentials.json')\n", + "\n", + "faspRunner.configure(searchClient, drsClient, wesClient)\n", + "\n", + "runList = faspRunner.runQuery(3, 'Anvil GTEX DRS to DNAStack WES')" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "2f73492e-d109-4e1a-b2be-9950deae9890 RUNNING\n", + "6c63ef08-e61b-43a8-ab07-7975a98bdbb8 RUNNING\n", + "1d764abd-5c8f-4c89-ad2b-26a43b2025f1 RUNNING\n" + ] + } + ], + "source": [ + "for run in runList:\n", + " print(run['run_id'], wesClient.getTaskStatus(run['run_id']))" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[{'subject': 'GTEX-1GTWX-0001-SM-7J3A5.cram', 'run_id': '2f73492e-d109-4e1a-b2be-9950deae9890'}, {'subject': 'GTEX-14PQA-0003-SM-7DLH4.cram', 'run_id': '6c63ef08-e61b-43a8-ab07-7975a98bdbb8'}, {'subject': 'GTEX-1B98T-0004-SM-7J38T.cram', 'run_id': '1d764abd-5c8f-4c89-ad2b-26a43b2025f1'}]\n" + ] + } + ], + "source": [ + "print (runList)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/notebooks/wes/DNAStackWESTour.ipynb b/notebooks/wes/DNAStackWESTour.ipynb index cbe32ea..6db3c76 100644 --- a/notebooks/wes/DNAStackWESTour.ipynb +++ b/notebooks/wes/DNAStackWESTour.ipynb @@ -13,7 +13,7 @@ }, { "cell_type": "code", - "execution_count": 14, + "execution_count": 5, "metadata": {}, "outputs": [], "source": [ @@ -33,7 +33,7 @@ }, { "cell_type": "code", - "execution_count": 16, + "execution_count": 11, "metadata": {}, "outputs": [ { @@ -49,7 +49,8 @@ "source": [ "from fasp.workflow import DNAStackWESClient\n", "wesClient = DNAStackWESClient('~/.keys/dnastack_wes_credentials.json')\n", - "getMinimalRunLog('523abb68-44bd-4040-b7b0-bfc2164f8917')" + "run_id = '523abb68-44bd-4040-b7b0-bfc2164f8917'\n", + "getMinimalRunLog(wesClient, run_id)" ] }, { @@ -61,7 +62,7 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 8, "metadata": {}, "outputs": [ { @@ -76,6 +77,7 @@ } ], "source": [ + "rundetails = wesClient.GetRunLog(run_id)\n", "log = rundetails['task_logs'][0]\n", "for stdx in ['stdout','stderr']:\n", " print('{} is at {}\\n'.format(stdx, log[stdx]))\n" @@ -96,16 +98,16 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "'0fcdbc2e-9990-4d7c-916a-41d363985aeb'" + "'eaf4ff6a-e229-47b9-9e4c-558dbb2c16be'" ] }, - "execution_count": 3, + "execution_count": 9, "metadata": {}, "output_type": "execute_result" } @@ -116,21 +118,21 @@ }, { "cell_type": "code", - "execution_count": 17, + "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "0fcdbc2e-9990-4d7c-916a-41d363985aeb\n", - "COMPLETE\n", - "{'gwas.logistic': 'gs://workspaces-cromwell-execution/gwas/0fcdbc2e-9990-4d7c-916a-41d363985aeb/call-run_gwas/CCDG_13607_B01_GRM_WGS_2019-02-19_chr21.recalibrated_variants.assoc.logistic', 'gwas.manhattan_plot': 'gs://workspaces-cromwell-execution/gwas/0fcdbc2e-9990-4d7c-916a-41d363985aeb/call-create_plot/CCDG_13607_B01_GRM_WGS_2019-02-19_chr21.recalibrated_variants.png'}\n" + "eaf4ff6a-e229-47b9-9e4c-558dbb2c16be\n", + "RUNNING\n", + "{}\n" ] } ], "source": [ - "getMinimalRunLog(wesClient, '0fcdbc2e-9990-4d7c-916a-41d363985aeb')" + "getMinimalRunLog(wesClient, 'eaf4ff6a-e229-47b9-9e4c-558dbb2c16be')" ] }, {