From 6281477b039b536ce3ba89b9ff50c5389883c642 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 8 Dec 2024 20:09:20 +0300 Subject: [PATCH 01/17] add redis helper for notifier worker - added redis helper class and some methods for digest and threshold storing - added env.sample for redis connections - added dependencies related to redis --- package.json | 5 +- workers/notifier/.env.sample | 2 + workers/notifier/package.json | 6 ++- workers/notifier/src/redisHelper.ts | 83 +++++++++++++++++++++++++++++ 4 files changed, 93 insertions(+), 3 deletions(-) create mode 100644 workers/notifier/.env.sample create mode 100644 workers/notifier/src/redisHelper.ts diff --git a/package.json b/package.json index 776c85c3..3d85f82d 100644 --- a/package.json +++ b/package.json @@ -64,7 +64,9 @@ "typescript": "^3.8.3", "uuid": "^8.3.0", "winston": "^3.2.1", - "yup": "^0.28.5" + "yup": "^0.28.5", + "@types/redis": "^2.8.28", + "redis": "^4.7.0" }, "devDependencies": { "@shelf/jest-mongodb": "^1.2.3", @@ -73,7 +75,6 @@ "jest": "25.5.4", "nodemon": "^2.0.3", "random-words": "^1.1.1", - "redis-mock": "^0.56.3", "ts-jest": "25.4.0", "wait-for-expect": "^3.0.2", "webpack": "^4.43.0", diff --git a/workers/notifier/.env.sample b/workers/notifier/.env.sample new file mode 100644 index 00000000..85889136 --- /dev/null +++ b/workers/notifier/.env.sample @@ -0,0 +1,2 @@ +# Url for connecting to Redis +REDIS_URL=redis://redis:6379 diff --git a/workers/notifier/package.json b/workers/notifier/package.json index fba08a39..4c746b88 100644 --- a/workers/notifier/package.json +++ b/workers/notifier/package.json @@ -3,5 +3,9 @@ "version": "1.0.0", "main": "src/index.ts", "license": "MIT", - "workerType": "notifier" + "workerType": "notifier", + "dependencies": { + "@types/redis": "^2.8.28", + "redis": "^4.7.0" +} } diff --git a/workers/notifier/src/redisHelper.ts b/workers/notifier/src/redisHelper.ts new file mode 100644 index 00000000..5e1bcc3c --- /dev/null +++ b/workers/notifier/src/redisHelper.ts @@ -0,0 +1,83 @@ +import { createClient } from 'redis'; +import createLogger from '../../../lib/logger'; + +/** + * Class with helper functions for working with Redis + */ +export default class RedisHelper { + /** + * Redis client for making queries + */ + private readonly redisClient = createClient({ url: process.env.REDIS_URL }); + + /** + * Logger instance + * (default level='info') + */ + private logger = createLogger(); + + public async addEventToDigest(projectId: string, groupHash: string): Promise { + this.logger.debug('Stored in Redis digest'); + + const script = ` + local structure_key = KEYS[1] + local group_hash = ARGV[1] + + -- Check if current project has a digest + if redis.call("EXISTS", structure_key) == 0 then + -- If there is no digest for current project, create it + redis.call("ZADD", structure_key, 1, group_hash) + return 1 + else + -- If there is a digest for current project, increment the score + local current_score = redis.call("ZSCORE", structure_key, group_hash) + if current_score then + redis.call("ZINCRBY", structure_key, 1, group_hash) + return current_score + 1 + else + redis.call("ZADD", structure_key, 1, group_hash) + return 1 + end + end + `; + + const digestKey = `digest:${projectId}`; + + await this.redisClient.eval(script, { + keys: [digestKey], + arguments: [groupHash], + }); + } + + /** + * Method that get repetitions event repetitions count from todays digest of the project + * @param projectId - id of the project to get the repetitions count from + * @param groupHash - hash of the event group + * @returns + */ + public async getEventRepetitionsFromDigest(projectId: string, groupHash: string): Promise { + const digestRepetitionCount = await this.redisClient.get(`digest:${projectId}:${groupHash}`) + + return (digestRepetitionCount !== null) ? parseInt(digestRepetitionCount) : null + } + + /** + * Method that sets the event threshold in redis storage + * @param projectId - id of the project to set the threshold for + * @param threshold - threshold value to set + */ + public setProjectNotificationTreshold(projectId: string, threshold: number): void { + this.redisClient.set(`threshold:${projectId}`, threshold.toString()); + } + + /** + * Method that gets the event threshold from redis storage + * @param projectId - id of the project to get the threshold from + * @returns threshold value for the project + */ + public async getProjectNotificationThreshold(projectId: string): Promise { + const threshold = await this.redisClient.get(`threshold:${projectId}`); + + return (threshold !== null) ? parseInt(threshold) : null; + } +} From 6a060873275cc18773e5fc65e91043bc82bce324 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 8 Dec 2024 20:12:10 +0300 Subject: [PATCH 02/17] add notifier methods for threshold evaluation - add notifier method to get threshold from redis or from db - add notifier method to check if event is critical - updated lock --- workers/notifier/src/index.ts | 158 ++++++++++++++++++++++++++++++---- yarn.lock | 71 ++++++++++++--- 2 files changed, 200 insertions(+), 29 deletions(-) diff --git a/workers/notifier/src/index.ts b/workers/notifier/src/index.ts index f6cc4ce8..357f5f2c 100644 --- a/workers/notifier/src/index.ts +++ b/workers/notifier/src/index.ts @@ -10,8 +10,8 @@ import { Rule } from '../types/rule'; import { SenderWorkerTask } from 'hawk-worker-sender/types/sender-task'; import Buffer, { BufferData, ChannelKey, EventKey } from './buffer'; import RuleValidator from './validator'; -import { MS_IN_SEC } from '../../../lib/utils/consts'; import Time from '../../../lib/utils/time'; +import RedisHelper from './redisHelper'; /** * Worker to buffer events before sending notifications about them @@ -23,10 +23,11 @@ export default class NotifierWorker extends Worker { public readonly type: string = pkg.workerType; /** - * Database Controller + * Database Controllers */ - private db: DatabaseController = new DatabaseController(process.env.MONGO_ACCOUNTS_DATABASE_URI); - + private accountsDb: DatabaseController = new DatabaseController(process.env.MONGO_ACCOUNTS_DATABASE_URI); + private eventsDb: DatabaseController = new DatabaseController(process.env.MONGO_EVENTS_DATABASE_URI); + /** * Received events buffer */ @@ -37,12 +38,20 @@ export default class NotifierWorker extends Worker { */ private readonly DEFAULT_MIN_PERIOD = 60; + /** + * Redis helper instance for modifying data through redis + */ + private redis = new RedisHelper(); + /** * Start consuming messages */ public async start(): Promise { - await this.db.connect(); + await this.accountsDb.connect(); + await this.eventsDb.connect(); await super.start(); + + await this.handle(); } /** @@ -50,7 +59,8 @@ export default class NotifierWorker extends Worker { */ public async finish(): Promise { await super.finish(); - await this.db.close(); + await this.accountsDb.connect(); + await this.eventsDb.connect(); } /** @@ -76,19 +86,129 @@ export default class NotifierWorker extends Worker { * * @param {NotifierWorkerTask} task — task to handle */ - public async handle(task: NotifierWorkerTask): Promise { + public async handle(task?: NotifierWorkerTask): Promise { try { - const { projectId, event } = task; + // const { projectId, event } = task; + const projectId = '673d8c2c1df5f6e57de2b269'; + const event = { + title: 'asdfasdf', + groupHash: 'asdfasdf', + isNew: true, + } + const rules = await this.getFittedRules(projectId, event); - + rules.forEach((rule) => { this.addEventToChannels(projectId, rule, event); }); + + if (await this.isEventCritical(projectId, event)) { + console.log('critical'); + } else { + console.log('not critical'); + } + + // const connection = this.db.getConnection(); + // const repetitions = connection.collection('dailyEvents:673d8c2c1df5f6e57de2b269'); + // const repetitions = connection.collection('adsfasdf'); + + // console.log(await repetitions.find({_id: new ObjectID('673d8e2c316c4c0b85c08d9a')}).toArray()); + // console.log(await repetitions.find({}).toArray()); + // await this.redis.addEventToDigest(task.projectId, task.event.groupHash); + } catch (e) { this.logger.error('Failed to handle message because of ', e); } } + /** + * Method that returns threshold for current project + * Used to check if event is critical or not + * @param projectId - if of the project, to get notification threshold for + */ + private async getNotificationThreshold(projectId: string): Promise { + const storedEventsCount = this.redis.getProjectNotificationThreshold(projectId); + + /** + * If redis has no threshold stored, then get it from the database + */ + if (storedEventsCount === null) { + const connection = this.eventsDb.getConnection(); + + const currentTime = Date.now(); + const twoDaysAgo = currentTime - 48 * 60 * 60 * 1000; + const oneDayAgo = currentTime - 24 * 60 * 60 * 1000; + + const events = connection.collection(`events:${projectId}`); + const repetitions = connection.collection(`repetitions:${projectId}`); + + /** + * Get ten events of the current project + */ + const eventsToEvaluate = await events.find({}).limit(10).toArray(); + + let averageProjectRepetitionsADay = 0 + + /** + * For each event get repetitions since two days to one day ago + */ + eventsToEvaluate.forEach(async (event) => { + const repetitionsCount = await repetitions.countDocuments({ + 'payload.timestamp': {$gte: twoDaysAgo, $le: oneDayAgo}, + 'groupHash': event.groupHash, + }); + + averageProjectRepetitionsADay += repetitionsCount; + }); + + /** + * Set counted repetitions count into redis storage + */ + this.redis.setProjectNotificationTreshold(projectId, averageProjectRepetitionsADay); + + /** + * Return floored average repetitions count + */ + return Math.floor(averageProjectRepetitionsADay / 10); + } + } + + /** + * Check if event is critical + * @param {NotifierEvent} event — received event + * @returns {boolean} + */ + private async isEventCritical(projectId: string, event: NotifierEvent): Promise { + /** + * Increment event repetitions count in digest + */ + this.redis.addEventToDigest(projectId, event.groupHash); + + /** + * Get current event repetitions from digest + */ + const eventRepetitionsToday = await this.redis.getEventRepetitionsFromDigest(projectId, event.groupHash); + + const projectThreshold = await this.getNotificationThreshold(projectId); + + /** + * Check if event repetitions are equal to threshold + */ + if (eventRepetitionsToday !== null && eventRepetitionsToday === projectThreshold) { + return true; + } + /** + * Check if event is new + */ + else if (event.isNew) { + return true; + } + + /** + * Event is not critical in other cases + */ + return false; + } /** * Get project notifications rules that matches received event * @@ -150,17 +270,17 @@ export default class NotifierWorker extends Worker { return; } - const minPeriod = (options.minPeriod || this.DEFAULT_MIN_PERIOD) * MS_IN_SEC; + // const minPeriod = (options.minPeriod || this.DEFAULT_MIN_PERIOD) * MS_IN_SEC; - /** - * Set timer to send events after min period of time is passed - */ - this.buffer.setTimer(channelKey, minPeriod, this.sendEvents); + // /** + // * Set timer to send events after min period of time is passed + // */ + // this.buffer.setTimer(channelKey, minPeriod, this.sendEvents); - await this.sendToSenderWorker(channelKey, [ { - key: event.groupHash, - count: 1, - } ]); + // await this.sendToSenderWorker(channelKey, [ { + // key: event.groupHash, + // count: 1, + // } ]); }); } @@ -208,7 +328,7 @@ export default class NotifierWorker extends Worker { * @returns {Promise} - project notification rules */ private async getProjectNotificationRules(projectId: string): Promise { - const connection = this.db.getConnection(); + const connection = this.accountsDb.getConnection(); const projects = connection.collection('projects'); const project = await projects.findOne({ _id: new ObjectID(projectId) }); diff --git a/yarn.lock b/yarn.lock index a2d8bd88..7fbaf0c6 100644 --- a/yarn.lock +++ b/yarn.lock @@ -569,6 +569,40 @@ "@nodelib/fs.scandir" "2.1.5" fastq "^1.6.0" +"@redis/bloom@1.2.0": + version "1.2.0" + resolved "https://registry.yarnpkg.com/@redis/bloom/-/bloom-1.2.0.tgz#d3fd6d3c0af3ef92f26767b56414a370c7b63b71" + integrity sha512-HG2DFjYKbpNmVXsa0keLHp/3leGJz1mjh09f2RLGGLQZzSHpkmZWuwJbAvo3QcRY8p80m5+ZdXZdYOSBLlp7Cg== + +"@redis/client@1.6.0": + version "1.6.0" + resolved "https://registry.yarnpkg.com/@redis/client/-/client-1.6.0.tgz#dcf4ae1319763db6fdddd6de7f0af68a352c30ea" + integrity sha512-aR0uffYI700OEEH4gYnitAnv3vzVGXCFvYfdpu/CJKvk4pHfLPEy/JSZyrpQ+15WhXe1yJRXLtfQ84s4mEXnPg== + dependencies: + cluster-key-slot "1.1.2" + generic-pool "3.9.0" + yallist "4.0.0" + +"@redis/graph@1.1.1": + version "1.1.1" + resolved "https://registry.yarnpkg.com/@redis/graph/-/graph-1.1.1.tgz#8c10df2df7f7d02741866751764031a957a170ea" + integrity sha512-FEMTcTHZozZciLRl6GiiIB4zGm5z5F3F6a6FZCyrfxdKOhFlGkiAqlexWMBzCi4DcRoyiOsuLfW+cjlGWyExOw== + +"@redis/json@1.0.7": + version "1.0.7" + resolved "https://registry.yarnpkg.com/@redis/json/-/json-1.0.7.tgz#016257fcd933c4cbcb9c49cde8a0961375c6893b" + integrity sha512-6UyXfjVaTBTJtKNG4/9Z8PSpKE6XgSyEb8iwaqDcy+uKrd/DGYHTWkUdnQDyzm727V7p21WUMhsqz5oy65kPcQ== + +"@redis/search@1.2.0": + version "1.2.0" + resolved "https://registry.yarnpkg.com/@redis/search/-/search-1.2.0.tgz#50976fd3f31168f585666f7922dde111c74567b8" + integrity sha512-tYoDBbtqOVigEDMAcTGsRlMycIIjwMCgD8eR2t0NANeQmgK/lvxNAvYyb6bZDD4frHRhIHkJu2TBRvB0ERkOmw== + +"@redis/time-series@1.1.0": + version "1.1.0" + resolved "https://registry.yarnpkg.com/@redis/time-series/-/time-series-1.1.0.tgz#cba454c05ec201bd5547aaf55286d44682ac8eb5" + integrity sha512-c1Q99M5ljsIuc4YdaCwfUEXsofakb9c8+Zse2qxTadu8TalLXuAESzLvFAvNVbkmSlvlzIQOLpBCmWI9wTOt+g== + "@shelf/jest-mongodb@^1.2.3": version "1.3.4" resolved "https://registry.yarnpkg.com/@shelf/jest-mongodb/-/jest-mongodb-1.3.4.tgz#200bac386cf513bed2d41952b1857689f0b88f31" @@ -1960,6 +1994,11 @@ clone@2.x: resolved "https://registry.yarnpkg.com/clone/-/clone-2.1.2.tgz#1b7f4b9f591f1e8f83670401600345a02887435f" integrity sha1-G39Ln1kfHo+DZwQBYANFoCiHQ18= +cluster-key-slot@1.1.2: + version "1.1.2" + resolved "https://registry.yarnpkg.com/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz#88ddaa46906e303b5de30d3153b7d9fe0a0c19ac" + integrity sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA== + co@^4.6.0: version "4.6.0" resolved "https://registry.yarnpkg.com/co/-/co-4.6.0.tgz#6ea6bdf3d853ae54ccb8e47bfa0bf3f9031fb184" @@ -3297,6 +3336,11 @@ gc-stats@^1.4.0: nan "^2.13.2" node-pre-gyp "^0.13.0" +generic-pool@3.9.0: + version "3.9.0" + resolved "https://registry.yarnpkg.com/generic-pool/-/generic-pool-3.9.0.tgz#36f4a678e963f4fdb8707eab050823abc4e8f5e4" + integrity sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g== + gensync@^1.0.0-beta.2: version "1.0.0-beta.2" resolved "https://registry.yarnpkg.com/gensync/-/gensync-1.0.0-beta.2.tgz#32a6ee76c3d7f52d46b2b1ae5d93fea8580a25e0" @@ -6273,11 +6317,6 @@ redis-errors@^1.0.0, redis-errors@^1.2.0: resolved "https://registry.yarnpkg.com/redis-errors/-/redis-errors-1.2.0.tgz#eb62d2adb15e4eaf4610c04afe1529384250abad" integrity sha1-62LSrbFeTq9GEMBK/hUpOEJQq60= -redis-mock@^0.56.3: - version "0.56.3" - resolved "https://registry.yarnpkg.com/redis-mock/-/redis-mock-0.56.3.tgz#e96471bcc774ddc514c2fc49cdd03cab2baecd89" - integrity sha512-ynaJhqk0Qf3Qajnwvy4aOjS4Mdf9IBkELWtjd+NYhpiqu4QCNq6Vf3Q7c++XRPGiKiwRj9HWr0crcwy7EiPjYQ== - redis-parser@^3.0.0: version "3.0.0" resolved "https://registry.yarnpkg.com/redis-parser/-/redis-parser-3.0.0.tgz#b66d828cdcafe6b4b8a428a7def4c6bcac31c8b4" @@ -6295,6 +6334,18 @@ redis@^3.1.1: redis-errors "^1.2.0" redis-parser "^3.0.0" +redis@^4.7.0: + version "4.7.0" + resolved "https://registry.yarnpkg.com/redis/-/redis-4.7.0.tgz#b401787514d25dd0cfc22406d767937ba3be55d6" + integrity sha512-zvmkHEAdGMn+hMRXuMBtu4Vo5P6rHQjLoHftu+lBqq8ZTA3RCVC/WzD790bkKKiNFp7d5/9PcSD19fJyyRvOdQ== + dependencies: + "@redis/bloom" "1.2.0" + "@redis/client" "1.6.0" + "@redis/graph" "1.1.1" + "@redis/json" "1.0.7" + "@redis/search" "1.2.0" + "@redis/time-series" "1.1.0" + regenerator-runtime@^0.13.4: version "0.13.9" resolved "https://registry.yarnpkg.com/regenerator-runtime/-/regenerator-runtime-0.13.9.tgz#8925742a98ffd90814988d7566ad30ca3b263b52" @@ -7939,6 +7990,11 @@ y18n@^4.0.0: resolved "https://registry.yarnpkg.com/y18n/-/y18n-4.0.3.tgz#b5f259c82cd6e336921efd7bfd8bf560de9eeedf" integrity sha512-JKhqTOwSrqNA1NY5lSztJ1GrBiUodLMmIZuLiDaMRJ+itFd+ABVE8XBjOvIWL+rSqNDC74LCSFmlb/U4UZ4hJQ== +yallist@4.0.0, yallist@^4.0.0: + version "4.0.0" + resolved "https://registry.yarnpkg.com/yallist/-/yallist-4.0.0.tgz#9bb92790d9c0effec63be73519e11a35019a3a72" + integrity sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A== + yallist@^2.1.2: version "2.1.2" resolved "https://registry.yarnpkg.com/yallist/-/yallist-2.1.2.tgz#1c11f9218f076089a47dd512f93c6699a6a81d52" @@ -7949,11 +8005,6 @@ yallist@^3.0.0, yallist@^3.0.2, yallist@^3.1.1: resolved "https://registry.yarnpkg.com/yallist/-/yallist-3.1.1.tgz#dbb7daf9bfd8bac9ab45ebf602b8cbad0d5d08fd" integrity sha512-a4UGQaWPH59mOXUYnAG2ewncQS4i4F43Tv3JoAM+s2VDAmS9NsK8GpDMLrCHPksFT7h3K6TOoUNn2pb7RoXx4g== -yallist@^4.0.0: - version "4.0.0" - resolved "https://registry.yarnpkg.com/yallist/-/yallist-4.0.0.tgz#9bb92790d9c0effec63be73519e11a35019a3a72" - integrity sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A== - yargs-parser@13.1.2, yargs-parser@^13.1.2: version "13.1.2" resolved "https://registry.yarnpkg.com/yargs-parser/-/yargs-parser-13.1.2.tgz#130f09702ebaeef2650d54ce6e3e5706f7a4fb38" From ced3bef6ac8df233fe3ed28e68c8cec650bff6d2 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 8 Dec 2024 20:31:58 +0300 Subject: [PATCH 03/17] clean up in notifier --- workers/notifier/src/index.ts | 63 +++++++++++------------------------ 1 file changed, 19 insertions(+), 44 deletions(-) diff --git a/workers/notifier/src/index.ts b/workers/notifier/src/index.ts index 357f5f2c..3441f2c3 100644 --- a/workers/notifier/src/index.ts +++ b/workers/notifier/src/index.ts @@ -50,8 +50,6 @@ export default class NotifierWorker extends Worker { await this.accountsDb.connect(); await this.eventsDb.connect(); await super.start(); - - await this.handle(); } /** @@ -86,36 +84,25 @@ export default class NotifierWorker extends Worker { * * @param {NotifierWorkerTask} task — task to handle */ - public async handle(task?: NotifierWorkerTask): Promise { + public async handle(task: NotifierWorkerTask): Promise { try { - // const { projectId, event } = task; - const projectId = '673d8c2c1df5f6e57de2b269'; - const event = { - title: 'asdfasdf', - groupHash: 'asdfasdf', - isNew: true, - } - - const rules = await this.getFittedRules(projectId, event); - - rules.forEach((rule) => { - this.addEventToChannels(projectId, rule, event); - }); + const { projectId, event } = task; + + /** + * Increment event repetitions count in digest + */ + this.redis.addEventToDigest(projectId, event.groupHash); + /** + * If event is critical, then send it to the channels + */ if (await this.isEventCritical(projectId, event)) { - console.log('critical'); - } else { - console.log('not critical'); - } - - // const connection = this.db.getConnection(); - // const repetitions = connection.collection('dailyEvents:673d8c2c1df5f6e57de2b269'); - // const repetitions = connection.collection('adsfasdf'); - - // console.log(await repetitions.find({_id: new ObjectID('673d8e2c316c4c0b85c08d9a')}).toArray()); - // console.log(await repetitions.find({}).toArray()); - // await this.redis.addEventToDigest(task.projectId, task.event.groupHash); + const rules = await this.getFittedRules(projectId, event); + rules.forEach((rule) => { + this.addEventToChannels(projectId, rule, event); + }); + } } catch (e) { this.logger.error('Failed to handle message because of ', e); } @@ -179,11 +166,6 @@ export default class NotifierWorker extends Worker { * @returns {boolean} */ private async isEventCritical(projectId: string, event: NotifierEvent): Promise { - /** - * Increment event repetitions count in digest - */ - this.redis.addEventToDigest(projectId, event.groupHash); - /** * Get current event repetitions from digest */ @@ -270,17 +252,10 @@ export default class NotifierWorker extends Worker { return; } - // const minPeriod = (options.minPeriod || this.DEFAULT_MIN_PERIOD) * MS_IN_SEC; - - // /** - // * Set timer to send events after min period of time is passed - // */ - // this.buffer.setTimer(channelKey, minPeriod, this.sendEvents); - - // await this.sendToSenderWorker(channelKey, [ { - // key: event.groupHash, - // count: 1, - // } ]); + await this.sendToSenderWorker(channelKey, [ { + key: event.groupHash, + count: 1, + } ]); }); } From a348328de93e2e160b94c673fc2eb35e979fd3aa Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 8 Dec 2024 20:47:53 +0300 Subject: [PATCH 04/17] get rid of the buffer - get rid of the buffer - move types to types directory - improve old comments --- workers/notifier/src/buffer.ts | 270 ---------------------------- workers/notifier/src/index.ts | 72 +++----- workers/notifier/src/redisHelper.ts | 23 ++- workers/notifier/types/channel.ts | 29 +++ 4 files changed, 66 insertions(+), 328 deletions(-) delete mode 100644 workers/notifier/src/buffer.ts diff --git a/workers/notifier/src/buffer.ts b/workers/notifier/src/buffer.ts deleted file mode 100644 index f6d097de..00000000 --- a/workers/notifier/src/buffer.ts +++ /dev/null @@ -1,270 +0,0 @@ -'use strict'; - -import Timeout = NodeJS.Timeout; - -/** - * Index signature generic helper - */ -interface Index { - [key: string]: T; -} - -/** - * Buffer channel object schema - */ -interface ChannelSchema { - /** - * Object which keys are events' group hash - * and values are number of events received for minPeriod - */ - payload: Index; - - /** - * Channel timer - */ - timer: Timeout | null; -} - -/** - * Events data schema - */ -export interface BufferData { - /** - * Group hash - */ - key: string; - - /** - * Number of events received for minPeriod - */ - count: number; -} - -/** - * Schema for notification rule: Object - */ -type RuleSchema = Index; - -/** - * Schema for project: Object - */ -type ProjectSchema = Index; - -/** - * Buffer schema: Object - */ -type BufferSchema = Index; - -/** - * Composed key to identify channel - * - * [projectId, ruleId, channelName] - */ -export type ChannelKey = [string, string, string]; - -/** - * Composed key to identify event - * - * [projectId, ruleId, channelName, eventGroupHash] - */ -export type EventKey = [string, string, string, string]; - -/** - * Channels' buffer to store number of received events - */ -export default class Buffer { - /** - * Store - */ - private projects: BufferSchema = {}; - - /** - * Add event to channel's buffer - * - * @param key - key of event to increment - */ - public push(key: EventKey): void { - const eventKey = key[3]; - const channel = this.getChannel(key.slice(0, -1) as ChannelKey); - - this.getField(channel.payload, eventKey, 0); - - channel.payload[eventKey]++; - } - - /** - * Get channel data - * - * @param key - key of channel to retrieve - */ - public get(key: ChannelKey): BufferData[]; - - /** - * Get event data - * - * @param key - key of event to get - * - * @returns {number} - number of events received for minPeriod - */ - // eslint-disable-next-line no-dupe-class-members - public get(key: EventKey): number; - - /** - * Implementation of two methods above - * - * @param arg - Channel or Event key - */ - // eslint-disable-next-line no-dupe-class-members - public get(arg: ChannelKey | EventKey): BufferData[] | number { - const [projectId, ruleId, channelName, key] = arg; - - const channel = this.getChannel([projectId, ruleId, channelName]); - - if (key) { - return channel.payload[key]; - } - - return Object - .entries(channel.payload) - .map(([k, count]) => ({ - key: k, - count, - })); - } - - /** - * Return size of channel's buffer - * - * @param key - key of channel to get size of - * - * @returns {number} - */ - public size(key: ChannelKey): number { - return this.get(key).length; - } - - /** - * Set timer for channel - * - * @param key - key of channel to set timer to - * @param timeout - timer timeout time in ms - * @param callback - callback to call on timeout - */ - // eslint-disable-next-line @typescript-eslint/no-explicit-any - public setTimer(key: ChannelKey, timeout: number, callback: (...args: any[]) => void): Timeout { - const channel = this.getChannel(key); - - channel.timer = setTimeout( - callback, - timeout, - key - ); - - return channel.timer; - } - - /** - * Get channel timer - * - * @param key - key of channel to get timer - * - * @returns {Timeout} - */ - public getTimer(key: ChannelKey): Timeout | null { - const channel = this.getChannel(key); - - return channel.timer; - } - - /** - * Clear channel timer - * - * @param key - key of channel to clear timer - */ - public clearTimer(key: ChannelKey): void { - const channel = this.getChannel(key); - - if (channel.timer) { - clearTimeout(channel.timer); - } - - channel.timer = null; - } - - /** - * Flush channel buffer and return it's data - * - * @param key - key of channel to flush - */ - public flush(key: ChannelKey): BufferData[] { - const channel = this.getChannel(key); - - const data = this.get(key); - - channel.payload = {}; - - return data; - } - - /** - * Flush project buffer or whole buffer - * - * @param {string} [projectId] - project to flush, if not set whole buffer is flushed - */ - public flushAll(projectId?: string): void { - if (projectId) { - this.projects[projectId] = {}; - - return; - } - - this.projects = {}; - } - - /** - * Get channel buffer - * - * @param channelKey - key for getting channel - */ - private getChannel(channelKey: ChannelKey): ChannelSchema { - const [projectId, ruleId, channelName] = channelKey; - const project = this.getField( - this.projects, - projectId, - {} - ); - const rule = this.getField( - project, - ruleId, - {} - ); - - return this.getField( - rule, - channelName, - { - payload: {}, - timer: null, - } - ); - } - - /** - * Helper method to get object field and set default value if one doesn't exist - * - * @param obj — any object - * @param field — field to get - * @param defaultValue - default value to set if field doesn't exist - */ - private getField( - obj: {[key: string]: V}, - field: string, - defaultValue: V - ): V { - if (!(field in obj)) { - obj[field] = defaultValue; - } - - return obj[field]; - } -} diff --git a/workers/notifier/src/index.ts b/workers/notifier/src/index.ts index 3441f2c3..6e35ba70 100644 --- a/workers/notifier/src/index.ts +++ b/workers/notifier/src/index.ts @@ -4,11 +4,10 @@ import { ObjectID } from 'mongodb'; import { DatabaseController } from '../../../lib/db/controller'; import { Worker } from '../../../lib/worker'; import * as pkg from '../package.json'; -import { Channel } from '../types/channel'; +import { Channel, ChannelKey, SenderData } from '../types/channel'; import { NotifierEvent, NotifierWorkerTask } from '../types/notifier-task'; import { Rule } from '../types/rule'; import { SenderWorkerTask } from 'hawk-worker-sender/types/sender-task'; -import Buffer, { BufferData, ChannelKey, EventKey } from './buffer'; import RuleValidator from './validator'; import Time from '../../../lib/utils/time'; import RedisHelper from './redisHelper'; @@ -27,16 +26,6 @@ export default class NotifierWorker extends Worker { */ private accountsDb: DatabaseController = new DatabaseController(process.env.MONGO_ACCOUNTS_DATABASE_URI); private eventsDb: DatabaseController = new DatabaseController(process.env.MONGO_EVENTS_DATABASE_URI); - - /** - * Received events buffer - */ - private buffer: Buffer = new Buffer(); - - /** - * Default period between messages in seconds - */ - private readonly DEFAULT_MIN_PERIOD = 60; /** * Redis helper instance for modifying data through redis @@ -87,7 +76,7 @@ export default class NotifierWorker extends Worker { public async handle(task: NotifierWorkerTask): Promise { try { const { projectId, event } = task; - + /** * Increment event repetitions count in digest */ @@ -111,11 +100,12 @@ export default class NotifierWorker extends Worker { /** * Method that returns threshold for current project * Used to check if event is critical or not + * * @param projectId - if of the project, to get notification threshold for */ private async getNotificationThreshold(projectId: string): Promise { const storedEventsCount = this.redis.getProjectNotificationThreshold(projectId); - + /** * If redis has no threshold stored, then get it from the database */ @@ -132,17 +122,21 @@ export default class NotifierWorker extends Worker { /** * Get ten events of the current project */ - const eventsToEvaluate = await events.find({}).limit(10).toArray(); + const eventsToEvaluate = await events.find({}).limit(10) + .toArray(); - let averageProjectRepetitionsADay = 0 + let averageProjectRepetitionsADay = 0; /** - * For each event get repetitions since two days to one day ago + * For each event get repetitions since two days to one day ago */ eventsToEvaluate.forEach(async (event) => { const repetitionsCount = await repetitions.countDocuments({ - 'payload.timestamp': {$gte: twoDaysAgo, $le: oneDayAgo}, - 'groupHash': event.groupHash, + 'payload.timestamp': { + $gte: twoDaysAgo, + $le: oneDayAgo, + }, + groupHash: event.groupHash, }); averageProjectRepetitionsADay += repetitionsCount; @@ -162,6 +156,8 @@ export default class NotifierWorker extends Worker { /** * Check if event is critical + * + * @param projectId - id of the project to of the event * @param {NotifierEvent} event — received event * @returns {boolean} */ @@ -178,19 +174,19 @@ export default class NotifierWorker extends Worker { */ if (eventRepetitionsToday !== null && eventRepetitionsToday === projectThreshold) { return true; - } /** * Check if event is new */ - else if (event.isNew) { + } else if (event.isNew) { return true; - } - + } + /** * Event is not critical in other cases */ return false; } + /** * Get project notifications rules that matches received event * @@ -244,14 +240,7 @@ export default class NotifierWorker extends Worker { } const channelKey: ChannelKey = [projectId, rule._id.toString(), name]; - const eventKey: EventKey = [projectId, rule._id.toString(), name, event.groupHash]; - - if (this.buffer.getTimer(channelKey)) { - this.buffer.push(eventKey); - - return; - } - + await this.sendToSenderWorker(channelKey, [ { key: event.groupHash, count: 1, @@ -259,30 +248,13 @@ export default class NotifierWorker extends Worker { }); } - /** - * Get events from buffer, flush buffer and send event to sender workers - * - * @param {ChannelKey} channelKey — buffer key - */ - private sendEvents = async (channelKey: ChannelKey): Promise => { - this.buffer.clearTimer(channelKey); - - const events = this.buffer.flush(channelKey); - - if (!events.length) { - return; - } - - await this.sendToSenderWorker(channelKey, events); - }; - /** * Send task to sender workers * * @param {ChannelKey} key — buffer key - * @param {BufferData[]} events - events to send + * @param {SenderData[]} events - events to send */ - private async sendToSenderWorker(key: ChannelKey, events: BufferData[]): Promise { + private async sendToSenderWorker(key: ChannelKey, events: SenderData[]): Promise { const [projectId, ruleId, channelName] = key; await this.addTask(`sender/${channelName}`, { diff --git a/workers/notifier/src/redisHelper.ts b/workers/notifier/src/redisHelper.ts index 5e1bcc3c..3d2a0af4 100644 --- a/workers/notifier/src/redisHelper.ts +++ b/workers/notifier/src/redisHelper.ts @@ -16,9 +16,13 @@ export default class RedisHelper { */ private logger = createLogger(); + /** + * @param projectId + * @param groupHash + */ public async addEventToDigest(projectId: string, groupHash: string): Promise { this.logger.debug('Stored in Redis digest'); - + const script = ` local structure_key = KEYS[1] local group_hash = ARGV[1] @@ -40,29 +44,31 @@ export default class RedisHelper { end end `; - + const digestKey = `digest:${projectId}`; - + await this.redisClient.eval(script, { - keys: [digestKey], - arguments: [groupHash], + keys: [ digestKey ], + arguments: [ groupHash ], }); } /** * Method that get repetitions event repetitions count from todays digest of the project + * * @param projectId - id of the project to get the repetitions count from * @param groupHash - hash of the event group - * @returns + * @returns */ public async getEventRepetitionsFromDigest(projectId: string, groupHash: string): Promise { - const digestRepetitionCount = await this.redisClient.get(`digest:${projectId}:${groupHash}`) + const digestRepetitionCount = await this.redisClient.get(`digest:${projectId}:${groupHash}`); - return (digestRepetitionCount !== null) ? parseInt(digestRepetitionCount) : null + return (digestRepetitionCount !== null) ? parseInt(digestRepetitionCount) : null; } /** * Method that sets the event threshold in redis storage + * * @param projectId - id of the project to set the threshold for * @param threshold - threshold value to set */ @@ -72,6 +78,7 @@ export default class RedisHelper { /** * Method that gets the event threshold from redis storage + * * @param projectId - id of the project to get the threshold from * @returns threshold value for the project */ diff --git a/workers/notifier/types/channel.ts b/workers/notifier/types/channel.ts index 855e0e5b..c6b1bc70 100644 --- a/workers/notifier/types/channel.ts +++ b/workers/notifier/types/channel.ts @@ -7,6 +7,35 @@ export enum ChannelType { Slack = 'slack', } +/** + * Composed key to identify channel + * + * [projectId, ruleId, channelName] + */ +export type ChannelKey = [string, string, string]; + +/** + * Composed key to identify event + * + * [projectId, ruleId, channelName, eventGroupHash] + */ +export type EventKey = [string, string, string, string]; + +/** + * Interface that represents data, that notifier passes to sender worker + */ +export interface SenderData { + /** + * Group hash of the event + */ + key: string; + + /** + * Number of events received + */ + count: number; +} + /** * Notification channel object */ From a77b393279688dfcf4afb6a8abbdb28e23316ae0 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 8 Dec 2024 20:53:34 +0300 Subject: [PATCH 05/17] fix lint issues --- workers/notifier/src/index.ts | 6 +++++- workers/notifier/src/redisHelper.ts | 8 ++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/workers/notifier/src/index.ts b/workers/notifier/src/index.ts index 6e35ba70..078ae7aa 100644 --- a/workers/notifier/src/index.ts +++ b/workers/notifier/src/index.ts @@ -113,7 +113,9 @@ export default class NotifierWorker extends Worker { const connection = this.eventsDb.getConnection(); const currentTime = Date.now(); + /* eslint-disable-next-line @typescript-eslint/no-magic-numbers */ const twoDaysAgo = currentTime - 48 * 60 * 60 * 1000; + /* eslint-disable-next-line @typescript-eslint/no-magic-numbers */ const oneDayAgo = currentTime - 24 * 60 * 60 * 1000; const events = connection.collection(`events:${projectId}`); @@ -122,6 +124,7 @@ export default class NotifierWorker extends Worker { /** * Get ten events of the current project */ + /* eslint-disable-next-line @typescript-eslint/no-magic-numbers */ const eventsToEvaluate = await events.find({}).limit(10) .toArray(); @@ -150,6 +153,7 @@ export default class NotifierWorker extends Worker { /** * Return floored average repetitions count */ + /* eslint-disable-next-line @typescript-eslint/no-magic-numbers */ return Math.floor(averageProjectRepetitionsADay / 10); } } @@ -240,7 +244,7 @@ export default class NotifierWorker extends Worker { } const channelKey: ChannelKey = [projectId, rule._id.toString(), name]; - + await this.sendToSenderWorker(channelKey, [ { key: event.groupHash, count: 1, diff --git a/workers/notifier/src/redisHelper.ts b/workers/notifier/src/redisHelper.ts index 3d2a0af4..ea5af2a7 100644 --- a/workers/notifier/src/redisHelper.ts +++ b/workers/notifier/src/redisHelper.ts @@ -17,8 +17,8 @@ export default class RedisHelper { private logger = createLogger(); /** - * @param projectId - * @param groupHash + * @param projectId - id of the project to add the event to digest + * @param groupHash - hash of the event group */ public async addEventToDigest(projectId: string, groupHash: string): Promise { this.logger.debug('Stored in Redis digest'); @@ -58,7 +58,7 @@ export default class RedisHelper { * * @param projectId - id of the project to get the repetitions count from * @param groupHash - hash of the event group - * @returns + * @returns {number | null} event repetitions count from the digest */ public async getEventRepetitionsFromDigest(projectId: string, groupHash: string): Promise { const digestRepetitionCount = await this.redisClient.get(`digest:${projectId}:${groupHash}`); @@ -80,7 +80,7 @@ export default class RedisHelper { * Method that gets the event threshold from redis storage * * @param projectId - id of the project to get the threshold from - * @returns threshold value for the project + * @returns {number | null} threshold value for the project, or null if it is not stored in redis */ public async getProjectNotificationThreshold(projectId: string): Promise { const threshold = await this.redisClient.get(`threshold:${projectId}`); From e6b521b87f36c4c2cd2e2cdc92b19d4315c4e2df Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Wed, 18 Dec 2024 21:33:34 +0300 Subject: [PATCH 06/17] refactor notifier and tests --- workers/notifier/src/index.ts | 68 ++++++++++---------- workers/notifier/src/redisHelper.ts | 91 ++++++++------------------- workers/notifier/tests/worker.test.ts | 83 ++++++++++++++++++++++++ workers/notifier/types/rule.ts | 10 +++ 4 files changed, 152 insertions(+), 100 deletions(-) diff --git a/workers/notifier/src/index.ts b/workers/notifier/src/index.ts index 078ae7aa..075094b3 100644 --- a/workers/notifier/src/index.ts +++ b/workers/notifier/src/index.ts @@ -52,52 +52,50 @@ export default class NotifierWorker extends Worker { /** * Task handling function - * - * Handling scheme: - * - * 1) On task received - * -> receive task - * -> get project notification rules - * -> filter rules - * -> check channel timer - * a) if timer doesn't exist - * -> send tasks to sender workers - * -> set timeout for minPeriod - * b) if timer exists - * -> push event to channel's buffer - * - * 2) On timeout - * -> get events from channel's buffer - * -> flush channel's buffer - * -> send tasks to sender workers - * - * @param {NotifierWorkerTask} task — task to handle + * Checks if event count is equal to the threshold and sends event to channels if it is + * Otherwise, increments the event count + * @param task — notifier task to handle */ public async handle(task: NotifierWorkerTask): Promise { try { const { projectId, event } = task; /** - * Increment event repetitions count in digest + * Get fitter rules for received event */ - this.redis.addEventToDigest(projectId, event.groupHash); - - /** - * If event is critical, then send it to the channels - */ - if (await this.isEventCritical(projectId, event)) { - const rules = await this.getFittedRules(projectId, event); + const rules = await this.getFittedRules(projectId, event); + + for (const rule of rules) { + /** + * If rule is enabled no need to store data in redis + */ + if (rule.isEnabled === false) { + return; + } +<<<<<<< Updated upstream rules.forEach((rule) => { this.addEventToChannels(projectId, rule, event); }); } +======= + const currentEventCount = await this.redis.getCurrentEventCount(rule._id.toString(), event.groupHash, rule.eventThresholdPeriod); + + /** + * If threshold reached, then send event to channels + */ + if (rule.threshold === currentEventCount) { + await this.addEventToChannels(projectId, rule, event); + } + } +>>>>>>> Stashed changes } catch (e) { this.logger.error('Failed to handle message because of ', e); } } /** +<<<<<<< Updated upstream * Method that returns threshold for current project * Used to check if event is critical or not * @@ -192,6 +190,8 @@ export default class NotifierWorker extends Worker { } /** +======= +>>>>>>> Stashed changes * Get project notifications rules that matches received event * * @param {string} projectId — project id event is related to @@ -232,24 +232,24 @@ export default class NotifierWorker extends Worker { * @param {Rule} rule - notification rule * @param {NotifierEvent} event - received event */ - private addEventToChannels(projectId: string, rule: Rule, event: NotifierEvent): void { + private async addEventToChannels(projectId: string, rule: Rule, event: NotifierEvent): Promise { const channels: Array<[string, Channel]> = Object.entries(rule.channels as { [name: string]: Channel }); - channels.forEach(async ([name, options]) => { + for (const [name, options] of channels) { /** * If channel is disabled by user, do not add event to it */ if (!options.isEnabled) { return; } - + const channelKey: ChannelKey = [projectId, rule._id.toString(), name]; - + await this.sendToSenderWorker(channelKey, [ { key: event.groupHash, count: 1, - } ]); - }); + }]); + }; } /** diff --git a/workers/notifier/src/redisHelper.ts b/workers/notifier/src/redisHelper.ts index ea5af2a7..e16b6354 100644 --- a/workers/notifier/src/redisHelper.ts +++ b/workers/notifier/src/redisHelper.ts @@ -1,5 +1,6 @@ import { createClient } from 'redis'; -import createLogger from '../../../lib/logger'; +import { Rule } from '../types/rule'; +import { NotifierEvent } from '../types/notifier-task'; /** * Class with helper functions for working with Redis @@ -11,80 +12,38 @@ export default class RedisHelper { private readonly redisClient = createClient({ url: process.env.REDIS_URL }); /** - * Logger instance - * (default level='info') + * Method that updates the event count respectfully to the threshold reset period + * @param ruleId - id of the rule used as a part of structure key + * @param groupHash - event group hash used as a part of structure key + * @param thresholdPeriod - period of time used to reset the event count + * @returns current event count */ - private logger = createLogger(); + public async getCurrentEventCount(ruleId: string, groupHash: NotifierEvent['groupHash'], thresholdPeriod: Rule['eventThresholdPeriod']): Promise { + const script = ` + local key = KEYS[1] + local currentTimestamp = tonumber(ARGV[1]) + local expirationPeriod = tonumber(ARGV[2]) - /** - * @param projectId - id of the project to add the event to digest - * @param groupHash - hash of the event group - */ - public async addEventToDigest(projectId: string, groupHash: string): Promise { - this.logger.debug('Stored in Redis digest'); + local startPeriodTimestamp = tonumber(redis.call("HGET", key, "timestamp")) - const script = ` - local structure_key = KEYS[1] - local group_hash = ARGV[1] - - -- Check if current project has a digest - if redis.call("EXISTS", structure_key) == 0 then - -- If there is no digest for current project, create it - redis.call("ZADD", structure_key, 1, group_hash) + if (startPeriodTimestamp > expirationPeriod or startPeriodTimestamp == nil) then + redis.call("HSET", key, "timestamp", currentTimestamp) + + // return 1 if period has been reset return 1 else - -- If there is a digest for current project, increment the score - local current_score = redis.call("ZSCORE", structure_key, group_hash) - if current_score then - redis.call("ZINCRBY", structure_key, 1, group_hash) - return current_score + 1 - else - redis.call("ZADD", structure_key, 1, group_hash) - return 1 - end + local newCounter = redis.call("HINCRBY", key, "counter", 1) + return newCounter end `; - const digestKey = `digest:${projectId}`; - - await this.redisClient.eval(script, { - keys: [ digestKey ], - arguments: [ groupHash ], - }); - } - - /** - * Method that get repetitions event repetitions count from todays digest of the project - * - * @param projectId - id of the project to get the repetitions count from - * @param groupHash - hash of the event group - * @returns {number | null} event repetitions count from the digest - */ - public async getEventRepetitionsFromDigest(projectId: string, groupHash: string): Promise { - const digestRepetitionCount = await this.redisClient.get(`digest:${projectId}:${groupHash}`); - - return (digestRepetitionCount !== null) ? parseInt(digestRepetitionCount) : null; - } + const key = `${ruleId}:${groupHash}:${thresholdPeriod}:times`; - /** - * Method that sets the event threshold in redis storage - * - * @param projectId - id of the project to set the threshold for - * @param threshold - threshold value to set - */ - public setProjectNotificationTreshold(projectId: string, threshold: number): void { - this.redisClient.set(`threshold:${projectId}`, threshold.toString()); - } - - /** - * Method that gets the event threshold from redis storage - * - * @param projectId - id of the project to get the threshold from - * @returns {number | null} threshold value for the project, or null if it is not stored in redis - */ - public async getProjectNotificationThreshold(projectId: string): Promise { - const threshold = await this.redisClient.get(`threshold:${projectId}`); + const currentEventCount = await this.redisClient.eval(script, { + keys: [key], + arguments: [Date.now().toString(), (Date.now() + thresholdPeriod).toString()], + }) as number; - return (threshold !== null) ? parseInt(threshold) : null; + return (currentEventCount !== null) ? currentEventCount : 0; } } diff --git a/workers/notifier/tests/worker.test.ts b/workers/notifier/tests/worker.test.ts index d8f9d2e1..a4a05492 100644 --- a/workers/notifier/tests/worker.test.ts +++ b/workers/notifier/tests/worker.test.ts @@ -11,6 +11,8 @@ const rule = { isEnabled: true, uidAdded: 'userid', whatToReceive: WhatToReceive.All, + threshold: 100, + thresholdPeriod: 60 * 1000, including: [], excluding: [], channels: { @@ -200,10 +202,32 @@ describe('NotifierWorker', () => { }); describe('handling', () => { +<<<<<<< Updated upstream it('should correctly handle first message', async () => { const worker = new NotifierWorker(); await worker.start(); +======= + it('should send task if event threshold reached', async () => { + const worker = new NotifierWorker(); + + jest.mock('../src/redisHelper'); + + worker.redis.redisClient.eval = jest.fn(async () => { + return Promise.resolve(); + }); + + await worker.start(); + + const message = { ...messageMock }; + + /** + * Current event count is equal to rule threshold + */ + RedisHelper.prototype.getCurrentEventCount = jest.fn(async () => { + return Promise.resolve(rule.threshold); + }); +>>>>>>> Stashed changes worker.sendToSenderWorker = jest.fn(); @@ -220,6 +244,7 @@ describe('NotifierWorker', () => { await worker.handle(message); +<<<<<<< Updated upstream expect(worker.buffer.setTimer).toBeCalledTimes(2); expect(worker.buffer.push).not.toBeCalled(); @@ -236,14 +261,39 @@ describe('NotifierWorker', () => { events ); }); +======= + expect(worker.sendToSenderWorker).toHaveBeenCalled(); +>>>>>>> Stashed changes await worker.finish(); }); +<<<<<<< Updated upstream it('should correctly handle messages after first one', async () => { const worker = new NotifierWorker(); await worker.start(); +======= + it('should not send task if event repetitions number is less than threshold', async () => { + const worker = new NotifierWorker(); + + jest.mock('../src/redisHelper'); + + worker.redis.redisClient.eval = jest.fn(async () => { + return Promise.resolve(); + }); + + await worker.start(); + + const message = { ...messageMock }; + + /** + * Current event count is equal to rule threshold + */ + RedisHelper.prototype.getCurrentEventCount = jest.fn(async () => { + return Promise.resolve(rule.threshold - 1); + }); +>>>>>>> Stashed changes worker.sendToSenderWorker = jest.fn(); @@ -263,6 +313,7 @@ describe('NotifierWorker', () => { await worker.handle(message); await worker.handle(message); +<<<<<<< Updated upstream expect(worker.buffer.getTimer).toBeCalledTimes(4); expect(worker.buffer.push).toBeCalledTimes(2); expect(worker.sendToSenderWorker).toBeCalledTimes(2); @@ -297,10 +348,39 @@ describe('NotifierWorker', () => { const message = { ...messageMock }; const channels = ['telegram', 'slack']; const channelKeyPart = [message.projectId, rule._id]; +======= + expect(worker.sendToSenderWorker).not.toHaveBeenCalled(); + + await worker.finish(); + }); + + it('should not send task if event repetitions number is more than threshold', async () => { + const worker = new NotifierWorker(); + + jest.mock('../src/redisHelper'); + + worker.redis.redisClient.eval = jest.fn(async () => { + return Promise.resolve(); + }); + + await worker.start(); + + const message = { ...messageMock }; + + /** + * Current event count is equal to rule threshold + */ + RedisHelper.prototype.getCurrentEventCount = jest.fn(async () => { + return Promise.resolve(rule.threshold + 1); + }); + + worker.sendToSenderWorker = jest.fn(); +>>>>>>> Stashed changes await worker.handle(message); await worker.handle(message); +<<<<<<< Updated upstream await new Promise((resolve) => setTimeout(() => { expect(worker.sendEvents).toBeCalledTimes(2); expect(worker.buffer.flush).toBeCalledTimes(2); @@ -340,6 +420,9 @@ describe('NotifierWorker', () => { expect(worker.addEventsToChannels).not.toBeCalled(); dbQueryMock = oldMock; +======= + expect(worker.sendToSenderWorker).not.toHaveBeenCalled(); +>>>>>>> Stashed changes await worker.finish(); }); diff --git a/workers/notifier/types/rule.ts b/workers/notifier/types/rule.ts index 57e72b5a..15545bbf 100644 --- a/workers/notifier/types/rule.ts +++ b/workers/notifier/types/rule.ts @@ -42,6 +42,16 @@ export interface Rule { */ readonly including: string[]; + /** + * If this number of events is reached in the eventThresholdPeriod, the rule will be triggered + */ + readonly threshold: number; + + /** + * Size of period (in milliseconds) to count events to compare to rule threshold + */ + readonly eventThresholdPeriod: number; + /** * Words event title must not include */ From 5c4d9268ec7ae37279c78e38e38fec350f80b5ba Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Wed, 18 Dec 2024 21:36:33 +0300 Subject: [PATCH 07/17] refactor notifier and tests --- workers/notifier/tests/worker.test.ts | 112 -------------------------- 1 file changed, 112 deletions(-) diff --git a/workers/notifier/tests/worker.test.ts b/workers/notifier/tests/worker.test.ts index a4a05492..d283a86b 100644 --- a/workers/notifier/tests/worker.test.ts +++ b/workers/notifier/tests/worker.test.ts @@ -202,12 +202,6 @@ describe('NotifierWorker', () => { }); describe('handling', () => { -<<<<<<< Updated upstream - it('should correctly handle first message', async () => { - const worker = new NotifierWorker(); - - await worker.start(); -======= it('should send task if event threshold reached', async () => { const worker = new NotifierWorker(); @@ -227,7 +221,6 @@ describe('NotifierWorker', () => { RedisHelper.prototype.getCurrentEventCount = jest.fn(async () => { return Promise.resolve(rule.threshold); }); ->>>>>>> Stashed changes worker.sendToSenderWorker = jest.fn(); @@ -244,36 +237,11 @@ describe('NotifierWorker', () => { await worker.handle(message); -<<<<<<< Updated upstream - expect(worker.buffer.setTimer).toBeCalledTimes(2); - expect(worker.buffer.push).not.toBeCalled(); - - channels.forEach((channel, i) => { - expect(worker.buffer.setTimer).toHaveBeenNthCalledWith( - i + 1, - [...channelKeyPart, channel], - rule.channels[channel].minPeriod * 1000, - worker.sendEvents - ); - expect(worker.sendToSenderWorker).toHaveBeenNthCalledWith( - i + 1, - [...channelKeyPart, channels[i]], - events - ); - }); -======= expect(worker.sendToSenderWorker).toHaveBeenCalled(); ->>>>>>> Stashed changes await worker.finish(); }); -<<<<<<< Updated upstream - it('should correctly handle messages after first one', async () => { - const worker = new NotifierWorker(); - - await worker.start(); -======= it('should not send task if event repetitions number is less than threshold', async () => { const worker = new NotifierWorker(); @@ -293,7 +261,6 @@ describe('NotifierWorker', () => { RedisHelper.prototype.getCurrentEventCount = jest.fn(async () => { return Promise.resolve(rule.threshold - 1); }); ->>>>>>> Stashed changes worker.sendToSenderWorker = jest.fn(); @@ -313,42 +280,6 @@ describe('NotifierWorker', () => { await worker.handle(message); await worker.handle(message); -<<<<<<< Updated upstream - expect(worker.buffer.getTimer).toBeCalledTimes(4); - expect(worker.buffer.push).toBeCalledTimes(2); - expect(worker.sendToSenderWorker).toBeCalledTimes(2); - - channels.forEach((channel, i) => { - expect(worker.buffer.push).toHaveBeenNthCalledWith( - i + 1, - [...channelKeyPart, channel, messageMock.event.groupHash] - ); - }); - - jest.useRealTimers(); - - await worker.finish(); - }); - - it('should send events after timeout', async () => { - const worker = new NotifierWorker(); - - await worker.start(); - - worker.sendToSenderWorker = jest.fn(); - - const realSendEvents = worker.sendEvents; - - worker.sendEvents = jest.fn((...args) => realSendEvents.apply(worker, args)); - - const realFlush = worker.buffer.flush; - - worker.buffer.flush = jest.fn((...args) => realFlush.apply(worker.buffer, args)); - - const message = { ...messageMock }; - const channels = ['telegram', 'slack']; - const channelKeyPart = [message.projectId, rule._id]; -======= expect(worker.sendToSenderWorker).not.toHaveBeenCalled(); await worker.finish(); @@ -375,54 +306,11 @@ describe('NotifierWorker', () => { }); worker.sendToSenderWorker = jest.fn(); ->>>>>>> Stashed changes await worker.handle(message); await worker.handle(message); -<<<<<<< Updated upstream - await new Promise((resolve) => setTimeout(() => { - expect(worker.sendEvents).toBeCalledTimes(2); - expect(worker.buffer.flush).toBeCalledTimes(2); - - channels.forEach((channel, i) => { - expect(worker.buffer.flush).toHaveBeenNthCalledWith( - i + 1, - [...channelKeyPart, channel] - ); - expect(worker.sendEvents).toHaveBeenNthCalledWith( - i + 1, - [...channelKeyPart, channel] - ); - }); - - resolve(); - }, 1000)); - - await worker.finish(); - }); - - it('should do nothing if project doesn\'t exist', async () => { - const worker = new NotifierWorker(); - - await worker.start(); - - worker.addEventsToChannels = jest.fn(); - - const message = { ...messageMock }; - - const oldMock = dbQueryMock; - - dbQueryMock = jest.fn(() => null); - - await worker.handle(message); - - expect(worker.addEventsToChannels).not.toBeCalled(); - - dbQueryMock = oldMock; -======= expect(worker.sendToSenderWorker).not.toHaveBeenCalled(); ->>>>>>> Stashed changes await worker.finish(); }); From 2f3661a3f75856c41f89d23727aa1a5d8619ea12 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Wed, 18 Dec 2024 21:50:53 +0300 Subject: [PATCH 08/17] lint fix and remove redundant --- workers/notifier/src/index.ts | 119 +----------- workers/notifier/src/redisHelper.ts | 5 +- workers/notifier/tests/buffer.test.ts | 254 -------------------------- workers/notifier/tests/worker.test.ts | 111 +++-------- workers/notifier/types/rule.ts | 2 +- 5 files changed, 37 insertions(+), 454 deletions(-) delete mode 100644 workers/notifier/tests/buffer.test.ts diff --git a/workers/notifier/src/index.ts b/workers/notifier/src/index.ts index 075094b3..0a0d3d5d 100644 --- a/workers/notifier/src/index.ts +++ b/workers/notifier/src/index.ts @@ -46,14 +46,15 @@ export default class NotifierWorker extends Worker { */ public async finish(): Promise { await super.finish(); - await this.accountsDb.connect(); - await this.eventsDb.connect(); + await this.accountsDb.close(); + await this.eventsDb.close(); } /** * Task handling function * Checks if event count is equal to the threshold and sends event to channels if it is * Otherwise, increments the event count + * * @param task — notifier task to handle */ public async handle(task: NotifierWorkerTask): Promise { @@ -73,12 +74,6 @@ export default class NotifierWorker extends Worker { return; } -<<<<<<< Updated upstream - rules.forEach((rule) => { - this.addEventToChannels(projectId, rule, event); - }); - } -======= const currentEventCount = await this.redis.getCurrentEventCount(rule._id.toString(), event.groupHash, rule.eventThresholdPeriod); /** @@ -87,111 +82,13 @@ export default class NotifierWorker extends Worker { if (rule.threshold === currentEventCount) { await this.addEventToChannels(projectId, rule, event); } - } ->>>>>>> Stashed changes + } } catch (e) { this.logger.error('Failed to handle message because of ', e); } } /** -<<<<<<< Updated upstream - * Method that returns threshold for current project - * Used to check if event is critical or not - * - * @param projectId - if of the project, to get notification threshold for - */ - private async getNotificationThreshold(projectId: string): Promise { - const storedEventsCount = this.redis.getProjectNotificationThreshold(projectId); - - /** - * If redis has no threshold stored, then get it from the database - */ - if (storedEventsCount === null) { - const connection = this.eventsDb.getConnection(); - - const currentTime = Date.now(); - /* eslint-disable-next-line @typescript-eslint/no-magic-numbers */ - const twoDaysAgo = currentTime - 48 * 60 * 60 * 1000; - /* eslint-disable-next-line @typescript-eslint/no-magic-numbers */ - const oneDayAgo = currentTime - 24 * 60 * 60 * 1000; - - const events = connection.collection(`events:${projectId}`); - const repetitions = connection.collection(`repetitions:${projectId}`); - - /** - * Get ten events of the current project - */ - /* eslint-disable-next-line @typescript-eslint/no-magic-numbers */ - const eventsToEvaluate = await events.find({}).limit(10) - .toArray(); - - let averageProjectRepetitionsADay = 0; - - /** - * For each event get repetitions since two days to one day ago - */ - eventsToEvaluate.forEach(async (event) => { - const repetitionsCount = await repetitions.countDocuments({ - 'payload.timestamp': { - $gte: twoDaysAgo, - $le: oneDayAgo, - }, - groupHash: event.groupHash, - }); - - averageProjectRepetitionsADay += repetitionsCount; - }); - - /** - * Set counted repetitions count into redis storage - */ - this.redis.setProjectNotificationTreshold(projectId, averageProjectRepetitionsADay); - - /** - * Return floored average repetitions count - */ - /* eslint-disable-next-line @typescript-eslint/no-magic-numbers */ - return Math.floor(averageProjectRepetitionsADay / 10); - } - } - - /** - * Check if event is critical - * - * @param projectId - id of the project to of the event - * @param {NotifierEvent} event — received event - * @returns {boolean} - */ - private async isEventCritical(projectId: string, event: NotifierEvent): Promise { - /** - * Get current event repetitions from digest - */ - const eventRepetitionsToday = await this.redis.getEventRepetitionsFromDigest(projectId, event.groupHash); - - const projectThreshold = await this.getNotificationThreshold(projectId); - - /** - * Check if event repetitions are equal to threshold - */ - if (eventRepetitionsToday !== null && eventRepetitionsToday === projectThreshold) { - return true; - /** - * Check if event is new - */ - } else if (event.isNew) { - return true; - } - - /** - * Event is not critical in other cases - */ - return false; - } - - /** -======= ->>>>>>> Stashed changes * Get project notifications rules that matches received event * * @param {string} projectId — project id event is related to @@ -242,14 +139,14 @@ export default class NotifierWorker extends Worker { if (!options.isEnabled) { return; } - + const channelKey: ChannelKey = [projectId, rule._id.toString(), name]; - + await this.sendToSenderWorker(channelKey, [ { key: event.groupHash, count: 1, - }]); - }; + } ]); + } } /** diff --git a/workers/notifier/src/redisHelper.ts b/workers/notifier/src/redisHelper.ts index e16b6354..354f9f59 100644 --- a/workers/notifier/src/redisHelper.ts +++ b/workers/notifier/src/redisHelper.ts @@ -13,10 +13,11 @@ export default class RedisHelper { /** * Method that updates the event count respectfully to the threshold reset period + * * @param ruleId - id of the rule used as a part of structure key * @param groupHash - event group hash used as a part of structure key * @param thresholdPeriod - period of time used to reset the event count - * @returns current event count + * @returns {number} current event count */ public async getCurrentEventCount(ruleId: string, groupHash: NotifierEvent['groupHash'], thresholdPeriod: Rule['eventThresholdPeriod']): Promise { const script = ` @@ -40,7 +41,7 @@ export default class RedisHelper { const key = `${ruleId}:${groupHash}:${thresholdPeriod}:times`; const currentEventCount = await this.redisClient.eval(script, { - keys: [key], + keys: [ key ], arguments: [Date.now().toString(), (Date.now() + thresholdPeriod).toString()], }) as number; diff --git a/workers/notifier/tests/buffer.test.ts b/workers/notifier/tests/buffer.test.ts deleted file mode 100644 index 40c5e26d..00000000 --- a/workers/notifier/tests/buffer.test.ts +++ /dev/null @@ -1,254 +0,0 @@ -/* eslint-disable @typescript-eslint/ban-ts-comment */ -import Buffer from '../src/buffer'; -import '../../../env-test'; - -describe('Buffer', () => { - describe('getField', () => { - const field = 'field'; - const defaultValue = 1; - - it('should set field if not exists', () => { - const obj = {}; - const buffer = new Buffer(); - - // @ts-ignore - buffer.getField(obj, field, defaultValue); - - expect(field in obj).toBeTruthy(); - }); - - it('should return default value if field doesn\'t exist', () => { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const obj = {} as any; - const buffer = new Buffer(); - - // @ts-ignore - buffer.getField(obj, field, defaultValue); - - expect(obj[field]).toEqual(defaultValue); - }); - - it('should return existing value if field exists', () => { - const value = 5; - const obj = { - [field]: value, - }; - const buffer = new Buffer(); - - // @ts-ignore - buffer.getField(obj, field, defaultValue); - - expect(obj[field]).toEqual(value); - }); - }); - - const projectId = 'project'; - const ruleId = 'rule'; - const channelName = 'channel'; - const key = 'event'; - - describe('push', () => { - it('should add new event entity with 1 as default value', () => { - const buffer = new Buffer(); - - buffer.push([projectId, ruleId, channelName, key]); - - // @ts-ignore - expect(buffer.projects[projectId][ruleId][channelName].payload[key]).toBeDefined(); - - // @ts-ignore - expect(buffer.projects[projectId][ruleId][channelName].payload[key]).toEqual(1); - }); - - it('should increment value by key', () => { - const buffer = new Buffer(); - - buffer.push([projectId, ruleId, channelName, key]); - buffer.push([projectId, ruleId, channelName, key]); - - // @ts-ignore - expect(buffer.projects[projectId][ruleId][channelName].payload[key]).toEqual(2); - }); - }); - - describe('get', () => { - it('should return value by key', () => { - const buffer = new Buffer(); - - buffer.push([projectId, ruleId, channelName, key]); - - const data = buffer.get([projectId, ruleId, channelName, key]); - - // @ts-ignore - expect(data).toEqual(buffer.projects[projectId][ruleId][channelName].payload[key]); - }); - - it('should return array of tuples if key not specified', () => { - const buffer = new Buffer(); - - buffer.push([projectId, ruleId, channelName, key]); - - // @ts-ignore - const tuples = Object.entries(buffer.projects[projectId][ruleId][channelName].payload) - .map(([k, count]) => ({ - key: k, - count, - })); - - expect(buffer.get([projectId, ruleId, channelName])).toStrictEqual(tuples); - }); - - it('should return undefined if key doesn\'t exist', () => { - const buffer = new Buffer(); - - expect(buffer.get([projectId, ruleId, channelName, key])).toBeUndefined(); - }); - }); - - describe('size', () => { - it('should return 0 if buffer is empty', () => { - const buffer = new Buffer(); - - expect(buffer.size([projectId, ruleId, channelName])).toEqual(0); - }); - - it('should return buffer size', () => { - const buffer = new Buffer(); - - buffer.push([projectId, ruleId, channelName, 'event1']); - - expect(buffer.size([projectId, ruleId, channelName])).toEqual(1); - - buffer.push([projectId, ruleId, channelName, 'event2']); - - expect(buffer.size([projectId, ruleId, channelName])).toEqual(2); - }); - }); - - describe('setTimer', () => { - it('should set timer', () => { - const buffer = new Buffer(); - - buffer.setTimer([projectId, ruleId, channelName], 10, jest.fn()); - - // @ts-ignore - const timer = buffer.projects[projectId][ruleId][channelName].timer; - - expect(timer).not.toEqual(0); - }); - - it('should call callback', (done) => { - const buffer = new Buffer(); - - const callback = jest.fn(() => { - expect(callback).toBeCalled(); - done(); - }); - - buffer.setTimer([projectId, ruleId, channelName], 500, callback); - }); - - it('should call callback with arguments', (done) => { - const buffer = new Buffer(); - - const callback = jest.fn(() => { - expect(callback).toBeCalledWith([projectId, ruleId, channelName]); - done(); - }); - - buffer.setTimer([projectId, ruleId, channelName], 1000, callback); - }); - }); - - describe('getTimer', () => { - it('should return null timer doesn\'t exist', () => { - const buffer = new Buffer(); - - expect(buffer.getTimer([projectId, ruleId, channelName])).toBeNull(); - }); - - it('should return timer', () => { - const buffer = new Buffer(); - - buffer.setTimer([projectId, ruleId, channelName], 10, jest.fn()); - - // @ts-ignore - const timer = buffer.projects[projectId][ruleId][channelName].timer; - - expect(buffer.getTimer([projectId, ruleId, channelName])).toEqual(timer); - }); - }); - - describe('clearTimer', () => { - it('should clear timer', () => { - const buffer = new Buffer(); - - jest.useFakeTimers(); - - buffer.setTimer([projectId, ruleId, channelName], 10000, jest.fn()); - - buffer.clearTimer([projectId, ruleId, channelName]); - - // @ts-ignore - const timer = buffer.projects[projectId][ruleId][channelName].timer; - - expect(timer).toBeNull(); - expect(clearTimeout).toBeCalled(); - - jest.useRealTimers(); - }); - }); - - describe('flush', () => { - it('should clear payload', () => { - const buffer = new Buffer(); - - buffer.push([projectId, ruleId, channelName, key]); - - expect(buffer.get([projectId, ruleId, channelName, key])).toEqual(1); - - buffer.flush([projectId, ruleId, channelName]); - - // @ts-ignore - expect(buffer.projects[projectId][ruleId][channelName].payload).toEqual({}); - }); - - it('should return payload after flush', () => { - const buffer = new Buffer(); - - buffer.push([projectId, ruleId, channelName, key]); - - const data = buffer.get([projectId, ruleId, channelName]); - - expect(buffer.flush([projectId, ruleId, channelName])).toStrictEqual(data); - }); - }); - - describe('flushAll', () => { - it('should clear project payload', () => { - const buffer = new Buffer(); - - buffer.push([projectId, ruleId, channelName, key]); - - expect(buffer.get([projectId, ruleId, channelName, key])).toEqual(1); - - buffer.flushAll(projectId); - - // @ts-ignore - expect(buffer.projects[projectId]).toEqual({}); - }); - - it('should clear all projects', () => { - const buffer = new Buffer(); - - buffer.push([projectId, ruleId, channelName, key]); - - expect(buffer.get([projectId, ruleId, channelName, key])).toEqual(1); - - buffer.flushAll(); - - // @ts-ignore - expect(buffer.projects).toEqual({}); - }); - }); -}); diff --git a/workers/notifier/tests/worker.test.ts b/workers/notifier/tests/worker.test.ts index d283a86b..ccd4852e 100644 --- a/workers/notifier/tests/worker.test.ts +++ b/workers/notifier/tests/worker.test.ts @@ -2,7 +2,7 @@ import { ObjectID } from 'mongodb'; import { WhatToReceive } from '../src/validator'; import * as messageMock from './mock.json'; import '../../../env-test'; -import waitForExpect from 'wait-for-expect'; +import RedisHelper from '../src/redisHelper'; /* eslint-disable @typescript-eslint/no-explicit-any */ @@ -34,7 +34,7 @@ const rule = { }, } as any; -let dbQueryMock = jest.fn(() => ({ +const dbQueryMock = jest.fn(() => ({ notifications: [ rule ], })) as any; @@ -123,6 +123,10 @@ describe('NotifierWorker', () => { it('should get db connection on message handle', async () => { const worker = new NotifierWorker(); + worker.redis.redisClient.eval = jest.fn(async () => { + return Promise.resolve(); + }); + await worker.start(); const message = { ...messageMock }; @@ -139,6 +143,10 @@ describe('NotifierWorker', () => { it('should get db connection on message handle and cache result', async () => { const worker = new NotifierWorker(); + worker.redis.redisClient.eval = jest.fn(async () => { + return Promise.resolve(); + }); + await worker.start(); const message = { ...messageMock }; @@ -160,6 +168,10 @@ describe('NotifierWorker', () => { const worker = new NotifierWorker(); const message = { ...messageMock }; + worker.redis.redisClient.eval = jest.fn(async () => { + return Promise.resolve(); + }); + await worker.start(); worker.sendToSenderWorker = jest.fn(); @@ -175,6 +187,10 @@ describe('NotifierWorker', () => { const worker = new NotifierWorker(); const message = { ...messageMock }; + worker.redis.redisClient.eval = jest.fn(async () => { + return Promise.resolve(); + }); + await worker.start(); worker.sendToSenderWorker = jest.fn(); @@ -212,9 +228,9 @@ describe('NotifierWorker', () => { }); await worker.start(); - + const message = { ...messageMock }; - + /** * Current event count is equal to rule threshold */ @@ -224,17 +240,6 @@ describe('NotifierWorker', () => { worker.sendToSenderWorker = jest.fn(); - worker.buffer.push = jest.fn(); - worker.buffer.setTimer = jest.fn(); - - const message = { ...messageMock }; - const channels = ['telegram', 'slack']; - const channelKeyPart = [message.projectId, rule._id]; - const events = [ { - key: message.event.groupHash, - count: 1, - } ]; - await worker.handle(message); expect(worker.sendToSenderWorker).toHaveBeenCalled(); @@ -252,9 +257,9 @@ describe('NotifierWorker', () => { }); await worker.start(); - + const message = { ...messageMock }; - + /** * Current event count is equal to rule threshold */ @@ -264,27 +269,13 @@ describe('NotifierWorker', () => { worker.sendToSenderWorker = jest.fn(); - jest.useFakeTimers(); - - const realGetTimer = worker.buffer.getTimer; - const realSetTimer = worker.buffer.setTimer; - - worker.buffer.getTimer = jest.fn((...args) => realGetTimer.apply(worker.buffer, args)); - worker.buffer.setTimer = jest.fn((...args) => realSetTimer.apply(worker.buffer, args)); - worker.buffer.push = jest.fn(); - - const message = { ...messageMock }; - const channels = ['telegram', 'slack']; - const channelKeyPart = [message.projectId, rule._id]; - - await worker.handle(message); await worker.handle(message); expect(worker.sendToSenderWorker).not.toHaveBeenCalled(); await worker.finish(); }); - + it('should not send task if event repetitions number is more than threshold', async () => { const worker = new NotifierWorker(); @@ -295,9 +286,9 @@ describe('NotifierWorker', () => { }); await worker.start(); - + const message = { ...messageMock }; - + /** * Current event count is equal to rule threshold */ @@ -315,56 +306,4 @@ describe('NotifierWorker', () => { await worker.finish(); }); }); - - it('should send task to sender workers', async () => { - const worker = new NotifierWorker(); - - await worker.start(); - - await worker.start(); - - worker.addTask = jest.fn(); - - const message = { ...messageMock }; - - await worker.handle(message); - - await waitForExpect(() => { - expect(worker.addTask).toHaveBeenNthCalledWith( - 1, - `sender/telegram`, - { - type: 'event', - payload: { - projectId: message.projectId, - ruleId: rule._id, - events: [ { - key: message.event.groupHash, - count: 1, - } ], - }, - } - ); - }, 2000); - - await waitForExpect(() => { - expect(worker.addTask).toHaveBeenNthCalledWith( - 2, - `sender/slack`, - { - type: 'event', - payload: { - projectId: message.projectId, - ruleId: rule._id, - events: [ { - key: message.event.groupHash, - count: 1, - } ], - }, - } - ); - }, 2000); - - await worker.finish(); - }); }); diff --git a/workers/notifier/types/rule.ts b/workers/notifier/types/rule.ts index 15545bbf..6b48fc20 100644 --- a/workers/notifier/types/rule.ts +++ b/workers/notifier/types/rule.ts @@ -48,7 +48,7 @@ export interface Rule { readonly threshold: number; /** - * Size of period (in milliseconds) to count events to compare to rule threshold + * Size of period (in milliseconds) to count events to compare to rule threshold */ readonly eventThresholdPeriod: number; From 3184566a3fc39e66980e5d11b3f2d2d934ee33d9 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Wed, 18 Dec 2024 22:06:09 +0300 Subject: [PATCH 09/17] remove unneeded database connection --- workers/notifier/src/index.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/workers/notifier/src/index.ts b/workers/notifier/src/index.ts index 0a0d3d5d..d6a06795 100644 --- a/workers/notifier/src/index.ts +++ b/workers/notifier/src/index.ts @@ -25,7 +25,6 @@ export default class NotifierWorker extends Worker { * Database Controllers */ private accountsDb: DatabaseController = new DatabaseController(process.env.MONGO_ACCOUNTS_DATABASE_URI); - private eventsDb: DatabaseController = new DatabaseController(process.env.MONGO_EVENTS_DATABASE_URI); /** * Redis helper instance for modifying data through redis @@ -37,7 +36,6 @@ export default class NotifierWorker extends Worker { */ public async start(): Promise { await this.accountsDb.connect(); - await this.eventsDb.connect(); await super.start(); } @@ -47,7 +45,6 @@ export default class NotifierWorker extends Worker { public async finish(): Promise { await super.finish(); await this.accountsDb.close(); - await this.eventsDb.close(); } /** From a740acf5a3e4f0dfa0ba84bc19e59b7c378261b0 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Wed, 18 Dec 2024 22:31:25 +0300 Subject: [PATCH 10/17] improve naming and lua logic --- workers/notifier/src/index.ts | 2 +- workers/notifier/src/redisHelper.ts | 12 +++++------- workers/notifier/tests/worker.test.ts | 6 +++--- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/workers/notifier/src/index.ts b/workers/notifier/src/index.ts index d6a06795..774c30b7 100644 --- a/workers/notifier/src/index.ts +++ b/workers/notifier/src/index.ts @@ -71,7 +71,7 @@ export default class NotifierWorker extends Worker { return; } - const currentEventCount = await this.redis.getCurrentEventCount(rule._id.toString(), event.groupHash, rule.eventThresholdPeriod); + const currentEventCount = await this.redis.computeEventCountForPeriod(rule._id.toString(), event.groupHash, rule.eventThresholdPeriod); /** * If threshold reached, then send event to channels diff --git a/workers/notifier/src/redisHelper.ts b/workers/notifier/src/redisHelper.ts index 354f9f59..e6ed6a28 100644 --- a/workers/notifier/src/redisHelper.ts +++ b/workers/notifier/src/redisHelper.ts @@ -19,7 +19,7 @@ export default class RedisHelper { * @param thresholdPeriod - period of time used to reset the event count * @returns {number} current event count */ - public async getCurrentEventCount(ruleId: string, groupHash: NotifierEvent['groupHash'], thresholdPeriod: Rule['eventThresholdPeriod']): Promise { + public async computeEventCountForPeriod(ruleId: string, groupHash: NotifierEvent['groupHash'], thresholdPeriod: Rule['eventThresholdPeriod']): Promise { const script = ` local key = KEYS[1] local currentTimestamp = tonumber(ARGV[1]) @@ -29,13 +29,11 @@ export default class RedisHelper { if (startPeriodTimestamp > expirationPeriod or startPeriodTimestamp == nil) then redis.call("HSET", key, "timestamp", currentTimestamp) - - // return 1 if period has been reset - return 1 - else - local newCounter = redis.call("HINCRBY", key, "counter", 1) - return newCounter + redis.call("HSET", key, "counter", 0) end + + local newCounter = redis.call("HINCRBY", key, "counter", 1) + return newCounter `; const key = `${ruleId}:${groupHash}:${thresholdPeriod}:times`; diff --git a/workers/notifier/tests/worker.test.ts b/workers/notifier/tests/worker.test.ts index ccd4852e..800e9e53 100644 --- a/workers/notifier/tests/worker.test.ts +++ b/workers/notifier/tests/worker.test.ts @@ -234,7 +234,7 @@ describe('NotifierWorker', () => { /** * Current event count is equal to rule threshold */ - RedisHelper.prototype.getCurrentEventCount = jest.fn(async () => { + RedisHelper.prototype.computeEventCountForPeriod = jest.fn(async () => { return Promise.resolve(rule.threshold); }); @@ -263,7 +263,7 @@ describe('NotifierWorker', () => { /** * Current event count is equal to rule threshold */ - RedisHelper.prototype.getCurrentEventCount = jest.fn(async () => { + RedisHelper.prototype.computeEventCountForPeriod = jest.fn(async () => { return Promise.resolve(rule.threshold - 1); }); @@ -292,7 +292,7 @@ describe('NotifierWorker', () => { /** * Current event count is equal to rule threshold */ - RedisHelper.prototype.getCurrentEventCount = jest.fn(async () => { + RedisHelper.prototype.computeEventCountForPeriod = jest.fn(async () => { return Promise.resolve(rule.threshold + 1); }); From 63447b2d2bfc502f5531ddc7a9116611b1b0e462 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sat, 21 Dec 2024 20:33:08 +0300 Subject: [PATCH 11/17] implement redis testcontainer --- jest.setup.redis-mock.js | 25 +++- package.json | 1 + workers/notifier/src/index.ts | 2 + workers/notifier/src/redisHelper.ts | 34 +++++- yarn.lock | 181 +++++++++++++++++++++++++++- 5 files changed, 233 insertions(+), 10 deletions(-) diff --git a/jest.setup.redis-mock.js b/jest.setup.redis-mock.js index 9af7d052..0ad2a8ca 100644 --- a/jest.setup.redis-mock.js +++ b/jest.setup.redis-mock.js @@ -1,4 +1,25 @@ +const { GenericContainer } = require('testcontainers'); + +let redisTestContainer; + /** - * Mock redis library + * Create test container with Redis, which could be used in tests */ -jest.mock('redis', () => jest.requireActual('redis-mock')); +beforeAll(async () => { + redisTestContainer = await new GenericContainer('redis') + .withExposedPorts(6379) + .start(); + + const port = redisTestContainer.getMappedPort(6379); + const host = redisTestContainer.getContainerIpAddress(); + + /** + * Set environment variable for redisHelper to connect to redis container + */ + process.env.REDIS_URL = `redis://${host}:${port}`; + }, +); + +afterAll(async () => { + await redisTestContainer.stop(); +}); diff --git a/package.json b/package.json index 3d85f82d..63c52d24 100644 --- a/package.json +++ b/package.json @@ -70,6 +70,7 @@ }, "devDependencies": { "@shelf/jest-mongodb": "^1.2.3", + "testcontainers": "^3.0.0", "eslint": "^7.14.0", "eslint-config-codex": "^1.6.1", "jest": "25.5.4", diff --git a/workers/notifier/src/index.ts b/workers/notifier/src/index.ts index 774c30b7..efe90fab 100644 --- a/workers/notifier/src/index.ts +++ b/workers/notifier/src/index.ts @@ -36,6 +36,7 @@ export default class NotifierWorker extends Worker { */ public async start(): Promise { await this.accountsDb.connect(); + await this.redis.initialize(); await super.start(); } @@ -45,6 +46,7 @@ export default class NotifierWorker extends Worker { public async finish(): Promise { await super.finish(); await this.accountsDb.close(); + await this.redis.close(); } /** diff --git a/workers/notifier/src/redisHelper.ts b/workers/notifier/src/redisHelper.ts index e6ed6a28..8854c661 100644 --- a/workers/notifier/src/redisHelper.ts +++ b/workers/notifier/src/redisHelper.ts @@ -1,4 +1,4 @@ -import { createClient } from 'redis'; +import { createClient, RedisClientType } from 'redis'; import { Rule } from '../types/rule'; import { NotifierEvent } from '../types/notifier-task'; @@ -9,7 +9,31 @@ export default class RedisHelper { /** * Redis client for making queries */ - private readonly redisClient = createClient({ url: process.env.REDIS_URL }); + private readonly redisClient: RedisClientType; + + constructor() { + this.redisClient = createClient({ url: process.env.REDIS_URL }); + + this.redisClient.on('error', (error) => { + console.error(error); + }); + } + + /** + * Connect to redis client + */ + public async initialize() { + await this.redisClient.connect(); + } + + /** + * Close redis client + */ + public async close() { + if (this.redisClient.isOpen) { + await this.redisClient.quit(); + } + } /** * Method that updates the event count respectfully to the threshold reset period @@ -23,11 +47,11 @@ export default class RedisHelper { const script = ` local key = KEYS[1] local currentTimestamp = tonumber(ARGV[1]) - local expirationPeriod = tonumber(ARGV[2]) + local thresholdExpirationPeriod = tonumber(ARGV[2]) local startPeriodTimestamp = tonumber(redis.call("HGET", key, "timestamp")) - if (startPeriodTimestamp > expirationPeriod or startPeriodTimestamp == nil) then + if ((startPeriodTimestamp == nil) or (currentTimestamp <= startPeriodTimestamp + thresholdExpirationPeriod)) then redis.call("HSET", key, "timestamp", currentTimestamp) redis.call("HSET", key, "counter", 0) end @@ -37,7 +61,7 @@ export default class RedisHelper { `; const key = `${ruleId}:${groupHash}:${thresholdPeriod}:times`; - + const currentEventCount = await this.redisClient.eval(script, { keys: [ key ], arguments: [Date.now().toString(), (Date.now() + thresholdPeriod).toString()], diff --git a/yarn.lock b/yarn.lock index 7fbaf0c6..86e71156 100644 --- a/yarn.lock +++ b/yarn.lock @@ -297,6 +297,11 @@ "@babel/helper-validator-identifier" "^7.14.9" to-fast-properties "^2.0.0" +"@balena/dockerignore@^1.0.2": + version "1.0.2" + resolved "https://registry.yarnpkg.com/@balena/dockerignore/-/dockerignore-1.0.2.tgz#9ffe4726915251e8eb69f44ef3547e0da2c03e0d" + integrity sha512-wMue2Sy4GAVTk6Ic4tJVcnfdau+gx2EnG7S+uAEe+TWJFqE4YoWN4/H8MSLj4eYJKxGg26lZwboEniNiNwZQ6Q== + "@bcoe/v8-coverage@^0.2.3": version "0.2.3" resolved "https://registry.yarnpkg.com/@bcoe/v8-coverage/-/v8-coverage-0.2.3.tgz#75a2e8b51cb758a7553d6804a5932d7aace75c39" @@ -698,6 +703,13 @@ dependencies: bson "*" +"@types/dockerode@^2.5.34": + version "2.5.34" + resolved "https://registry.yarnpkg.com/@types/dockerode/-/dockerode-2.5.34.tgz#9adb884f7cc6c012a6eb4b2ad794cc5d01439959" + integrity sha512-LcbLGcvcBwBAvjH9UrUI+4qotY+A5WCer5r43DR5XHv2ZIEByNXFdPLo1XxR+v/BjkGjlggW8qUiXuVEhqfkpA== + dependencies: + "@types/node" "*" + "@types/dotenv@^8.2.0": version "8.2.0" resolved "https://registry.yarnpkg.com/@types/dotenv/-/dotenv-8.2.0.tgz#5cd64710c3c98e82d9d15844375a33bf1b45d053" @@ -1204,6 +1216,11 @@ ansi-styles@^4.0.0, ansi-styles@^4.1.0: dependencies: color-convert "^2.0.1" +any-promise@^1.1.0: + version "1.3.0" + resolved "https://registry.yarnpkg.com/any-promise/-/any-promise-1.3.0.tgz#abc6afeedcea52e809cdc0376aed3ce39635d17f" + integrity sha512-7UvmKalWRt1wgjL1RrGxoSJW/0QZFIegpeGvZG9kjp8vrRu55XTHbwnqq2GpXm9uLbcuhxm3IqX9OB4MZR1b2A== + anymatch@^2.0.0: version "2.0.0" resolved "https://registry.yarnpkg.com/anymatch/-/anymatch-2.0.0.tgz#bcb24b4f37934d9aa7ac17b4adaf89e7c76ef2eb" @@ -1305,6 +1322,13 @@ asn1.js@^5.2.0: minimalistic-assert "^1.0.0" safer-buffer "^2.1.0" +asn1@^0.2.6: + version "0.2.6" + resolved "https://registry.yarnpkg.com/asn1/-/asn1-0.2.6.tgz#0d3a7bb6e64e02a90c0303b31f292868ea09a08d" + integrity sha512-ix/FxPn0MDjeyJ7i/yoHGFt/EX6LyNbxSEhPPXODPL+KB0VPk86UYfL0lMdy+KCnv+fmvIzySwaK5COwqVbWTQ== + dependencies: + safer-buffer "~2.1.0" + asn1@~0.2.3: version "0.2.4" resolved "https://registry.yarnpkg.com/asn1/-/asn1-0.2.4.tgz#8d2475dfab553bb33e77b54e59e880bb8ce23136" @@ -1471,10 +1495,10 @@ base@^0.11.1: mixin-deep "^1.2.0" pascalcase "^0.1.1" -bcrypt-pbkdf@^1.0.0: +bcrypt-pbkdf@^1.0.0, bcrypt-pbkdf@^1.0.2: version "1.0.2" resolved "https://registry.yarnpkg.com/bcrypt-pbkdf/-/bcrypt-pbkdf-1.0.2.tgz#a4301d389b6a43f9b67ff3ca11a3f6637e360e9e" - integrity sha1-pDAdOJtqQ/m2f/PKEaP2Y342Dp4= + integrity sha512-qeFIXtP4MSoi6NLqO12WfqARWWuCKi2Rn/9hJLEmtB5yTNr9DqFWkJRCf2qShWzPeAMRnOgCrq0sg/KLv5ES9w== dependencies: tweetnacl "^0.14.3" @@ -1748,11 +1772,21 @@ buffer@^5.5.0, buffer@^5.6.0: base64-js "^1.3.1" ieee754 "^1.1.13" +buildcheck@~0.0.6: + version "0.0.6" + resolved "https://registry.yarnpkg.com/buildcheck/-/buildcheck-0.0.6.tgz#89aa6e417cfd1e2196e3f8fe915eb709d2fe4238" + integrity sha512-8f9ZJCUXyT1M35Jx7MkBgmBMo3oHTTBIPLiY9xyL0pl3T5RwcPEY8cUHr5LBNfu/fk6c2T4DJZuVM/8ZZT2D2A== + builtin-status-codes@^3.0.0: version "3.0.0" resolved "https://registry.yarnpkg.com/builtin-status-codes/-/builtin-status-codes-3.0.0.tgz#85982878e21b98e1c66425e03d0174788f569ee8" integrity sha1-hZgoeOIbmOHGZCXgPQF0eI9Wnug= +byline@^5.0.0: + version "5.0.0" + resolved "https://registry.yarnpkg.com/byline/-/byline-5.0.0.tgz#741c5216468eadc457b03410118ad77de8c1ddb1" + integrity sha512-s6webAy+R4SR8XVuJWt2V2rGvhnrhxN+9S15GNuTK3wKPOXFF6RNc+8ug2XhH+2s4f+uudG4kUVYmYOQWL2g0Q== + cacache@^12.0.2: version "12.0.4" resolved "https://registry.yarnpkg.com/cacache/-/cacache-12.0.4.tgz#668bcbd105aeb5f1d92fe25570ec9525c8faa40c" @@ -2196,6 +2230,14 @@ core-util-is@~1.0.0: resolved "https://registry.yarnpkg.com/core-util-is/-/core-util-is-1.0.3.tgz#a6042d3634c2b27e9328f837b965fac83808db85" integrity sha512-ZQBvi1DcpJ4GDqanjucZ2Hj3wEO5pZDS89BWbkcrvdxksJorwUDDZamX9ldFkp9aw2lmBDLgkObEA4DWNJ9FYQ== +cpu-features@~0.0.10: + version "0.0.10" + resolved "https://registry.yarnpkg.com/cpu-features/-/cpu-features-0.0.10.tgz#9aae536db2710c7254d7ed67cb3cbc7d29ad79c5" + integrity sha512-9IkYqtX3YHPCzoVg1Py+o9057a3i0fp7S530UWokCSaFVTc7CwXPRiOjRjBQQ18ZCNafx78YfnG+HALxtVmOGA== + dependencies: + buildcheck "~0.0.6" + nan "^2.19.0" + create-ecdh@^4.0.0: version "4.0.4" resolved "https://registry.yarnpkg.com/create-ecdh/-/create-ecdh-4.0.4.tgz#d6e7f4bffa66736085a0762fd3a632684dabcc4e" @@ -2470,6 +2512,32 @@ dir-glob@^3.0.1: dependencies: path-type "^4.0.0" +docker-compose@^0.23.5: + version "0.23.19" + resolved "https://registry.yarnpkg.com/docker-compose/-/docker-compose-0.23.19.tgz#9947726e2fe67bdfa9e8efe1ff15aa0de2e10eb8" + integrity sha512-v5vNLIdUqwj4my80wxFDkNH+4S85zsRuH29SO7dCWVWPCMt/ohZBsGN6g6KXWifT0pzQ7uOxqEKCYCDPJ8Vz4g== + dependencies: + yaml "^1.10.2" + +docker-modem@^3.0.0: + version "3.0.8" + resolved "https://registry.yarnpkg.com/docker-modem/-/docker-modem-3.0.8.tgz#ef62c8bdff6e8a7d12f0160988c295ea8705e77a" + integrity sha512-f0ReSURdM3pcKPNS30mxOHSbaFLcknGmQjwSfmbcdOw1XWKXVhukM3NJHhr7NpY9BIyyWQb0EBo3KQvvuU5egQ== + dependencies: + debug "^4.1.1" + readable-stream "^3.5.0" + split-ca "^1.0.1" + ssh2 "^1.11.0" + +dockerode@^3.2.1: + version "3.3.5" + resolved "https://registry.yarnpkg.com/dockerode/-/dockerode-3.3.5.tgz#7ae3f40f2bec53ae5e9a741ce655fff459745629" + integrity sha512-/0YNa3ZDNeLr/tSckmD69+Gq+qVNhvKfAHNeZJBnp7EOP6RGKV8ORrJHkUn20So5wU+xxT7+1n5u8PjHbfjbSA== + dependencies: + "@balena/dockerignore" "^1.0.2" + docker-modem "^3.0.0" + tar-fs "~2.0.1" + doctrine@1.5.0: version "1.5.0" resolved "https://registry.yarnpkg.com/doctrine/-/doctrine-1.5.0.tgz#379dce730f6166f76cefa4e6707a159b02c5a6fa" @@ -3443,6 +3511,18 @@ glob@^7.1.1, glob@^7.1.2, glob@^7.1.3, glob@^7.1.4: once "^1.3.0" path-is-absolute "^1.0.0" +glob@^7.1.6: + version "7.2.3" + resolved "https://registry.yarnpkg.com/glob/-/glob-7.2.3.tgz#b8df0fb802bbfa8e89bd1d938b4e16578ed44f2b" + integrity sha512-nFR0zLpU2YCaRxwoCJvL6UvCH2JFyFVIvwTLsIf21AuHlMskA1hhTdk+LlYJtOlYt9v6dvszD2BGRqBL+iQK9Q== + dependencies: + fs.realpath "^1.0.0" + inflight "^1.0.4" + inherits "2" + minimatch "^3.1.1" + once "^1.3.0" + path-is-absolute "^1.0.0" + global-dirs@^3.0.0: version "3.0.0" resolved "https://registry.yarnpkg.com/global-dirs/-/global-dirs-3.0.0.tgz#70a76fe84ea315ab37b1f5576cbde7d48ef72686" @@ -5125,6 +5205,13 @@ minimatch@3.0.4, minimatch@3.0.x, minimatch@^3.0.4: dependencies: brace-expansion "^1.1.7" +minimatch@^3.1.1: + version "3.1.2" + resolved "https://registry.yarnpkg.com/minimatch/-/minimatch-3.1.2.tgz#19cd194bfd3e428f049a70817c038d89ab4be35b" + integrity sha512-J7p63hRiAjw1NDEww1W7i37+ByIrOWO5XQQAzZ3VOcL0PNybwpfmV/N05zFAzwQ9USyEcX6t3UO+K5aqBQOIHw== + dependencies: + brace-expansion "^1.1.7" + minimist@^1.1.1, minimist@^1.2.0, minimist@^1.2.5: version "1.2.5" resolved "https://registry.yarnpkg.com/minimist/-/minimist-1.2.5.tgz#67d66014b66a6a8aaa0c083c5fd58df4e4e97602" @@ -5169,6 +5256,11 @@ mixin-deep@^1.2.0: for-in "^1.0.2" is-extendable "^1.0.1" +mkdirp-classic@^0.5.2: + version "0.5.3" + resolved "https://registry.yarnpkg.com/mkdirp-classic/-/mkdirp-classic-0.5.3.tgz#fa10c9115cc6d8865be221ba47ee9bed78601113" + integrity sha512-gKLcREMhtuZRwRAfqP3RFW+TK4JqApVBtOIftVgjuABpAtpxhPGaDcfvbhNvD0B8iD1oUr/txX35NjcaY6Ns/A== + mkdirp@0.5.5, mkdirp@^0.5.1, mkdirp@^0.5.3, mkdirp@^0.5.5: version "0.5.5" resolved "https://registry.yarnpkg.com/mkdirp/-/mkdirp-0.5.5.tgz#d91cefd62d1436ca0f41620e251288d420099def" @@ -5310,6 +5402,11 @@ nan@^2.12.1, nan@^2.13.2: resolved "https://registry.yarnpkg.com/nan/-/nan-2.15.0.tgz#3f34a473ff18e15c1b5626b62903b5ad6e665fee" integrity sha512-8ZtvEnA2c5aYCZYd1cvgdnU6cqwixRoYg70xPLWUws5ORTa/lnw+u4amixRS/Ac5U5mQVgp9pnlSUnbNWFaWZQ== +nan@^2.19.0, nan@^2.20.0: + version "2.22.0" + resolved "https://registry.yarnpkg.com/nan/-/nan-2.22.0.tgz#31bc433fc33213c97bad36404bb68063de604de3" + integrity sha512-nbajikzWTMwsW+eSsNm3QwlOs7het9gGJU5dDZzRTQGk03vyBOauxgI4VakDzE0PtsGTmXPsXTbbjVhRwR5mpw== + nanoid@^3.1.31: version "3.1.31" resolved "https://registry.yarnpkg.com/nanoid/-/nanoid-3.1.31.tgz#f5b58a1ce1b7604da5f0605757840598d8974dc6" @@ -5363,6 +5460,11 @@ node-cache@^5.1.2: dependencies: clone "2.x" +node-duration@^1.0.4: + version "1.0.4" + resolved "https://registry.yarnpkg.com/node-duration/-/node-duration-1.0.4.tgz#3e94ecc0e473691c89c4560074503362071cecac" + integrity sha512-eUXYNSY7DL53vqfTosggWkvyIW3bhAcqBDIlolgNYlZhianXTrCL50rlUJWD1eRqkIxMppXTfiFbp+9SjpPrgA== + node-environment-flags@1.0.6: version "1.0.6" resolved "https://registry.yarnpkg.com/node-environment-flags/-/node-environment-flags-1.0.6.tgz#a30ac13621f6f7d674260a54dede048c3982c088" @@ -6279,6 +6381,15 @@ readable-stream@^3.1.1, readable-stream@^3.4.0, readable-stream@^3.6.0: string_decoder "^1.1.1" util-deprecate "^1.0.1" +readable-stream@^3.5.0: + version "3.6.2" + resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-3.6.2.tgz#56a9b36ea965c00c5a93ef31eb111a0f11056967" + integrity sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA== + dependencies: + inherits "^2.0.3" + string_decoder "^1.1.1" + util-deprecate "^1.0.1" + readdirp@^2.2.1: version "2.2.1" resolved "https://registry.yarnpkg.com/readdirp/-/readdirp-2.2.1.tgz#0e87622a3325aa33e892285caf8b4e846529a525" @@ -6894,6 +7005,11 @@ spdx-license-ids@^3.0.0: resolved "https://registry.yarnpkg.com/spdx-license-ids/-/spdx-license-ids-3.0.10.tgz#0d9becccde7003d6c658d487dd48a32f0bf3014b" integrity sha512-oie3/+gKf7QtpitB0LYLETe+k8SifzsX4KixvpOsbI6S0kRiRQ5MKOio8eMSAKQ17N06+wdEOXRiId+zOxo0hA== +split-ca@^1.0.1: + version "1.0.1" + resolved "https://registry.yarnpkg.com/split-ca/-/split-ca-1.0.1.tgz#6c83aff3692fa61256e0cd197e05e9de157691a6" + integrity sha512-Q5thBSxp5t8WPTTJQS59LrGqOZqOsrhDGDVm8azCqIBjSBd7nd9o2PM+mDulQQkh8h//4U6hFZnc/mul8t5pWQ== + split-string@^3.0.1, split-string@^3.0.2: version "3.1.0" resolved "https://registry.yarnpkg.com/split-string/-/split-string-3.1.0.tgz#7cb09dda3a86585705c64b39a6466038682e8fe2" @@ -6906,6 +7022,17 @@ sprintf-js@~1.0.2: resolved "https://registry.yarnpkg.com/sprintf-js/-/sprintf-js-1.0.3.tgz#04e6926f662895354f3dd015203633b857297e2c" integrity sha1-BOaSb2YolTVPPdAVIDYzuFcpfiw= +ssh2@^1.11.0: + version "1.16.0" + resolved "https://registry.yarnpkg.com/ssh2/-/ssh2-1.16.0.tgz#79221d40cbf4d03d07fe881149de0a9de928c9f0" + integrity sha512-r1X4KsBGedJqo7h8F5c4Ybpcr5RjyP+aWIG007uBPRjmdQWfEiVLzSK71Zji1B9sKxwaCvD8y8cwSkYrlLiRRg== + dependencies: + asn1 "^0.2.6" + bcrypt-pbkdf "^1.0.2" + optionalDependencies: + cpu-features "~0.0.10" + nan "^2.20.0" + sshpk@^1.7.0: version "1.16.1" resolved "https://registry.yarnpkg.com/sshpk/-/sshpk-1.16.1.tgz#fb661c0bef29b39db40769ee39fa70093d6f6877" @@ -6985,6 +7112,13 @@ stream-shift@^1.0.0: resolved "https://registry.yarnpkg.com/stream-shift/-/stream-shift-1.0.1.tgz#d7088281559ab2778424279b0877da3c392d5a3d" integrity sha512-AiisoFqQ0vbGcZgQPY1cdP2I76glaVA/RauYR4G4thNFgkTqr90yXTo4LYX60Jl+sIlPNHHdGSwo01AvbKUSVQ== +stream-to-array@^2.3.0: + version "2.3.0" + resolved "https://registry.yarnpkg.com/stream-to-array/-/stream-to-array-2.3.0.tgz#bbf6b39f5f43ec30bc71babcb37557acecf34353" + integrity sha512-UsZtOYEn4tWU2RGLOXr/o/xjRBftZRlG3dEWoaHr8j4GuypJ3isitGbVyjQKAuMu+xbiop8q224TjiZWc4XTZA== + dependencies: + any-promise "^1.1.0" + string-length@^3.1.0: version "3.1.0" resolved "https://registry.yarnpkg.com/string-length/-/string-length-3.1.0.tgz#107ef8c23456e187a8abd4a61162ff4ac6e25837" @@ -7177,7 +7311,27 @@ tapable@^1.0.0, tapable@^1.1.3: resolved "https://registry.yarnpkg.com/tapable/-/tapable-1.1.3.tgz#a1fccc06b58db61fd7a45da2da44f5f3a3e67ba2" integrity sha512-4WK/bYZmj8xLr+HUCODHGF1ZFzsYffasLUgEiMBY4fgtltdO6B4WJtlSbPaDTLpYTcGVwM2qLnFTICEcNxs3kA== -tar-stream@^2.1.4: +tar-fs@^2.1.0: + version "2.1.1" + resolved "https://registry.yarnpkg.com/tar-fs/-/tar-fs-2.1.1.tgz#489a15ab85f1f0befabb370b7de4f9eb5cbe8784" + integrity sha512-V0r2Y9scmbDRLCNex/+hYzvp/zyYjvFbHPNgVTKfQvVrb6guiE/fxP+XblDNR011utopbkex2nM4dHNV6GDsng== + dependencies: + chownr "^1.1.1" + mkdirp-classic "^0.5.2" + pump "^3.0.0" + tar-stream "^2.1.4" + +tar-fs@~2.0.1: + version "2.0.1" + resolved "https://registry.yarnpkg.com/tar-fs/-/tar-fs-2.0.1.tgz#e44086c1c60d31a4f0cf893b1c4e155dabfae9e2" + integrity sha512-6tzWDMeroL87uF/+lin46k+Q+46rAJ0SyPGz7OW7wTgblI273hsBqk2C1j0/xNadNLKDTUL9BukSjB7cwgmlPA== + dependencies: + chownr "^1.1.1" + mkdirp-classic "^0.5.2" + pump "^3.0.0" + tar-stream "^2.0.0" + +tar-stream@^2.0.0, tar-stream@^2.1.4: version "2.2.0" resolved "https://registry.yarnpkg.com/tar-stream/-/tar-stream-2.2.0.tgz#acad84c284136b060dc3faa64474aa9aebd77287" integrity sha512-ujeqbceABgwMZxEJnk2HDY2DlnUZ+9oEcb1KzTVfYHio0UE6dG71n60d8D2I4qNvleWrrXpmjpt7vZeF1LnMZQ== @@ -7249,6 +7403,22 @@ test-exclude@^6.0.0: glob "^7.1.4" minimatch "^3.0.4" +testcontainers@^3.0.0: + version "3.5.0" + resolved "https://registry.yarnpkg.com/testcontainers/-/testcontainers-3.5.0.tgz#da8a3fa53f1c6d106ccff7e0f71bf86d358c2589" + integrity sha512-iigGuzBDzOmTFlxfaTo1JO7EGMs4Dgf+XHd98XeEwp8J+WzOdkJfsSL5A8fdLSGQA5GvVAdxLcUfT4a32z4rbg== + dependencies: + "@types/dockerode" "^2.5.34" + byline "^5.0.0" + debug "^4.1.1" + docker-compose "^0.23.5" + dockerode "^3.2.1" + get-port "^5.1.1" + glob "^7.1.6" + node-duration "^1.0.4" + stream-to-array "^2.3.0" + tar-fs "^2.1.0" + text-hex@1.0.x: version "1.0.0" resolved "https://registry.yarnpkg.com/text-hex/-/text-hex-1.0.0.tgz#69dc9c1b17446ee79a92bf5b884bb4b9127506f5" @@ -8005,6 +8175,11 @@ yallist@^3.0.0, yallist@^3.0.2, yallist@^3.1.1: resolved "https://registry.yarnpkg.com/yallist/-/yallist-3.1.1.tgz#dbb7daf9bfd8bac9ab45ebf602b8cbad0d5d08fd" integrity sha512-a4UGQaWPH59mOXUYnAG2ewncQS4i4F43Tv3JoAM+s2VDAmS9NsK8GpDMLrCHPksFT7h3K6TOoUNn2pb7RoXx4g== +yaml@^1.10.2: + version "1.10.2" + resolved "https://registry.yarnpkg.com/yaml/-/yaml-1.10.2.tgz#2301c5ffbf12b467de8da2333a459e29e7920e4b" + integrity sha512-r3vXyErRCYJ7wg28yvBY5VSoAF8ZvlcW9/BwUzEtUsjvX/DKs24dIkuwjtuprwJJHsbyUbLApepYTR1BN4uHrg== + yargs-parser@13.1.2, yargs-parser@^13.1.2: version "13.1.2" resolved "https://registry.yarnpkg.com/yargs-parser/-/yargs-parser-13.1.2.tgz#130f09702ebaeef2650d54ce6e3e5706f7a4fb38" From 0a66ae0b5cb244fedc6be27d3a93ecb3fbe401b6 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sat, 21 Dec 2024 20:33:19 +0300 Subject: [PATCH 12/17] refactor notifier tests with testcontainer --- workers/notifier/tests/worker.test.ts | 111 ++++---------------------- 1 file changed, 14 insertions(+), 97 deletions(-) diff --git a/workers/notifier/tests/worker.test.ts b/workers/notifier/tests/worker.test.ts index 800e9e53..147c6b0b 100644 --- a/workers/notifier/tests/worker.test.ts +++ b/workers/notifier/tests/worker.test.ts @@ -82,6 +82,9 @@ class MockDBController { } describe('NotifierWorker', () => { + // eslint-disable-next-line @typescript-eslint/no-var-requires + const NotifierWorker = require('../src').default; + jest.mock('../../../lib/db/controller', () => ({ DatabaseController: MockDBController, })); @@ -89,46 +92,28 @@ describe('NotifierWorker', () => { /** * Reset calls number after each test */ - afterEach(() => { + afterEach(async () => { jest.clearAllMocks(); + await worker.finish(); }); - // eslint-disable-next-line @typescript-eslint/no-var-requires - const NotifierWorker = require('../src').default; - - it('should have correct worker type', () => { - const worker = new NotifierWorker(); - - expect(worker.type).toEqual('notifier'); - }); - - it('should start and finish without errors', async () => { - const worker = new NotifierWorker(); + /** + * Before each test create an instance of the worker and start it + */ + beforeEach(async () => { + worker = new NotifierWorker(); await worker.start(); - await worker.finish(); - }); + }) + + let worker: typeof NotifierWorker; describe('db calls', () => { it('should connect to db on start', async () => { - const worker = new NotifierWorker(); - - await worker.start(); - expect(dbConnectMock).toBeCalled(); - - await worker.finish(); }); it('should get db connection on message handle', async () => { - const worker = new NotifierWorker(); - - worker.redis.redisClient.eval = jest.fn(async () => { - return Promise.resolve(); - }); - - await worker.start(); - const message = { ...messageMock }; worker.sendToSenderWorker = jest.fn(); @@ -136,19 +121,9 @@ describe('NotifierWorker', () => { await worker.handle(message); expect(dbConnectionMock).toBeCalled(); - - await worker.finish(); }); it('should get db connection on message handle and cache result', async () => { - const worker = new NotifierWorker(); - - worker.redis.redisClient.eval = jest.fn(async () => { - return Promise.resolve(); - }); - - await worker.start(); - const message = { ...messageMock }; worker.sendToSenderWorker = jest.fn(); @@ -160,75 +135,39 @@ describe('NotifierWorker', () => { await worker.handle(message); expect(dbConnectionMock).toBeCalledTimes(1); - - await worker.finish(); }); it('should query correct collection on message handle', async () => { - const worker = new NotifierWorker(); const message = { ...messageMock }; - worker.redis.redisClient.eval = jest.fn(async () => { - return Promise.resolve(); - }); - - await worker.start(); - worker.sendToSenderWorker = jest.fn(); await worker.handle(message); expect(dbCollectionMock).toBeCalledWith('projects'); - - await worker.finish(); }); it('should query correct project on message handle', async () => { - const worker = new NotifierWorker(); const message = { ...messageMock }; - worker.redis.redisClient.eval = jest.fn(async () => { - return Promise.resolve(); - }); - - await worker.start(); - worker.sendToSenderWorker = jest.fn(); await worker.handle(message); expect(dbQueryMock).toBeCalledWith({ _id: new ObjectID(message.projectId) }); - - await worker.finish(); }); it('should close db connection on finish', async () => { - const worker = new NotifierWorker(); - - await worker.start(); - worker.sendToSenderWorker = jest.fn(); await worker.finish(); - + expect(dbCloseMock).toBeCalled(); - - await worker.finish(); }); }); describe('handling', () => { it('should send task if event threshold reached', async () => { - const worker = new NotifierWorker(); - - jest.mock('../src/redisHelper'); - - worker.redis.redisClient.eval = jest.fn(async () => { - return Promise.resolve(); - }); - - await worker.start(); - const message = { ...messageMock }; /** @@ -243,21 +182,11 @@ describe('NotifierWorker', () => { await worker.handle(message); expect(worker.sendToSenderWorker).toHaveBeenCalled(); - - await worker.finish(); }); it('should not send task if event repetitions number is less than threshold', async () => { - const worker = new NotifierWorker(); - jest.mock('../src/redisHelper'); - worker.redis.redisClient.eval = jest.fn(async () => { - return Promise.resolve(); - }); - - await worker.start(); - const message = { ...messageMock }; /** @@ -272,21 +201,11 @@ describe('NotifierWorker', () => { await worker.handle(message); expect(worker.sendToSenderWorker).not.toHaveBeenCalled(); - - await worker.finish(); }); it('should not send task if event repetitions number is more than threshold', async () => { - const worker = new NotifierWorker(); - jest.mock('../src/redisHelper'); - worker.redis.redisClient.eval = jest.fn(async () => { - return Promise.resolve(); - }); - - await worker.start(); - const message = { ...messageMock }; /** @@ -302,8 +221,6 @@ describe('NotifierWorker', () => { await worker.handle(message); expect(worker.sendToSenderWorker).not.toHaveBeenCalled(); - - await worker.finish(); }); }); }); From cf9545f1d252bbcf0f05e8de6982de7935fb8feb Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sat, 21 Dec 2024 20:37:08 +0300 Subject: [PATCH 13/17] lint fix --- jest.setup.redis-mock.js | 14 +++++++------- workers/notifier/src/redisHelper.ts | 9 ++++++--- workers/notifier/tests/worker.test.ts | 8 ++++---- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/jest.setup.redis-mock.js b/jest.setup.redis-mock.js index 0ad2a8ca..291ded59 100644 --- a/jest.setup.redis-mock.js +++ b/jest.setup.redis-mock.js @@ -10,14 +10,14 @@ beforeAll(async () => { .withExposedPorts(6379) .start(); - const port = redisTestContainer.getMappedPort(6379); - const host = redisTestContainer.getContainerIpAddress(); + const port = redisTestContainer.getMappedPort(6379); + const host = redisTestContainer.getContainerIpAddress(); - /** - * Set environment variable for redisHelper to connect to redis container - */ - process.env.REDIS_URL = `redis://${host}:${port}`; - }, + /** + * Set environment variable for redisHelper to connect to redis container + */ + process.env.REDIS_URL = `redis://${host}:${port}`; +} ); afterAll(async () => { diff --git a/workers/notifier/src/redisHelper.ts b/workers/notifier/src/redisHelper.ts index 8854c661..4201fb7d 100644 --- a/workers/notifier/src/redisHelper.ts +++ b/workers/notifier/src/redisHelper.ts @@ -11,6 +11,9 @@ export default class RedisHelper { */ private readonly redisClient: RedisClientType; + /** + * + */ constructor() { this.redisClient = createClient({ url: process.env.REDIS_URL }); @@ -22,14 +25,14 @@ export default class RedisHelper { /** * Connect to redis client */ - public async initialize() { + public async initialize(): Promise { await this.redisClient.connect(); } /** * Close redis client */ - public async close() { + public async close(): Promise { if (this.redisClient.isOpen) { await this.redisClient.quit(); } @@ -61,7 +64,7 @@ export default class RedisHelper { `; const key = `${ruleId}:${groupHash}:${thresholdPeriod}:times`; - + const currentEventCount = await this.redisClient.eval(script, { keys: [ key ], arguments: [Date.now().toString(), (Date.now() + thresholdPeriod).toString()], diff --git a/workers/notifier/tests/worker.test.ts b/workers/notifier/tests/worker.test.ts index 147c6b0b..11bb7333 100644 --- a/workers/notifier/tests/worker.test.ts +++ b/workers/notifier/tests/worker.test.ts @@ -84,6 +84,8 @@ class MockDBController { describe('NotifierWorker', () => { // eslint-disable-next-line @typescript-eslint/no-var-requires const NotifierWorker = require('../src').default; + + let worker: typeof NotifierWorker; jest.mock('../../../lib/db/controller', () => ({ DatabaseController: MockDBController, @@ -104,9 +106,7 @@ describe('NotifierWorker', () => { worker = new NotifierWorker(); await worker.start(); - }) - - let worker: typeof NotifierWorker; + }); describe('db calls', () => { it('should connect to db on start', async () => { @@ -161,7 +161,7 @@ describe('NotifierWorker', () => { worker.sendToSenderWorker = jest.fn(); await worker.finish(); - + expect(dbCloseMock).toBeCalled(); }); }); From 0c593789b1a13f6178a5eb506e8bc0b1f0408687 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sat, 21 Dec 2024 20:37:22 +0300 Subject: [PATCH 14/17] typo fix --- workers/notifier/tests/worker.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workers/notifier/tests/worker.test.ts b/workers/notifier/tests/worker.test.ts index 11bb7333..ccdf98c0 100644 --- a/workers/notifier/tests/worker.test.ts +++ b/workers/notifier/tests/worker.test.ts @@ -84,7 +84,7 @@ class MockDBController { describe('NotifierWorker', () => { // eslint-disable-next-line @typescript-eslint/no-var-requires const NotifierWorker = require('../src').default; - + let worker: typeof NotifierWorker; jest.mock('../../../lib/db/controller', () => ({ From fb62731d2bcf18687e33b3975f937a61a1be6c3d Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sat, 21 Dec 2024 20:41:49 +0300 Subject: [PATCH 15/17] update lock file --- yarn.lock | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/yarn.lock b/yarn.lock index bca134cc..fe73d119 100644 --- a/yarn.lock +++ b/yarn.lock @@ -608,6 +608,11 @@ resolved "https://registry.yarnpkg.com/@redis/time-series/-/time-series-1.1.0.tgz#cba454c05ec201bd5547aaf55286d44682ac8eb5" integrity sha512-c1Q99M5ljsIuc4YdaCwfUEXsofakb9c8+Zse2qxTadu8TalLXuAESzLvFAvNVbkmSlvlzIQOLpBCmWI9wTOt+g== +"@sentry/core@^8.45.1": + version "8.47.0" + resolved "https://registry.yarnpkg.com/@sentry/core/-/core-8.47.0.tgz#e811444552f7a91b5de573875a318a6cd4e802f8" + integrity sha512-iSEJZMe3DOcqBFZQAqgA3NB2lCWBc4Gv5x/SCri/TVg96wAlss4VrUunSI2Mp0J4jJ5nJcJ2ChqHSBAU48k3FA== + "@shelf/jest-mongodb@^1.2.3": version "1.3.4" resolved "https://registry.yarnpkg.com/@shelf/jest-mongodb/-/jest-mongodb-1.3.4.tgz#200bac386cf513bed2d41952b1857689f0b88f31" @@ -6406,7 +6411,7 @@ read-pkg@^5.2.0: isarray "0.0.1" string_decoder "~0.10.x" -readable-stream@^3.1.1: +readable-stream@^3.1.1, readable-stream@^3.5.0: version "3.6.2" resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-3.6.2.tgz#56a9b36ea965c00c5a93ef31eb111a0f11056967" integrity sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA== @@ -6424,15 +6429,6 @@ readable-stream@^3.4.0, readable-stream@^3.6.0: string_decoder "^1.1.1" util-deprecate "^1.0.1" -readable-stream@^3.5.0: - version "3.6.2" - resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-3.6.2.tgz#56a9b36ea965c00c5a93ef31eb111a0f11056967" - integrity sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA== - dependencies: - inherits "^2.0.3" - string_decoder "^1.1.1" - util-deprecate "^1.0.1" - readdirp@^2.2.1: version "2.2.1" resolved "https://registry.yarnpkg.com/readdirp/-/readdirp-2.2.1.tgz#0e87622a3325aa33e892285caf8b4e846529a525" From 9f6b401dcabea9dba702f8ef89030393bf4e9ae4 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sat, 21 Dec 2024 20:46:44 +0300 Subject: [PATCH 16/17] fix lua script and naming --- workers/notifier/src/redisHelper.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/workers/notifier/src/redisHelper.ts b/workers/notifier/src/redisHelper.ts index 4201fb7d..d9ec9bec 100644 --- a/workers/notifier/src/redisHelper.ts +++ b/workers/notifier/src/redisHelper.ts @@ -54,12 +54,12 @@ export default class RedisHelper { local startPeriodTimestamp = tonumber(redis.call("HGET", key, "timestamp")) - if ((startPeriodTimestamp == nil) or (currentTimestamp <= startPeriodTimestamp + thresholdExpirationPeriod)) then + if ((startPeriodTimestamp == nil) or (currentTimestamp >= startPeriodTimestamp + thresholdExpirationPeriod)) then redis.call("HSET", key, "timestamp", currentTimestamp) - redis.call("HSET", key, "counter", 0) + redis.call("HSET", key, "eventsCount", 0) end - local newCounter = redis.call("HINCRBY", key, "counter", 1) + local newCounter = redis.call("HINCRBY", key, "eventsCount", 1) return newCounter `; From f7fed4669db3ab56c0b4a65959995db6ae717341 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sat, 21 Dec 2024 20:52:02 +0300 Subject: [PATCH 17/17] move Date.now() to variables --- workers/notifier/src/redisHelper.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/workers/notifier/src/redisHelper.ts b/workers/notifier/src/redisHelper.ts index d9ec9bec..b8b151f2 100644 --- a/workers/notifier/src/redisHelper.ts +++ b/workers/notifier/src/redisHelper.ts @@ -65,9 +65,11 @@ export default class RedisHelper { const key = `${ruleId}:${groupHash}:${thresholdPeriod}:times`; + const currentTimestamp = Date.now(); + const currentEventCount = await this.redisClient.eval(script, { keys: [ key ], - arguments: [Date.now().toString(), (Date.now() + thresholdPeriod).toString()], + arguments: [currentTimestamp.toString(), (currentTimestamp + thresholdPeriod).toString()], }) as number; return (currentEventCount !== null) ? currentEventCount : 0;