Skip to content

Commit

Permalink
switching from bmt-lite to bmt, updating tests for new biolink model,…
Browse files Browse the repository at this point in the history
… improving qualifier query validation
  • Loading branch information
EvanDietzMorris committed Mar 7, 2024
1 parent 3213fec commit 0b00d6e
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 48 deletions.
10 changes: 10 additions & 0 deletions reasoner_transpiler/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,13 @@
class InvalidPredicateError(Exception):
def __init__(self, error_message: str):
super().__init__(error_message)


class InvalidQualifierError(Exception):
def __init__(self, error_message: str):
super().__init__(error_message)


class InvalidQualifierValueError(Exception):
def __init__(self, error_message: str):
super().__init__(error_message)
35 changes: 25 additions & 10 deletions reasoner_transpiler/matching.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@

from bmt import Toolkit

from .exceptions import InvalidPredicateError
from .exceptions import InvalidPredicateError, InvalidQualifierError, InvalidQualifierValueError
from .nesting import Query
from .util import ensure_list, snake_case, space_case, pascal_case

bmt = Toolkit()

ALL_BIOLINK_ENUMS = bmt.view.all_enums().keys()

def cypher_prop_string(value):
"""Convert property value to cypher string representation."""
Expand Down Expand Up @@ -291,15 +291,30 @@ def __qualifier_filters(self, edge , edge_id):
# also handle "qualifier_set": [{ }]
if not constraint_filter:
continue
all_enums = bmt.get_all_enums()
qualifiers_values = []
for enum in all_enums:
to_append = bmt.get_enum_value_descendants(enum, constraint_filter['qualifier_value'])
if to_append:
qualifiers_values += to_append

qualifier_type = constraint_filter['qualifier_type_id']
queried_qualifier_value = constraint_filter['qualifier_value']

if not bmt.is_qualifier(qualifier_type):
raise InvalidQualifierError(f'Invalid qualifier in query: {qualifier_type}')

# we should do something like this but it does not work without knowing the association type of the edge
# if not bmt.validate_qualifier(qualifier_type_id=qualifier_type, qualifier_value=queried_qualifier_value):
# raise InvalidQualifierError(f'Invalid qualifier requested, {qualifier_type}:{queried_qualifier_value}')

qualifier_value_plus_descendants = [queried_qualifier_value]
if qualifier_type != 'qualified_predicate': # qualified_predicate doesn't have an enum as values so the following does not apply
permissible_value = False
for enum_for_qualifier_values in ALL_BIOLINK_ENUMS:
if bmt.is_permissible_value_of_enum(enum_name=enum_for_qualifier_values, value=queried_qualifier_value):
permissible_value = True
qualifier_value_plus_descendants += bmt.get_permissible_value_descendants(permissible_value=queried_qualifier_value,
enum_name=enum_for_qualifier_values)
if not permissible_value:
raise InvalidQualifierValueError(f'Invalid value for qualifier {qualifier_type} in query: {queried_qualifier_value}')

# Join qualifier value hierarchy with an or
qualifier_where_condition = " ( "+ " OR ".join([f"`{edge_id}`.{constraint_filter['qualifier_type_id']} = {cypher_prop_string(qualifier_value)}"
for qualifier_value in set(qualifiers_values)]) + " ) "
qualifier_where_condition = " ( " + " OR ".join([f"`{edge_id}`.{qualifier_type} = {cypher_prop_string(qualifier_value)}" for qualifier_value in set(qualifier_value_plus_descendants)]) + " ) "
ands.append(qualifier_where_condition)
# if qualifier set is empty ; loop to the next
if not len(ands):
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
bmt-lite-v3.6.0==2.3.0
bmt==1.2.1
2 changes: 1 addition & 1 deletion tests/neo4j_csv/edges.csv
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ carcinoma_associated_with_CASP8,MONDO:0004993,biolink:genetically_associated_wit
carcinoma_associated_with_BRCA1,NCBIGene:672,biolink:gene_associated_with_condition,MONDO:0004993,"{}"
t2d_invalid_predicate_albuminaria,MONDO:0005148,biolink:invalid_predicate,HP:0012592,"{}"
qualified_edge_single_qualifier,PUBCHEM.COMPOUND:5460341,biolink:affects,NCBIGene:283871,"{\"qualified_predicate\": \"biolink:causes\"}"
qualified_edge_multiple_qualifier,PUBCHEM.COMPOUND:5460341,biolink:affects,NCBIGene:283871,"{\"qualified_predicate\": \"biolink:causes\",\"object_aspect\": \"activity\",\"object_direction\": \"decreased\",\"biolink:primary_knowledge_source\": \"infores:ctd\"}}"
qualified_edge_multiple_qualifier,PUBCHEM.COMPOUND:5460341,biolink:affects,NCBIGene:283871,"{\"qualified_predicate\": \"biolink:causes\",\"object_aspect_qualifier\": \"activity\",\"object_direction_qualifier\": \"decreased\",\"biolink:primary_knowledge_source\": \"infores:ctd\"}}"
60 changes: 34 additions & 26 deletions tests/test_edge_reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,42 +24,46 @@ def test_directed_canonical():
edge= {"subject": "s", "object": "o", "predicates": "biolink:affects"}
ref = EdgeReference("e0", edge, invert=True)
preds = ref.label.split('|')
assert len(preds) == 11
assert len(preds) == 10
assert ref.directed
assert len(ref.filters) == 0
assert not ref.cypher_invert

