Skip to content

Commit

Permalink
update from main
Browse files Browse the repository at this point in the history
  • Loading branch information
e11sy committed Dec 25, 2024
2 parents f7fed46 + 4d1973c commit d50efc6
Show file tree
Hide file tree
Showing 10 changed files with 371 additions and 68 deletions.
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@
"typescript": "^3.8.3",
"uuid": "^8.3.0",
"winston": "^3.2.1",
"yup": "^0.28.5",
"@types/redis": "^2.8.28",
"yup": "^0.28.5",
"redis": "^4.7.0"
},
"devDependencies": {
Expand Down
3 changes: 1 addition & 2 deletions workers/grouper/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
"workerType": "grouper",
"dependencies": {
"@types/redis": "^2.8.28",
"js-levenshtein": "^1.1.6",
"redis": "^3.1.1"
"js-levenshtein": "^1.1.6"
}
}
3 changes: 3 additions & 0 deletions workers/grouper/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ export default class GrouperWorker extends Worker {
public async start(): Promise<void> {
await this.db.connect();
this.prepareCache();
await this.redis.initialize();

await super.start();
}

Expand All @@ -61,6 +63,7 @@ export default class GrouperWorker extends Worker {
await super.finish();
this.prepareCache();
await this.db.close();
await this.redis.close();
}

/**
Expand Down
57 changes: 50 additions & 7 deletions workers/grouper/src/redisHelper.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import HawkCatcher from '@hawk.so/nodejs';
import redis from 'redis';
import { createClient, RedisClientType } from 'redis';
import createLogger from '../../../lib/logger';

/**
Expand All @@ -14,27 +14,70 @@ export default class RedisHelper {
/**
* Redis client for making queries
*/
private readonly redisClient = redis.createClient({ url: process.env.REDIS_URL });
private readonly redisClient: RedisClientType;

/**
* Logger instance
* (default level='info')
*/
private logger = createLogger();

/**
* Constructor of the Redis helper class
* Initializes the Redis client and sets up error handling
*/
constructor() {
this.redisClient = createClient({ url: process.env.REDIS_URL });

this.redisClient.on('error', (error) => {
if (error) {
this.logger.error(error);
HawkCatcher.send(error);
}
});
}

/**
* Connect to redis client
*/
public async initialize(): Promise<void> {
try {
await this.redisClient.connect();
} catch (error) {
console.error('Error connecting to redis', error);
}
}

/**
* Close redis client
*/
public async close(): Promise<void> {
if (this.redisClient.isOpen) {
await this.redisClient.quit();
}
}

/**
* Checks if a lock exists on the given group hash and identifier pair. If it does not exist, creates a lock.
* Returns true if lock exists
*
* @param groupHash - event group hash
* @param userId - event user id
*/
public checkOrSetEventLock(groupHash: string, userId: string): Promise<boolean> {
return new Promise((resolve, reject) => {
const callback = this.createCallback(resolve, reject);
public async checkOrSetEventLock(groupHash: string, userId: string): Promise<boolean> {
const result = await this.redisClient.set(
`${groupHash}:${userId}`,
'1',
{
EX: RedisHelper.LOCK_TTL,
NX: true,
} as const
);

this.redisClient.set(`${groupHash}:${userId}`, '1', 'EX', RedisHelper.LOCK_TTL, 'NX', callback);
});
/**
* Result would be null if lock already exists, false otherwise
*/
return result === null;
}

/**
Expand Down
18 changes: 12 additions & 6 deletions workers/grouper/tests/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import '../../../env-test';
import GrouperWorker from '../src';
import { GroupWorkerTask } from '../types/group-worker-task';
import redis from 'redis';
import { createClient, RedisClientType } from 'redis';
import { Collection, MongoClient } from 'mongodb';
import { EventAddons, EventDataAccepted } from '@hawk.so/types';

Expand Down Expand Up @@ -74,14 +74,16 @@ function generateTask(event: Partial<EventDataAccepted<EventAddons>> = undefined
}

describe('GrouperWorker', () => {
const worker = new GrouperWorker();
let connection: MongoClient;
let eventsCollection: Collection;
let dailyEventsCollection: Collection;
let repetitionsCollection: Collection;
let redisClient;
let redisClient: RedisClientType;
let worker: GrouperWorker;

beforeAll(async () => {
worker = new GrouperWorker();

await worker.start();
connection = await MongoClient.connect(process.env.MONGO_EVENTS_DATABASE_URI, {
useNewUrlParser: true,
Expand All @@ -90,7 +92,10 @@ describe('GrouperWorker', () => {
eventsCollection = connection.db().collection('events:' + projectIdMock);
dailyEventsCollection = connection.db().collection('dailyEvents:' + projectIdMock);
repetitionsCollection = connection.db().collection('repetitions:' + projectIdMock);
redisClient = redis.createClient({ url: process.env.REDIS_URL });

redisClient = createClient({ url: process.env.REDIS_URL });
await redisClient.connect();

jest.resetAllMocks();
});

Expand All @@ -104,8 +109,8 @@ describe('GrouperWorker', () => {
await repetitionsCollection.deleteMany({});
});

afterEach((done) => {
redisClient.flushall(done);
afterEach(async () => {
await redisClient.flushAll();
});

describe('Saving events', () => {
Expand Down Expand Up @@ -299,6 +304,7 @@ describe('GrouperWorker', () => {
});

afterAll(async () => {
await redisClient.quit();
await worker.finish();
await connection.close();
});
Expand Down
3 changes: 1 addition & 2 deletions workers/limiter/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
"workerType": "cron-tasks/limiter",
"dependencies": {
"@types/redis": "^2.8.28",
"axios": "^0.21.2",
"redis": "^3.1.1"
"axios": "^0.21.2"
}
}
4 changes: 4 additions & 0 deletions workers/limiter/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ export default class LimiterWorker extends Worker {

this.projectsCollection = accountDbConnection.collection<ProjectDBScheme>('projects');
this.workspacesCollection = accountDbConnection.collection<WorkspaceDBScheme>('workspaces');

await this.redis.initialize();

await super.start();
}

Expand All @@ -91,6 +94,7 @@ export default class LimiterWorker extends Worker {
await super.finish();
await this.eventsDb.close();
await this.accountsDb.close();
await this.redis.close();
}

/**
Expand Down
66 changes: 57 additions & 9 deletions workers/limiter/src/redisHelper.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import HawkCatcher from '@hawk.so/nodejs';
import redis from 'redis';
import { createClient, RedisClientType } from 'redis';
import createLogger from '../../../lib/logger';

/**
Expand All @@ -9,7 +9,8 @@ export default class RedisHelper {
/**
* Redis client for making queries
*/
private readonly redisClient = redis.createClient({ url: process.env.REDIS_URL });
// private readonly redisClient = redis.createClient({ url: process.env.REDIS_URL });
private readonly redisClient: RedisClientType;

/**
* Logger instance
Expand All @@ -22,6 +23,35 @@ export default class RedisHelper {
*/
private readonly redisDisabledProjectsKey = 'DisabledProjectsSet';

/**
* Constructor of the Redis helper class
* Initializes the Redis client and sets up error handling
*/
constructor() {
this.redisClient = createClient({ url: process.env.REDIS_URL });

this.redisClient.on('error', (error) => {
this.logger.error(error);
HawkCatcher.send(error);
});
}

/**
* Connect to redis client
*/
public async initialize(): Promise<void> {
await this.redisClient.connect();
}

/**
* Close redis client
*/
public async close(): Promise<void> {
if (this.redisClient.isOpen) {
await this.redisClient.quit();
}
}

/**
* Saves banned project ids to redis
* If there is no projects, then previous data in Redis will be erased
Expand All @@ -33,12 +63,26 @@ export default class RedisHelper {
const callback = this.createCallback(resolve, reject);

if (projectIdsToBan.length) {
this.redisClient.multi()
.del(this.redisDisabledProjectsKey)
.sadd(this.redisDisabledProjectsKey, projectIdsToBan)
.exec(callback);
const pipeline = this.redisClient.multi();

pipeline.del(this.redisDisabledProjectsKey);

pipeline.sAdd(this.redisDisabledProjectsKey, projectIdsToBan);

try {
pipeline.exec();
callback(null);
} catch (err) {
callback(err);
}
} else {
this.redisClient.del(this.redisDisabledProjectsKey, callback);
this.redisClient.del(this.redisDisabledProjectsKey)
.then(() => {
callback(null);
})
.catch((err) => {
callback(err);
});
}
});
}
Expand All @@ -53,7 +97,9 @@ export default class RedisHelper {
const callback = this.createCallback(resolve, reject);

if (projectIds.length) {
this.redisClient.sadd(this.redisDisabledProjectsKey, projectIds, callback);
this.redisClient.sAdd(this.redisDisabledProjectsKey, projectIds)
.then(() => callback(null))
.catch((err) => callback(err));
} else {
resolve();
}
Expand All @@ -70,7 +116,9 @@ export default class RedisHelper {
const callback = this.createCallback(resolve, reject);

if (projectIds.length) {
this.redisClient.srem(this.redisDisabledProjectsKey, projectIds, callback);
this.redisClient.sRem(this.redisDisabledProjectsKey, projectIds)
.then(() => callback(null))
.catch((err) => callback(err));
} else {
resolve();
}
Expand Down
Loading

0 comments on commit d50efc6

Please sign in to comment.