Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow queries to be aborted #78

Merged
merged 4 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion __test__/unittest/template_query_builder.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import qb from "../../src/builder/template_query_builder";
import path from "path";
import fs from "fs";
import { QueryHandlerOptions } from "../../src/types";
import { QueryHandlerOptions } from "@biothings-explorer/types";

describe("test query builder class", () => {
describe("test _getUrl function", () => {
Expand Down
2 changes: 1 addition & 1 deletion __test__/unittest/trapi_query_builder.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*/

import TRAPIQueryBuilder from "../../src/builder/trapi_query_builder";
import { QueryHandlerOptions } from "../../src/types";
import { QueryHandlerOptions } from "@biothings-explorer/types";

describe("test trapi query builder class", () => {
describe("test getConfig function", () => {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"@biothings-explorer/utils": "workspace:../utils",
"@biothings-explorer/types": "workspace:../types",
"@sentry/node": "^7.74.1",
"axios": "^0.21.4",
"axios": "^1.7.2",
"axios-retry": "^3.8.1",
"debug": "^4.3.4",
"husky": "^4.3.8",
Expand Down
3 changes: 2 additions & 1 deletion src/builder/base_query_builder.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { AxiosRequestConfig, Method } from "axios";
import { QueryParams, APIEdge, QueryHandlerOptions } from "../types";
import { QueryParams, APIEdge } from "../types";
import { QueryHandlerOptions } from "@biothings-explorer/types";

/**
* Build API queries serving as input for Axios library based on BTE Edge info
Expand Down
3 changes: 2 additions & 1 deletion src/builder/builder_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import TRAPIQueryBuilder from "./trapi_query_builder";
import TemplateQueryBuilder from "./template_query_builder";
import Debug from "debug";
const debug = Debug("bte:call-apis:query");
import type { APIEdge, QueryHandlerOptions } from "../types";
import type { APIEdge } from "../types";
import { QueryHandlerOptions } from "@biothings-explorer/types";

function builderFactory(
APIEdge: APIEdge,
Expand Down
29 changes: 18 additions & 11 deletions src/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import { Record } from "@biothings-explorer/api-response-transform";
import BaseQueryBuilder from "./builder/base_query_builder";
import APIQueryPool from "./query_pool";
import APIQueryQueue from "./query_queue";
import { QueryHandlerOptions, UnavailableAPITracker } from "./types";
import { UnavailableAPITracker } from "./types";
import { QueryHandlerOptions } from "@biothings-explorer/types";
import {
LogEntry,
StampedLog,
Expand Down Expand Up @@ -48,7 +49,7 @@ export default class SubQueryDispatcher {
this.logs = [];
}

async execute(): Promise<{ records: Record[]; logs: StampedLog[] }> {
async execute(abortSignal?: AbortSignal): Promise<{ records: Record[]; logs: StampedLog[] }> {
const promise: Promise<{ records: Record[]; logs: StampedLog[] }> =
new Promise(resolve => {
this.complete = ({
Expand All @@ -60,12 +61,12 @@ export default class SubQueryDispatcher {
}) => resolve({ records, logs });
});
for (let i = 0; i < this.pool.size; i++) {
await this.queryPool();
await this.queryPool(abortSignal);
}
return promise;
}

async queryPool(): Promise<void> {
async queryPool(abortSignal?: AbortSignal): Promise<void> {
const query = await this.queue.getNext();
if (!query) return;

Expand All @@ -74,16 +75,17 @@ export default class SubQueryDispatcher {
query,
this.unavailableAPIs,
async (logs, records, followUp) => {
await this.onQueryComplete(logs, records, followUp);
await this.onQueryComplete(logs, records, followUp, abortSignal);
},
abortSignal
);
}

checkMaxRecords(): boolean {
return (
(this.maxRecords > 0 && this.totalRecords >= this.maxRecords) ||
(this.globalMaxRecords > 0 &&
global.queryInformation?.totalRecords > this.globalMaxRecords)
global.queryInformation?.totalRecords?.[this.options.handlerIndex ?? 0] > this.globalMaxRecords)
);
}

Expand All @@ -96,15 +98,20 @@ export default class SubQueryDispatcher {
logs: StampedLog[],
records?: Record[],
followUp?: BaseQueryBuilder[],
abortSignal?: AbortSignal
): Promise<void> {
if (this.done) return;
if (logs) this.logs.push(...logs);
const globalRecordsIndex = this.options.handlerIndex ?? 0;
if (records) {
this.records.push(...records);

const globalRecords = global.queryInformation?.totalRecords;
const globalRecords = global.queryInformation?.totalRecords?.[globalRecordsIndex];
if (global.queryInformation) {
global.queryInformation.totalRecords = globalRecords
if (!global.queryInformation.totalRecords) {
global.queryInformation.totalRecords = {}
}
global.queryInformation.totalRecords[globalRecordsIndex] = globalRecords
? globalRecords + records.length
: records.length;
}
Expand All @@ -118,7 +125,7 @@ export default class SubQueryDispatcher {
if (this.checkMaxRecords()) {
const stoppedOnGlobalMax =
this.globalMaxRecords > 0 &&
global.queryInformation?.totalRecords >= this.globalMaxRecords;
global.queryInformation?.totalRecords?.[globalRecordsIndex] >= this.globalMaxRecords;
let message = [
`Qedge ${this.qEdgeID}`,
`obtained ${this.records.length} records,`,
Expand All @@ -133,7 +140,7 @@ export default class SubQueryDispatcher {
message = message.slice(0, 2);
message.push(
...[
`totalling ${global.queryInformation.totalRecords} for this query.`,
`totalling ${global.queryInformation.totalRecords?.[globalRecordsIndex]} for this query.`,
`This exceeds the per-query maximum of ${this.globalMaxRecords}.`,
`For stability purposes, this query is terminated.`,
`Please consider refining your query further.`,
Expand All @@ -158,6 +165,6 @@ export default class SubQueryDispatcher {
});
return;
}
await this.queryPool();
await this.queryPool(abortSignal);
}
}
15 changes: 10 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { LogEntry, StampedLog, RedisClient } from "@biothings-explorer/utils";
import { APIEdge, QueryHandlerOptions, UnavailableAPITracker } from "./types";
import { APIEdge, UnavailableAPITracker } from "./types";
import { QueryHandlerOptions } from "@biothings-explorer/types";
import Debug from "debug";
const debug = Debug("bte:call-apis:query");
import queryBuilder from "./builder/builder_factory";
Expand Down Expand Up @@ -81,15 +82,16 @@ export default class APIQueryDispatcher {
async _annotate(
records: Record[],
resolveOutputIDs = true,
abortSignal?: AbortSignal
): Promise<Record[]> {
const groupedCuries = this._groupCuriesBySemanticType(records);
let res: SRIResolverOutput | ResolverOutput;
let attributes: unknown;
if (resolveOutputIDs === false) {
res = generateInvalidBioentities(groupedCuries);
} else {
res = await resolveSRI(groupedCuries);
attributes = await getAttributes(groupedCuries);
res = await resolveSRI(groupedCuries, abortSignal);
attributes = await getAttributes(groupedCuries, abortSignal);
}
records.map(record => {
if (record && record !== undefined) {
Expand Down Expand Up @@ -128,6 +130,7 @@ export default class APIQueryDispatcher {
async query(
resolveOutputIDs = true,
unavailableAPIs: UnavailableAPITracker = {},
abortSignal?: AbortSignal
): Promise<Record[]> {
// Used for temporarily storing a message to log via both debug and TRAPI logs
let message: string;
Expand All @@ -150,7 +153,7 @@ export default class APIQueryDispatcher {
unavailableAPIs,
this.options,
);
const { records, logs } = await subQueryDispatcher.execute();
const { records, logs } = await subQueryDispatcher.execute(abortSignal);
this.logs.push(...logs);
// Occurs when globalMaxRecords hit, requiring query termination
if (!records) return undefined;
Expand All @@ -168,8 +171,10 @@ export default class APIQueryDispatcher {
debug(message);
this.logs.push(new LogEntry("DEBUG", null, message).getLog());

if (abortSignal?.aborted) return [];

debug("Start to use id resolver module to annotate output ids.");
const annotatedRecords = await this._annotate(records, resolveOutputIDs);
const annotatedRecords = await this._annotate(records, resolveOutputIDs, abortSignal);
debug("id annotation completes");
debug(`qEdge queries complete in ${timeElapsed}${timeUnits}`);
this.logs.push(
Expand Down
54 changes: 28 additions & 26 deletions src/query_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,14 @@ import Transformer, {
Record,
} from "@biothings-explorer/api-response-transform";
import BaseQueryBuilder from "./builder/base_query_builder";
import {
APIDefinition,
QueryHandlerOptions,
UnavailableAPITracker,
} from "./types";
import { UnavailableAPITracker } from "./types";
import os from "os";
import axios, { AxiosRequestConfig, AxiosResponse } from "axios";
import Debug from "debug";
const debug = Debug("bte:call-apis:query");
import { Telemetry, LogEntry, StampedLog } from "@biothings-explorer/utils";
import axiosRetry from "axios-retry";
import { QueryHandlerOptions, APIDefinition } from "@biothings-explorer/types";

const SUBQUERY_DEFAULT_TIMEOUT = parseInt(
process.env.SUBQUERY_DEFAULT_TIMEOUT ?? "50000",
Expand Down Expand Up @@ -57,6 +54,7 @@ export default class APIQueryPool {
query: BaseQueryBuilder,
unavailableAPIs: UnavailableAPITracker,
logs: StampedLog[],
abortSignal?: AbortSignal,
): {
queryConfig: AxiosRequestConfig;
nInputs: number;
Expand All @@ -71,9 +69,8 @@ export default class APIQueryPool {
queryConfig = query.getConfig();
// Skip if query API has been marked unavailable
if (unavailableAPIs[query.APIEdge.query_operation.server]?.skip) {
unavailableAPIs[
query.APIEdge.query_operation.server
].skippedQueries += 1;
unavailableAPIs[query.APIEdge.query_operation.server].skippedQueries +=
1;
return undefined;
}
if (Array.isArray(query.APIEdge.input)) {
Expand Down Expand Up @@ -105,6 +102,7 @@ export default class APIQueryPool {
queryConfig.timeout = this._getTimeout(
query.APIEdge.association.smartapi.id,
);
queryConfig.signal = abortSignal;

return {
queryConfig,
Expand All @@ -119,8 +117,8 @@ export default class APIQueryPool {
"ERROR",
null,
`${(
(error as Error).stack
).toString()} while configuring query. Query dump: ${JSON.stringify(
error as Error
).stack.toString()} while configuring query. Query dump: ${JSON.stringify(
query,
)}`,
).getLog(),
Expand All @@ -137,9 +135,10 @@ export default class APIQueryPool {
records?: Record[],
followUp?: BaseQueryBuilder[],
) => Promise<void>,
abortSignal?: AbortSignal,
) {
// Check if pool has been stopped due to limit hit (save some computation)
if (this.stop) {
if (this.stop || abortSignal?.aborted) {
await finish();
return;
}
Expand All @@ -154,6 +153,7 @@ export default class APIQueryPool {
query,
unavailableAPIs,
logs,
abortSignal,
);

if (!queryConfigAttempt) {
Expand Down Expand Up @@ -187,8 +187,9 @@ export default class APIQueryPool {
retryDelay: axiosRetry.exponentialDelay,
retryCondition: err => {
return (
axiosRetry.isNetworkOrIdempotentRequestError(err) ||
err.response?.status >= 500
(axiosRetry.isNetworkOrIdempotentRequestError(err) ||
err.response?.status >= 500) &&
!abortSignal?.aborted
);
},
});
Expand Down Expand Up @@ -223,20 +224,16 @@ export default class APIQueryPool {
edge: query.APIEdge,
};

const {paginationStart: queryNeedsPagination, paginationSize} = query.needPagination(
unTransformedHits.response,
);
const { paginationStart: queryNeedsPagination, paginationSize } =
query.needPagination(unTransformedHits.response);
if (queryNeedsPagination) {
const log = `Query requires pagination, will re-query to window ${queryNeedsPagination}-${
queryNeedsPagination + paginationSize
}: ${query.APIEdge.query_operation.server} (${nInputs} ID${
nInputs > 1 ? "s" : ""
})`;
const log = `Query requires pagination, will re-query to window ${queryNeedsPagination}-${queryNeedsPagination + paginationSize
}: ${query.APIEdge.query_operation.server} (${nInputs} ID${nInputs > 1 ? "s" : ""
})`;
debug(log);
if (queryNeedsPagination >= 9000) {
const log = `Biothings query reaches 10,000 max: ${
query.APIEdge.query_operation.server
} (${nInputs} ID${nInputs > 1 ? "s" : ""})`;
const log = `Biothings query reaches 10,000 max: ${query.APIEdge.query_operation.server
} (${nInputs} ID${nInputs > 1 ? "s" : ""})`;
debug(log);
logs.push(new LogEntry("WARNING", null, log).getLog());
}
Expand All @@ -259,8 +256,13 @@ export default class APIQueryPool {
transformSpan.finish();

if (global.queryInformation?.queryGraph) {
const globalRecords = global.queryInformation.totalRecords;
global.queryInformation.totalRecords = globalRecords
const globalRecordsIndex = this.options.handlerIndex ?? 0;
const globalRecords =
global.queryInformation?.totalRecords?.[globalRecordsIndex];
if (!global.queryInformation.totalRecords) {
global.queryInformation.totalRecords = {}
}
global.queryInformation.totalRecords[globalRecordsIndex] = globalRecords
? globalRecords + transformedRecords.length
: transformedRecords.length;
}
Expand Down
15 changes: 0 additions & 15 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,6 @@ export interface APIList {
exclude: APIDefinition[];
}

export interface QueryHandlerOptions {
provenanceUsesServiceProvider?: boolean;
smartAPIID?: string;
teamName?: string;
enableIDResolution?: boolean;
// TODO: type instances of `any`
apiList?: APIList;
schema?: any; // might be hard to type -- it's the entire TRAPI schema IIRC
dryrun?: boolean;
resolveOutputIDs?: boolean;
submitter?: string;
caching?: boolean; // from request url query values
EDGE_ATTRIBUTES_USED_IN_RECORD_HASH?: string[];
}

export interface UnavailableAPITracker {
[server: string]: { skip: boolean; skippedQueries: number };
}
Loading