Skip to content

Commit

Permalink
Merge pull request #78 from biothings/parallel-creative
Browse files Browse the repository at this point in the history
allow queries to be aborted
  • Loading branch information
tokebe authored Oct 11, 2024
2 parents 273b866 + 5bfebea commit f02d561
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 62 deletions.
2 changes: 1 addition & 1 deletion __test__/unittest/template_query_builder.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import qb from "../../src/builder/template_query_builder";
import path from "path";
import fs from "fs";
import { QueryHandlerOptions } from "../../src/types";
import { QueryHandlerOptions } from "@biothings-explorer/types";

describe("test query builder class", () => {
describe("test _getUrl function", () => {
Expand Down
2 changes: 1 addition & 1 deletion __test__/unittest/trapi_query_builder.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*/

import TRAPIQueryBuilder from "../../src/builder/trapi_query_builder";
import { QueryHandlerOptions } from "../../src/types";
import { QueryHandlerOptions } from "@biothings-explorer/types";

describe("test trapi query builder class", () => {
describe("test getConfig function", () => {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"@biothings-explorer/utils": "workspace:../utils",
"@biothings-explorer/types": "workspace:../types",
"@sentry/node": "^7.74.1",
"axios": "^0.21.4",
"axios": "^1.7.2",
"axios-retry": "^3.8.1",
"debug": "^4.3.4",
"husky": "^4.3.8",
Expand Down
3 changes: 2 additions & 1 deletion src/builder/base_query_builder.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { AxiosRequestConfig, Method } from "axios";
import { QueryParams, APIEdge, QueryHandlerOptions } from "../types";
import { QueryParams, APIEdge } from "../types";
import { QueryHandlerOptions } from "@biothings-explorer/types";

/**
* Build API queries serving as input for Axios library based on BTE Edge info
Expand Down
3 changes: 2 additions & 1 deletion src/builder/builder_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import TRAPIQueryBuilder from "./trapi_query_builder";
import TemplateQueryBuilder from "./template_query_builder";
import Debug from "debug";
const debug = Debug("bte:call-apis:query");
import type { APIEdge, QueryHandlerOptions } from "../types";
import type { APIEdge } from "../types";
import { QueryHandlerOptions } from "@biothings-explorer/types";

function builderFactory(
APIEdge: APIEdge,
Expand Down
29 changes: 18 additions & 11 deletions src/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import { Record } from "@biothings-explorer/api-response-transform";
import BaseQueryBuilder from "./builder/base_query_builder";
import APIQueryPool from "./query_pool";
import APIQueryQueue from "./query_queue";
import { QueryHandlerOptions, UnavailableAPITracker } from "./types";
import { UnavailableAPITracker } from "./types";
import { QueryHandlerOptions } from "@biothings-explorer/types";
import {
LogEntry,
StampedLog,
Expand Down Expand Up @@ -48,7 +49,7 @@ export default class SubQueryDispatcher {
this.logs = [];
}

async execute(): Promise<{ records: Record[]; logs: StampedLog[] }> {
async execute(abortSignal?: AbortSignal): Promise<{ records: Record[]; logs: StampedLog[] }> {
const promise: Promise<{ records: Record[]; logs: StampedLog[] }> =
new Promise(resolve => {
this.complete = ({
Expand All @@ -60,12 +61,12 @@ export default class SubQueryDispatcher {
}) => resolve({ records, logs });
});
for (let i = 0; i < this.pool.size; i++) {
await this.queryPool();
await this.queryPool(abortSignal);
}
return promise;
}

async queryPool(): Promise<void> {
async queryPool(abortSignal?: AbortSignal): Promise<void> {
const query = await this.queue.getNext();
if (!query) return;

Expand All @@ -74,16 +75,17 @@ export default class SubQueryDispatcher {
query,
this.unavailableAPIs,
async (logs, records, followUp) => {
await this.onQueryComplete(logs, records, followUp);
await this.onQueryComplete(logs, records, followUp, abortSignal);
},
abortSignal
);
}

checkMaxRecords(): boolean {
return (
(this.maxRecords > 0 && this.totalRecords >= this.maxRecords) ||
(this.globalMaxRecords > 0 &&
global.queryInformation?.totalRecords > this.globalMaxRecords)
global.queryInformation?.totalRecords?.[this.options.handlerIndex ?? 0] > this.globalMaxRecords)
);
}

Expand All @@ -96,15 +98,20 @@ export default class SubQueryDispatcher {
logs: StampedLog[],
records?: Record[],
followUp?: BaseQueryBuilder[],
abortSignal?: AbortSignal
): Promise<void> {
if (this.done) return;
if (logs) this.logs.push(...logs);
const globalRecordsIndex = this.options.handlerIndex ?? 0;
if (records) {
this.records.push(...records);

const globalRecords = global.queryInformation?.totalRecords;
const globalRecords = global.queryInformation?.totalRecords?.[globalRecordsIndex];
if (global.queryInformation) {
global.queryInformation.totalRecords = globalRecords
if (!global.queryInformation.totalRecords) {
global.queryInformation.totalRecords = {}
}
global.queryInformation.totalRecords[globalRecordsIndex] = globalRecords
? globalRecords + records.length
: records.length;
}
Expand All @@ -118,7 +125,7 @@ export default class SubQueryDispatcher {
if (this.checkMaxRecords()) {
const stoppedOnGlobalMax =
this.globalMaxRecords > 0 &&
global.queryInformation?.totalRecords >= this.globalMaxRecords;
global.queryInformation?.totalRecords?.[globalRecordsIndex] >= this.globalMaxRecords;
let message = [
`Qedge ${this.qEdgeID}`,
`obtained ${this.records.length} records,`,
Expand All @@ -133,7 +140,7 @@ export default class SubQueryDispatcher {
message = message.slice(0, 2);
message.push(
...[
`totalling ${global.queryInformation.totalRecords} for this query.`,
`totalling ${global.queryInformation.totalRecords?.[globalRecordsIndex]} for this query.`,
`This exceeds the per-query maximum of ${this.globalMaxRecords}.`,
`For stability purposes, this query is terminated.`,
`Please consider refining your query further.`,
Expand All @@ -158,6 +165,6 @@ export default class SubQueryDispatcher {
});
return;
}
await this.queryPool();
await this.queryPool(abortSignal);
}
}
15 changes: 10 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { LogEntry, StampedLog, RedisClient } from "@biothings-explorer/utils";
import { APIEdge, QueryHandlerOptions, UnavailableAPITracker } from "./types";
import { APIEdge, UnavailableAPITracker } from "./types";
import { QueryHandlerOptions } from "@biothings-explorer/types";
import Debug from "debug";
const debug = Debug("bte:call-apis:query");
import queryBuilder from "./builder/builder_factory";
Expand Down Expand Up @@ -81,15 +82,16 @@ export default class APIQueryDispatcher {
async _annotate(
records: Record[],
resolveOutputIDs = true,
abortSignal?: AbortSignal
): Promise<Record[]> {
const groupedCuries = this._groupCuriesBySemanticType(records);
let res: SRIResolverOutput | ResolverOutput;
let attributes: unknown;
if (resolveOutputIDs === false) {
res = generateInvalidBioentities(groupedCuries);
} else {
res = await resolveSRI(groupedCuries);
attributes = await getAttributes(groupedCuries);
res = await resolveSRI(groupedCuries, abortSignal);
attributes = await getAttributes(groupedCuries, abortSignal);
}
records.map(record => {
if (record && record !== undefined) {
Expand Down Expand Up @@ -128,6 +130,7 @@ export default class APIQueryDispatcher {
async query(
resolveOutputIDs = true,
unavailableAPIs: UnavailableAPITracker = {},
abortSignal?: AbortSignal
): Promise<Record[]> {
// Used for temporarily storing a message to log via both debug and TRAPI logs
let message: string;
Expand All @@ -150,7 +153,7 @@ export default class APIQueryDispatcher {
unavailableAPIs,
this.options,
);
const { records, logs } = await subQueryDispatcher.execute();
const { records, logs } = await subQueryDispatcher.execute(abortSignal);
this.logs.push(...logs);
// Occurs when globalMaxRecords hit, requiring query termination
if (!records) return undefined;
Expand All @@ -168,8 +171,10 @@ export default class APIQueryDispatcher {
debug(message);
this.logs.push(new LogEntry("DEBUG", null, message).getLog());

if (abortSignal?.aborted) return [];

debug("Start to use id resolver module to annotate output ids.");
const annotatedRecords = await this._annotate(records, resolveOutputIDs);
const annotatedRecords = await this._annotate(records, resolveOutputIDs, abortSignal);
debug("id annotation completes");
debug(`qEdge queries complete in ${timeElapsed}${timeUnits}`);
this.logs.push(
Expand Down
54 changes: 28 additions & 26 deletions src/query_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,14 @@ import Transformer, {
Record,
} from "@biothings-explorer/api-response-transform";
import BaseQueryBuilder from "./builder/base_query_builder";
import {
APIDefinition,
QueryHandlerOptions,
UnavailableAPITracker,
} from "./types";
import { UnavailableAPITracker } from "./types";
import os from "os";
import axios, { AxiosRequestConfig, AxiosResponse } from "axios";
import Debug from "debug";
const debug = Debug("bte:call-apis:query");
import { Telemetry, LogEntry, StampedLog } from "@biothings-explorer/utils";
import axiosRetry from "axios-retry";
import { QueryHandlerOptions, APIDefinition } from "@biothings-explorer/types";

const SUBQUERY_DEFAULT_TIMEOUT = parseInt(
process.env.SUBQUERY_DEFAULT_TIMEOUT ?? "50000",
Expand Down Expand Up @@ -57,6 +54,7 @@ export default class APIQueryPool {
query: BaseQueryBuilder,
unavailableAPIs: UnavailableAPITracker,
logs: StampedLog[],
abortSignal?: AbortSignal,
): {
queryConfig: AxiosRequestConfig;
nInputs: number;
Expand All @@ -71,9 +69,8 @@ export default class APIQueryPool {
queryConfig = query.getConfig();
// Skip if query API has been marked unavailable
if (unavailableAPIs[query.APIEdge.query_operation.server]?.skip) {
unavailableAPIs[
query.APIEdge.query_operation.server
].skippedQueries += 1;
unavailableAPIs[query.APIEdge.query_operation.server].skippedQueries +=
1;
return undefined;
}
if (Array.isArray(query.APIEdge.input)) {
Expand Down Expand Up @@ -105,6 +102,7 @@ export default class APIQueryPool {
queryConfig.timeout = this._getTimeout(
query.APIEdge.association.smartapi.id,
);
queryConfig.signal = abortSignal;

return {
queryConfig,
Expand All @@ -119,8 +117,8 @@ export default class APIQueryPool {
"ERROR",
null,
`${(
(error as Error).stack
).toString()} while configuring query. Query dump: ${JSON.stringify(
error as Error
).stack.toString()} while configuring query. Query dump: ${JSON.stringify(
query,
)}`,
).getLog(),
Expand All @@ -137,9 +135,10 @@ export default class APIQueryPool {
records?: Record[],
followUp?: BaseQueryBuilder[],
) => Promise<void>,
abortSignal?: AbortSignal,
) {
// Check if pool has been stopped due to limit hit (save some computation)
if (this.stop) {
if (this.stop || abortSignal?.aborted) {
await finish();
return;
}
Expand All @@ -154,6 +153,7 @@ export default class APIQueryPool {
query,
unavailableAPIs,
logs,
abortSignal,
);

if (!queryConfigAttempt) {
Expand Down Expand Up @@ -187,8 +187,9 @@ export default class APIQueryPool {
retryDelay: axiosRetry.exponentialDelay,
retryCondition: err => {
return (
axiosRetry.isNetworkOrIdempotentRequestError(err) ||
err.response?.status >= 500
(axiosRetry.isNetworkOrIdempotentRequestError(err) ||
err.response?.status >= 500) &&
!abortSignal?.aborted
);
},
});
Expand Down Expand Up @@ -223,20 +224,16 @@ export default class APIQueryPool {
edge: query.APIEdge,
};

const {paginationStart: queryNeedsPagination, paginationSize} = query.needPagination(
unTransformedHits.response,
);
const { paginationStart: queryNeedsPagination, paginationSize } =
query.needPagination(unTransformedHits.response);
if (queryNeedsPagination) {
const log = `Query requires pagination, will re-query to window ${queryNeedsPagination}-${
queryNeedsPagination + paginationSize
}: ${query.APIEdge.query_operation.server} (${nInputs} ID${
nInputs > 1 ? "s" : ""
})`;
const log = `Query requires pagination, will re-query to window ${queryNeedsPagination}-${queryNeedsPagination + paginationSize
}: ${query.APIEdge.query_operation.server} (${nInputs} ID${nInputs > 1 ? "s" : ""
})`;
debug(log);
if (queryNeedsPagination >= 9000) {
const log = `Biothings query reaches 10,000 max: ${
query.APIEdge.query_operation.server
} (${nInputs} ID${nInputs > 1 ? "s" : ""})`;
const log = `Biothings query reaches 10,000 max: ${query.APIEdge.query_operation.server
} (${nInputs} ID${nInputs > 1 ? "s" : ""})`;
debug(log);
logs.push(new LogEntry("WARNING", null, log).getLog());
}
Expand All @@ -259,8 +256,13 @@ export default class APIQueryPool {
transformSpan.finish();

if (global.queryInformation?.queryGraph) {
const globalRecords = global.queryInformation.totalRecords;
global.queryInformation.totalRecords = globalRecords
const globalRecordsIndex = this.options.handlerIndex ?? 0;
const globalRecords =
global.queryInformation?.totalRecords?.[globalRecordsIndex];
if (!global.queryInformation.totalRecords) {
global.queryInformation.totalRecords = {}
}
global.queryInformation.totalRecords[globalRecordsIndex] = globalRecords
? globalRecords + transformedRecords.length
: transformedRecords.length;
}
Expand Down
15 changes: 0 additions & 15 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,6 @@ export interface APIList {
exclude: APIDefinition[];
}

export interface QueryHandlerOptions {
provenanceUsesServiceProvider?: boolean;
smartAPIID?: string;
teamName?: string;
enableIDResolution?: boolean;
// TODO: type instances of `any`
apiList?: APIList;
schema?: any; // might be hard to type -- it's the entire TRAPI schema IIRC
dryrun?: boolean;
resolveOutputIDs?: boolean;
submitter?: string;
caching?: boolean; // from request url query values
EDGE_ATTRIBUTES_USED_IN_RECORD_HASH?: string[];
}

export interface UnavailableAPITracker {
[server: string]: { skip: boolean; skippedQueries: number };
}

0 comments on commit f02d561

Please sign in to comment.