def test_noncanonical():
"""If we send in a non-canonical (and by definition directed) query, we expect that we'll have a directed
query with the reversed edge, the canonical predicate (and sub-predicates), no where clause."""
edge= {"subject": "s", "object": "o", "predicates": "biolink:is_ameliorated_by"}
edge= {"subject": "s", "object": "o", "predicates": "biolink:affected_by"}
ref = EdgeReference("e0", edge, invert=True)
preds = ref.label.split('|')
assert len(preds) == 2
assert "`biolink:treats`" in preds
assert "`biolink:ameliorates`" in preds
assert len(preds) == 10
assert "`biolink:has_adverse_event`" in preds
assert "`biolink:has_side_effect`" in preds
assert ref.directed
assert len(ref.filters) == 0
#assert that the edge was reversed
assert ref.cypher_invert

def test_multiple_canonical():
"""Make sure that the canonical logic is applied when there are multiple (canonical) predicates"""
edge= {"subject": "s", "object": "o", "predicates": ["biolink:ameliorates","biolink:affects_risk_for"]}
edge= {"subject": "s", "object": "o", "predicates": ["biolink:ameliorates_condition", "biolink:affects"]}
ref = EdgeReference("e0", edge, invert=True)
preds = ref.label.split('|')
assert len(preds) == 5 # ameliorates, treats, affects_risk for, predisposes, prevents
# affects, affects_response_to, ameliorates_condition, decreases_response_to, disrupts, exacerbates_condition,
# has_adverse_event, has_side_effect, increases_response_to, regulates
assert len(preds) == 10
assert ref.directed
assert len(ref.filters) == 0
assert not ref.cypher_invert

def test_multiple_noncanonical():
"""Make sure that the canonical logic is applied when there are multiple (noncanonical) predicates"""
edge = {"subject": "s", "object": "o", "predicates": ["biolink:is_ameliorated_by", "biolink:risk_affected_by"]}
edge = {"subject": "s", "object": "o", "predicates": ["biolink:condition_ameliorated_by",
"biolink:likelihood_affected_by"]}
ref = EdgeReference("e0", edge, invert=True)
preds = ref.label.split('|')
expected_preds = [ f"`biolink:{x}`" for x in ["ameliorates", "treats", "affects_risk_for", "predisposes", "prevents"]]
assert set(preds) == set(expected_preds) #order doesn't matter
expected_preds = [ f"`biolink:{x}`" for x in ["ameliorates_condition", "predisposes_to_condition",
"preventative_for_condition", "affects_likelihood_of"]]
assert set(preds) == set(expected_preds) # order doesn't matter
assert ref.directed
assert len(ref.filters) == 0
assert ref.cypher_invert
Expand All @@ -84,20 +88,24 @@ def parse_filter(filter):
return parsed_filter

