Skip to content

Commit

Permalink
Merge branch 'concurrency' of https://github.com/biothings/bte_trapi_…
Browse files Browse the repository at this point in the history
  • Loading branch information
tokebe committed Nov 13, 2023
2 parents 283cf44 + c437261 commit cbbc03d
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 26 deletions.
33 changes: 11 additions & 22 deletions src/edge_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,29 +203,14 @@ export default class QueryEdgeManager {
const execObjectCuries = qEdge.reverse ? subjectCuries : objectCuries;

records.forEach((record) => {
//check sub curies against $input ids
const subjectIDs: Set<string> = new Set();
const objectIDs: Set<string> = new Set();
let objectMatch = 0;
let subjectMatch = 0;
// check against original, primaryID, and equivalent ids
const subjectIDs = [record.subject.original, record.subject.curie, ...record.subject.equivalentCuries];
const objectIDs = [record.object.original, record.object.curie, ...record.object.equivalentCuries];

// there must be at least a minimal intersection
const subjectMatch = subjectIDs.some((curie) => execSubjectCuries.includes(curie));
const objectMatch = objectIDs.some((curie) => execObjectCuries.includes(curie));

//compare record I/O ids against edge node ids
// #1 check equivalent ids
record.subject.equivalentCuries.forEach((curie) => {
subjectIDs.add(curie);
});
record.object.equivalentCuries.forEach((curie) => {
objectIDs.add(curie);
});
// #2 ensure we have the primaryID
subjectIDs.add(record.subject.curie);
objectIDs.add(record.object.curie);
// #3 make sure we at least have the original
subjectIDs.add(record.subject.original);
objectIDs.add(record.object.original);
// check ids
subjectMatch = _.intersection([...subjectIDs], execSubjectCuries).length;
objectMatch = _.intersection([...objectIDs], execObjectCuries).length;
//if both ends match then keep record
if (subjectMatch && objectMatch) {
keep.push(record);
Expand Down Expand Up @@ -397,6 +382,7 @@ export default class QueryEdgeManager {
async executeEdges(): Promise<boolean> {
const unavailableAPIs: UnavailableAPITracker = {};
while (this.getEdgesNotExecuted()) {
const span = Telemetry.startSpan({ description: 'edgeExecution' });
//next available/most efficient edge
const currentQEdge = this.getNext();
//crate queries from edge
Expand Down Expand Up @@ -446,6 +432,7 @@ export default class QueryEdgeManager {
`qEdge (${currentQEdge.getID()}) got 0 records. Your query terminates.`,
).getLog(),
);
span.finish();
return;
}
// storing records will trigger a node entity count update
Expand Down Expand Up @@ -473,6 +460,7 @@ export default class QueryEdgeManager {
`qEdge (${currentQEdge.getID()}) kept 0 records. Your query terminates.`,
).getLog(),
);
span.finish();
return;
}
// edge all done
Expand All @@ -486,6 +474,7 @@ export default class QueryEdgeManager {
if (process.env.DUMP_RECORDS) {
await this.dumpRecords(this.getRecords());
}
span.finish();
return true;
}
}
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ export default class TRAPIQueryHandler {
* Set TRAPI Query Graph
* @param { object } queryGraph - TRAPI Query Graph Object
*/
async setQueryGraph(queryGraph: TrapiQueryGraph): Promise<void> {
setQueryGraph(queryGraph: TrapiQueryGraph): void {
this.originalQueryGraph = _.cloneDeep(queryGraph);
this.queryGraph = queryGraph;
for (const nodeId in queryGraph.nodes) {
Expand Down
9 changes: 8 additions & 1 deletion src/inferred_mode/inferred_mode.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import Debug from 'debug';
import { LogEntry, StampedLog } from '@biothings-explorer/utils';
import { LogEntry, StampedLog, Telemetry } from '@biothings-explorer/utils';
import * as utils from '../utils';
import async from 'async';
import biolink from '../biolink';
Expand Down Expand Up @@ -254,6 +254,7 @@ export default class InferredQueryHandler {
qEdge: TrapiQEdge,
combinedResponse: CombinedResponse,
): CombinedResponseReport {
const span = Telemetry.startSpan({ description: 'creativeCombineResponse' });
const newResponse = handler.getResponse();
const report: CombinedResponseReport = {
querySuccess: 0,
Expand Down Expand Up @@ -422,6 +423,7 @@ export default class InferredQueryHandler {
if (Object.keys(combinedResponse.message.results).length >= this.CREATIVE_LIMIT && !report.creativeLimitHit) {
report.creativeLimitHit = Object.keys(newResponse.message.results).length;
}
span.finish();
return report;
}

Expand Down Expand Up @@ -514,8 +516,11 @@ export default class InferredQueryHandler {
} = {};

await async.eachOfSeries(subQueries, async ({ template, queryGraph }, i) => {
const span = Telemetry.startSpan({ description: 'creativeTemplate' });
span.setData('template', i + 1);
i = i as number;
if (stop) {
span.finish();
return;
}
if (global.queryInformation?.queryGraph) {
Expand Down Expand Up @@ -558,13 +563,15 @@ export default class InferredQueryHandler {
debug(message);
combinedResponse.logs.push(new LogEntry(`INFO`, null, message).getLog());
}
span.finish();
} catch (error) {
handler.logs.forEach((log) => {
combinedResponse.logs.push(log);
});
const message = `ERROR: Template-${i + 1} failed due to error ${error}`;
debug(message);
combinedResponse.logs.push(new LogEntry(`ERROR`, null, message).getLog());
span.finish();
return;
}
});
Expand Down
2 changes: 1 addition & 1 deletion src/query_node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ export default class QNode {
const keep: { [mainID: string]: string[] } = {};
// If a new entity has any alias intersection with an existing entity, keep it
Object.entries(newCuries).forEach(([newMainID, currentAliases]) => {
const someIntersection = Object.entries(this.expanded_curie).some(([, existingAliases]) => {
const someIntersection = Object.values(this.expanded_curie).some((existingAliases) => {
return currentAliases.some((currentAlias) =>
existingAliases.some((existingAlias) => currentAlias.toLowerCase() === existingAlias.toLowerCase()),
);
Expand Down
2 changes: 1 addition & 1 deletion src/results_assembly/score.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export interface ScoreCombos {
// create lookup table for ngd scores in the format: {inputUMLS-outputUMLS: ngd}
async function query(queryPairs: string[][]): Promise<ScoreCombos> {
const url = 'https://biothings.ncats.io/semmeddb/query/ngd';
const batchSize = 1000;
const batchSize = 500;

debug('Querying', queryPairs.length, 'combos.');

Expand Down

0 comments on commit cbbc03d

Please sign in to comment.