Skip to content

Commit

Permalink
code reorganization for pathfinder
Browse files Browse the repository at this point in the history
  • Loading branch information
rjawesome committed Mar 29, 2024
1 parent 24eb56f commit f2832f0
Show file tree
Hide file tree
Showing 3 changed files with 280 additions and 212 deletions.
229 changes: 18 additions & 211 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import path from 'path';
import QueryGraph from './query_graph';
import KnowledgeGraph from './graph/knowledge_graph';
import TrapiResultsAssembler from './results_assembly/query_results';
import { scaled_sigmoid, inverse_scaled_sigmoid } from './results_assembly/score';
import InvalidQueryGraphError from './exceptions/invalid_query_graph_error';
import Debug from 'debug';
const debug = Debug('bte:biothings-explorer-trapi:main');
Expand All @@ -16,6 +15,7 @@ import { promises as fs } from 'fs';
import { getDescendants } from '@biothings-explorer/node-expansion';
import { resolveSRI, SRINodeNormFailure } from 'biomedical_id_resolver';
import InferredQueryHandler from './inferred_mode/inferred_mode';
import PathfinderQueryHandler from './inferred_mode/pathfinder';
import KGNode from './graph/kg_node';
import KGEdge from './graph/kg_edge';
import {
Expand Down Expand Up @@ -449,13 +449,11 @@ export default class TRAPIQueryHandler {

let log_msg: string;
if (currentQEdge.reverse) {
log_msg = `qEdge ${currentQEdge.id} (reversed): ${currentQEdge.object.categories} > ${
currentQEdge.predicate ? `${currentQEdge.predicate} > ` : ''
}${currentQEdge.subject.categories}`;
log_msg = `qEdge ${currentQEdge.id} (reversed): ${currentQEdge.object.categories} > ${currentQEdge.predicate ? `${currentQEdge.predicate} > ` : ''
}${currentQEdge.subject.categories}`;
} else {
log_msg = `qEdge ${currentQEdge.id}: ${currentQEdge.subject.categories} > ${
currentQEdge.predicate ? `${currentQEdge.predicate} > ` : ''
}${currentQEdge.object.categories}`;
log_msg = `qEdge ${currentQEdge.id}: ${currentQEdge.subject.categories} > ${currentQEdge.predicate ? `${currentQEdge.predicate} > ` : ''
}${currentQEdge.object.categories}`;
}
this.logs.push(new LogEntry('INFO', null, log_msg).getLog());

Expand Down Expand Up @@ -496,9 +494,8 @@ export default class TRAPIQueryHandler {
});
const qEdgesLogStr = qEdgesToLog.length > 1 ? `[${qEdgesToLog.join(', ')}]` : `${qEdgesToLog.join(', ')}`;
if (len > 0) {
const terminateLog = `Query Edge${len !== 1 ? 's' : ''} ${qEdgesLogStr} ${
len !== 1 ? 'have' : 'has'
} no MetaKG edges. Your query terminates.`;
const terminateLog = `Query Edge${len !== 1 ? 's' : ''} ${qEdgesLogStr} ${len !== 1 ? 'have' : 'has'
} no MetaKG edges. Your query terminates.`;
debug(terminateLog);
this.logs.push(new LogEntry('WARNING', null, terminateLog).getLog());
return false;
Expand Down Expand Up @@ -527,203 +524,15 @@ export default class TRAPIQueryHandler {
}

async _handlePathfinder(): Promise<void> {
const [unpinnedNodeId, unpinnedNode] = Object.entries(this.queryGraph.nodes).find(([_, node]) => !node.ids);
// remove unpinned node & all edges involving unpinned node for now
delete this.queryGraph.nodes[unpinnedNodeId];
const intermediateEdges = Object.entries(this.queryGraph.edges).filter(([_, edge]) => edge.subject === unpinnedNodeId || edge.object === unpinnedNodeId);
const [mainEdgeID, mainEdge] = Object.entries(this.queryGraph.edges).find(([_, edge]) => edge.subject !== unpinnedNodeId && edge.object !== unpinnedNodeId);

// intermediateEdges should be in order of n0 -> un & un -> n1
if (intermediateEdges[0][1].subject === unpinnedNodeId) {
let temp = intermediateEdges[0];
intermediateEdges[0] = intermediateEdges[1];
intermediateEdges[1] = temp;
}

// remove intermediates for creative execution
intermediateEdges.forEach(([edgeId, _]) => delete this.queryGraph.edges[edgeId]);

if (Object.keys(this.queryGraph.edges).length !== 1) {
const message = 'Pathfinder Mode needs exactly one edge between nodes with IDs. Your query terminates.';
debug(message);
this.logs.push(new LogEntry('WARNING', null, message).getLog());
return;
}

if (intermediateEdges[0][1].subject !== mainEdge.subject || intermediateEdges[1][1].object !== mainEdge.object || intermediateEdges[0][1].object !== unpinnedNodeId || intermediateEdges[1][1].subject !== unpinnedNodeId) {
const message = 'Intermediate edges for Pathfinder are incorrect. Your query terminates.';
debug(message);
this.logs.push(new LogEntry('WARNING', null, message).getLog());
return;
}

// run creative mode
await this._handleInferredEdges(true);
const creativeResponse = this.getResponse();
const originalAnalyses = (creativeResponse as any).original_analyses;
delete (creativeResponse as any).original_analyses;

// restore query graph
this.queryGraph.nodes[unpinnedNodeId] = unpinnedNode;
intermediateEdges.forEach(([edgeId, edge]) => this.queryGraph.edges[edgeId] = edge);
creativeResponse.message.query_graph = this.queryGraph;

// if no results then we are done
if (creativeResponse.message.results.length === 0) {
this.getResponse = () => creativeResponse;
return;
}


// set up a graph structure
const kgEdge = creativeResponse.message.results[0].analyses[0].edge_bindings[mainEdgeID][0].id;
const kgSrc = creativeResponse.message.results[0].node_bindings[mainEdge.subject][0].id;
const kgDst = creativeResponse.message.results[0].node_bindings[mainEdge.object][0].id;
const dfsNodes: {[node: string]: {[dst: string]: string[]}} = {};
const supportGraphsPerNode: {[node: string]: Set<string>} = {};
for (const supportGraph of creativeResponse.message.knowledge_graph.edges[kgEdge].attributes.find(attr => attr.attribute_type_id === 'biolink:support_graphs').value as string[]) {
const auxGraph = creativeResponse.message.auxiliary_graphs[supportGraph];
for (const subEdge of auxGraph.edges) {
const kgSubEdge = creativeResponse.message.knowledge_graph.edges[subEdge];
if (!dfsNodes[kgSubEdge.subject]) {
dfsNodes[kgSubEdge.subject] = {};
}
if (!dfsNodes[kgSubEdge.subject][kgSubEdge.object]) {
dfsNodes[kgSubEdge.subject][kgSubEdge.object] = [];
}
dfsNodes[kgSubEdge.subject][kgSubEdge.object].push(subEdge);

if (!supportGraphsPerNode[kgSubEdge.subject]) {
supportGraphsPerNode[kgSubEdge.subject] = new Set();
}
supportGraphsPerNode[kgSubEdge.subject].add(supportGraph);
}
}

const message1 = '[Pathfinder]: Performing serach for intermediate nodes.';
debug(message1);
this.logs.push(new LogEntry('INFO', null, message1).getLog());

// perform dfs
const stack = [{ node: kgSrc, path: [kgSrc] }];
const newResultObject: {[id: string]: TrapiResult} = {};
const newAuxGraphs: {[id: string]: {edges: Set<string>}} = {};
while (stack.length !== 0) {
const { node, path } = stack.pop();
if (node === kgDst) {
if (path.length > 2) {
// loop through all intermediate nodes
for (let i = 1; i < path.length - 1; i++) {
const intermediateNode = path[i];
const firstEdges = [];
const secondEdges = [];
for (let j = 0; j < i; j++) {
for (let edge of dfsNodes[path[j]][path[j+1]]) {
firstEdges.push(edge);
}
}
for (let j = i; j < path.length - 1; j++) {
for (let edge of dfsNodes[path[j]][path[j+1]]) {
secondEdges.push(edge);
}
}

if (!(`pathfinder-${kgSrc}-${intermediateNode}-${kgDst}` in newResultObject)) {
newResultObject[`pathfinder-${kgSrc}-${intermediateNode}-${kgDst}`] = {
node_bindings: {
[mainEdge.subject]: [{ id: kgSrc }],
[mainEdge.object]: [{ id: kgDst }],
[unpinnedNodeId]: [{ id: intermediateNode }]
},
analyses: [{
resource_id: "infores:biothings-explorer",
edge_bindings: {
[mainEdgeID]: [{ id: kgEdge }],
[intermediateEdges[0][0]]: [{ id: `pathfinder-${kgSrc}-${intermediateNode}` }],
[intermediateEdges[1][0]]: [{ id: `pathfinder-${intermediateNode}-${kgDst}` }],
},
score: undefined
}],
};
creativeResponse.message.knowledge_graph.edges[`pathfinder-${kgSrc}-${intermediateNode}`] = {
predicate: 'biolink:related_to',
subject: kgSrc,
object: intermediateNode,
sources: [
{
resource_id: this.options.provenanceUsesServiceProvider
? 'infores:service-provider-trapi'
: 'infores:biothings-explorer',
resource_role: 'primary_knowledge_source',
},
],
attributes: [{ attribute_type_id: 'biolink:support_graphs', value: [`pathfinder-${kgSrc}-${intermediateNode}-support`] }],
};
creativeResponse.message.knowledge_graph.edges[`pathfinder-${intermediateNode}-${kgDst}`] = {
predicate: 'biolink:related_to',
subject: intermediateNode,
object: kgDst,
sources: [
{
resource_id: this.options.provenanceUsesServiceProvider
? 'infores:service-provider-trapi'
: 'infores:biothings-explorer',
resource_role: 'primary_knowledge_source',
},
],
attributes: [{ attribute_type_id: 'biolink:support_graphs', value: [`pathfinder-${intermediateNode}-${kgDst}-support`] }],
};
newAuxGraphs[`pathfinder-${kgSrc}-${intermediateNode}-support`] = { edges: new Set(firstEdges) };
newAuxGraphs[`pathfinder-${intermediateNode}-${kgDst}-support`] = { edges: new Set(secondEdges) };

// calculate score
if (supportGraphsPerNode[intermediateNode]?.size > 0) {
let score = undefined;
for (const supportGraph of supportGraphsPerNode[intermediateNode]) {
score = scaled_sigmoid(
inverse_scaled_sigmoid(score ?? 0) +
inverse_scaled_sigmoid(originalAnalyses[supportGraph].score),
);
}
newResultObject[`pathfinder-${kgSrc}-${intermediateNode}-${kgDst}`].analyses[0].score = score;
}

} else {
firstEdges.forEach(edge => newAuxGraphs[`pathfinder-${kgSrc}-${intermediateNode}-support`].edges.add(edge));
secondEdges.forEach(edge => newAuxGraphs[`pathfinder-${intermediateNode}-${kgDst}-support`].edges.add(edge));
}
}
}
} else {
for (const neighbor in dfsNodes[node]) {
if (!path.includes(neighbor)) {
stack.push({ node: neighbor, path: [...path, neighbor ] });
}
}
}
}

creativeResponse.message.results = Object.values(newResultObject).sort((a, b) => (b.analyses[0].score ?? 0) - (a.analyses[0].score ?? 0)).slice(0, process.env.CREATIVE_LIMIT ? parseInt(process.env.CREATIVE_LIMIT) : 500);
creativeResponse.description = `Query processed successfully, retrieved ${creativeResponse.message.results.length} results.`

const finalNewAuxGraphs: {[id: string]: {edges: string[]}} = newAuxGraphs as any;
for (const auxGraph in finalNewAuxGraphs) {
finalNewAuxGraphs[auxGraph].edges = Array.from(finalNewAuxGraphs[auxGraph].edges);
}
Object.assign(creativeResponse.message.auxiliary_graphs, finalNewAuxGraphs);

const message2 = `[Pathfinder]: Pathfinder found ${creativeResponse.message.results.length} intermediate nodes and created ${Object.keys(finalNewAuxGraphs).length} support graphs.`;
debug(message2);
this.logs.push(new LogEntry('INFO', null, message2).getLog());

// TODO: formatting
// TODO: move to a seperate file if this gets too big?
// TODO: make unit tests
// TODO: add spans in the class
const pathfinderHandler = new PathfinderQueryHandler(this.logs, this.queryGraph, this);
const pathfinderResponse = await pathfinderHandler.query();

this.getResponse = () => creativeResponse;
this.getResponse = () => pathfinderResponse;
}

async _handleInferredEdges(pathfinder = false): Promise<void> {
async _handleInferredEdges(): Promise<void> {
if (!this._queryIsOneHop()) {
const message = 'Inferred Mode edges are only supported in single-edge queries. Your query terminates.';
debug(message);
Expand All @@ -737,8 +546,7 @@ export default class TRAPIQueryHandler {
this.options,
this.path,
this.predicatePath,
this.includeReasoner,
pathfinder
this.includeReasoner
);
const inferredQueryResponse = await inferredQueryHandler.query();
if (inferredQueryResponse) {
Expand Down Expand Up @@ -816,8 +624,7 @@ export default class TRAPIQueryHandler {
new LogEntry(
'INFO',
null,
`Execution Summary: (${KGNodes}) nodes / (${kgEdges}) edges / (${results}) results; (${resultQueries}/${queries}) queries${
cached ? ` (${cached} cached qEdges)` : ''
`Execution Summary: (${KGNodes}) nodes / (${kgEdges}) edges / (${results}) results; (${resultQueries}/${queries}) queries${cached ? ` (${cached} cached qEdges)` : ''
} returned results from(${sources.length}) unique API${sources.length === 1 ? 's' : ''}`,
).getLog(),
new LogEntry('INFO', null, `APIs: ${sources.join(', ')} `).getLog(),
Expand Down Expand Up @@ -874,10 +681,10 @@ export default class TRAPIQueryHandler {
debug(`(3) All edges created ${JSON.stringify(queryEdges)} `);

if (this._queryIsPathfinder()) {
const span2 = Telemetry.startSpan({ description: 'pathfinderExecution' });
await this._handlePathfinder();
span2?.finish();
return;
const span2 = Telemetry.startSpan({ description: 'pathfinderExecution' });
await this._handlePathfinder();
span2?.finish();
return;
}

if (this._queryUsesInferredMode()) {
Expand Down
2 changes: 1 addition & 1 deletion src/inferred_mode/inferred_mode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export default class InferredQueryHandler {
path: string,
predicatePath: string,
includeReasoner: boolean,
pathfinder: boolean
pathfinder = false
) {
this.parent = parent;
this.queryGraph = queryGraph;
Expand Down
Loading

0 comments on commit f2832f0

Please sign in to comment.