diff --git a/__test__/integration/BatchEdgeQueryHandler.test.ts b/__test__/integration/BatchEdgeQueryHandler.test.ts deleted file mode 100644 index 16795516..00000000 --- a/__test__/integration/BatchEdgeQueryHandler.test.ts +++ /dev/null @@ -1,19 +0,0 @@ -import BatchEdgeQueryHandler from '../../src/batch_edge_query'; -import MetaKG from '@biothings-explorer/smartapi-kg'; - -describe('Testing BatchEdgeQueryHandler Module', () => { - const kg = new MetaKG(); - kg.constructMetaKGSync(); - - describe('Testing query function', () => { - test('test subscribe and unsubscribe function', () => { - const batchHandler = new BatchEdgeQueryHandler(kg); - batchHandler.subscribe(1); - batchHandler.subscribe(2); - batchHandler.subscribe(3); - expect(batchHandler.subscribers).toContain(2); - batchHandler.unsubscribe(2); - expect(batchHandler.subscribers).not.toContain(2); - }); - }); -}); diff --git a/__test__/integration/TRAPIQueryHandler.test.ts b/__test__/integration/TRAPIQueryHandler.test.ts index b815ee4f..425fd3cc 100644 --- a/__test__/integration/TRAPIQueryHandler.test.ts +++ b/__test__/integration/TRAPIQueryHandler.test.ts @@ -1,5 +1,6 @@ -import axios from 'axios'; +import axios, { AxiosStatic } from 'axios'; jest.mock('axios'); +const mockedAxios = axios as jest.Mocked; import TRAPIQueryHandler from '../../src/index'; import path from 'path'; @@ -26,7 +27,7 @@ describe('Testing TRAPIQueryHandler Module', () => { }; describe('Testing query function', () => { test('test with one query edge', async () => { - (axios.get as jest.Mock).mockResolvedValue({ + (mockedAxios.get as jest.Mock).mockResolvedValue({ data: { message: { query_graph: { @@ -101,7 +102,7 @@ describe('Testing TRAPIQueryHandler Module', () => { workflow: [{ id: 'lookup' }], }, }); - (axios.post as jest.Mock).mockResolvedValue({ + (mockedAxios.post as jest.Mock).mockResolvedValue({ data: { 'MONDO:0005737': { id: { identifier: 'MONDO:0005737', label: 'Ebola hemorrhagic fever' }, @@ -139,6 +140,6 @@ describe('Testing TRAPIQueryHandler Module', () => { queryHandler.setQueryGraph(OneHopQuery); await queryHandler.query(); expect(queryHandler.knowledgeGraph.kg).toHaveProperty('nodes'); - }, 15000); + }, 30000); }); }); diff --git a/__test__/unittest/redisClient.test.ts b/__test__/unittest/redisClient.test.ts deleted file mode 100644 index 997126ed..00000000 --- a/__test__/unittest/redisClient.test.ts +++ /dev/null @@ -1,57 +0,0 @@ -import { redisClient as redisClientType } from '../../src/redis-client'; -import RedisMock from 'ioredis-mock'; - -describe('Test redis client', () => { - const OLD_ENV = process.env; - - beforeEach(() => { - jest.resetModules(); // Most important - it clears the cache - jest.clearAllMocks(); - jest.mock('ioredis', () => RedisMock); - process.env = { ...OLD_ENV }; // Make a copy - }); - - afterAll(() => { - process.env = OLD_ENV; // Restore old environment - }); - test('Client reports not enabled when REDIS_HOST is not set', () => { - const { redisClient } = require('../../src/redis-client'); - expect(redisClient.clientEnabled).toEqual(false); - }); - - test('will receive process.env variables', () => { - // Set the variables - process.env.REDIS_HOST = 'localhost'; - process.env.REDIS_PORT = '3367'; - const { redisClient } = require('../../src/redis-client'); - expect(redisClient).not.toEqual({}); - }); - - test('Test if record is correctly stored', async () => { - process.env.REDIS_HOST = 'localhost'; - process.env.REDIS_PORT = '3367'; - const { redisClient } = require('../../src/redis-client'); - await redisClient.client.setTimeout('record1', 'hello'); - const res = await redisClient.client.getTimeout('record1'); - expect(res).toEqual('hello'); - }); - - test('Test if record is correctly stored', async () => { - process.env.REDIS_HOST = 'localhost'; - process.env.REDIS_PORT = '3367'; - const { redisClient } = require('../../src/redis-client'); - await redisClient.client.setTimeout('record1', 'hello'); - const res = await redisClient.client.getTimeout('record1'); - expect(res).toEqual('hello'); - }); - - test('Test key should be removed after ttl', async () => { - process.env.REDIS_HOST = 'localhost'; - process.env.REDIS_PORT = '3367'; - const { redisClient } = require('../../src/redis-client'); - await redisClient.client.setTimeout('record1', 'hello', 'EX', 2); - await new Promise((r) => setTimeout(r, 3000)); - const res = await redisClient.client.getTimeout('record1'); - expect(res).not.toBeNull; - }); -}); diff --git a/package.json b/package.json index bf03aa25..6426eca2 100644 --- a/package.json +++ b/package.json @@ -58,10 +58,11 @@ "@biothings-explorer/node-expansion": "workspace:../node-expansion", "@biothings-explorer/smartapi-kg": "workspace:../smartapi-kg", "@biothings-explorer/utils": "workspace:../utils", - "@sentry/node": "^7.74.1", - "async": "^3.2.4", + "@biothings-explorer/types": "workspace:../types", "biolink-model": "workspace:../biolink-model", "biomedical_id_resolver": "workspace:../biomedical_id_resolver", + "@sentry/node": "^7.74.1", + "async": "^3.2.4", "chi-square-p-value": "^1.0.5", "debug": "^4.3.4", "ioredis": "^5.3.2", diff --git a/src/batch_edge_query.ts b/src/batch_edge_query.ts index b9cb0b99..d5504b21 100644 --- a/src/batch_edge_query.ts +++ b/src/batch_edge_query.ts @@ -1,4 +1,5 @@ -import call_api, { RedisClient } from '@biothings-explorer/call-apis'; +import call_api from '@biothings-explorer/call-apis'; +import { redisClient } from '@biothings-explorer/utils'; import QEdge2APIEdgeHandler, { APIEdge } from './qedge2apiedge'; import NodesUpdateHandler from './update_nodes'; import Debug from 'debug'; @@ -7,18 +8,18 @@ import CacheHandler from './cache_handler'; import { threadId } from 'worker_threads'; import MetaKG from '@biothings-explorer/smartapi-kg'; import { StampedLog } from '@biothings-explorer/utils'; -import { QueryHandlerOptions, redisClient } from '.'; +import { QueryHandlerOptions } from '@biothings-explorer/types'; import QEdge from './query_edge'; import { UnavailableAPITracker } from './types'; import { Record } from '@biothings-explorer/api-response-transform'; export interface BatchEdgeQueryOptions extends QueryHandlerOptions { recordHashEdgeAttributes: string[]; + caching: boolean; } export default class BatchEdgeQueryHandler { metaKG: MetaKG; - subscribers: any[]; logs: StampedLog[]; caching: boolean; options: QueryHandlerOptions; @@ -26,7 +27,6 @@ export default class BatchEdgeQueryHandler { qEdges: QEdge | QEdge[]; constructor(metaKG: MetaKG, resolveOutputIDs = true, options?: BatchEdgeQueryOptions) { this.metaKG = metaKG; - this.subscribers = []; this.logs = []; this.caching = options && options.caching; this.options = options; @@ -62,7 +62,7 @@ export default class BatchEdgeQueryHandler { * @private */ async _queryAPIEdges(APIEdges: APIEdge[], unavailableAPIs: UnavailableAPITracker = {}): Promise { - const executor = new call_api(APIEdges, this.options, redisClient as RedisClient); + const executor = new call_api(APIEdges, this.options, redisClient); const records: Record[] = await executor.query(this.resolveOutputIDs, unavailableAPIs); this.logs = [...this.logs, ...executor.logs]; return records; @@ -100,7 +100,7 @@ export default class BatchEdgeQueryHandler { const equivalentAlreadyIncluded = qEdge .getInputNode() .getEquivalentIDs() - [curie].equivalentIDs.some((equivalentCurie) => reducedCuries.includes(equivalentCurie)); + [curie].equivalentIDs.some((equivalentCurie) => reducedCuries.includes(equivalentCurie)); if (!equivalentAlreadyIncluded) { reducedCuries.push(curie); } else { @@ -112,8 +112,7 @@ export default class BatchEdgeQueryHandler { strippedCuries.push(...nodeStrippedCuries); if (nodeStrippedCuries.length > 0) { debug( - `stripped (${nodeStrippedCuries.length}) duplicate equivalent curies from ${ - node.id + `stripped (${nodeStrippedCuries.length}) duplicate equivalent curies from ${node.id }: ${nodeStrippedCuries.join(',')}`, ); } @@ -173,31 +172,4 @@ export default class BatchEdgeQueryHandler { debug('Update nodes completed!'); return queryRecords; } - - /** - * Register subscribers - * @param {object} subscriber - */ - subscribe(subscriber): void { - this.subscribers.push(subscriber); - } - - /** - * Unsubscribe a listener - * @param {object} subscriber - */ - unsubscribe(subscriber): void { - this.subscribers = this.subscribers.filter((fn) => { - if (fn != subscriber) return fn; - }); - } - - /** - * Nofity all listeners - */ - notify(res): void { - this.subscribers.map((subscriber) => { - subscriber.update(res); - }); - } } diff --git a/src/cache_handler.ts b/src/cache_handler.ts index d41d9a4a..5b3bbbc7 100644 --- a/src/cache_handler.ts +++ b/src/cache_handler.ts @@ -1,4 +1,4 @@ -import { redisClient } from './redis-client'; +import { redisClient } from '@biothings-explorer/utils'; import Debug from 'debug'; const debug = Debug('bte:biothings-explorer-trapi:cache_handler'); import { LogEntry, StampedLog } from '@biothings-explorer/utils'; @@ -10,8 +10,8 @@ import { Readable, Transform } from 'stream'; import { Record, RecordPackage } from '@biothings-explorer/api-response-transform'; import { threadId } from 'worker_threads'; import MetaKG from '../../smartapi-kg/built'; -import { QueryHandlerOptions } from '.'; import QEdge from './query_edge'; +import { QueryHandlerOptions } from '@biothings-explorer/types'; export interface RecordPacksByQedgeMetaKGHash { [QEdgeHash: string]: RecordPackage; @@ -113,7 +113,7 @@ export default class CacheHandler { } async categorizeEdges(qEdges: QEdge[]): Promise<{ cachedRecords: Record[]; nonCachedQEdges: QEdge[] }> { - if (this.cacheEnabled === false || process.env.INTERNAL_DISABLE_REDIS) { + if (this.cacheEnabled === false || process.env.INTERNAL_DISABLE_REDIS === "true") { return { cachedRecords: [], nonCachedQEdges: qEdges, @@ -210,7 +210,7 @@ export default class CacheHandler { } async cacheEdges(queryRecords: Record[]): Promise { - if (this.cacheEnabled === false || process.env.INTERNAL_DISABLE_REDIS) { + if (this.cacheEnabled === false || process.env.INTERNAL_DISABLE_REDIS === "true") { if (global.parentPort) { global.parentPort.postMessage({ threadId, cacheDone: true }); } diff --git a/src/edge_manager.ts b/src/edge_manager.ts index d62db752..543a6ea5 100644 --- a/src/edge_manager.ts +++ b/src/edge_manager.ts @@ -8,7 +8,7 @@ import BatchEdgeQueryHandler, { BatchEdgeQueryOptions } from './batch_edge_query import { Telemetry } from '@biothings-explorer/utils'; import QEdge from './query_edge'; import MetaKG from '@biothings-explorer/smartapi-kg'; -import { QueryHandlerOptions } from '.'; +import { QueryHandlerOptions } from '@biothings-explorer/types'; import { Record } from '@biothings-explorer/api-response-transform'; import { UnavailableAPITracker } from './types'; import { RecordsByQEdgeID } from './results_assembly/query_results'; @@ -108,7 +108,7 @@ export default class QueryEdgeManager { } debug( `(5) Sending next edge '${nextQEdge.getID()}' ` + - `WITH entity count...(${nextQEdge.subject.entity_count || nextQEdge.object.entity_count})`, + `WITH entity count...(${nextQEdge.subject.entity_count || nextQEdge.object.entity_count})`, ); return this.preSendOffCheck(nextQEdge); } @@ -117,9 +117,9 @@ export default class QueryEdgeManager { this._qEdges.forEach((qEdge) => { debug( `'${qEdge.getID()}'` + - ` : (${qEdge.subject.entity_count || 0}) ` + - `${qEdge.reverse ? '<--' : '-->'}` + - ` (${qEdge.object.entity_count || 0})`, + ` : (${qEdge.subject.entity_count || 0}) ` + + `${qEdge.reverse ? '<--' : '-->'}` + + ` (${qEdge.object.entity_count || 0})`, ); }); } @@ -127,9 +127,8 @@ export default class QueryEdgeManager { _logSkippedQueries(unavailableAPIs: UnavailableAPITracker): void { Object.entries(unavailableAPIs).forEach(([api, { skippedQueries }]) => { if (skippedQueries > 0) { - const skipMessage = `${skippedQueries} additional quer${skippedQueries > 1 ? 'ies' : 'y'} to ${api} ${ - skippedQueries > 1 ? 'were' : 'was' - } skipped as the API was unavailable.`; + const skipMessage = `${skippedQueries} additional quer${skippedQueries > 1 ? 'ies' : 'y'} to ${api} ${skippedQueries > 1 ? 'were' : 'was' + } skipped as the API was unavailable.`; debug(skipMessage); this.logs.push(new LogEntry('WARNING', null, skipMessage).getLog()); } @@ -195,7 +194,7 @@ export default class QueryEdgeManager { const objectCuries = qEdge.object.curie; debug( `'${qEdge.getID()}' Reversed[${qEdge.reverse}] (${JSON.stringify(subjectCuries.length || 0)})` + - `--(${JSON.stringify(objectCuries.length || 0)}) entities / (${records.length}) records.`, + `--(${JSON.stringify(objectCuries.length || 0)}) entities / (${records.length}) records.`, ); // debug(`IDS SUB ${JSON.stringify(sub_count)}`) // debug(`IDS OBJ ${JSON.stringify(obj_count)}`) @@ -397,8 +396,7 @@ export default class QueryEdgeManager { new LogEntry( 'INFO', null, - `Executing ${currentQEdge.getID()}${currentQEdge.isReversed() ? ' (reversed)' : ''}: ${ - currentQEdge.subject.id + `Executing ${currentQEdge.getID()}${currentQEdge.isReversed() ? ' (reversed)' : ''}: ${currentQEdge.subject.id } ${currentQEdge.isReversed() ? '<--' : '-->'} ${currentQEdge.object.id}`, ).getLog(), ); diff --git a/src/graph/knowledge_graph.ts b/src/graph/knowledge_graph.ts index 7bc8bdad..0ac539cd 100644 --- a/src/graph/knowledge_graph.ts +++ b/src/graph/knowledge_graph.ts @@ -1,7 +1,6 @@ import { toArray } from '../utils'; import Debug from 'debug'; import { - APIList, TrapiAttribute, TrapiKnowledgeGraph, TrapiKGEdge, @@ -10,11 +9,11 @@ import { TrapiKGNodes, TrapiQualifier, TrapiSource, - APIDefinition, } from '../types'; import KGNode from './kg_node'; import KGEdge from './kg_edge'; import { BTEGraphUpdate } from './graph'; +import { APIDefinition } from '@biothings-explorer/types'; const debug = Debug('bte:biothings-explorer-trapi:KnowledgeGraph'); diff --git a/src/index.ts b/src/index.ts index e0a029ad..cc4c6e01 100644 --- a/src/index.ts +++ b/src/index.ts @@ -18,7 +18,6 @@ import InferredQueryHandler from './inferred_mode/inferred_mode'; import KGNode from './graph/kg_node'; import KGEdge from './graph/kg_edge'; import { - APIList, TrapiAuxGraphCollection, TrapiAuxiliaryGraph, TrapiQNode, @@ -26,33 +25,19 @@ import { TrapiResponse, TrapiResult, } from './types'; +import { QueryHandlerOptions } from '@biothings-explorer/types'; import BTEGraph from './graph/graph'; import QEdge from './query_edge'; import { Telemetry } from '@biothings-explorer/utils'; // Exports for external availability export * from './types'; -export { redisClient, getNewRedisClient } from './redis-client'; export { getTemplates, supportedLookups } from './inferred_mode/template_lookup'; export { default as QEdge } from './query_edge'; export { default as QNode } from './query_node'; export { default as InvalidQueryGraphError } from './exceptions/invalid_query_graph_error'; export * from './qedge2apiedge'; -export interface QueryHandlerOptions { - provenanceUsesServiceProvider?: boolean; - smartAPIID?: string; - teamName?: string; - enableIDResolution?: boolean; - // TODO: type instances of `any` - apiList?: APIList; - schema?: any; // might be hard to type -- it's the entire TRAPI schema IIRC - dryrun?: boolean; - resolveOutputIDs?: boolean; - submitter?: string; - caching?: boolean; // from request url query values - EDGE_ATTRIBUTES_USED_IN_RECORD_HASH?: string[]; -} export default class TRAPIQueryHandler { logs: StampedLog[]; options: QueryHandlerOptions; @@ -448,13 +433,11 @@ export default class TRAPIQueryHandler { let log_msg: string; if (currentQEdge.reverse) { - log_msg = `qEdge ${currentQEdge.id} (reversed): ${currentQEdge.object.categories} > ${ - currentQEdge.predicate ? `${currentQEdge.predicate} > ` : '' - }${currentQEdge.subject.categories}`; + log_msg = `qEdge ${currentQEdge.id} (reversed): ${currentQEdge.object.categories} > ${currentQEdge.predicate ? `${currentQEdge.predicate} > ` : '' + }${currentQEdge.subject.categories}`; } else { - log_msg = `qEdge ${currentQEdge.id}: ${currentQEdge.subject.categories} > ${ - currentQEdge.predicate ? `${currentQEdge.predicate} > ` : '' - }${currentQEdge.object.categories}`; + log_msg = `qEdge ${currentQEdge.id}: ${currentQEdge.subject.categories} > ${currentQEdge.predicate ? `${currentQEdge.predicate} > ` : '' + }${currentQEdge.object.categories}`; } this.logs.push(new LogEntry('INFO', null, log_msg).getLog()); @@ -495,9 +478,8 @@ export default class TRAPIQueryHandler { }); const qEdgesLogStr = qEdgesToLog.length > 1 ? `[${qEdgesToLog.join(', ')}]` : `${qEdgesToLog.join(', ')}`; if (len > 0) { - const terminateLog = `Query Edge${len !== 1 ? 's' : ''} ${qEdgesLogStr} ${ - len !== 1 ? 'have' : 'has' - } no MetaKG edges. Your query terminates.`; + const terminateLog = `Query Edge${len !== 1 ? 's' : ''} ${qEdgesLogStr} ${len !== 1 ? 'have' : 'has' + } no MetaKG edges. Your query terminates.`; debug(terminateLog); this.logs.push(new LogEntry('WARNING', null, terminateLog).getLog()); return false; @@ -611,8 +593,7 @@ export default class TRAPIQueryHandler { new LogEntry( 'INFO', null, - `Execution Summary: (${KGNodes}) nodes / (${kgEdges}) edges / (${results}) results; (${resultQueries}/${queries}) queries${ - cached ? ` (${cached} cached qEdges)` : '' + `Execution Summary: (${KGNodes}) nodes / (${kgEdges}) edges / (${results}) results; (${resultQueries}/${queries}) queries${cached ? ` (${cached} cached qEdges)` : '' } returned results from(${sources.length}) unique API${sources.length === 1 ? 's' : ''}`, ).getLog(), new LogEntry('INFO', null, `APIs: ${sources.join(', ')} `).getLog(), diff --git a/src/inferred_mode/inferred_mode.ts b/src/inferred_mode/inferred_mode.ts index 31b2169f..cd360243 100644 --- a/src/inferred_mode/inferred_mode.ts +++ b/src/inferred_mode/inferred_mode.ts @@ -5,7 +5,8 @@ import async from 'async'; import biolink from '../biolink'; import { getTemplates, MatchedTemplate, TemplateLookup } from './template_lookup'; import { scaled_sigmoid, inverse_scaled_sigmoid } from '../results_assembly/score'; -import TRAPIQueryHandler, { QueryHandlerOptions } from '../index'; +import TRAPIQueryHandler from '../index'; +import { QueryHandlerOptions } from '@biothings-explorer/types'; import { CompactQualifiers, TrapiAuxGraphCollection, @@ -384,9 +385,9 @@ export default class InferredQueryHandler { if (typeof combinedResponse.message.results[resultID].analyses[0].score !== 'undefined') { combinedResponse.message.results[resultID].analyses[0].score = resScore ? scaled_sigmoid( - inverse_scaled_sigmoid(combinedResponse.message.results[resultID].analyses[0].score) + - inverse_scaled_sigmoid(resScore), - ) + inverse_scaled_sigmoid(combinedResponse.message.results[resultID].analyses[0].score) + + inverse_scaled_sigmoid(resScore), + ) : combinedResponse.message.results[resultID].analyses[0].score; } else { combinedResponse.message.results[resultID].analyses[0].score = resScore; @@ -554,11 +555,9 @@ export default class InferredQueryHandler { const message = [ `Addition of ${creativeLimitHit} results from Template ${i + 1}`, Object.keys(combinedResponse.message.results).length === this.CREATIVE_LIMIT ? ' meets ' : ' exceeds ', - `creative result maximum of ${this.CREATIVE_LIMIT} (reaching ${ - Object.keys(combinedResponse.message.results).length + `creative result maximum of ${this.CREATIVE_LIMIT} (reaching ${Object.keys(combinedResponse.message.results).length } merged). `, - `Response will be truncated to top-scoring ${this.CREATIVE_LIMIT} results. Skipping remaining ${ - subQueries.length - (i + 1) + `Response will be truncated to top-scoring ${this.CREATIVE_LIMIT} results. Skipping remaining ${subQueries.length - (i + 1) } `, subQueries.length - (i + 1) === 1 ? `template.` : `templates.`, ].join(''); @@ -583,9 +582,8 @@ export default class InferredQueryHandler { const total = Object.values(mergedResultsCount).reduce((sum, count) => sum + count, 0) + Object.keys(mergedResultsCount).length; - const message = `Merging Summary: (${total}) inferred-template results were merged into (${ - Object.keys(mergedResultsCount).length - }) final results, reducing result count by (${total - Object.keys(mergedResultsCount).length})`; + const message = `Merging Summary: (${total}) inferred-template results were merged into (${Object.keys(mergedResultsCount).length + }) final results, reducing result count by (${total - Object.keys(mergedResultsCount).length})`; debug(message); combinedResponse.logs.push(new LogEntry('INFO', null, message).getLog()); } diff --git a/src/redis-client.ts b/src/redis-client.ts deleted file mode 100644 index f48419a6..00000000 --- a/src/redis-client.ts +++ /dev/null @@ -1,309 +0,0 @@ -import Redis, { Callback, Cluster, RedisKey, ScanStream } from 'ioredis'; -import Debug from 'debug'; -const debug = Debug('bte:biothings-explorer-trapi:redis-client'); -import Redlock, { RedlockAbortSignal } from 'redlock'; - -const prefix = `{BTEHashSlotPrefix}:`; - -type AsyncFunction = (...args: unknown[]) => Promise; -type DropFirst = T extends [unknown, ...infer U] ? U : never; -type Awaited = T extends PromiseLike ? U : T; - -function timeoutFunc(func: F, timeoutms = 0) { - return (...args: Parameters): ReturnType => { - return new Promise((resolve, reject) => { - const timeout = timeoutms ? timeoutms : parseInt(process.env.REDIS_TIMEOUT || '60000'); - let done = false; - setTimeout(() => { - if (!done) { - reject(new Error(`redis call timed out, args: ${JSON.stringify([...args])}`)); - } - }, timeout); - func(...args).then((returnValue: ReturnType) => { - done = true; - resolve(returnValue); - }); - }) as ReturnType; - }; -} - -/** - * Decorate a function such that the first argument is given the module-defined prefix - */ -function addPrefix(func: F) { - return (arg0: Parameters[0], ...args: DropFirst>): ReturnType => { - if (arg0 && (arg0 as string).length > 0) { - arg0 = `${prefix}${arg0 as string}`; - } - return func(arg0, ...args) as ReturnType; - }; -} - -/** - * Decorate a function such that each argument is given the module-defined prefix - */ -function addPrefixToAll(func: F) { - return (...args: Parameters): ReturnType => { - return func(...args.map((arg) => `${prefix}${arg}`)) as ReturnType; - }; -} - -/** - * Decorate a Redlock function such that the locks are given the module-defined prefix - */ -function lockPrefix(func: F) { - return async (locks: Parameters[0], ...args: DropFirst>): Promise>> => { - return (await func( - (locks as string[]).map((lockName: string) => `${prefix}${lockName}`), - ...args, - )) as Awaited>; - }; -} - -interface RedisClientInterface { - clearEdgeCache: () => void; - getTimeout: (key: RedisKey) => Promise; - setTimeout: (key: RedisKey, value: string | number | Buffer) => Promise<'OK'>; - hsetTimeout: (...args: [key: RedisKey, ...fieldValues: (string | Buffer | number)[]]) => Promise; - hgetallTimeout: (key: RedisKey) => Promise>; - expireTimeout: (key: RedisKey, seconds: string | number) => Promise; - delTimeout: (key: RedisKey | RedisKey[]) => Promise; - usingLock: ( - resources: string[], - duration: number, - routine?: (signal: RedlockAbortSignal) => Promise, - ) => Promise; - incrTimeout: (key: string) => Promise; - decrTimeout: (key: string) => Promise; - existsTimeout: (...args: RedisKey[]) => Promise; - pingTimeout: () => Promise<'PONG'>; -} - -function addClientFuncs(client: Redis | Cluster, redlock: Redlock): RedisClientInterface { - function decorate(func: F, timeoutms?: number): (...args: Parameters) => ReturnType { - let wrapped = timeoutFunc(func, timeoutms); - if (client instanceof Cluster) { - // Dirty way to cast the function so that typescript doesn't complain - // But given the extremely limited use-case of this function, it's fine for now - wrapped = addPrefix(wrapped) as unknown as (...args: Parameters) => ReturnType; - } - - return wrapped; - } - return { - clearEdgeCache: () => null, - getTimeout: decorate((key: RedisKey) => client.get(key)), - setTimeout: decorate((key: RedisKey, value: string | number | Buffer) => client.set(key, value)), - hsetTimeout: decorate((...args: [key: RedisKey, ...fieldValues: (string | Buffer | number)[]]) => - client.hset(...args), - ), - hgetallTimeout: decorate((key: RedisKey) => client.hgetall(key)), - - expireTimeout: decorate((key: RedisKey, seconds: string | number) => client.expire(key, seconds)), - - delTimeout: - client instanceof Cluster - ? addPrefixToAll(timeoutFunc((...args: RedisKey[]) => client.del(...args))) - : timeoutFunc((...args: RedisKey[]) => client.del(...args)), - usingLock: lockPrefix( - (resources: string[], duration: number, routine?: (signal: RedlockAbortSignal) => Promise) => - redlock.using(resources, duration, routine), - ), - incrTimeout: decorate((key: string) => client.incr(key)), - decrTimeout: decorate((key: string) => client.decr(key)), - existsTimeout: decorate((...args: RedisKey[]) => client.exists(...args)), - pingTimeout: decorate(() => client.ping(), 10000), // for testing - // hmsetTimeout: decorate((...args) => client.hmset(...args)), - // keysTimeout: decorate((...args) => client.keys(...args)), - }; -} - -class RedisClient { - client: Record | ReturnType; - enableRedis: boolean; - clientEnabled: boolean; - internalClient: Redis | Cluster; - constructor() { - this.client; - this.enableRedis = !(process.env.REDIS_HOST === undefined) && !(process.env.REDIS_PORT === undefined); - - if (!this.enableRedis) { - this.client = {}; - this.clientEnabled = false; - return; - } - - interface RedisClusterDetails { - redisOptions: { - connectTimeout: number; - password?: string; - tls?: { - checkServerIdentity: () => undefined | Error; - }; - }; - // How long to wait given how many failed tries - clusterRetryStrategy?: (times: number) => number; - } - - if (process.env.REDIS_CLUSTER === 'true') { - const details = { - redisOptions: { - connectTimeout: 20000, - }, - clusterRetryStrategy(times: number) { - return Math.min(times * 100, 5000); - }, - } as RedisClusterDetails; - - if (process.env.REDIS_PASSWORD) { - details.redisOptions.password = process.env.REDIS_PASSWORD; - } - if (process.env.REDIS_TLS_ENABLED) { - details.redisOptions.tls = { checkServerIdentity: () => undefined }; - } - - const cluster = new Redis.Cluster( - [ - { - host: process.env.REDIS_HOST, - port: parseInt(process.env.REDIS_PORT), - }, - ], - details, - ); - - // allow up to 10 minutes to acquire lock (in case of large items being saved/retrieved) - const redlock = new Redlock([cluster], { retryDelay: 500, retryCount: 1200 }); - - this.internalClient = cluster; - - this.client = addClientFuncs(cluster, redlock); - - this.client.clearEdgeCache = () => { - let count = 0; - const nodes = (this.internalClient as Cluster).nodes('master'); - let completedNodes = 0; - nodes.forEach((node, i) => { - const stream = node.scanStream({ - match: '*bte:edgeCache:*', - count: 50, - }); - - stream - .on('data', (foundKeys: string[]) => { - if (!foundKeys.length) return; - count += foundKeys.length; - try { - node.del(...foundKeys.map((key) => key.replace(`${prefix} `, ''))).then( - () => null, - (error) => { - debug(`Cache clear: error deleting ${foundKeys.length} keys`); - debug(error); - }, - ); - } catch (error) { - debug('Cache clearing failure:'); - debug(error); - } - }) - .on('error', (error) => { - debug(`Cache clearing failure on node ${i}:`); - debug(error); - completedNodes += 1; - if (completedNodes >= nodes.length) { - debug(`Cache clearing completes, cleared ${count} keys.`); - } - }) - .on('end', () => { - debug(`Cache clearing completes on cluster node ${i}`); - completedNodes += 1; - if (completedNodes >= nodes.length) { - debug(`Cache clearing completes, cleared ${count} keys.`); - } - }); - }); - }; - - debug('Initialized redis client (cluster-mode)'); - } else { - interface RedisDetails { - host: string; - port: number; - connectTimeout: number; - retryStrategy: (times: number) => number; - password?: string; - tls?: { - checkServerIdentity: () => undefined | Error; - }; - } - - const details = { - host: process.env.REDIS_HOST, - port: parseInt(process.env.REDIS_PORT), - connectTimeout: 20000, - retryStrategy(times) { - return Math.min(times * 100, 5000); - }, - } as RedisDetails; - if (process.env.REDIS_PASSWORD) { - details.password = process.env.REDIS_PASSWORD; - } - if (process.env.REDIS_TLS_ENABLED) { - details.tls = { checkServerIdentity: () => undefined }; - } - const client = new Redis(details); - - // allow up to 10 minutes to acquire lock (in case of large items being saved/retrieved) - const redlock = new Redlock([client], { retryDelay: 500, retryCount: 1200 }); - - this.internalClient = client; - - this.client = addClientFuncs(client, redlock); - - this.client.clearEdgeCache = () => { - const stream = (redisClient.internalClient as Redis).scanStream({ - match: '*bte:edgeCache:*', - count: 50, - }); - - let count = 0; - - stream - .on('data', (foundKeys: string[]) => { - if (!foundKeys.length) return; - count += foundKeys.length; - try { - redisClient.internalClient.del(...foundKeys.map((key) => key.replace(`${prefix} `, ''))).then( - () => null, - (error) => { - debug(`Cache clear: error deleting ${foundKeys.length} keys`); - debug(error); - }, - ); - } catch (error) { - debug('Cache clearing failure:'); - debug(error); - } - }) - .on('error', (error) => { - debug('Cache clearing failure:'); - debug(error); - }) - .on('end', () => { - debug(`Cache clearing completes, cleared ${count} keys.`); - }); - }; - - debug('Initialized redis client (non-cluster-mode)'); - } - this.clientEnabled = true; - } -} - -const redisClient = new RedisClient(); - -function getNewRedisClient(): RedisClient { - return new RedisClient(); -} - -export { redisClient, getNewRedisClient }; diff --git a/src/types.ts b/src/types.ts index 5e02e4ad..77ac472e 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,26 +1,5 @@ -// TODO change how types are exported on smartapi-kg so this is a normal package import -import { SmartAPISpec } from '@biothings-explorer/smartapi-kg'; import { TrapiLog } from '@biothings-explorer/utils'; -declare global { - var missingAPIs: SmartAPISpec[]; - var BIOLINK_VERSION: string; - var SCHEMA_VERSION: string; - var parentPort: MessagePort; - var cachingTasks: Promise[]; - var queryInformation: { - queryGraph: TrapiQueryGraph; - isCreativeMode: boolean; - creativeTemplate?: string; - totalRecords: number; - jobID?: string; - callback_url?: string; - }; - var job: { - log: (logString: string) => void; - }; // TODO type as Piscina job -} - export interface TrapiQNode { ids?: string[]; categories?: string[]; @@ -168,19 +147,6 @@ export interface TrapiResponse { logs: TrapiLog[]; } -export type APIDefinition = { - // Must have one of id or infores - id?: string; // SmartAPI ID, takes priority over infores - name: string; // Must match name on SmartAPI registry - infores?: string; // infores of API - primarySource?: boolean; -} & ({ id: string } | { infores: string }); - -export interface APIList { - include: APIDefinition[]; - // takes priority over include, taking into account id/infores prioritization - exclude: APIDefinition[]; -} export interface UnavailableAPITracker { [server: string]: { skip: boolean; skippedQueries: number }; diff --git a/tsconfig.json b/tsconfig.json index 9a9535fe..b712fdc7 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -9,7 +9,8 @@ "biolink-model": ["../biolink-model"], "biomedical_id_resolver": ["../biomedical_id_resolver"], "@biothings-explorer/smartapi-kg": ["../smartapi-kg"], - "@biothings-explorer/utils": ["../utils"] + "@biothings-explorer/utils": ["../utils"], + "@biothings-explorer/types": ["../types"] } }, "include": ["./src/**/*", "./src/biolink.json", "./src/smartapi_specs.json", "./src/predicates.json"], @@ -29,6 +30,9 @@ }, { "path": "../utils" + }, + { + "path": "../types" } ] }