From 3ce0b79e43c06833031d0f30cba04295c7a96872 Mon Sep 17 00:00:00 2001 From: rjawesome Date: Tue, 25 Jun 2024 14:51:15 -0700 Subject: [PATCH] run creative mode templates in parallel --- src/inferred_mode/inferred_mode.ts | 116 ++++++++++++++--------------- 1 file changed, 54 insertions(+), 62 deletions(-) diff --git a/src/inferred_mode/inferred_mode.ts b/src/inferred_mode/inferred_mode.ts index bc6f3dd7..bc297b73 100644 --- a/src/inferred_mode/inferred_mode.ts +++ b/src/inferred_mode/inferred_mode.ts @@ -1,5 +1,5 @@ import Debug from 'debug'; -import { LogEntry, StampedLog, Telemetry } from '@biothings-explorer/utils'; +import { LogEntry, StampedLog, Telemetry, timeoutPromise } from '@biothings-explorer/utils'; import * as utils from '../utils'; import async from 'async'; import biolink from '../biolink'; @@ -38,7 +38,6 @@ export interface CombinedResponseReport { querySuccess: number; queryHadResults: boolean; mergedResults: { [resultID: string]: number }; - creativeLimitHit: boolean | number; } // MatchedTemplate, but with IDs, etc. filled in @@ -263,7 +262,6 @@ export default class InferredQueryHandler { querySuccess: 0, queryHadResults: false, mergedResults: {}, - creativeLimitHit: false, }; let mergedThisTemplate = 0; const resultIDsFromPrevious = new Set(Object.keys(combinedResponse.message.results)); @@ -428,9 +426,6 @@ export default class InferredQueryHandler { } report.querySuccess = 1; - if (Object.keys(combinedResponse.message.results).length >= this.CREATIVE_LIMIT && !report.creativeLimitHit) { - report.creativeLimitHit = Object.keys(newResponse.message.results).length; - } span.finish(); return report; } @@ -523,66 +518,49 @@ export default class InferredQueryHandler { [resultID: string]: number; } = {}; - await async.eachOfSeries(subQueries, async ({ template, queryGraph }, i) => { - const span = Telemetry.startSpan({ description: 'creativeTemplate' }); - span.setData('template', (i as number) + 1); - i = i as number; - if (stop) { - span.finish(); - return; - } - if (global.queryInformation?.queryGraph) { - global.queryInformation.isCreativeMode = true; - global.queryInformation.creativeTemplate = template; - } - const handler = new TRAPIQueryHandler(this.options, this.path, this.predicatePath, this.includeReasoner); - try { - // make query and combine results/kg/logs/etc + const QUERY_TIMEOUT = 4.8 * 60 * 1000; // 4.5 minutes + + const completedHandlers = await Promise.all( + subQueries.map(async ({ template, queryGraph }, i) => { + const span = Telemetry.startSpan({ description: 'creativeTemplate' }); + span.setData('template', i + 1); + const handler = new TRAPIQueryHandler(this.options, this.path, this.predicatePath, this.includeReasoner); handler.setQueryGraph(queryGraph); - await handler.query(); - const { querySuccess, queryHadResults, mergedResults, creativeLimitHit } = this.combineResponse( - i, - handler, - qEdgeID, - qEdge, - combinedResponse, - ); - // update values used in logging - successfulQueries += querySuccess; - if (queryHadResults) resultQueries.push(i); - Object.entries(mergedResults).forEach(([result, countMerged]) => { - mergedResultsCount[result] = - result in mergedResultsCount ? mergedResultsCount[result] + countMerged : countMerged; - }); - // log to user if we should stop - if (creativeLimitHit) { - stop = true; - const message = [ - `Addition of ${creativeLimitHit} results from Template ${i + 1}`, - Object.keys(combinedResponse.message.results).length === this.CREATIVE_LIMIT ? ' meets ' : ' exceeds ', - `creative result maximum of ${this.CREATIVE_LIMIT} (reaching ${ - Object.keys(combinedResponse.message.results).length - } merged). `, - `Response will be truncated to top-scoring ${this.CREATIVE_LIMIT} results. Skipping remaining ${ - subQueries.length - (i + 1) - } `, - subQueries.length - (i + 1) === 1 ? `template.` : `templates.`, - ].join(''); + try { + await timeoutPromise(handler.query(AbortSignal.timeout(QUERY_TIMEOUT)), QUERY_TIMEOUT); + } catch (error) { + handler.logs.forEach((log) => { + combinedResponse.logs.push(log); + }); + const message = `ERROR: Template-${i + 1} failed due to error ${error}`; debug(message); - combinedResponse.logs.push(new LogEntry(`INFO`, null, message).getLog()); + combinedResponse.logs.push(new LogEntry(`ERROR`, null, message).getLog()); + span.finish(); + return undefined; } span.finish(); - } catch (error) { - handler.logs.forEach((log) => { - combinedResponse.logs.push(log); - }); - const message = `ERROR: Template-${i + 1} failed due to error ${error}`; - debug(message); - combinedResponse.logs.push(new LogEntry(`ERROR`, null, message).getLog()); - span.finish(); - return; - } - }); + return { i, handler }; + }) + ); + + for (const handlerInfo of completedHandlers) { + if (handlerInfo === undefined) continue; + const { i, handler } = handlerInfo; + const { querySuccess, queryHadResults, mergedResults } = this.combineResponse( + i, + handler, + qEdgeID, + qEdge, + combinedResponse, + ); + successfulQueries += querySuccess; + if (queryHadResults) resultQueries.push(i); + Object.entries(mergedResults).forEach(([result, countMerged]) => { + mergedResultsCount[result] = + result in mergedResultsCount ? mergedResultsCount[result] + countMerged : countMerged; + }); + } + // log about merged Results if (Object.keys(mergedResultsCount).length) { // Add 1 for first instance of result (not counted during merging) @@ -613,6 +591,20 @@ export default class InferredQueryHandler { response.message.results = Object.values(combinedResponse.message.results).sort((a, b) => { return b.analyses[0].score - a.analyses[0].score ? b.analyses[0].score - a.analyses[0].score : 0; }); + + // log about trimming results + if (response.message.results.length > this.CREATIVE_LIMIT) { + const message = [ + `Number of results exceeds`, + `creative result maximum of ${this.CREATIVE_LIMIT} (reaching ${ + Object.keys(response.message.results).length + } merged). `, + `Response will be truncated to top-scoring ${this.CREATIVE_LIMIT} results.` + ].join(''); + debug(message); + combinedResponse.logs.push(new LogEntry(`INFO`, null, message).getLog()); + } + // trim extra results and prune kg response.message.results = response.message.results.slice(0, this.CREATIVE_LIMIT); response.description = `Query processed successfully, retrieved ${response.message.results.length} results.`;