Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run creative mode templates in parallel #197

Merged
merged 8 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions __test__/unittest/inferred_mode.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -557,14 +557,12 @@ describe('Test InferredQueryHandler', () => {
expect(report).toHaveProperty('querySuccess');
expect(report).toHaveProperty('queryHadResults');
expect(report).toHaveProperty('mergedResults');
expect(report).toHaveProperty('creativeLimitHit');

const { querySuccess, queryHadResults, mergedResults, creativeLimitHit } = report;
const { querySuccess, queryHadResults, mergedResults } = report;
expect(querySuccess).toBeTruthy();
expect(queryHadResults).toBeTruthy();
expect(Object.keys(mergedResults)).toHaveLength(2);
expect(Object.values(mergedResults)[0]).toEqual(1);
expect(creativeLimitHit).toBeTruthy();
expect(Object.keys(combinedResponse.message.results)).toHaveLength(3);
expect(combinedResponse.message.results['fakeCompound1-fakeDisease1'].analyses[0].score).toEqual(
0.7836531040612146,
Expand Down Expand Up @@ -730,13 +728,11 @@ describe('Test InferredQueryHandler', () => {
querySuccess: querySuccess1,
queryHadResults: queryHadResults1,
mergedResults: mergedResults1,
creativeLimitHit: creativeLimitHit1,
} = inferredQueryHandler.combineResponse(2, trapiQueryHandler1, qEdgeID, qEdge, combinedResponse, auxGraphSuffixes);

expect(querySuccess1).toBeTruthy();
expect(queryHadResults1).toBeTruthy();
expect(Object.keys(mergedResults1)).toHaveLength(1);
expect(creativeLimitHit1).toBeTruthy();
expect(combinedResponse.message.results['fakeCompound1-fakeDisease1'].analyses[0].score).toEqual(
0.7836531040612146,
);
Expand Down Expand Up @@ -992,9 +988,6 @@ describe('Test InferredQueryHandler', () => {
expect(response.message.knowledge_graph.nodes).toHaveProperty('creativeQueryObject');
expect(response.message.results[0].node_bindings).toHaveProperty('creativeQuerySubject');
expect(response.message.results[0].node_bindings).toHaveProperty('creativeQueryObject');
expect(response.logs.map((log) => log.message)).toContain(
'Addition of 1 results from Template 1 meets creative result maximum of 1 (reaching 1 merged). Response will be truncated to top-scoring 1 results. Skipping remaining 2 templates.',
);
});

test('supportedLookups', async () => {
Expand Down
16 changes: 9 additions & 7 deletions src/batch_edge_query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ export default class BatchEdgeQueryHandler {
/**
* @private
*/
async _queryAPIEdges(APIEdges: APIEdge[], unavailableAPIs: UnavailableAPITracker = {}): Promise<Record[]> {
async _queryAPIEdges(APIEdges: APIEdge[], unavailableAPIs: UnavailableAPITracker = {}, abortSignal?: AbortSignal): Promise<Record[]> {
const executor = new call_api(APIEdges, this.options, redisClient);
const records: Record[] = await executor.query(this.resolveOutputIDs, unavailableAPIs);
const records: Record[] = await executor.query(this.resolveOutputIDs, unavailableAPIs, abortSignal);
this.logs = [...this.logs, ...executor.logs];
return records;
}
Expand Down Expand Up @@ -123,18 +123,20 @@ export default class BatchEdgeQueryHandler {
});
}

async query(qEdges: QEdge | QEdge[], unavailableAPIs: UnavailableAPITracker = {}): Promise<Record[]> {
async query(qEdges: QEdge | QEdge[], unavailableAPIs: UnavailableAPITracker = {}, abortSignal?: AbortSignal): Promise<Record[]> {
debug('Node Update Start');
// it's now a single edge but convert to arr to simplify refactoring
qEdges = Array.isArray(qEdges) ? qEdges : [qEdges];
const nodeUpdate = new NodesUpdateHandler(qEdges);
// difference is there is no previous edge info anymore
await nodeUpdate.setEquivalentIDs(qEdges);
await nodeUpdate.setEquivalentIDs(qEdges, abortSignal);
await this._rmEquivalentDuplicates(qEdges);
debug('Node Update Success');

if (abortSignal?.aborted) return [];

const cacheHandler = new CacheHandler(this.caching, this.metaKG, this.options);
const { cachedRecords, nonCachedQEdges } = await cacheHandler.categorizeEdges(qEdges);
const { cachedRecords, nonCachedQEdges } = await cacheHandler.categorizeEdges(qEdges, abortSignal);
this.logs = [...this.logs, ...cacheHandler.logs];
let queryRecords: Record[];

Expand All @@ -154,8 +156,8 @@ export default class BatchEdgeQueryHandler {
}
const expanded_APIEdges = this._expandAPIEdges(APIEdges);
debug('Start to query APIEdges....');
queryRecords = await this._queryAPIEdges(expanded_APIEdges, unavailableAPIs);
if (queryRecords === undefined) return;
queryRecords = await this._queryAPIEdges(expanded_APIEdges, unavailableAPIs, abortSignal);
if (queryRecords === undefined || abortSignal?.aborted) return;
debug('APIEdges are successfully queried....');
queryRecords = await this._postQueryFilter(queryRecords);
debug(`Total number of records is (${queryRecords.length})`);
Expand Down
5 changes: 4 additions & 1 deletion src/cache_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ export default class CacheHandler {
);
}

async categorizeEdges(qEdges: QEdge[]): Promise<{ cachedRecords: Record[]; nonCachedQEdges: QEdge[] }> {
async categorizeEdges(qEdges: QEdge[], abortSignal?: AbortSignal): Promise<{ cachedRecords: Record[]; nonCachedQEdges: QEdge[] }> {
if (this.cacheEnabled === false || process.env.INTERNAL_DISABLE_REDIS === 'true') {
return {
cachedRecords: [],
Expand All @@ -123,13 +123,16 @@ export default class CacheHandler {
let cachedRecords: Record[] = [];
debug('Begin edge cache lookup...');
await async.eachSeries(qEdges, async (qEdge) => {
if (abortSignal?.aborted) return;
const qEdgeMetaKGHash = this._hashEdgeByMetaKG(qEdge.getHashedEdgeRepresentation());
const unpackedRecords: Record[] = await new Promise((resolve) => {
const redisID = 'bte:edgeCache:' + qEdgeMetaKGHash;
redisClient.client.usingLock([`redisLock:${redisID}`], 600000, async () => {
try {
const compressedRecordPack = await redisClient.client.hgetallTimeout(redisID);

if (abortSignal?.aborted) resolve([]);

if (compressedRecordPack && Object.keys(compressedRecordPack).length) {
const recordPack = [];

Expand Down
6 changes: 4 additions & 2 deletions src/edge_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,11 @@ export default class QueryEdgeManager {
debug(logMessage);
}

async executeEdges(): Promise<boolean> {
async executeEdges(abortSignal?: AbortSignal): Promise<boolean> {
const unavailableAPIs: UnavailableAPITracker = {};
while (this.getEdgesNotExecuted()) {
if (abortSignal?.aborted) return false;

const span = Telemetry.startSpan({ description: 'edgeExecution' });
//next available/most efficient edge
const currentQEdge = this.getNext();
Expand All @@ -423,7 +425,7 @@ export default class QueryEdgeManager {
);
debug(`(5) Executing current edge >> "${currentQEdge.getID()}"`);
//execute current edge query
const queryRecords = await queryBatchHandler.query(queryBatchHandler.qEdges, unavailableAPIs);
const queryRecords = await queryBatchHandler.query(queryBatchHandler.qEdges, unavailableAPIs, abortSignal);
this.logs = [...this.logs, ...queryBatchHandler.logs];
if (queryRecords === undefined) return;
// create an edge execution summary
Expand Down
6 changes: 4 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ export default class TRAPIQueryHandler {
];
};

async query(): Promise<void> {
async query(abortSignal?: AbortSignal): Promise<void> {
this._initializeResponse();
await this.addQueryNodes();

Expand Down Expand Up @@ -769,12 +769,14 @@ export default class TRAPIQueryHandler {
}
const manager = new EdgeManager(queryEdges, metaKG, this.subclassEdges, this.options);

const executionSuccess = await manager.executeEdges();
const executionSuccess = await manager.executeEdges(abortSignal);
this.logs = [...this.logs, ...manager.logs];
if (!executionSuccess) {
return;
}

if (abortSignal?.aborted) return;

const span3 = Telemetry.startSpan({ description: 'resultsAssembly' });

// update query graph
Expand Down
Loading
Loading