Skip to content

Commit

Permalink
Merge branch 'cache-clear' of https://github.com/biothings/bte_trapi_…
Browse files Browse the repository at this point in the history
  • Loading branch information
tokebe committed Dec 11, 2023
2 parents 9021e74 + 97b0476 commit cfeb7a7
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 10 deletions.
6 changes: 3 additions & 3 deletions src/cache_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ export default class CacheHandler {
caching === false
? false
: process.env.RESULT_CACHING !== 'false'
? !(process.env.REDIS_HOST === undefined) && !(process.env.REDIS_PORT === undefined)
: false;
? !(process.env.REDIS_HOST === undefined) && !(process.env.REDIS_PORT === undefined)
: false;
this.recordConfig = recordConfig;
this.logs.push(
new LogEntry('DEBUG', null, `REDIS cache is ${this.cacheEnabled === true ? '' : 'not'} enabled.`).getLog(),
Expand Down Expand Up @@ -231,7 +231,7 @@ export default class CacheHandler {
if (global.parentPort) {
global.parentPort.postMessage({ threadId, addCacheKey: redisID });
}
await redisClient.client.usingLock([`redisLock:${redisID}`], 600000, async () => {
await redisClient.client.usingLock([`redisLock:${redisID}`, 'redisLock:EdgeCaching'], 600000, async () => {
try {
await redisClient.client.delTimeout(redisID); // prevents weird overwrite edge cases
await new Promise<void>((resolve, reject) => {
Expand Down
94 changes: 87 additions & 7 deletions src/redis-client.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import Redis, { Callback, Cluster, RedisKey } from 'ioredis';
import Redis, { Callback, Cluster, RedisKey, ScanStream } from 'ioredis';
import Debug from 'debug';
const debug = Debug('bte:biothings-explorer-trapi:redis-client');
import Redlock, { RedlockAbortSignal } from 'redlock';

const prefix = `{BTEHashSlotPrefix}`;
const prefix = `{BTEHashSlotPrefix}:`;

type AsyncFunction = (...args: unknown[]) => Promise<unknown>;
type DropFirst<T extends unknown[]> = T extends [unknown, ...infer U] ? U : never;
Expand Down Expand Up @@ -33,7 +33,7 @@ function timeoutFunc<F extends AsyncFunction>(func: F, timeoutms = 0) {
function addPrefix<F extends AsyncFunction>(func: F) {
return (arg0: Parameters<F>[0], ...args: DropFirst<Parameters<F>>): ReturnType<F> => {
if (args.length > 0) {
arg0 = `${prefix}:${arg0}`;
arg0 = `${prefix}${arg0}`;
}
return func(arg0, ...args) as ReturnType<F>;
};
Expand All @@ -44,7 +44,7 @@ function addPrefix<F extends AsyncFunction>(func: F) {
*/
function addPrefixToAll<F extends AsyncFunction>(func: F) {
return (...args: Parameters<F>): ReturnType<F> => {
return func(...args.map((arg) => `${prefix}:${arg}`)) as ReturnType<F>;
return func(...args.map((arg) => `${prefix}${arg}`)) as ReturnType<F>;
};
}

Expand All @@ -54,19 +54,20 @@ function addPrefixToAll<F extends AsyncFunction>(func: F) {
function lockPrefix<F extends AsyncFunction>(func: F) {
return async (locks: Parameters<F>[0], ...args: DropFirst<Parameters<F>>): Promise<Awaited<ReturnType<F>>> => {
return (await func(
(locks as string[]).map((lockName: string) => `${prefix}:${lockName}`),
(locks as string[]).map((lockName: string) => `${prefix}${lockName}`),
...args,
)) as Awaited<ReturnType<F>>;
};
}

interface RedisClientInterface {
clearEdgeCache: () => void;
getTimeout: (key: RedisKey) => Promise<string>;
setTimeout: (key: RedisKey, value: string | number | Buffer) => Promise<'OK'>;
hsetTimeout: (...args: [key: RedisKey, ...fieldValues: (string | Buffer | number)[]]) => Promise<number>;
hgetallTimeout: (key: RedisKey) => Promise<Record<string, string>>;
expireTimeout: (key: RedisKey, seconds: string | number) => Promise<number>;
delTimeout: (...args: RedisKey[]) => Promise<number>;
delTimeout: (key: RedisKey | RedisKey[]) => Promise<number>;
usingLock: (
resources: string[],
duration: number,
Expand All @@ -90,7 +91,7 @@ function addClientFuncs(client: Redis | Cluster, redlock: Redlock): RedisClientI
return wrapped;
}
return {
...client,
clearEdgeCache: () => null,
getTimeout: decorate((key: RedisKey) => client.get(key)),
setTimeout: decorate((key: RedisKey, value: string | number | Buffer) => client.set(key, value)),
hsetTimeout: decorate((...args: [key: RedisKey, ...fieldValues: (string | Buffer | number)[]]) =>
Expand Down Expand Up @@ -178,6 +179,51 @@ class RedisClient {

this.client = addClientFuncs(cluster, redlock);

this.client.clearEdgeCache = () => {
let count = 0;
const nodes = (this.internalClient as Cluster).nodes('master');
let completedNodes = 0;
nodes.forEach((node, i) => {
const stream = node.scanStream({
match: '*bte:edgeCache:*',
count: 50,
});

stream
.on('data', (foundKeys: string[]) => {
if (!foundKeys.length) return;
count += foundKeys.length;
try {
node.del(...foundKeys.map((key) => key.replace(`${prefix} `, ''))).then(
() => null,
(error) => {
debug(`Cache clear: error deleting ${foundKeys.length} keys`);
debug(error);
},
);
} catch (error) {
debug('Cache clearing failure:');
debug(error);
}
})
.on('error', (error) => {
debug(`Cache clearing failure on node ${i}:`);
debug(error);
completedNodes += 1;
if (completedNodes >= nodes.length) {
debug(`Cache clearing completes, cleared ${count} keys.`);
}
})
.on('end', () => {
debug(`Cache clearing completes on cluster node ${i}`);
completedNodes += 1;
if (completedNodes >= nodes.length) {
debug(`Cache clearing completes, cleared ${count} keys.`);
}
});
});
};

debug('Initialized redis client (cluster-mode)');
} else {
interface RedisDetails {
Expand Down Expand Up @@ -214,6 +260,40 @@ class RedisClient {

this.client = addClientFuncs(client, redlock);

this.client.clearEdgeCache = () => {
const stream = (redisClient.internalClient as Redis).scanStream({
match: '*bte:edgeCache:*',
count: 50,
});

let count = 0;

stream
.on('data', (foundKeys: string[]) => {
if (!foundKeys.length) return;
count += foundKeys.length;
try {
redisClient.internalClient.del(...foundKeys.map((key) => key.replace(`${prefix} `, ''))).then(
() => null,
(error) => {
debug(`Cache clear: error deleting ${foundKeys.length} keys`);
debug(error);
},
);
} catch (error) {
debug('Cache clearing failure:');
debug(error);
}
})
.on('error', (error) => {
debug('Cache clearing failure:');
debug(error);
})
.on('end', () => {
debug(`Cache clearing completes, cleared ${count} keys.`);
});
};

debug('Initialized redis client (non-cluster-mode)');
}
this.clientEnabled = true;
Expand Down

0 comments on commit cfeb7a7

Please sign in to comment.