diff --git a/src/cache_handler.ts b/src/cache_handler.ts index a42d0e23..bdba8b56 100644 --- a/src/cache_handler.ts +++ b/src/cache_handler.ts @@ -104,8 +104,8 @@ export default class CacheHandler { caching === false ? false : process.env.RESULT_CACHING !== 'false' - ? !(process.env.REDIS_HOST === undefined) && !(process.env.REDIS_PORT === undefined) - : false; + ? !(process.env.REDIS_HOST === undefined) && !(process.env.REDIS_PORT === undefined) + : false; this.recordConfig = recordConfig; this.logs.push( new LogEntry('DEBUG', null, `REDIS cache is ${this.cacheEnabled === true ? '' : 'not'} enabled.`).getLog(), @@ -231,7 +231,7 @@ export default class CacheHandler { if (global.parentPort) { global.parentPort.postMessage({ threadId, addCacheKey: redisID }); } - await redisClient.client.usingLock([`redisLock:${redisID}`], 600000, async () => { + await redisClient.client.usingLock([`redisLock:${redisID}`, 'redisLock:EdgeCaching'], 600000, async () => { try { await redisClient.client.delTimeout(redisID); // prevents weird overwrite edge cases await new Promise((resolve, reject) => { diff --git a/src/redis-client.ts b/src/redis-client.ts index 788e69a0..785a6090 100644 --- a/src/redis-client.ts +++ b/src/redis-client.ts @@ -1,9 +1,9 @@ -import Redis, { Callback, Cluster, RedisKey } from 'ioredis'; +import Redis, { Callback, Cluster, RedisKey, ScanStream } from 'ioredis'; import Debug from 'debug'; const debug = Debug('bte:biothings-explorer-trapi:redis-client'); import Redlock, { RedlockAbortSignal } from 'redlock'; -const prefix = `{BTEHashSlotPrefix}`; +const prefix = `{BTEHashSlotPrefix}:`; type AsyncFunction = (...args: unknown[]) => Promise; type DropFirst = T extends [unknown, ...infer U] ? U : never; @@ -33,7 +33,7 @@ function timeoutFunc(func: F, timeoutms = 0) { function addPrefix(func: F) { return (arg0: Parameters[0], ...args: DropFirst>): ReturnType => { if (args.length > 0) { - arg0 = `${prefix}:${arg0}`; + arg0 = `${prefix}${arg0}`; } return func(arg0, ...args) as ReturnType; }; @@ -44,7 +44,7 @@ function addPrefix(func: F) { */ function addPrefixToAll(func: F) { return (...args: Parameters): ReturnType => { - return func(...args.map((arg) => `${prefix}:${arg}`)) as ReturnType; + return func(...args.map((arg) => `${prefix}${arg}`)) as ReturnType; }; } @@ -54,19 +54,20 @@ function addPrefixToAll(func: F) { function lockPrefix(func: F) { return async (locks: Parameters[0], ...args: DropFirst>): Promise>> => { return (await func( - (locks as string[]).map((lockName: string) => `${prefix}:${lockName}`), + (locks as string[]).map((lockName: string) => `${prefix}${lockName}`), ...args, )) as Awaited>; }; } interface RedisClientInterface { + clearEdgeCache: () => void; getTimeout: (key: RedisKey) => Promise; setTimeout: (key: RedisKey, value: string | number | Buffer) => Promise<'OK'>; hsetTimeout: (...args: [key: RedisKey, ...fieldValues: (string | Buffer | number)[]]) => Promise; hgetallTimeout: (key: RedisKey) => Promise>; expireTimeout: (key: RedisKey, seconds: string | number) => Promise; - delTimeout: (...args: RedisKey[]) => Promise; + delTimeout: (key: RedisKey | RedisKey[]) => Promise; usingLock: ( resources: string[], duration: number, @@ -90,7 +91,7 @@ function addClientFuncs(client: Redis | Cluster, redlock: Redlock): RedisClientI return wrapped; } return { - ...client, + clearEdgeCache: () => null, getTimeout: decorate((key: RedisKey) => client.get(key)), setTimeout: decorate((key: RedisKey, value: string | number | Buffer) => client.set(key, value)), hsetTimeout: decorate((...args: [key: RedisKey, ...fieldValues: (string | Buffer | number)[]]) => @@ -178,6 +179,51 @@ class RedisClient { this.client = addClientFuncs(cluster, redlock); + this.client.clearEdgeCache = () => { + let count = 0; + const nodes = (this.internalClient as Cluster).nodes('master'); + let completedNodes = 0; + nodes.forEach((node, i) => { + const stream = node.scanStream({ + match: '*bte:edgeCache:*', + count: 50, + }); + + stream + .on('data', (foundKeys: string[]) => { + if (!foundKeys.length) return; + count += foundKeys.length; + try { + node.del(...foundKeys.map((key) => key.replace(`${prefix} `, ''))).then( + () => null, + (error) => { + debug(`Cache clear: error deleting ${foundKeys.length} keys`); + debug(error); + }, + ); + } catch (error) { + debug('Cache clearing failure:'); + debug(error); + } + }) + .on('error', (error) => { + debug(`Cache clearing failure on node ${i}:`); + debug(error); + completedNodes += 1; + if (completedNodes >= nodes.length) { + debug(`Cache clearing completes, cleared ${count} keys.`); + } + }) + .on('end', () => { + debug(`Cache clearing completes on cluster node ${i}`); + completedNodes += 1; + if (completedNodes >= nodes.length) { + debug(`Cache clearing completes, cleared ${count} keys.`); + } + }); + }); + }; + debug('Initialized redis client (cluster-mode)'); } else { interface RedisDetails { @@ -214,6 +260,40 @@ class RedisClient { this.client = addClientFuncs(client, redlock); + this.client.clearEdgeCache = () => { + const stream = (redisClient.internalClient as Redis).scanStream({ + match: '*bte:edgeCache:*', + count: 50, + }); + + let count = 0; + + stream + .on('data', (foundKeys: string[]) => { + if (!foundKeys.length) return; + count += foundKeys.length; + try { + redisClient.internalClient.del(...foundKeys.map((key) => key.replace(`${prefix} `, ''))).then( + () => null, + (error) => { + debug(`Cache clear: error deleting ${foundKeys.length} keys`); + debug(error); + }, + ); + } catch (error) { + debug('Cache clearing failure:'); + debug(error); + } + }) + .on('error', (error) => { + debug('Cache clearing failure:'); + debug(error); + }) + .on('end', () => { + debug(`Cache clearing completes, cleared ${count} keys.`); + }); + }; + debug('Initialized redis client (non-cluster-mode)'); } this.clientEnabled = true;