def test_multiple_conflicting():
"""Suppose that there are two predicates, one canonical, one not. e.g. ameliorates(canonical) and risk_affected_by
(non canonical). In this case, we expect a non-directed cypher edge with all canonical predicates, and
a where clause separating out the ones going left to right from the ones going right to left."""
edge = {"subject": "s", "object": "o", "predicates": ["biolink:ameliorates", "biolink:risk_affected_by"]}
"""Suppose that there are two predicates, one canonical, one not. e.g. ameliorates_condition(canonical) and
condition_predisposed_by(non canonical). In this case, we expect a non-directed cypher edge with all canonical
predicates, and a where clause separating out the ones going left to right from the ones going right to left."""
edge = {"subject": "s", "object": "o", "predicates": ["biolink:ameliorates_condition",
"biolink:likelihood_affected_by"]}
ref = EdgeReference("e0", edge, invert=True)
preds = ref.label.split('|')
#Make sure that the label contains only canonical predicates
expected_preds = [ f"`biolink:{x}`" for x in ["ameliorates", "treats", "affects_risk_for", "predisposes", "prevents"]]
expected_preds = [ f"`biolink:{x}`" for x in ["ameliorates_condition", "predisposes_to_condition",
"preventative_for_condition", "affects_likelihood_of"]]
assert set(preds) == set(expected_preds) #order doesn't matter
assert not ref.directed
#filters should look like:
#['(type(`e0`) in ["biolink:treats", "biolink:ameliorates"] AND startNode(`e0`) = `s`) OR (type(`e0`) in ["biolink:predisposes", "biolink:prevents", "biolink:affects_risk_for"] AND startNode(`e0`) = `o`)']
expected_filter = { "s": set(["biolink:treats", "biolink:ameliorates"]),
"o": set(["biolink:affects_risk_for", "biolink:predisposes", "biolink:prevents"])}
expected_filter = {"s": set(["biolink:ameliorates_condition"]),
"o": set(["biolink:predisposes_to_condition",
"biolink:preventative_for_condition",
"biolink:affects_likelihood_of"])}
assert len(ref.filters) == 1
parsed_filter = parse_filter(ref.filters[0])
assert parsed_filter == expected_filter
Expand All @@ -107,18 +115,18 @@ def test_symmetric_canonical():
"""Two predicates. One symmetric, one canonical/directed. We would expect a non-directed, non-inverted cypher
And a where clause. the canonical should be in one of the subclauses, and all subclasses of the symmetric
(including any canonical/directed subclasses) should be in both."""
edge = {"subject": "s", "object": "o", "predicates": ["biolink:correlated_with", "biolink:ameliorates"]}
edge = {"subject": "s", "object": "o", "predicates": ["biolink:correlated_with", "biolink:affects_likelihood_of"]}
ref = EdgeReference("e0", edge, invert=True)
preds = ref.label.split('|')
# Make sure that the label contains only canonical predicates
correlated_sub_preds = ["correlated_with", "biomarker_for", "coexpressed_with", "negatively_correlated_with",
"positively_correlated_with", "occurs_together_in_literature_with"]
ameliorates_sub_preds = ["ameliorates", "treats"]
expected_preds = [f"`biolink:{x}`" for x in correlated_sub_preds + ameliorates_sub_preds ]
likelihood_of_sub_preds = ["predisposes_to_condition", "preventative_for_condition", "affects_likelihood_of"]
expected_preds = [f"`biolink:{x}`" for x in correlated_sub_preds + likelihood_of_sub_preds]
assert set(preds) == set(expected_preds) # order doesn't matter
assert not ref.directed
expected_filter = {"s": set([f"biolink:{x}" for x in correlated_sub_preds + ameliorates_sub_preds]),
"o": set([f"biolink:{x}" for x in correlated_sub_preds ])}
expected_filter = {"s": set([f"biolink:{x}" for x in correlated_sub_preds + likelihood_of_sub_preds]),
"o": set([f"biolink:{x}" for x in correlated_sub_preds])}
assert len(ref.filters) == 1
parsed_filter = parse_filter(ref.filters[0])
assert parsed_filter == expected_filter
Expand All @@ -129,18 +137,18 @@ def test_symmetric_noncanonical():
And a where clause. the canonical should be in one of the subclauses, and all subclasses of the symmetric
(including any canonical/directed subclasses) should be in both. Note that the ameliorates subpredicates
have the object as the starting node in this case, unlike the canonical case above"""
edge = {"subject": "s", "object": "o", "predicates": ["biolink:correlated_with", "biolink:is_ameliorated_by"]}
edge = {"subject": "s", "object": "o", "predicates": ["biolink:correlated_with", "biolink:likelihood_affected_by"]}
ref = EdgeReference("e0", edge, invert=True)
preds = ref.label.split('|')
# Make sure that the label contains only canonical predicates
correlated_sub_preds = ["correlated_with", "biomarker_for", "coexpressed_with", "negatively_correlated_with",
"positively_correlated_with", "occurs_together_in_literature_with"]
ameliorates_sub_preds = ["ameliorates", "treats"]
expected_preds = [f"`biolink:{x}`" for x in correlated_sub_preds + ameliorates_sub_preds ]
ameliorates_sub_preds = ["predisposes_to_condition", "preventative_for_condition", "affects_likelihood_of"]
expected_preds = [f"`biolink:{x}`" for x in correlated_sub_preds + ameliorates_sub_preds]
assert set(preds) == set(expected_preds) # order doesn't matter
assert not ref.directed
expected_filter = {"o": set([f"biolink:{x}" for x in correlated_sub_preds + ameliorates_sub_preds]),
"s": set([f"biolink:{x}" for x in correlated_sub_preds ])}
"s": set([f"biolink:{x}" for x in correlated_sub_preds])}
assert len(ref.filters) == 1
parsed_filter = parse_filter(ref.filters[0])
assert parsed_filter == expected_filter
Expand Down
63 changes: 62 additions & 1 deletion tests/test_invalid.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pytest

from reasoner_transpiler.cypher import get_query
from reasoner_transpiler.exceptions import InvalidPredicateError
from reasoner_transpiler.exceptions import InvalidPredicateError, InvalidQualifierError, InvalidQualifierValueError
from .fixtures import fixture_database


Expand Down Expand Up @@ -117,3 +117,64 @@ def test_invalid_predicate():
}
with pytest.raises(InvalidPredicateError):
query = get_query(qgraph)

def test_invalid_qualifier():
"""Test that an invalid edge qualifier throws an error."""
qgraph = {
"nodes": {
"n0": {},
"n1": {
"ids": "NCBIGene:283871"
},
},
"edges": {
"e0": {
"subject": "n0",
"object": "n1",
"predicates": ["biolink:affects"],
"qualifier_constraints": [
{
"qualifier_set": [
{
"qualifier_type_id": "bogus_qualifier_1",
"qualifier_value": "abundance"
},
]
},
]
},
},
}
with pytest.raises(InvalidQualifierError):
query = get_query(qgraph)


def test_invalid_qualifier_value():
"""Test that an invalid edge qualifier value throws an error."""
qgraph = {
"nodes": {
"n0": {},
"n1": {
"ids": "NCBIGene:283871"
},
},
"edges": {
"e0": {
"subject": "n0",
"object": "n1",
"predicates": ["biolink:affects"],
"qualifier_constraints": [
{
"qualifier_set": [
{
"qualifier_type_id": "object_aspect_qualifier",
"qualifier_value": "bogus_value"
},
]
},
]
},
},
}
with pytest.raises(InvalidQualifierValueError):
query = get_query(qgraph)
17 changes: 8 additions & 9 deletions tests/test_qualifier_constraints.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def test_single_qualifier(database):
"qualifier_type_id": "qualified_predicate",
"qualifier_value": "biolink:causes"
}, {
"qualifier_type_id": "object_aspect",
"qualifier_type_id": "object_aspect_qualifier",
"qualifier_value": "activity"
}
]
Expand Down Expand Up @@ -60,7 +60,7 @@ def test_multi_qualifier(database):
{
"qualifier_set": [
{
"qualifier_type_id": "object_aspect",
"qualifier_type_id": "object_aspect_qualifier",
"qualifier_value": "activity"
},
]
Expand All @@ -84,8 +84,7 @@ def test_multi_qualifier(database):
assert len(record["results"][0]["analyses"][0]["edge_bindings"]["e10a"]) == 2


# skipping this test for now will need to make them once qualifier heirarchy is supported
def test_qualifier_heirarchy(database):
def test_qualifier_hierarchy(database):
"""Test if edges satifying all constraints are returned"""
qgraph = {
"nodes": {
Expand All @@ -103,7 +102,7 @@ def test_qualifier_heirarchy(database):
{
"qualifier_set": [
{
"qualifier_type_id": "object_aspect",
"qualifier_type_id": "object_aspect_qualifier",
"qualifier_value": "activity_or_abundance"
},
]
Expand All @@ -120,8 +119,8 @@ def test_qualifier_heirarchy(database):
assert len(record["results"][0]["analyses"][0]["edge_bindings"]["e10a"]) == 1


def test_phony_qualifier_value(database):
"""Test if edges satifying all constraints are returned"""
def test_incorrect_qualifier_value(database):
"""Test if an incorrect qualifier returns no result"""
qgraph = {
"nodes": {
"n0": {},
Expand All @@ -138,8 +137,8 @@ def test_phony_qualifier_value(database):
{
"qualifier_set": [
{
"qualifier_type_id": "object_aspect",
"qualifier_value": "some_non_existent"
"qualifier_type_id": "object_aspect_qualifier",
"qualifier_value": "abundance"
},
]
},
Expand Down

0 comments on commit 0b00d6e

Please sign in to comment.