From 5b61684f61fafb85a50dd09e289f130d96d9a51f Mon Sep 17 00:00:00 2001 From: Rohan Juneja Date: Mon, 5 Feb 2024 18:28:36 -0800 Subject: [PATCH] migrate redis client to utils --- __test__/unittest/redisClient.test.ts | 57 ----- src/batch_edge_query.ts | 4 +- src/cache_handler.ts | 2 +- src/index.ts | 36 ++- src/redis-client.ts | 309 -------------------------- 5 files changed, 31 insertions(+), 377 deletions(-) delete mode 100644 __test__/unittest/redisClient.test.ts delete mode 100644 src/redis-client.ts diff --git a/__test__/unittest/redisClient.test.ts b/__test__/unittest/redisClient.test.ts deleted file mode 100644 index 997126ed..00000000 --- a/__test__/unittest/redisClient.test.ts +++ /dev/null @@ -1,57 +0,0 @@ -import { redisClient as redisClientType } from '../../src/redis-client'; -import RedisMock from 'ioredis-mock'; - -describe('Test redis client', () => { - const OLD_ENV = process.env; - - beforeEach(() => { - jest.resetModules(); // Most important - it clears the cache - jest.clearAllMocks(); - jest.mock('ioredis', () => RedisMock); - process.env = { ...OLD_ENV }; // Make a copy - }); - - afterAll(() => { - process.env = OLD_ENV; // Restore old environment - }); - test('Client reports not enabled when REDIS_HOST is not set', () => { - const { redisClient } = require('../../src/redis-client'); - expect(redisClient.clientEnabled).toEqual(false); - }); - - test('will receive process.env variables', () => { - // Set the variables - process.env.REDIS_HOST = 'localhost'; - process.env.REDIS_PORT = '3367'; - const { redisClient } = require('../../src/redis-client'); - expect(redisClient).not.toEqual({}); - }); - - test('Test if record is correctly stored', async () => { - process.env.REDIS_HOST = 'localhost'; - process.env.REDIS_PORT = '3367'; - const { redisClient } = require('../../src/redis-client'); - await redisClient.client.setTimeout('record1', 'hello'); - const res = await redisClient.client.getTimeout('record1'); - expect(res).toEqual('hello'); - }); - - test('Test if record is correctly stored', async () => { - process.env.REDIS_HOST = 'localhost'; - process.env.REDIS_PORT = '3367'; - const { redisClient } = require('../../src/redis-client'); - await redisClient.client.setTimeout('record1', 'hello'); - const res = await redisClient.client.getTimeout('record1'); - expect(res).toEqual('hello'); - }); - - test('Test key should be removed after ttl', async () => { - process.env.REDIS_HOST = 'localhost'; - process.env.REDIS_PORT = '3367'; - const { redisClient } = require('../../src/redis-client'); - await redisClient.client.setTimeout('record1', 'hello', 'EX', 2); - await new Promise((r) => setTimeout(r, 3000)); - const res = await redisClient.client.getTimeout('record1'); - expect(res).not.toBeNull; - }); -}); diff --git a/src/batch_edge_query.ts b/src/batch_edge_query.ts index b9cb0b99..c70b3579 100644 --- a/src/batch_edge_query.ts +++ b/src/batch_edge_query.ts @@ -6,8 +6,8 @@ const debug = Debug('bte:biothings-explorer-trapi:batch_edge_query'); import CacheHandler from './cache_handler'; import { threadId } from 'worker_threads'; import MetaKG from '@biothings-explorer/smartapi-kg'; -import { StampedLog } from '@biothings-explorer/utils'; -import { QueryHandlerOptions, redisClient } from '.'; +import { StampedLog, redisClient } from '@biothings-explorer/utils'; +import { QueryHandlerOptions } from '.'; import QEdge from './query_edge'; import { UnavailableAPITracker } from './types'; import { Record } from '@biothings-explorer/api-response-transform'; diff --git a/src/cache_handler.ts b/src/cache_handler.ts index bdba8b56..69ba8021 100644 --- a/src/cache_handler.ts +++ b/src/cache_handler.ts @@ -1,4 +1,4 @@ -import { redisClient } from './redis-client'; +import { redisClient } from '@biothings-explorer/utils'; import Debug from 'debug'; const debug = Debug('bte:biothings-explorer-trapi:cache_handler'); import { LogEntry, StampedLog } from '@biothings-explorer/utils'; diff --git a/src/index.ts b/src/index.ts index e0a029ad..54d88dba 100644 --- a/src/index.ts +++ b/src/index.ts @@ -28,11 +28,10 @@ import { } from './types'; import BTEGraph from './graph/graph'; import QEdge from './query_edge'; -import { Telemetry } from '@biothings-explorer/utils'; +import { Telemetry, redisClient } from '@biothings-explorer/utils'; // Exports for external availability export * from './types'; -export { redisClient, getNewRedisClient } from './redis-client'; export { getTemplates, supportedLookups } from './inferred_mode/template_lookup'; export { default as QEdge } from './query_edge'; export { default as QNode } from './query_node'; @@ -87,14 +86,27 @@ export default class TRAPIQueryHandler { async findUnregisteredAPIs() { const configListAPIs = this.options.apiList['include']; - const smartapiRegistry = await fs.readFile(this.path, { encoding: 'utf8' }); + + let smartapiRegistry; + if (redisClient.clientEnabled) { + const redisData = await redisClient.client.getTimeout(`bte:smartapi:smartapi`).catch(console.log) + debug(redisData); + if (redisData) { + smartapiRegistry = (JSON.parse(redisData))?.smartapi; + } + } + + if (!smartapiRegistry) { + const file = await fs.readFile(this.path, "utf-8"); + smartapiRegistry = JSON.parse(file); + } const smartapiIds: string[] = []; const inforesIds: string[] = []; const unregisteredAPIs: string[] = []; // TODO typing for smartapiRegistration - JSON.parse(smartapiRegistry).hits.forEach((smartapiRegistration) => { + smartapiRegistry.hits.forEach((smartapiRegistration) => { smartapiIds.push(smartapiRegistration._id); inforesIds.push(smartapiRegistration.info?.['x-translator']?.infores); }); @@ -110,7 +122,7 @@ export default class TRAPIQueryHandler { return unregisteredAPIs; } - _loadMetaKG(): MetaKG { + async _loadMetaKG(): Promise { const metaKG = new MetaKG(this.path, this.predicatePath); debug( `Query options are: ${JSON.stringify({ @@ -119,7 +131,7 @@ export default class TRAPIQueryHandler { })}`, ); debug(`SmartAPI Specs read from path: ${this.path}`); - metaKG.constructMetaKGSync(this.includeReasoner, this.options); + await metaKG.constructMetaKGSync(this.includeReasoner, this.options); return metaKG; } @@ -313,6 +325,7 @@ export default class TRAPIQueryHandler { } async addQueryNodes(): Promise { + debug("c1"); const qNodeIDsByOriginalID: Map = new Map(); const curiesToResolve = [ ...Object.values(this.queryGraph.nodes).reduce((set: Set, qNode) => { @@ -323,7 +336,9 @@ export default class TRAPIQueryHandler { return set; }, new Set()), ] as string[]; - const resolvedCuries = await resolveSRI({ unknown: curiesToResolve }); + debug("d1"); + const resolvedCuries = {} as {[n: string]: any} /*await resolveSRI({ unknown: curiesToResolve })*/; + debug("e1"); Object.entries(resolvedCuries).forEach(([originalCurie, resolvedEntity]) => { if (!this.bteGraph.nodes[resolvedEntity.primaryID]) { const category = resolvedEntity.primaryTypes?.[0] @@ -340,6 +355,7 @@ export default class TRAPIQueryHandler { }); } }); + debug("f1"); } getResponse(): TrapiResponse { @@ -620,13 +636,17 @@ export default class TRAPIQueryHandler { }; async query(): Promise { + debug("a"); this._initializeResponse(); + debug("b"); await this.addQueryNodes(); + debug("c"); const span1 = Telemetry.startSpan({ description: 'loadMetaKG' }); debug('Start to load metakg.'); - const metaKG = this._loadMetaKG(); + let metaKG; + try { metaKG = await this._loadMetaKG(); } catch (err) { console.log("an error");console.log(err);console.log(err.stack);} if (!metaKG.ops.length) { let error: string; if (this.options.smartAPIID) { diff --git a/src/redis-client.ts b/src/redis-client.ts deleted file mode 100644 index f48419a6..00000000 --- a/src/redis-client.ts +++ /dev/null @@ -1,309 +0,0 @@ -import Redis, { Callback, Cluster, RedisKey, ScanStream } from 'ioredis'; -import Debug from 'debug'; -const debug = Debug('bte:biothings-explorer-trapi:redis-client'); -import Redlock, { RedlockAbortSignal } from 'redlock'; - -const prefix = `{BTEHashSlotPrefix}:`; - -type AsyncFunction = (...args: unknown[]) => Promise; -type DropFirst = T extends [unknown, ...infer U] ? U : never; -type Awaited = T extends PromiseLike ? U : T; - -function timeoutFunc(func: F, timeoutms = 0) { - return (...args: Parameters): ReturnType => { - return new Promise((resolve, reject) => { - const timeout = timeoutms ? timeoutms : parseInt(process.env.REDIS_TIMEOUT || '60000'); - let done = false; - setTimeout(() => { - if (!done) { - reject(new Error(`redis call timed out, args: ${JSON.stringify([...args])}`)); - } - }, timeout); - func(...args).then((returnValue: ReturnType) => { - done = true; - resolve(returnValue); - }); - }) as ReturnType; - }; -} - -/** - * Decorate a function such that the first argument is given the module-defined prefix - */ -function addPrefix(func: F) { - return (arg0: Parameters[0], ...args: DropFirst>): ReturnType => { - if (arg0 && (arg0 as string).length > 0) { - arg0 = `${prefix}${arg0 as string}`; - } - return func(arg0, ...args) as ReturnType; - }; -} - -/** - * Decorate a function such that each argument is given the module-defined prefix - */ -function addPrefixToAll(func: F) { - return (...args: Parameters): ReturnType => { - return func(...args.map((arg) => `${prefix}${arg}`)) as ReturnType; - }; -} - -/** - * Decorate a Redlock function such that the locks are given the module-defined prefix - */ -function lockPrefix(func: F) { - return async (locks: Parameters[0], ...args: DropFirst>): Promise>> => { - return (await func( - (locks as string[]).map((lockName: string) => `${prefix}${lockName}`), - ...args, - )) as Awaited>; - }; -} - -interface RedisClientInterface { - clearEdgeCache: () => void; - getTimeout: (key: RedisKey) => Promise; - setTimeout: (key: RedisKey, value: string | number | Buffer) => Promise<'OK'>; - hsetTimeout: (...args: [key: RedisKey, ...fieldValues: (string | Buffer | number)[]]) => Promise; - hgetallTimeout: (key: RedisKey) => Promise>; - expireTimeout: (key: RedisKey, seconds: string | number) => Promise; - delTimeout: (key: RedisKey | RedisKey[]) => Promise; - usingLock: ( - resources: string[], - duration: number, - routine?: (signal: RedlockAbortSignal) => Promise, - ) => Promise; - incrTimeout: (key: string) => Promise; - decrTimeout: (key: string) => Promise; - existsTimeout: (...args: RedisKey[]) => Promise; - pingTimeout: () => Promise<'PONG'>; -} - -function addClientFuncs(client: Redis | Cluster, redlock: Redlock): RedisClientInterface { - function decorate(func: F, timeoutms?: number): (...args: Parameters) => ReturnType { - let wrapped = timeoutFunc(func, timeoutms); - if (client instanceof Cluster) { - // Dirty way to cast the function so that typescript doesn't complain - // But given the extremely limited use-case of this function, it's fine for now - wrapped = addPrefix(wrapped) as unknown as (...args: Parameters) => ReturnType; - } - - return wrapped; - } - return { - clearEdgeCache: () => null, - getTimeout: decorate((key: RedisKey) => client.get(key)), - setTimeout: decorate((key: RedisKey, value: string | number | Buffer) => client.set(key, value)), - hsetTimeout: decorate((...args: [key: RedisKey, ...fieldValues: (string | Buffer | number)[]]) => - client.hset(...args), - ), - hgetallTimeout: decorate((key: RedisKey) => client.hgetall(key)), - - expireTimeout: decorate((key: RedisKey, seconds: string | number) => client.expire(key, seconds)), - - delTimeout: - client instanceof Cluster - ? addPrefixToAll(timeoutFunc((...args: RedisKey[]) => client.del(...args))) - : timeoutFunc((...args: RedisKey[]) => client.del(...args)), - usingLock: lockPrefix( - (resources: string[], duration: number, routine?: (signal: RedlockAbortSignal) => Promise) => - redlock.using(resources, duration, routine), - ), - incrTimeout: decorate((key: string) => client.incr(key)), - decrTimeout: decorate((key: string) => client.decr(key)), - existsTimeout: decorate((...args: RedisKey[]) => client.exists(...args)), - pingTimeout: decorate(() => client.ping(), 10000), // for testing - // hmsetTimeout: decorate((...args) => client.hmset(...args)), - // keysTimeout: decorate((...args) => client.keys(...args)), - }; -} - -class RedisClient { - client: Record | ReturnType; - enableRedis: boolean; - clientEnabled: boolean; - internalClient: Redis | Cluster; - constructor() { - this.client; - this.enableRedis = !(process.env.REDIS_HOST === undefined) && !(process.env.REDIS_PORT === undefined); - - if (!this.enableRedis) { - this.client = {}; - this.clientEnabled = false; - return; - } - - interface RedisClusterDetails { - redisOptions: { - connectTimeout: number; - password?: string; - tls?: { - checkServerIdentity: () => undefined | Error; - }; - }; - // How long to wait given how many failed tries - clusterRetryStrategy?: (times: number) => number; - } - - if (process.env.REDIS_CLUSTER === 'true') { - const details = { - redisOptions: { - connectTimeout: 20000, - }, - clusterRetryStrategy(times: number) { - return Math.min(times * 100, 5000); - }, - } as RedisClusterDetails; - - if (process.env.REDIS_PASSWORD) { - details.redisOptions.password = process.env.REDIS_PASSWORD; - } - if (process.env.REDIS_TLS_ENABLED) { - details.redisOptions.tls = { checkServerIdentity: () => undefined }; - } - - const cluster = new Redis.Cluster( - [ - { - host: process.env.REDIS_HOST, - port: parseInt(process.env.REDIS_PORT), - }, - ], - details, - ); - - // allow up to 10 minutes to acquire lock (in case of large items being saved/retrieved) - const redlock = new Redlock([cluster], { retryDelay: 500, retryCount: 1200 }); - - this.internalClient = cluster; - - this.client = addClientFuncs(cluster, redlock); - - this.client.clearEdgeCache = () => { - let count = 0; - const nodes = (this.internalClient as Cluster).nodes('master'); - let completedNodes = 0; - nodes.forEach((node, i) => { - const stream = node.scanStream({ - match: '*bte:edgeCache:*', - count: 50, - }); - - stream - .on('data', (foundKeys: string[]) => { - if (!foundKeys.length) return; - count += foundKeys.length; - try { - node.del(...foundKeys.map((key) => key.replace(`${prefix} `, ''))).then( - () => null, - (error) => { - debug(`Cache clear: error deleting ${foundKeys.length} keys`); - debug(error); - }, - ); - } catch (error) { - debug('Cache clearing failure:'); - debug(error); - } - }) - .on('error', (error) => { - debug(`Cache clearing failure on node ${i}:`); - debug(error); - completedNodes += 1; - if (completedNodes >= nodes.length) { - debug(`Cache clearing completes, cleared ${count} keys.`); - } - }) - .on('end', () => { - debug(`Cache clearing completes on cluster node ${i}`); - completedNodes += 1; - if (completedNodes >= nodes.length) { - debug(`Cache clearing completes, cleared ${count} keys.`); - } - }); - }); - }; - - debug('Initialized redis client (cluster-mode)'); - } else { - interface RedisDetails { - host: string; - port: number; - connectTimeout: number; - retryStrategy: (times: number) => number; - password?: string; - tls?: { - checkServerIdentity: () => undefined | Error; - }; - } - - const details = { - host: process.env.REDIS_HOST, - port: parseInt(process.env.REDIS_PORT), - connectTimeout: 20000, - retryStrategy(times) { - return Math.min(times * 100, 5000); - }, - } as RedisDetails; - if (process.env.REDIS_PASSWORD) { - details.password = process.env.REDIS_PASSWORD; - } - if (process.env.REDIS_TLS_ENABLED) { - details.tls = { checkServerIdentity: () => undefined }; - } - const client = new Redis(details); - - // allow up to 10 minutes to acquire lock (in case of large items being saved/retrieved) - const redlock = new Redlock([client], { retryDelay: 500, retryCount: 1200 }); - - this.internalClient = client; - - this.client = addClientFuncs(client, redlock); - - this.client.clearEdgeCache = () => { - const stream = (redisClient.internalClient as Redis).scanStream({ - match: '*bte:edgeCache:*', - count: 50, - }); - - let count = 0; - - stream - .on('data', (foundKeys: string[]) => { - if (!foundKeys.length) return; - count += foundKeys.length; - try { - redisClient.internalClient.del(...foundKeys.map((key) => key.replace(`${prefix} `, ''))).then( - () => null, - (error) => { - debug(`Cache clear: error deleting ${foundKeys.length} keys`); - debug(error); - }, - ); - } catch (error) { - debug('Cache clearing failure:'); - debug(error); - } - }) - .on('error', (error) => { - debug('Cache clearing failure:'); - debug(error); - }) - .on('end', () => { - debug(`Cache clearing completes, cleared ${count} keys.`); - }); - }; - - debug('Initialized redis client (non-cluster-mode)'); - } - this.clientEnabled = true; - } -} - -const redisClient = new RedisClient(); - -function getNewRedisClient(): RedisClient { - return new RedisClient(); -} - -export { redisClient, getNewRedisClient };