From b86e66a9898586a74757dec3e752aed560647834 Mon Sep 17 00:00:00 2001 From: rjawesome Date: Mon, 24 Jun 2024 15:11:28 -0700 Subject: [PATCH 1/5] allow queries to be aborted --- src/batch_edge_query.ts | 16 +++++++++------- src/cache_handler.ts | 5 ++++- src/edge_manager.ts | 6 ++++-- src/index.ts | 6 ++++-- src/update_nodes.ts | 8 ++++---- 5 files changed, 25 insertions(+), 16 deletions(-) diff --git a/src/batch_edge_query.ts b/src/batch_edge_query.ts index d5504b21..0d843985 100644 --- a/src/batch_edge_query.ts +++ b/src/batch_edge_query.ts @@ -61,9 +61,9 @@ export default class BatchEdgeQueryHandler { /** * @private */ - async _queryAPIEdges(APIEdges: APIEdge[], unavailableAPIs: UnavailableAPITracker = {}): Promise { + async _queryAPIEdges(APIEdges: APIEdge[], unavailableAPIs: UnavailableAPITracker = {}, abortSignal?: AbortSignal): Promise { const executor = new call_api(APIEdges, this.options, redisClient); - const records: Record[] = await executor.query(this.resolveOutputIDs, unavailableAPIs); + const records: Record[] = await executor.query(this.resolveOutputIDs, unavailableAPIs, abortSignal); this.logs = [...this.logs, ...executor.logs]; return records; } @@ -123,18 +123,20 @@ export default class BatchEdgeQueryHandler { }); } - async query(qEdges: QEdge | QEdge[], unavailableAPIs: UnavailableAPITracker = {}): Promise { + async query(qEdges: QEdge | QEdge[], unavailableAPIs: UnavailableAPITracker = {}, abortSignal?: AbortSignal): Promise { debug('Node Update Start'); // it's now a single edge but convert to arr to simplify refactoring qEdges = Array.isArray(qEdges) ? qEdges : [qEdges]; const nodeUpdate = new NodesUpdateHandler(qEdges); // difference is there is no previous edge info anymore - await nodeUpdate.setEquivalentIDs(qEdges); + await nodeUpdate.setEquivalentIDs(qEdges, abortSignal); await this._rmEquivalentDuplicates(qEdges); debug('Node Update Success'); + if (abortSignal?.aborted) return []; + const cacheHandler = new CacheHandler(this.caching, this.metaKG, this.options); - const { cachedRecords, nonCachedQEdges } = await cacheHandler.categorizeEdges(qEdges); + const { cachedRecords, nonCachedQEdges } = await cacheHandler.categorizeEdges(qEdges, abortSignal); this.logs = [...this.logs, ...cacheHandler.logs]; let queryRecords: Record[]; @@ -154,8 +156,8 @@ export default class BatchEdgeQueryHandler { } const expanded_APIEdges = this._expandAPIEdges(APIEdges); debug('Start to query APIEdges....'); - queryRecords = await this._queryAPIEdges(expanded_APIEdges, unavailableAPIs); - if (queryRecords === undefined) return; + queryRecords = await this._queryAPIEdges(expanded_APIEdges, unavailableAPIs, abortSignal); + if (queryRecords === undefined || abortSignal?.aborted) return; debug('APIEdges are successfully queried....'); queryRecords = await this._postQueryFilter(queryRecords); debug(`Total number of records is (${queryRecords.length})`); diff --git a/src/cache_handler.ts b/src/cache_handler.ts index a52c6d1f..f62f3a84 100644 --- a/src/cache_handler.ts +++ b/src/cache_handler.ts @@ -112,7 +112,7 @@ export default class CacheHandler { ); } - async categorizeEdges(qEdges: QEdge[]): Promise<{ cachedRecords: Record[]; nonCachedQEdges: QEdge[] }> { + async categorizeEdges(qEdges: QEdge[], abortSignal?: AbortSignal): Promise<{ cachedRecords: Record[]; nonCachedQEdges: QEdge[] }> { if (this.cacheEnabled === false || process.env.INTERNAL_DISABLE_REDIS === 'true') { return { cachedRecords: [], @@ -123,6 +123,7 @@ export default class CacheHandler { let cachedRecords: Record[] = []; debug('Begin edge cache lookup...'); await async.eachSeries(qEdges, async (qEdge) => { + if (abortSignal?.aborted) return; const qEdgeMetaKGHash = this._hashEdgeByMetaKG(qEdge.getHashedEdgeRepresentation()); const unpackedRecords: Record[] = await new Promise((resolve) => { const redisID = 'bte:edgeCache:' + qEdgeMetaKGHash; @@ -130,6 +131,8 @@ export default class CacheHandler { try { const compressedRecordPack = await redisClient.client.hgetallTimeout(redisID); + if (abortSignal?.aborted) resolve([]); + if (compressedRecordPack && Object.keys(compressedRecordPack).length) { const recordPack = []; diff --git a/src/edge_manager.ts b/src/edge_manager.ts index 543a6ea5..a0db4dc2 100644 --- a/src/edge_manager.ts +++ b/src/edge_manager.ts @@ -384,9 +384,11 @@ export default class QueryEdgeManager { debug(logMessage); } - async executeEdges(): Promise { + async executeEdges(abortSignal?: AbortSignal): Promise { const unavailableAPIs: UnavailableAPITracker = {}; while (this.getEdgesNotExecuted()) { + if (abortSignal?.aborted) return false; + const span = Telemetry.startSpan({ description: 'edgeExecution' }); //next available/most efficient edge const currentQEdge = this.getNext(); @@ -402,7 +404,7 @@ export default class QueryEdgeManager { ); debug(`(5) Executing current edge >> "${currentQEdge.getID()}"`); //execute current edge query - const queryRecords = await queryBatchHandler.query(queryBatchHandler.qEdges, unavailableAPIs); + const queryRecords = await queryBatchHandler.query(queryBatchHandler.qEdges, unavailableAPIs, abortSignal); this.logs = [...this.logs, ...queryBatchHandler.logs]; if (queryRecords === undefined) return; // create an edge execution summary diff --git a/src/index.ts b/src/index.ts index 59401b36..ff860d0e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -620,7 +620,7 @@ export default class TRAPIQueryHandler { ]; }; - async query(): Promise { + async query(abortSignal?: AbortSignal): Promise { this._initializeResponse(); await this.addQueryNodes(); @@ -681,12 +681,14 @@ export default class TRAPIQueryHandler { } const manager = new EdgeManager(queryEdges, metaKG, this.options); - const executionSuccess = await manager.executeEdges(); + const executionSuccess = await manager.executeEdges(abortSignal); this.logs = [...this.logs, ...manager.logs]; if (!executionSuccess) { return; } + if (abortSignal?.aborted) return; + const span3 = Telemetry.startSpan({ description: 'resultsAssembly' }); // update query graph diff --git a/src/update_nodes.ts b/src/update_nodes.ts index c9c30f93..2b41d549 100644 --- a/src/update_nodes.ts +++ b/src/update_nodes.ts @@ -39,17 +39,17 @@ export default class NodesUpdateHandler { * Resolve input ids * @param {object} curies - each key represents the category, e.g. gene, value is an array of curies. */ - async _getEquivalentIDs(curies: ResolverInput): Promise { + async _getEquivalentIDs(curies: ResolverInput, abortSignal?: AbortSignal): Promise { // const resolver = new id_resolver.Resolver('biolink'); // const equivalentIDs = await resolver.resolve(curies); - return await resolveSRI(curies); + return await resolveSRI(curies, abortSignal); } - async setEquivalentIDs(qEdges: QEdge[]): Promise { + async setEquivalentIDs(qEdges: QEdge[], abortSignal?: AbortSignal): Promise { debug(`Getting equivalent IDs...`); const curies = this._getCuries(this.qEdges); debug(`curies: ${JSON.stringify(curies)}`); - const equivalentIDs = await this._getEquivalentIDs(curies); + const equivalentIDs = await this._getEquivalentIDs(curies, abortSignal); qEdges.map((qEdge) => { const edgeEquivalentIDs = Object.keys(equivalentIDs) .filter((key) => qEdge.getInputCurie().includes(key)) From 3ce0b79e43c06833031d0f30cba04295c7a96872 Mon Sep 17 00:00:00 2001 From: rjawesome Date: Tue, 25 Jun 2024 14:51:15 -0700 Subject: [PATCH 2/5] run creative mode templates in parallel --- src/inferred_mode/inferred_mode.ts | 116 ++++++++++++++--------------- 1 file changed, 54 insertions(+), 62 deletions(-) diff --git a/src/inferred_mode/inferred_mode.ts b/src/inferred_mode/inferred_mode.ts index bc6f3dd7..bc297b73 100644 --- a/src/inferred_mode/inferred_mode.ts +++ b/src/inferred_mode/inferred_mode.ts @@ -1,5 +1,5 @@ import Debug from 'debug'; -import { LogEntry, StampedLog, Telemetry } from '@biothings-explorer/utils'; +import { LogEntry, StampedLog, Telemetry, timeoutPromise } from '@biothings-explorer/utils'; import * as utils from '../utils'; import async from 'async'; import biolink from '../biolink'; @@ -38,7 +38,6 @@ export interface CombinedResponseReport { querySuccess: number; queryHadResults: boolean; mergedResults: { [resultID: string]: number }; - creativeLimitHit: boolean | number; } // MatchedTemplate, but with IDs, etc. filled in @@ -263,7 +262,6 @@ export default class InferredQueryHandler { querySuccess: 0, queryHadResults: false, mergedResults: {}, - creativeLimitHit: false, }; let mergedThisTemplate = 0; const resultIDsFromPrevious = new Set(Object.keys(combinedResponse.message.results)); @@ -428,9 +426,6 @@ export default class InferredQueryHandler { } report.querySuccess = 1; - if (Object.keys(combinedResponse.message.results).length >= this.CREATIVE_LIMIT && !report.creativeLimitHit) { - report.creativeLimitHit = Object.keys(newResponse.message.results).length; - } span.finish(); return report; } @@ -523,66 +518,49 @@ export default class InferredQueryHandler { [resultID: string]: number; } = {}; - await async.eachOfSeries(subQueries, async ({ template, queryGraph }, i) => { - const span = Telemetry.startSpan({ description: 'creativeTemplate' }); - span.setData('template', (i as number) + 1); - i = i as number; - if (stop) { - span.finish(); - return; - } - if (global.queryInformation?.queryGraph) { - global.queryInformation.isCreativeMode = true; - global.queryInformation.creativeTemplate = template; - } - const handler = new TRAPIQueryHandler(this.options, this.path, this.predicatePath, this.includeReasoner); - try { - // make query and combine results/kg/logs/etc + const QUERY_TIMEOUT = 4.8 * 60 * 1000; // 4.5 minutes + + const completedHandlers = await Promise.all( + subQueries.map(async ({ template, queryGraph }, i) => { + const span = Telemetry.startSpan({ description: 'creativeTemplate' }); + span.setData('template', i + 1); + const handler = new TRAPIQueryHandler(this.options, this.path, this.predicatePath, this.includeReasoner); handler.setQueryGraph(queryGraph); - await handler.query(); - const { querySuccess, queryHadResults, mergedResults, creativeLimitHit } = this.combineResponse( - i, - handler, - qEdgeID, - qEdge, - combinedResponse, - ); - // update values used in logging - successfulQueries += querySuccess; - if (queryHadResults) resultQueries.push(i); - Object.entries(mergedResults).forEach(([result, countMerged]) => { - mergedResultsCount[result] = - result in mergedResultsCount ? mergedResultsCount[result] + countMerged : countMerged; - }); - // log to user if we should stop - if (creativeLimitHit) { - stop = true; - const message = [ - `Addition of ${creativeLimitHit} results from Template ${i + 1}`, - Object.keys(combinedResponse.message.results).length === this.CREATIVE_LIMIT ? ' meets ' : ' exceeds ', - `creative result maximum of ${this.CREATIVE_LIMIT} (reaching ${ - Object.keys(combinedResponse.message.results).length - } merged). `, - `Response will be truncated to top-scoring ${this.CREATIVE_LIMIT} results. Skipping remaining ${ - subQueries.length - (i + 1) - } `, - subQueries.length - (i + 1) === 1 ? `template.` : `templates.`, - ].join(''); + try { + await timeoutPromise(handler.query(AbortSignal.timeout(QUERY_TIMEOUT)), QUERY_TIMEOUT); + } catch (error) { + handler.logs.forEach((log) => { + combinedResponse.logs.push(log); + }); + const message = `ERROR: Template-${i + 1} failed due to error ${error}`; debug(message); - combinedResponse.logs.push(new LogEntry(`INFO`, null, message).getLog()); + combinedResponse.logs.push(new LogEntry(`ERROR`, null, message).getLog()); + span.finish(); + return undefined; } span.finish(); - } catch (error) { - handler.logs.forEach((log) => { - combinedResponse.logs.push(log); - }); - const message = `ERROR: Template-${i + 1} failed due to error ${error}`; - debug(message); - combinedResponse.logs.push(new LogEntry(`ERROR`, null, message).getLog()); - span.finish(); - return; - } - }); + return { i, handler }; + }) + ); + + for (const handlerInfo of completedHandlers) { + if (handlerInfo === undefined) continue; + const { i, handler } = handlerInfo; + const { querySuccess, queryHadResults, mergedResults } = this.combineResponse( + i, + handler, + qEdgeID, + qEdge, + combinedResponse, + ); + successfulQueries += querySuccess; + if (queryHadResults) resultQueries.push(i); + Object.entries(mergedResults).forEach(([result, countMerged]) => { + mergedResultsCount[result] = + result in mergedResultsCount ? mergedResultsCount[result] + countMerged : countMerged; + }); + } + // log about merged Results if (Object.keys(mergedResultsCount).length) { // Add 1 for first instance of result (not counted during merging) @@ -613,6 +591,20 @@ export default class InferredQueryHandler { response.message.results = Object.values(combinedResponse.message.results).sort((a, b) => { return b.analyses[0].score - a.analyses[0].score ? b.analyses[0].score - a.analyses[0].score : 0; }); + + // log about trimming results + if (response.message.results.length > this.CREATIVE_LIMIT) { + const message = [ + `Number of results exceeds`, + `creative result maximum of ${this.CREATIVE_LIMIT} (reaching ${ + Object.keys(response.message.results).length + } merged). `, + `Response will be truncated to top-scoring ${this.CREATIVE_LIMIT} results.` + ].join(''); + debug(message); + combinedResponse.logs.push(new LogEntry(`INFO`, null, message).getLog()); + } + // trim extra results and prune kg response.message.results = response.message.results.slice(0, this.CREATIVE_LIMIT); response.description = `Query processed successfully, retrieved ${response.message.results.length} results.`; From 1203b2c3c429c5b90e20b4143d479c2d8f59ffab Mon Sep 17 00:00:00 2001 From: rjawesome Date: Wed, 26 Jun 2024 10:14:55 -0700 Subject: [PATCH 3/5] creative timeout environment variable --- src/inferred_mode/inferred_mode.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/inferred_mode/inferred_mode.ts b/src/inferred_mode/inferred_mode.ts index bc297b73..d5ec7a8d 100644 --- a/src/inferred_mode/inferred_mode.ts +++ b/src/inferred_mode/inferred_mode.ts @@ -52,6 +52,7 @@ export default class InferredQueryHandler { predicatePath: string; includeReasoner: boolean; CREATIVE_LIMIT: number; + CREATIVE_TIMEOUT: number; constructor( parent: TRAPIQueryHandler, queryGraph: TrapiQueryGraph, @@ -69,6 +70,7 @@ export default class InferredQueryHandler { this.predicatePath = predicatePath; this.includeReasoner = includeReasoner; this.CREATIVE_LIMIT = process.env.CREATIVE_LIMIT ? parseInt(process.env.CREATIVE_LIMIT) : 500; + this.CREATIVE_TIMEOUT = process.env.CREATIVE_TIMEOUT_S ? parseInt(process.env.CREATIVE_TIMEOUT) * 1000 : 4.8 * 60 * 1000; } get queryIsValid(): boolean { @@ -518,8 +520,6 @@ export default class InferredQueryHandler { [resultID: string]: number; } = {}; - const QUERY_TIMEOUT = 4.8 * 60 * 1000; // 4.5 minutes - const completedHandlers = await Promise.all( subQueries.map(async ({ template, queryGraph }, i) => { const span = Telemetry.startSpan({ description: 'creativeTemplate' }); @@ -527,7 +527,7 @@ export default class InferredQueryHandler { const handler = new TRAPIQueryHandler(this.options, this.path, this.predicatePath, this.includeReasoner); handler.setQueryGraph(queryGraph); try { - await timeoutPromise(handler.query(AbortSignal.timeout(QUERY_TIMEOUT)), QUERY_TIMEOUT); + await timeoutPromise(handler.query(AbortSignal.timeout(this.CREATIVE_TIMEOUT)), this.CREATIVE_TIMEOUT); } catch (error) { handler.logs.forEach((log) => { combinedResponse.logs.push(log); From 0a9c6a112f43462b50dca871d4a37c85d4f76468 Mon Sep 17 00:00:00 2001 From: rjawesome Date: Wed, 26 Jun 2024 11:17:25 -0700 Subject: [PATCH 4/5] debnug cpu usage --- src/inferred_mode/inferred_mode.ts | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/inferred_mode/inferred_mode.ts b/src/inferred_mode/inferred_mode.ts index d5ec7a8d..81323096 100644 --- a/src/inferred_mode/inferred_mode.ts +++ b/src/inferred_mode/inferred_mode.ts @@ -70,7 +70,7 @@ export default class InferredQueryHandler { this.predicatePath = predicatePath; this.includeReasoner = includeReasoner; this.CREATIVE_LIMIT = process.env.CREATIVE_LIMIT ? parseInt(process.env.CREATIVE_LIMIT) : 500; - this.CREATIVE_TIMEOUT = process.env.CREATIVE_TIMEOUT_S ? parseInt(process.env.CREATIVE_TIMEOUT) * 1000 : 4.8 * 60 * 1000; + this.CREATIVE_TIMEOUT = process.env.CREATIVE_TIMEOUT_S ? parseInt(process.env.CREATIVE_TIMEOUT) * 1000 : 4.75 * 60 * 1000; } get queryIsValid(): boolean { @@ -520,6 +520,11 @@ export default class InferredQueryHandler { [resultID: string]: number; } = {}; + // perf debugging + const startUsage = process.cpuUsage(); + const startTime = new Date().getTime(); + const ncpu = require('os').cpus().length; + const completedHandlers = await Promise.all( subQueries.map(async ({ template, queryGraph }, i) => { const span = Telemetry.startSpan({ description: 'creativeTemplate' }); @@ -543,6 +548,12 @@ export default class InferredQueryHandler { }) ); + // perf debugging + const endTime = new Date().getTime(); + const timeDelta = (endTime - startTime) * 10 * ncpu; + const { user, system } = process.cpuUsage(startUsage); + debug(`Average CPU Usage: ${(system + user) / timeDelta}%`); + for (const handlerInfo of completedHandlers) { if (handlerInfo === undefined) continue; const { i, handler } = handlerInfo; From 6aa562a588a15bbbd177f0e33bce06a38887f8d2 Mon Sep 17 00:00:00 2001 From: tokebe <43009413+tokebe@users.noreply.github.com> Date: Fri, 11 Oct 2024 15:38:34 -0400 Subject: [PATCH 5/5] test: fix tests --- __test__/unittest/inferred_mode.test.ts | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/__test__/unittest/inferred_mode.test.ts b/__test__/unittest/inferred_mode.test.ts index 3c95d449..f0cd9971 100644 --- a/__test__/unittest/inferred_mode.test.ts +++ b/__test__/unittest/inferred_mode.test.ts @@ -557,14 +557,12 @@ describe('Test InferredQueryHandler', () => { expect(report).toHaveProperty('querySuccess'); expect(report).toHaveProperty('queryHadResults'); expect(report).toHaveProperty('mergedResults'); - expect(report).toHaveProperty('creativeLimitHit'); - const { querySuccess, queryHadResults, mergedResults, creativeLimitHit } = report; + const { querySuccess, queryHadResults, mergedResults } = report; expect(querySuccess).toBeTruthy(); expect(queryHadResults).toBeTruthy(); expect(Object.keys(mergedResults)).toHaveLength(2); expect(Object.values(mergedResults)[0]).toEqual(1); - expect(creativeLimitHit).toBeTruthy(); expect(Object.keys(combinedResponse.message.results)).toHaveLength(3); expect(combinedResponse.message.results['fakeCompound1-fakeDisease1'].analyses[0].score).toEqual( 0.7836531040612146, @@ -730,13 +728,11 @@ describe('Test InferredQueryHandler', () => { querySuccess: querySuccess1, queryHadResults: queryHadResults1, mergedResults: mergedResults1, - creativeLimitHit: creativeLimitHit1, } = inferredQueryHandler.combineResponse(2, trapiQueryHandler1, qEdgeID, qEdge, combinedResponse, auxGraphSuffixes); expect(querySuccess1).toBeTruthy(); expect(queryHadResults1).toBeTruthy(); expect(Object.keys(mergedResults1)).toHaveLength(1); - expect(creativeLimitHit1).toBeTruthy(); expect(combinedResponse.message.results['fakeCompound1-fakeDisease1'].analyses[0].score).toEqual( 0.7836531040612146, ); @@ -992,9 +988,6 @@ describe('Test InferredQueryHandler', () => { expect(response.message.knowledge_graph.nodes).toHaveProperty('creativeQueryObject'); expect(response.message.results[0].node_bindings).toHaveProperty('creativeQuerySubject'); expect(response.message.results[0].node_bindings).toHaveProperty('creativeQueryObject'); - expect(response.logs.map((log) => log.message)).toContain( - 'Addition of 1 results from Template 1 meets creative result maximum of 1 (reaching 1 merged). Response will be truncated to top-scoring 1 results. Skipping remaining 2 templates.', - ); }); test('supportedLookups', async () => {