From 9f339cb744ebb87547a9e6fd80362bb04f753466 Mon Sep 17 00:00:00 2001 From: jyothishnt Date: Fri, 9 Aug 2024 12:22:20 +0100 Subject: [PATCH 1/7] Add vep results endpoint --- app/vep/vep_resources.py | 42 +++++++++++++++++++++++++++++++++------- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/app/vep/vep_resources.py b/app/vep/vep_resources.py index 77b93a4..abc1c69 100644 --- a/app/vep/vep_resources.py +++ b/app/vep/vep_resources.py @@ -33,6 +33,7 @@ ) from vep.models.upload_vcf_files import Streamer, MaxBodySizeException from vep.utils.nextflow import launch_workflow, get_workflow_status +from vep.utils.vcf_results import get_results_from_path import json from pydantic import FilePath @@ -95,6 +96,10 @@ async def vep_status(request: Request, submission_id: str): logging.debug(e) return response_error_handler(result={"status": 500}) +def get_vep_results_file_path(input_vcf_file): + input_vcf_path = FilePath(input_vcf_file) + return input_vcf_path.with_name("_VEP").with_suffix(".vcf.gz") + @router.get("/submissions/{submission_id}/download", name="download_results") async def download_results(request: Request, submission_id: str): @@ -105,13 +110,9 @@ async def download_results(request: Request, submission_id: str): ) if submission_status.status == "SUCCEEDED": input_vcf_file = workflow_status["workflow"]["params"]["vcf"] - input_vcf_path = FilePath(input_vcf_file) - results_file_path = input_vcf_path.with_name("_VEP").with_suffix(".vcf.gz") - return FileResponse( - results_file_path, - media_type="application/gzip", - filename=results_file_path.name, - ) + results_file_path = get_vep_results_file_path(input_vcf_file) + + return FileResponse(results_file_path, media_type="application/gzip", filename=results_file_path.name) else: response_msg = { "details": f"A submission with id {submission_id} is not yet finished", @@ -135,3 +136,30 @@ async def download_results(request: Request, submission_id: str): except Exception as e: logging.debug(e) return response_error_handler(result={"status": 500}) + +@router.get("/submissions/{submission_id}/results", name="view_results") +async def fetch_results(request: Request, submission_id: str, page: int, page_size: int): + try: + workflow_status = await get_workflow_status(submission_id) + submission_status = PipelineStatus(submission_id=submission_id, status=workflow_status['workflow']['status']) + if submission_status.status == "SUCCEEDED": + input_vcf_file = workflow_status["workflow"]["params"]["vcf"] + results_file_path = get_vep_results_file_path(input_vcf_file) + return get_results_from_path(vcf_path = results_file_path, page=page, page_size = page_size) + + except HTTPError as http_error: + if http_error.response.status_code in [403,400]: + response_msg = json.dumps( + { + "status_code": status.HTTP_404_NOT_FOUND, + "details": f"A submission with id {submission_id} was not found", + } + ) + return PlainTextResponse( + response_msg, status_code=status.HTTP_404_NOT_FOUND + ) + return response_error_handler(result={"status": http_error.response.status_code}) + except Exception as e: + logging.debug(e) + # return response_error_handler(result={"status": 500}) + return PlainTextResponse(e, status_code=500) From a0c60711205415b4a1d8d76137ee305e848dbd60 Mon Sep 17 00:00:00 2001 From: jyothishnt Date: Mon, 12 Aug 2024 15:51:11 +0100 Subject: [PATCH 2/7] edit error responses --- app/vep/vep_resources.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/app/vep/vep_resources.py b/app/vep/vep_resources.py index abc1c69..ea83005 100644 --- a/app/vep/vep_resources.py +++ b/app/vep/vep_resources.py @@ -155,11 +155,12 @@ async def fetch_results(request: Request, submission_id: str, page: int, page_si "details": f"A submission with id {submission_id} was not found", } ) - return PlainTextResponse( - response_msg, status_code=status.HTTP_404_NOT_FOUND + return JSONResponse( + content=response_msg, status_code=status.HTTP_404_NOT_FOUND ) - return response_error_handler(result={"status": http_error.response.status_code}) + return response_error_handler( + result={"status": http_error.response.status_code} + ) except Exception as e: logging.debug(e) - # return response_error_handler(result={"status": 500}) - return PlainTextResponse(e, status_code=500) + return response_error_handler(result={"status": 500}) From d2fe1a07907ce8c88395b058128debaf2a4d0f6d Mon Sep 17 00:00:00 2001 From: Jon Keatley Date: Tue, 13 Aug 2024 10:07:24 +0100 Subject: [PATCH 3/7] Updated vcf_results to support vcf.gz files and to support paging starting at 1 rather than 0 --- app/tests/test_vep.py | 11 ++++++--- app/vep/utils/vcf_results.py | 47 +++++++++++++++++++++--------------- 2 files changed, 35 insertions(+), 23 deletions(-) diff --git a/app/tests/test_vep.py b/app/tests/test_vep.py index b6a9c50..215588e 100644 --- a/app/tests/test_vep.py +++ b/app/tests/test_vep.py @@ -150,7 +150,7 @@ def test_get_results_from_stream(): assert len(results.variants) == 2 - assert results.metadata.pagination.page == 0 + assert results.metadata.pagination.page == 1 assert results.metadata.pagination.per_page == 100 assert results.metadata.pagination.total == 2 @@ -181,13 +181,16 @@ def test_get_results_from_stream(): def test_get_results_with_file_and_dump(): vcf_path = ( - "/Users/jon/Programming/vep-vcf-results/vep-output-phase1-options-plus-con.vcf" + #"/Users/jon/Programming/vep-vcf-results/vep-output-phase1-options-plus-con.vcf" + "/Users/jon/Programming/ensembl-web-tools-api/test_VEP.vcf.gz" ) - results = get_results_from_path(100, 2, vcf_path) + results = get_results_from_path(100, 1, vcf_path) expected_index = {TARGET_COLUMNS[x]: x for x in range(0, len(TARGET_COLUMNS))} with open("dump.json", "w") as test_dump: test_dump.write(results.json()) - assert len(results.variants) == 100 + #assert results.variants[0].name =="rs1405511870" + assert results.metadata.pagination.total == 1 + assert len(results.variants) == 1 diff --git a/app/vep/utils/vcf_results.py b/app/vep/utils/vcf_results.py index 40a148a..9331872 100644 --- a/app/vep/utils/vcf_results.py +++ b/app/vep/utils/vcf_results.py @@ -2,9 +2,9 @@ object as defined in APISpecification""" import vcfpy -from typing import List, Dict, Any +from typing import List, Dict, Any, IO -from vep.models import vcf_results_model as model +from app.vep.models import vcf_results_model as model TARGET_COLUMNS = [ "Allele", @@ -51,9 +51,8 @@ def _set_allele_type(alt_one_bp: bool, ref_one_bp: bool, ref_alt_equal_bp: bool) def _get_prediction_index_map( - csq_header: str, - target_columns: List[str] = None - ) -> Dict: + csq_header: str, target_columns: List[str] = None +) -> Dict: """Creates a dictionary of column indexes based on the CSQ info description""" if not target_columns: @@ -101,7 +100,7 @@ def _get_alt_allele_details( if len(cons) == 0: cons = [] else: - cons = cons.split('&') + cons = cons.split("&") if csq_values[index_map["Feature_type"]] == "Transcript": is_cononical = ( _get_csq_value(csq_values, "CANONICAL", "NO", index_map) == "YES" @@ -147,9 +146,9 @@ def get_results_from_path( ) -> model.VepResultsResponse: """Helper method that converts a file path to a stream for use with get_results_from_stream""" - # Check file file exists - with open(vcf_path, encoding="utf-8") as vcf_stream: - return get_results_from_stream(page_size, page, vcf_stream) + # Todo check file file exists + vcf_records = vcfpy.Reader.from_path(vcf_path) + return _get_results_from_vcfpy(page_size, page, vcf_records) def get_results_from_stream( @@ -160,6 +159,14 @@ def get_results_from_stream( # Load vcf vcf_records = vcfpy.Reader.from_stream(vcf_stream) + return _get_results_from_vcfpy(page_size, page, vcf_records) + + +def _get_results_from_vcfpy( + page_size: int, page: int, vcf_records: vcfpy.Reader +) -> model.VepResultsResponse: + """Generates a page of VCF data in the format described in + APISpecification.yaml for a given VCFPY reader""" # Parse csq header prediction_index_map = _get_prediction_index_map( @@ -168,7 +175,9 @@ def get_results_from_stream( # handle offset count = 0 - offset = page_size * page + if page < 1: + page = 1 + offset = page_size * (page - 1) # This is very slow. We need to find a better way of handling this. # vcfpy __next__ might be the key as it reads lines @@ -180,7 +189,7 @@ def get_results_from_stream( # build page variants = [] - count = 0 + page_count = 0 # populate page for record in vcf_records: @@ -209,7 +218,9 @@ def get_results_from_stream( model.Variant( name=";".join(record.ID) if len(record.ID) > 0 else ".", location=location, - reference_allele=model.ReferenceVariantAllele(allele_sequence=record.REF), + reference_allele=model.ReferenceVariantAllele( + allele_sequence=record.REF + ), alternative_alleles=alt_alleles, allele_type=_set_allele_type( longest_alt < 2, ref_len < 2, longest_alt == ref_len @@ -217,22 +228,20 @@ def get_results_from_stream( ) ) - count += 1 - if count >= page_size: + page_count += 1 + if page_count >= page_size: break # Also very slow. We could compute this and add it to the VCF header - total = offset + count + total = offset + page_count for _r in vcf_records: total += 1 return model.VepResultsResponse( metadata=model.Metadata( pagination=model.PaginationMetadata( - page=page, - per_page=page_size, - total=total + page=page, per_page=page_size, total=total ) ), - variants=variants + variants=variants, ) From 3288711f6262ff31ed657bd5c6ebe7ec0333795f Mon Sep 17 00:00:00 2001 From: Jon Keatley Date: Tue, 13 Aug 2024 10:18:05 +0100 Subject: [PATCH 4/7] Added paging tests --- app/tests/test_vep.py | 56 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/app/tests/test_vep.py b/app/tests/test_vep.py index 215588e..805d1f1 100644 --- a/app/tests/test_vep.py +++ b/app/tests/test_vep.py @@ -38,6 +38,33 @@ """ +TEST_PAGING_VCF = f"""##fileformat=VCFv4.2 +##fileDate=20160824 +##INFO= +#CHROM POS ID REF ALT QUAL FILTER INFO +chr19 82664 id_01 C T 50 PASS CSQ={CSQ_1} +chr19 82829 id_02 T A 50 PASS CSQ={CSQ_2} +chr19 82829 id_03 T A 50 PASS CSQ={CSQ_2} +chr19 82829 id_04 T A 50 PASS CSQ={CSQ_2} +chr19 82829 id_05 T A 50 PASS CSQ={CSQ_2} +chr19 82829 id_06 T A 50 PASS CSQ={CSQ_2} +chr19 82829 id_07 T A 50 PASS CSQ={CSQ_2} +chr19 82829 id_08 T A 50 PASS CSQ={CSQ_2} +chr19 82829 id_09 T A 50 PASS CSQ={CSQ_2} +chr19 82829 id_10 T A 50 PASS CSQ={CSQ_2} +chr19 82829 id_11 T A 50 PASS CSQ={CSQ_2} +chr19 82829 id_12 T A 50 PASS CSQ={CSQ_2} +chr19 82829 id_13 T A 50 PASS CSQ={CSQ_2} +chr19 82829 id_14 T A 50 PASS CSQ={CSQ_2} +chr19 82829 id_15 T A 50 PASS CSQ={CSQ_2} +chr19 82829 id_16 T A 50 PASS CSQ={CSQ_2} +chr19 82829 id_17 T A 50 PASS CSQ={CSQ_2} +chr19 82829 id_18 T A 50 PASS CSQ={CSQ_2} +chr19 82829 id_19 T A 50 PASS CSQ={CSQ_2} +chr19 82829 id_20 T A 50 PASS CSQ={CSQ_2} +chr19 82829 id_21 T A 50 PASS CSQ={CSQ_2} +""" + def test_get_prediction_index_map(): @@ -141,8 +168,7 @@ def test_get_alt_allele_details_intergenic(): results.predicted_molecular_consequences[0].consequences[0] == "intergenic_variant" ) - - + def test_get_results_from_stream(): results = get_results_from_stream(100, 0, StringIO(TEST_VCF)) @@ -176,6 +202,32 @@ def test_get_results_from_stream(): == None ) +def test_paging(): + results = get_results_from_stream(5, 1, StringIO(TEST_PAGING_VCF)) + + assert(results.metadata.pagination.page == 1) + assert(results.metadata.pagination.per_page == 5) + assert(results.metadata.pagination.total == 21) + + assert(results.variants[0].name == "id_01") + assert(results.variants[-1].name == "id_05") + + results = get_results_from_stream(5, 2, StringIO(TEST_PAGING_VCF)) + assert(results.variants[0].name == "id_06") + assert(results.variants[-1].name == "id_10") + + results = get_results_from_stream(5, 3, StringIO(TEST_PAGING_VCF)) + assert(results.variants[0].name == "id_11") + assert(results.variants[-1].name == "id_15") + + results = get_results_from_stream(5, 4, StringIO(TEST_PAGING_VCF)) + assert(results.variants[0].name == "id_16") + assert(results.variants[-1].name == "id_20") + + results = get_results_from_stream(5, 5, StringIO(TEST_PAGING_VCF)) + assert(results.variants[0].name == "id_21") + assert(len(results.variants) == 1) + @pytest.mark.skip(reason="Used to test against a real VCF file") def test_get_results_with_file_and_dump(): From f1b208239c0cf7ae615e86d9c46f188c2c1b2f40 Mon Sep 17 00:00:00 2001 From: Jon Keatley Date: Tue, 13 Aug 2024 10:33:05 +0100 Subject: [PATCH 5/7] Added negative test for paging --- app/tests/test_vep.py | 5 +++++ app/vep/utils/vcf_results.py | 11 +++++++++++ 2 files changed, 16 insertions(+) diff --git a/app/tests/test_vep.py b/app/tests/test_vep.py index 805d1f1..0b84f66 100644 --- a/app/tests/test_vep.py +++ b/app/tests/test_vep.py @@ -228,6 +228,11 @@ def test_paging(): assert(results.variants[0].name == "id_21") assert(len(results.variants) == 1) +def test_negative_paging(): + results = get_results_from_stream(5, 6, StringIO(TEST_PAGING_VCF)) + assert(len(results.variants) == 0) + assert(results.metadata.pagination.total == 21) + @pytest.mark.skip(reason="Used to test against a real VCF file") def test_get_results_with_file_and_dump(): diff --git a/app/vep/utils/vcf_results.py b/app/vep/utils/vcf_results.py index 9331872..c313506 100644 --- a/app/vep/utils/vcf_results.py +++ b/app/vep/utils/vcf_results.py @@ -186,6 +186,17 @@ def _get_results_from_vcfpy( count += 1 if count >= offset: break + + #user asked for a page out of range + if offset > count: + return model.VepResultsResponse( + metadata=model.Metadata( + pagination=model.PaginationMetadata( + page=page, per_page=page_size, total=count + ) + ), + variants=[], + ) # build page variants = [] From 11736b7a98a99f23e181aafbadecc14750c8957e Mon Sep 17 00:00:00 2001 From: jyothishnt Date: Tue, 13 Aug 2024 14:53:01 +0100 Subject: [PATCH 6/7] param rename to per_page as per API spec --- app/vep/vep_resources.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/app/vep/vep_resources.py b/app/vep/vep_resources.py index ea83005..f67eb8a 100644 --- a/app/vep/vep_resources.py +++ b/app/vep/vep_resources.py @@ -21,6 +21,7 @@ from requests import HTTPError from starlette.responses import JSONResponse, PlainTextResponse, FileResponse from fastapi import Request, status, APIRouter +from enum import Enum from core.error_response import response_error_handler from core.logging import InterceptHandler @@ -41,6 +42,12 @@ router = APIRouter() +class VepStatus(str, Enum): + submitted = "SUBMITTED" + running = "RUNNING" + succeeded = "SUCCEEDED" + failed = "FAILED" + cancelled = "CANCELLED" @router.post("/submissions", name="submit_vep") async def submit_vep(request: Request): @@ -108,7 +115,7 @@ async def download_results(request: Request, submission_id: str): submission_status = PipelineStatus( submission_id=submission_id, status=workflow_status["workflow"]["status"] ) - if submission_status.status == "SUCCEEDED": + if submission_status.status == VepStatus.succeeded: input_vcf_file = workflow_status["workflow"]["params"]["vcf"] results_file_path = get_vep_results_file_path(input_vcf_file) @@ -138,14 +145,14 @@ async def download_results(request: Request, submission_id: str): return response_error_handler(result={"status": 500}) @router.get("/submissions/{submission_id}/results", name="view_results") -async def fetch_results(request: Request, submission_id: str, page: int, page_size: int): +async def fetch_results(request: Request, submission_id: str, page: int, per_page: int): try: workflow_status = await get_workflow_status(submission_id) submission_status = PipelineStatus(submission_id=submission_id, status=workflow_status['workflow']['status']) - if submission_status.status == "SUCCEEDED": + if submission_status.status == VepStatus.succeeded: input_vcf_file = workflow_status["workflow"]["params"]["vcf"] results_file_path = get_vep_results_file_path(input_vcf_file) - return get_results_from_path(vcf_path = results_file_path, page=page, page_size = page_size) + return get_results_from_path(vcf_path = results_file_path, page=page, page_size = per_page) except HTTPError as http_error: if http_error.response.status_code in [403,400]: From 3f99a37539d44af30de6718782c642dad77d9c9e Mon Sep 17 00:00:00 2001 From: jyothishnt Date: Wed, 14 Aug 2024 11:31:48 +0100 Subject: [PATCH 7/7] fix vcf file path --- app/vep/vep_resources.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/app/vep/vep_resources.py b/app/vep/vep_resources.py index f67eb8a..6ea75f7 100644 --- a/app/vep/vep_resources.py +++ b/app/vep/vep_resources.py @@ -105,7 +105,7 @@ async def vep_status(request: Request, submission_id: str): def get_vep_results_file_path(input_vcf_file): input_vcf_path = FilePath(input_vcf_file) - return input_vcf_path.with_name("_VEP").with_suffix(".vcf.gz") + return input_vcf_path.with_name(input_vcf_path.stem + "_VEP").with_suffix(".vcf.gz") @router.get("/submissions/{submission_id}/download", name="download_results") @@ -113,7 +113,7 @@ async def download_results(request: Request, submission_id: str): try: workflow_status = await get_workflow_status(submission_id) submission_status = PipelineStatus( - submission_id=submission_id, status=workflow_status["workflow"]["status"] + submission_id=submission_id, status=workflow_status ) if submission_status.status == VepStatus.succeeded: input_vcf_file = workflow_status["workflow"]["params"]["vcf"] @@ -148,7 +148,7 @@ async def download_results(request: Request, submission_id: str): async def fetch_results(request: Request, submission_id: str, page: int, per_page: int): try: workflow_status = await get_workflow_status(submission_id) - submission_status = PipelineStatus(submission_id=submission_id, status=workflow_status['workflow']['status']) + submission_status = PipelineStatus(submission_id=submission_id, status=workflow_status) if submission_status.status == VepStatus.succeeded: input_vcf_file = workflow_status["workflow"]["params"]["vcf"] results_file_path = get_vep_results_file_path(input_vcf_file)