From 222226db76183add8b3038fc25ddfdf026d3066a Mon Sep 17 00:00:00 2001 From: Chris Bizon Date: Thu, 11 Jul 2024 09:54:09 -0400 Subject: [PATCH 1/4] bumped version first --- openapi-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openapi-config.yaml b/openapi-config.yaml index da21132..136b755 100644 --- a/openapi-config.yaml +++ b/openapi-config.yaml @@ -11,7 +11,7 @@ servers: # url: http://127.0.0.1:5000 termsOfService: http://robokop.renci.org:7055/tos?service_long=ARAGORN&provider_long=RENCI title: ARAGORN -version: 2.7.5 +version: 2.8.0 tags: - name: translator - name: ARA From ff0b32e2da0775850be77f8a3a68c0681404afa1 Mon Sep 17 00:00:00 2001 From: Chris Bizon Date: Sat, 13 Jul 2024 16:20:26 -0400 Subject: [PATCH 2/4] added two rules for mcq --- src/results_cache.py | 15 ++++-- src/rules/MCQ.json | 100 ++++++++++++++++++++++++++++++++++++++ src/service_aggregator.py | 41 ++++++++++------ 3 files changed, 135 insertions(+), 21 deletions(-) create mode 100644 src/rules/MCQ.json diff --git a/src/results_cache.py b/src/results_cache.py index b2b3374..379a5b1 100644 --- a/src/results_cache.py +++ b/src/results_cache.py @@ -33,13 +33,18 @@ def __init__( password=redis_password, ) - def get_query_key(self, input_id, predicate, qualifiers, source_input, caller, workflow): + def get_query_key(self, input_id, predicate, qualifiers, source_input, caller, workflow, mcq, member_ids): keydict = {'predicate': predicate, 'source_input': source_input, 'input_id': input_id, 'caller': caller, 'workflow': workflow} keydict.update(qualifiers) + if mcq: + #because we already have a bunch of keys without mcq, we only want to add these if we are doing the new mcq. + member_ids.sort() + keydict['mcq'] = True + keydict['member_ids'] = member_ids return json.dumps(keydict, sort_keys=True) - def get_result(self, input_id, predicate, qualifiers, source_input, caller, workflow): - key = self.get_query_key(input_id, predicate, qualifiers, source_input, caller, workflow) + def get_result(self, input_id, predicate, qualifiers, source_input, caller, workflow, mcq, member_ids): + key = self.get_query_key(input_id, predicate, qualifiers, source_input, caller, workflow, mcq, member_ids) try: result = self.creative_redis.get(key) if result is not None: @@ -51,8 +56,8 @@ def get_result(self, input_id, predicate, qualifiers, source_input, caller, work return result - def set_result(self, input_id, predicate, qualifiers, source_input, caller, workflow, final_answer): - key = self.get_query_key(input_id, predicate, qualifiers, source_input, caller, workflow) + def set_result(self, input_id, predicate, qualifiers, source_input, caller, workflow, mcq, member_ids, final_answer): + key = self.get_query_key(input_id, predicate, qualifiers, source_input, caller, workflow, mcq, member_ids) try: self.creative_redis.set(key, gzip.compress(json.dumps(final_answer).encode())) diff --git a/src/rules/MCQ.json b/src/rules/MCQ.json new file mode 100644 index 0000000..22ba191 --- /dev/null +++ b/src/rules/MCQ.json @@ -0,0 +1,100 @@ +{ + "{\"mcq\": true, \"predicate\": \"biolink:genetically_associated_with\"}": [ + { + "Rule": "?a phenotype of ?b is genetically associated with ?c => ?a genetically associated with ?c", + "template": { + "query_graph": { + "nodes": { + "$source": { + "ids": [ + "$source_id" + ], + "categories": [ + "biolink:PhenotypicFeature" + ], + "set_interpretation": "MANY" + }, + "$target": { + "ids": [ + "$target_id" + ], + "categories": [ + "biolink:Gene" + ] + }, + "b": { + "categories": [ + "biolink:Disease" + ], + "set_interpretation": "MANY" + } + }, + "edges": { + "edge_0": { + "subject": "g", + "object": "$source", + "predicates": [ + "biolink:has_phenotype" + ] + }, + "edge_1": { + "subject": "g", + "object": "$target", + "predicates": [ + "biolink:genetically_associated_with" + ] + } + } + } + } + }, + { + "Rule": "?a contributed to by ?b affects ?c => ?a genetically associated with ?c", + "template": { + "query_graph": { + "nodes": { + "$source": { + "ids": [ + "$source_id" + ], + "categories": [ + "biolink:PhenotypicFeature" + ], + "set_interpretation": "MANY" + }, + "$target": { + "ids": [ + "$target_id" + ], + "categories": [ + "biolink:Gene" + ] + }, + "b": { + "categories": [ + "biolink:ChemicalEntity" + ], + "set_interpretation": "MANY" + } + }, + "edges": { + "edge_0": { + "subject": "g", + "object": "$source", + "predicates": [ + "biolink:contributes_to" + ] + }, + "edge_1": { + "subject": "g", + "object": "$target", + "predicates": [ + "biolink:affects" + ] + } + } + } + } + } + ] +} \ No newline at end of file diff --git a/src/service_aggregator.py b/src/service_aggregator.py index 25a7dd5..cd24c05 100644 --- a/src/service_aggregator.py +++ b/src/service_aggregator.py @@ -27,21 +27,19 @@ DUMPTRUCK = False -#from src.rules.rules import rules as AMIE_EXPANSIONS - logger = logging.getLogger(__name__) # declare the directory where the async data files will exist queue_file_dir = "./queue-files" -#Load in the AMIE rules. I'm not sure how this works wrt startup and workers. +#Load in the AMIE rules. thisdir = os.path.dirname(__file__) -#Temporarily point to a typed rules file. In the future, we will get types in the basic rules and use the config -# to generate "rules.json" in the "rules" directory. -#rulefile = os.path.join(thisdir,"rules","rules.json") -rulefile = os.path.join(thisdir,"rules","kara_typed_rules","rules_with_types_cleaned_finalized.json") -with open(rulefile,'r') as inf: - AMIE_EXPANSIONS = json.load(inf) +rulefiles = [os.path.join(thisdir,"rules","kara_typed_rules","rules_with_types_cleaned_finalized.json")] +rulefiles.append( os.path.join(thisdir, "rules", "MCQ.json")) +AMIE_EXPANSIONS = {} +for rulefile in rulefiles: + with open(rulefile,'r') as inf: + AMIE_EXPANSIONS.update(json.load(inf)) def examine_query(message): """Decides whether the input is an infer. Returns the grouping node""" @@ -86,7 +84,7 @@ def match_results_to_query(results, query_message, query_source, query_target, q # rewrite the results to match the query. #First, get the source, target, and qedge id's from the results - _, _, _, results_source, _, results_target, results_qedge_id = get_infer_parameters(results) + _, _, _, results_source, _, results_target, results_qedge_id, _, _ = get_infer_parameters(results) #Now replace the results query graph with the input query graph results["message"]["query_graph"] = query_message["message"]["query_graph"] @@ -184,9 +182,9 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int): if infer: # We're going to cache infer queries, and we need to do that even if we're overriding the cache # because we need these values to post to the cache at the end. - input_id, predicate, qualifiers, source, source_input, target, qedge_id = get_infer_parameters(message) + input_id, predicate, qualifiers, source, source_input, target, qedge_id, mcq, member_ids = get_infer_parameters(message) if read_from_cache: - results = results_cache.get_result(input_id, predicate, qualifiers, source_input, caller, workflow_def) + results = results_cache.get_result(input_id, predicate, qualifiers, source_input, caller, workflow_def, mcq, member_ids) if results is not None: logger.info(f"{guid}: Returning results cache lookup") # The results can't go verbatim. While the essense of the query is the same as the cached result, @@ -857,28 +855,39 @@ def get_infer_parameters(input_message): if ("ids" in input_message["message"]["query_graph"]["nodes"][source]) \ and (input_message["message"]["query_graph"]["nodes"][source]["ids"] is not None): input_id = input_message["message"]["query_graph"]["nodes"][source]["ids"][0] + member_ids = input_message["message"]["query_graph"]["nodes"][source].get("member_ids",[]) source_input = True else: input_id = input_message["message"]["query_graph"]["nodes"][target]["ids"][0] + member_ids = input_message["message"]["query_graph"]["nodes"][target].get("member_ids",[]) source_input = False + mcq = False + if ("set_interpretation" in input_message["message"]["query_graph"] and + input_message["message"]["query_graph"]["set_interpretation"] == "MANY"): + mcq = True #key = get_key(predicate, qualifiers) - return input_id, predicate, qualifiers, source, source_input, target, query_edge + return input_id, predicate, qualifiers, source, source_input, target, query_edge, mcq, member_ids -def get_rule_key(predicate, qualifiers): +def get_rule_key(predicate, qualifiers, mcq): keydict = {'predicate': predicate} keydict.update(qualifiers) + if mcq: + keydict["mcq"] = True return json.dumps(keydict,sort_keys=True) def expand_query(input_message, params, guid): #Contract: 1. there is a single edge in the query graph 2. The edge is marked inferred. 3. Either the source # or the target has IDs, but not both. 4. The number of ids on the query node is 1. - input_id, predicate, qualifiers, source, source_input, target, qedge_id = get_infer_parameters(input_message) - key = get_rule_key(predicate, qualifiers) + input_id, predicate, qualifiers, source, source_input, target, qedge_id, mcq, member_ids = get_infer_parameters(input_message) + key = get_rule_key(predicate, qualifiers, mcq) #We want to run the non-inferred version of the query as well qg = deepcopy(input_message["message"]["query_graph"]) for eid,edge in qg["edges"].items(): del edge["knowledge_type"] messages = [{"message": {"query_graph":qg}, "parameters": input_message.get("parameters") or {}}] + #If it's an MCQ, then we also copy the KG which has the member_of edges + if mcq: + messages[0]["message"]["knowledge_graph"] = deepcopy(input_message["message"]["knowledge_graph"]) #If we don't have any AMIE expansions, this will just generate the direct query for rule_def in AMIE_EXPANSIONS.get(key,[]): query_template = Template(json.dumps(rule_def["template"])) From 19666f61ddee55e91f83a5bebfbaf02b605459cc Mon Sep 17 00:00:00 2001 From: Chris Bizon Date: Mon, 15 Jul 2024 13:41:49 -0400 Subject: [PATCH 3/4] fixed cache writing --- src/service_aggregator.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/service_aggregator.py b/src/service_aggregator.py index cd24c05..2c042d8 100644 --- a/src/service_aggregator.py +++ b/src/service_aggregator.py @@ -194,6 +194,8 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int): else: logger.info(f"{guid}: Results cache miss") else: + mcq = False + member_ids = [] if read_from_cache: results = results_cache.get_lookup_result(workflow_def, query_graph) if results is not None: @@ -218,7 +220,7 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int): # so we want to write to the cache if bypass cache is false or overwrite_cache is true if overwrite_cache or (not bypass_cache): if infer: - results_cache.set_result(input_id, predicate, qualifiers, source_input, caller, workflow_def, final_answer) + results_cache.set_result(input_id, predicate, qualifiers, source_input, caller, workflow_def, mcq, member_ids, final_answer) elif {"id": "lookup"} in workflow_def: results_cache.set_lookup_result(workflow_def, query_graph, final_answer) From 10b082a02e55a2acddb990d5a1a154ba4cd8075e Mon Sep 17 00:00:00 2001 From: Chris Bizon Date: Tue, 16 Jul 2024 13:06:50 -0400 Subject: [PATCH 4/4] set interp on node --- src/service_aggregator.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/service_aggregator.py b/src/service_aggregator.py index 2c042d8..e60a045 100644 --- a/src/service_aggregator.py +++ b/src/service_aggregator.py @@ -854,19 +854,21 @@ def get_infer_parameters(input_message): qualifiers = {} else: qualifiers = {"qualifier_constraints": qc} - if ("ids" in input_message["message"]["query_graph"]["nodes"][source]) \ - and (input_message["message"]["query_graph"]["nodes"][source]["ids"] is not None): - input_id = input_message["message"]["query_graph"]["nodes"][source]["ids"][0] - member_ids = input_message["message"]["query_graph"]["nodes"][source].get("member_ids",[]) + mcq = False + snode = input_message["message"]["query_graph"]["nodes"][source] + tnode = input_message["message"]["query_graph"]["nodes"][target] + if ("ids" in snode) and (snode["ids"] is not None): + input_id = snode["ids"][0] + member_ids = snode.get("member_ids",[]) + if "set_interpretation" in snode and snode["set_interpretation"] == "MANY": + mcq = True source_input = True else: - input_id = input_message["message"]["query_graph"]["nodes"][target]["ids"][0] - member_ids = input_message["message"]["query_graph"]["nodes"][target].get("member_ids",[]) + input_id = tnode["ids"][0] + member_ids = tnode.get("member_ids",[]) + if "set_interpretation" in tnode and tnode["set_interpretation"] == "MANY": + mcq = True source_input = False - mcq = False - if ("set_interpretation" in input_message["message"]["query_graph"] and - input_message["message"]["query_graph"]["set_interpretation"] == "MANY"): - mcq = True #key = get_key(predicate, qualifiers) return input_id, predicate, qualifiers, source, source_input, target, query_edge, mcq, member_ids