Skip to content

Commit

Permalink
Merge pull request #174 from biothings/concurrency
Browse files Browse the repository at this point in the history
Concurrency improvements
  • Loading branch information
tokebe authored Dec 7, 2023
2 parents df854b3 + 0582a82 commit 598d506
Show file tree
Hide file tree
Showing 20 changed files with 80 additions and 157 deletions.
27 changes: 0 additions & 27 deletions __test__/unittest/LogEntry.test.ts

This file was deleted.

2 changes: 1 addition & 1 deletion __test__/unittest/inferred_mode.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import fs from 'fs';
const smartAPIPAth = path.resolve(__dirname, '../../../bte-trapi/data/smartapi_specs.json');
const predicatesPath = path.resolve(__dirname, '../../../bte-trapi/data/predicates.json');
import _ from 'lodash';
import { StampedLog, TrapiLog } from '../../src/log_entry';
import { StampedLog, TrapiLog } from '@biothings-explorer/utils';
import InferredQueryHandler, { CombinedResponse } from '../../src/inferred_mode/inferred_mode';
import { MatchedTemplate } from '../../src/inferred_mode/template_lookup';

Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
},
"homepage": "https://github.com/biothings/bte_trapi_query_graph_handler#readme",
"devDependencies": {
"@types/async": "^3.2.22",
"@types/debug": "^4.1.10",
"@types/jest": "^29.5.6",
"@types/lodash": "^4.14.200",
Expand All @@ -56,6 +57,7 @@
"@biothings-explorer/call-apis": "workspace:../call-apis",
"@biothings-explorer/node-expansion": "workspace:../node-expansion",
"@biothings-explorer/smartapi-kg": "workspace:../smartapi-kg",
"@biothings-explorer/utils": "workspace:../utils",
"@sentry/node": "^7.74.1",
"async": "^3.2.4",
"biolink-model": "workspace:../biolink-model",
Expand Down
8 changes: 4 additions & 4 deletions src/batch_edge_query.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import call_api from '@biothings-explorer/call-apis';
import call_api, { RedisClient } from '@biothings-explorer/call-apis';
import QEdge2APIEdgeHandler, { APIEdge } from './qedge2apiedge';
import NodesUpdateHandler from './update_nodes';
import Debug from 'debug';
const debug = Debug('bte:biothings-explorer-trapi:batch_edge_query');
import CacheHandler from './cache_handler';
import { threadId } from 'worker_threads';
import MetaKG from '@biothings-explorer/smartapi-kg';
import { StampedLog } from './log_entry';
import { QueryHandlerOptions } from '.';
import { StampedLog } from '@biothings-explorer/utils';
import { QueryHandlerOptions, redisClient } from '.';
import QEdge from './query_edge';
import { UnavailableAPITracker } from './types';
import { Record } from '@biothings-explorer/api-response-transform';
Expand Down Expand Up @@ -62,7 +62,7 @@ export default class BatchEdgeQueryHandler {
* @private
*/
async _queryAPIEdges(APIEdges: APIEdge[], unavailableAPIs: UnavailableAPITracker = {}): Promise<Record[]> {
const executor = new call_api(APIEdges, this.options);
const executor = new call_api(APIEdges, this.options, redisClient as RedisClient);
const records: Record[] = await executor.query(this.resolveOutputIDs, unavailableAPIs);
this.logs = [...this.logs, ...executor.logs];
return records;
Expand Down
2 changes: 1 addition & 1 deletion src/cache_handler.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { redisClient } from './redis-client';
import Debug from 'debug';
const debug = Debug('bte:biothings-explorer-trapi:cache_handler');
import LogEntry, { StampedLog } from './log_entry';
import { LogEntry, StampedLog } from '@biothings-explorer/utils';
import async from 'async';
import helper from './helper';
import lz4 from 'lz4';
Expand Down
4 changes: 2 additions & 2 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ const API_BATCH_SIZE = [
{
id: '1d288b3a3caf75d541ffaae3aab386c8',
name: 'BioThings SEMMEDDB API',
max: 100,
max: 50,
},
{
id: '978fe380a147a8641caf72320862697b',
name: 'Text Mining Targeted Association API',
max: 100,
max: 50,
},
{
id: '02af7d098ab304e80d6f4806c3527027',
Expand Down
48 changes: 17 additions & 31 deletions src/edge_manager.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import _ from 'lodash';
import LogEntry, { StampedLog } from './log_entry';
import { LogEntry, StampedLog } from '@biothings-explorer/utils';
import BTEError from './exceptions/bte_error';
import Debug from 'debug';
const debug = Debug('bte:biothings-explorer-trapi:edge-manager');
import * as config from './config';
import BatchEdgeQueryHandler, { BatchEdgeQueryOptions } from './batch_edge_query';
import * as Sentry from '@sentry/node';
import { Telemetry } from '@biothings-explorer/utils';
import QEdge from './query_edge';
import MetaKG from '@biothings-explorer/smartapi-kg';
import { QueryHandlerOptions } from '.';
Expand Down Expand Up @@ -203,33 +203,19 @@ export default class QueryEdgeManager {
const execObjectCuries = qEdge.reverse ? subjectCuries : objectCuries;

records.forEach((record) => {
//check sub curies against $input ids
const subjectIDs: Set<string> = new Set();
const objectIDs: Set<string> = new Set();
let objectMatch = 0;
let subjectMatch = 0;
// check against original, primaryID, and equivalent ids
const subjectIDs = [record.subject.original, record.subject.curie, ...record.subject.equivalentCuries];
const objectIDs = [record.object.original, record.object.curie, ...record.object.equivalentCuries];

// there must be at least a minimal intersection
const subjectMatch =
subjectIDs.some((curie) => execSubjectCuries.includes(curie)) || execSubjectCuries.length === 0;
const objectMatch = objectIDs.some((curie) => execObjectCuries.includes(curie)) || execObjectCuries.length === 0;

//compare record I/O ids against edge node ids
// #1 check equivalent ids
record.subject.equivalentCuries.forEach((curie) => {
subjectIDs.add(curie);
});
record.object.equivalentCuries.forEach((curie) => {
objectIDs.add(curie);
});
// #2 ensure we have the primaryID
subjectIDs.add(record.subject.curie);
objectIDs.add(record.object.curie);
// #3 make sure we at least have the original
subjectIDs.add(record.subject.original);
objectIDs.add(record.object.original);
// check ids
subjectMatch = _.intersection([...subjectIDs], execSubjectCuries).length;
objectMatch = _.intersection([...objectIDs], execObjectCuries).length;
//if both ends match then keep record

// Don't keep self-edges
const selfEdge = [...subjectIDs].some((curie) => objectIDs.has(curie));
const selfEdge = [...subjectIDs].some((curie) => objectIDs.includes(curie));
if (subjectMatch && objectMatch && !selfEdge) {
keep.push(record);
}
Expand Down Expand Up @@ -400,6 +386,7 @@ export default class QueryEdgeManager {
async executeEdges(): Promise<boolean> {
const unavailableAPIs: UnavailableAPITracker = {};
while (this.getEdgesNotExecuted()) {
const span = Telemetry.startSpan({ description: 'edgeExecution' });
//next available/most efficient edge
const currentQEdge = this.getNext();
//crate queries from edge
Expand Down Expand Up @@ -449,21 +436,18 @@ export default class QueryEdgeManager {
`qEdge (${currentQEdge.getID()}) got 0 records. Your query terminates.`,
).getLog(),
);
span.finish();
return;
}
// storing records will trigger a node entity count update
currentQEdge.storeRecords(queryRecords);

const span1 = Sentry?.getCurrentHub()?.getScope()?.getTransaction()?.startChild({
description: 'filteringRecords',
});
const span1 = Telemetry.startSpan({ description: 'filteringRecords' });
// filter records
this.updateEdgeRecords(currentQEdge);
span1?.finish();

const span2 = Sentry?.getCurrentHub()?.getScope()?.getTransaction()?.startChild({
description: 'updatingRecordEdges',
});
const span2 = Telemetry.startSpan({ description: 'updatingRecordEdges' });

// update and filter neighbors
this.updateAllOtherEdges(currentQEdge);
Expand All @@ -480,11 +464,13 @@ export default class QueryEdgeManager {
`qEdge (${currentQEdge.getID()}) kept 0 records. Your query terminates.`,
).getLog(),
);
span.finish();
return;
}
// edge all done
currentQEdge.executed = true;
debug(`(10) Edge successfully queried.`);
span.finish();
}
this._logSkippedQueries(unavailableAPIs);
// collect and organize records
Expand Down
2 changes: 1 addition & 1 deletion src/graph/graph.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import kg_edge from './kg_edge';
import kg_node from './kg_node';
import Debug from 'debug';
import LogEntry, { StampedLog } from '../log_entry';
import { LogEntry, StampedLog } from '@biothings-explorer/utils';
import KGNode from './kg_node';
import KGEdge from './kg_edge';
import { Record } from '../../../api-response-transform/built';
Expand Down
26 changes: 9 additions & 17 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@ import Graph from './graph/graph';
import EdgeManager from './edge_manager';
import _ from 'lodash';
import QEdge2APIEdgeHandler from './qedge2apiedge';
import LogEntry, { StampedLog } from './log_entry';
import { LogEntry, StampedLog } from '@biothings-explorer/utils';
import { promises as fs } from 'fs';
import { getDescendants } from '@biothings-explorer/node-expansion';
import { resolveSRI, SRINodeNormFailure } from 'biomedical_id_resolver';
import InferredQueryHandler from './inferred_mode/inferred_mode';
import KGNode from './graph/kg_node';
import KGEdge from './graph/kg_edge';
import * as Sentry from '@sentry/node';
import {
APIList,
TrapiAuxGraphCollection,
Expand All @@ -29,6 +28,7 @@ import {
} from './types';
import BTEGraph from './graph/graph';
import QEdge from './query_edge';
import { Telemetry } from '@biothings-explorer/utils';

// Exports for external availability
export * from './types';
Expand All @@ -37,8 +37,6 @@ export { getTemplates, supportedLookups } from './inferred_mode/template_lookup'
export { default as QEdge } from './query_edge';
export { default as QNode } from './query_node';
export { default as InvalidQueryGraphError } from './exceptions/invalid_query_graph_error';
export * from './log_entry';
export { default as LogEntry } from './log_entry';
export * from './qedge2apiedge';

export interface QueryHandlerOptions {
Expand Down Expand Up @@ -317,14 +315,14 @@ export default class TRAPIQueryHandler {
async addQueryNodes(): Promise<void> {
const qNodeIDsByOriginalID: Map<string, TrapiQNode> = new Map();
const curiesToResolve = [
...Object.values(this.queryGraph.nodes).reduce((set, qNode) => {
...Object.values(this.queryGraph.nodes).reduce((set: Set<string>, qNode) => {
qNode.ids?.forEach((id) => {
set.add(id);
qNodeIDsByOriginalID.set(id, qNode);
});
return set;
}, new Set() as Set<string>),
];
}, new Set()),
] as string[];
const resolvedCuries = await resolveSRI({ unknown: curiesToResolve });
Object.entries(resolvedCuries).forEach(([originalCurie, resolvedEntity]) => {
if (!this.bteGraph.nodes[resolvedEntity.primaryID]) {
Expand Down Expand Up @@ -365,7 +363,7 @@ export default class TRAPIQueryHandler {
* Set TRAPI Query Graph
* @param { object } queryGraph - TRAPI Query Graph Object
*/
async setQueryGraph(queryGraph: TrapiQueryGraph): Promise<void> {
setQueryGraph(queryGraph: TrapiQueryGraph): void {
this.originalQueryGraph = _.cloneDeep(queryGraph);
this.queryGraph = queryGraph;
for (const nodeId in queryGraph.nodes) {
Expand Down Expand Up @@ -625,9 +623,7 @@ export default class TRAPIQueryHandler {
this._initializeResponse();
await this.addQueryNodes();

const span1 = Sentry?.getCurrentHub()?.getScope()?.getTransaction()?.startChild({
description: 'loadMetaKG',
});
const span1 = Telemetry.startSpan({ description: 'loadMetaKG' });

debug('Start to load metakg.');
const metaKG = this._loadMetaKG();
Expand Down Expand Up @@ -673,9 +669,7 @@ export default class TRAPIQueryHandler {
debug(`(3) All edges created ${JSON.stringify(queryEdges)} `);

if (this._queryUsesInferredMode()) {
const span2 = Sentry?.getCurrentHub()?.getScope()?.getTransaction()?.startChild({
description: 'creativeExecution',
});
const span2 = Telemetry.startSpan({ description: 'creativeExecution' });
await this._handleInferredEdges();
span2?.finish();
return;
Expand All @@ -692,9 +686,7 @@ export default class TRAPIQueryHandler {
return;
}

const span3 = Sentry?.getCurrentHub()?.getScope()?.getTransaction()?.startChild({
description: 'resultsAssembly',
});
const span3 = Telemetry.startSpan({ description: 'resultsAssembly' });

// update query graph
this.bteGraph.update(manager.getRecords());
Expand Down
9 changes: 8 additions & 1 deletion src/inferred_mode/inferred_mode.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import Debug from 'debug';
import LogEntry, { StampedLog } from '../log_entry';
import { LogEntry, StampedLog, Telemetry } from '@biothings-explorer/utils';
import * as utils from '../utils';
import async from 'async';
import biolink from '../biolink';
Expand Down Expand Up @@ -256,6 +256,7 @@ export default class InferredQueryHandler {
qEdge: TrapiQEdge,
combinedResponse: CombinedResponse,
): CombinedResponseReport {
const span = Telemetry.startSpan({ description: 'creativeCombineResponse' });
const newResponse = handler.getResponse();
const report: CombinedResponseReport = {
querySuccess: 0,
Expand Down Expand Up @@ -424,6 +425,7 @@ export default class InferredQueryHandler {
if (Object.keys(combinedResponse.message.results).length >= this.CREATIVE_LIMIT && !report.creativeLimitHit) {
report.creativeLimitHit = Object.keys(newResponse.message.results).length;
}
span.finish();
return report;
}

Expand Down Expand Up @@ -516,8 +518,11 @@ export default class InferredQueryHandler {
} = {};

await async.eachOfSeries(subQueries, async ({ template, queryGraph }, i) => {
const span = Telemetry.startSpan({ description: 'creativeTemplate' });
span.setData('template', (i as number) + 1);
i = i as number;
if (stop) {
span.finish();
return;
}
if (global.queryInformation?.queryGraph) {
Expand Down Expand Up @@ -560,13 +565,15 @@ export default class InferredQueryHandler {
debug(message);
combinedResponse.logs.push(new LogEntry(`INFO`, null, message).getLog());
}
span.finish();
} catch (error) {
handler.logs.forEach((log) => {
combinedResponse.logs.push(log);
});
const message = `ERROR: Template-${i + 1} failed due to error ${error}`;
debug(message);
combinedResponse.logs.push(new LogEntry(`ERROR`, null, message).getLog());
span.finish();
return;
}
});
Expand Down
Loading

0 comments on commit 598d506

Please sign in to comment.