Skip to content

Commit

Permalink
Merge pull request #197 from biothings/parallel-creative
Browse files Browse the repository at this point in the history
Run creative mode templates in parallel
  • Loading branch information
tokebe authored Oct 11, 2024
2 parents 9f27ba7 + 6aa562a commit e70395c
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 114 deletions.
9 changes: 1 addition & 8 deletions __test__/unittest/inferred_mode.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -557,14 +557,12 @@ describe('Test InferredQueryHandler', () => {
expect(report).toHaveProperty('querySuccess');
expect(report).toHaveProperty('queryHadResults');
expect(report).toHaveProperty('mergedResults');
expect(report).toHaveProperty('creativeLimitHit');

const { querySuccess, queryHadResults, mergedResults, creativeLimitHit } = report;
const { querySuccess, queryHadResults, mergedResults } = report;
expect(querySuccess).toBeTruthy();
expect(queryHadResults).toBeTruthy();
expect(Object.keys(mergedResults)).toHaveLength(2);
expect(Object.values(mergedResults)[0]).toEqual(1);
expect(creativeLimitHit).toBeTruthy();
expect(Object.keys(combinedResponse.message.results)).toHaveLength(3);
expect(combinedResponse.message.results['fakeCompound1-fakeDisease1'].analyses[0].score).toEqual(
0.7836531040612146,
Expand Down Expand Up @@ -730,13 +728,11 @@ describe('Test InferredQueryHandler', () => {
querySuccess: querySuccess1,
queryHadResults: queryHadResults1,
mergedResults: mergedResults1,
creativeLimitHit: creativeLimitHit1,
} = inferredQueryHandler.combineResponse(2, trapiQueryHandler1, qEdgeID, qEdge, combinedResponse, auxGraphSuffixes);

expect(querySuccess1).toBeTruthy();
expect(queryHadResults1).toBeTruthy();
expect(Object.keys(mergedResults1)).toHaveLength(1);
expect(creativeLimitHit1).toBeTruthy();
expect(combinedResponse.message.results['fakeCompound1-fakeDisease1'].analyses[0].score).toEqual(
0.7836531040612146,
);
Expand Down Expand Up @@ -992,9 +988,6 @@ describe('Test InferredQueryHandler', () => {
expect(response.message.knowledge_graph.nodes).toHaveProperty('creativeQueryObject');
expect(response.message.results[0].node_bindings).toHaveProperty('creativeQuerySubject');
expect(response.message.results[0].node_bindings).toHaveProperty('creativeQueryObject');
expect(response.logs.map((log) => log.message)).toContain(
'Addition of 1 results from Template 1 meets creative result maximum of 1 (reaching 1 merged). Response will be truncated to top-scoring 1 results. Skipping remaining 2 templates.',
);
});

test('supportedLookups', async () => {
Expand Down
16 changes: 9 additions & 7 deletions src/batch_edge_query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ export default class BatchEdgeQueryHandler {
/**
* @private
*/
async _queryAPIEdges(APIEdges: APIEdge[], unavailableAPIs: UnavailableAPITracker = {}): Promise<Record[]> {
async _queryAPIEdges(APIEdges: APIEdge[], unavailableAPIs: UnavailableAPITracker = {}, abortSignal?: AbortSignal): Promise<Record[]> {
const executor = new call_api(APIEdges, this.options, redisClient);
const records: Record[] = await executor.query(this.resolveOutputIDs, unavailableAPIs);
const records: Record[] = await executor.query(this.resolveOutputIDs, unavailableAPIs, abortSignal);
this.logs = [...this.logs, ...executor.logs];
return records;
}
Expand Down Expand Up @@ -123,18 +123,20 @@ export default class BatchEdgeQueryHandler {
});
}

async query(qEdges: QEdge | QEdge[], unavailableAPIs: UnavailableAPITracker = {}): Promise<Record[]> {
async query(qEdges: QEdge | QEdge[], unavailableAPIs: UnavailableAPITracker = {}, abortSignal?: AbortSignal): Promise<Record[]> {
debug('Node Update Start');
// it's now a single edge but convert to arr to simplify refactoring
qEdges = Array.isArray(qEdges) ? qEdges : [qEdges];
const nodeUpdate = new NodesUpdateHandler(qEdges);
// difference is there is no previous edge info anymore
await nodeUpdate.setEquivalentIDs(qEdges);
await nodeUpdate.setEquivalentIDs(qEdges, abortSignal);
await this._rmEquivalentDuplicates(qEdges);
debug('Node Update Success');

if (abortSignal?.aborted) return [];

const cacheHandler = new CacheHandler(this.caching, this.metaKG, this.options);
const { cachedRecords, nonCachedQEdges } = await cacheHandler.categorizeEdges(qEdges);
const { cachedRecords, nonCachedQEdges } = await cacheHandler.categorizeEdges(qEdges, abortSignal);
this.logs = [...this.logs, ...cacheHandler.logs];
let queryRecords: Record[];

Expand All @@ -154,8 +156,8 @@ export default class BatchEdgeQueryHandler {
}
const expanded_APIEdges = this._expandAPIEdges(APIEdges);
debug('Start to query APIEdges....');
queryRecords = await this._queryAPIEdges(expanded_APIEdges, unavailableAPIs);
if (queryRecords === undefined) return;
queryRecords = await this._queryAPIEdges(expanded_APIEdges, unavailableAPIs, abortSignal);
if (queryRecords === undefined || abortSignal?.aborted) return;
debug('APIEdges are successfully queried....');
queryRecords = await this._postQueryFilter(queryRecords);
debug(`Total number of records is (${queryRecords.length})`);
Expand Down
5 changes: 4 additions & 1 deletion src/cache_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ export default class CacheHandler {
);
}

async categorizeEdges(qEdges: QEdge[]): Promise<{ cachedRecords: Record[]; nonCachedQEdges: QEdge[] }> {
async categorizeEdges(qEdges: QEdge[], abortSignal?: AbortSignal): Promise<{ cachedRecords: Record[]; nonCachedQEdges: QEdge[] }> {
if (this.cacheEnabled === false || process.env.INTERNAL_DISABLE_REDIS === 'true') {
return {
cachedRecords: [],
Expand All @@ -123,13 +123,16 @@ export default class CacheHandler {
let cachedRecords: Record[] = [];
debug('Begin edge cache lookup...');
await async.eachSeries(qEdges, async (qEdge) => {
if (abortSignal?.aborted) return;
const qEdgeMetaKGHash = this._hashEdgeByMetaKG(qEdge.getHashedEdgeRepresentation());
const unpackedRecords: Record[] = await new Promise((resolve) => {
const redisID = 'bte:edgeCache:' + qEdgeMetaKGHash;
redisClient.client.usingLock([`redisLock:${redisID}`], 600000, async () => {
try {
const compressedRecordPack = await redisClient.client.hgetallTimeout(redisID);

if (abortSignal?.aborted) resolve([]);

if (compressedRecordPack && Object.keys(compressedRecordPack).length) {
const recordPack = [];

Expand Down
6 changes: 4 additions & 2 deletions src/edge_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,11 @@ export default class QueryEdgeManager {
debug(logMessage);
}

async executeEdges(): Promise<boolean> {
async executeEdges(abortSignal?: AbortSignal): Promise<boolean> {
const unavailableAPIs: UnavailableAPITracker = {};
while (this.getEdgesNotExecuted()) {
if (abortSignal?.aborted) return false;

const span = Telemetry.startSpan({ description: 'edgeExecution' });
//next available/most efficient edge
const currentQEdge = this.getNext();
Expand All @@ -423,7 +425,7 @@ export default class QueryEdgeManager {
);
debug(`(5) Executing current edge >> "${currentQEdge.getID()}"`);
//execute current edge query
const queryRecords = await queryBatchHandler.query(queryBatchHandler.qEdges, unavailableAPIs);
const queryRecords = await queryBatchHandler.query(queryBatchHandler.qEdges, unavailableAPIs, abortSignal);
this.logs = [...this.logs, ...queryBatchHandler.logs];
if (queryRecords === undefined) return;
// create an edge execution summary
Expand Down
6 changes: 4 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ export default class TRAPIQueryHandler {
];
};

async query(): Promise<void> {
async query(abortSignal?: AbortSignal): Promise<void> {
this._initializeResponse();
await this.addQueryNodes();

Expand Down Expand Up @@ -769,12 +769,14 @@ export default class TRAPIQueryHandler {
}
const manager = new EdgeManager(queryEdges, metaKG, this.subclassEdges, this.options);

const executionSuccess = await manager.executeEdges();
const executionSuccess = await manager.executeEdges(abortSignal);
this.logs = [...this.logs, ...manager.logs];
if (!executionSuccess) {
return;
}

if (abortSignal?.aborted) return;

const span3 = Telemetry.startSpan({ description: 'resultsAssembly' });

// update query graph
Expand Down
Loading

0 comments on commit e70395c

Please sign in to comment.