Skip to content

Commit

Permalink
feat(grouper): added daily affected users counter (#340)
Browse files Browse the repository at this point in the history
* feat: added daily affected users counter

* fix: lint

* feat: added daily affected to tests for afeccted users

* feat: add tests for daily affected

* refactor

* little fixes

* remove log

* feat: added separate method for locking daily events

* feat: added test case, fix tests mongo intiating
  • Loading branch information
slaveeks authored Jan 24, 2025
1 parent 6c8271e commit 55be3bd
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 24 deletions.
140 changes: 119 additions & 21 deletions workers/grouper/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ export default class GrouperWorker extends Worker {

let repetitionId = null;

let incrementDailyAffectedUsers = false;

/**
* Filter sensitive information
*/
Expand All @@ -121,6 +123,11 @@ export default class GrouperWorker extends Worker {
payload: task.event,
usersAffected: 1,
} as GroupedEventDBScheme);

/**
* Increment daily affected users for the first event
*/
incrementDailyAffectedUsers = true;
} catch (e) {
/**
* If we caught Database duplication error, then another worker thread has already saved it to the database
Expand All @@ -136,7 +143,9 @@ export default class GrouperWorker extends Worker {
}
}
} else {
const incrementAffectedUsers = await this.shouldIncrementAffectedUsers(task, existedEvent);
const [incrementAffectedUsers, shouldIncrementDailyAffectedUsers] = await this.shouldIncrementAffectedUsers(task, existedEvent);

incrementDailyAffectedUsers = shouldIncrementDailyAffectedUsers;

/**
* Increment existed task's counter
Expand Down Expand Up @@ -172,7 +181,7 @@ export default class GrouperWorker extends Worker {
/**
* Store events counter by days
*/
await this.saveDailyEvents(task.projectId, uniqueEventHash, task.event.timestamp, repetitionId);
await this.saveDailyEvents(task.projectId, uniqueEventHash, task.event.timestamp, repetitionId, incrementDailyAffectedUsers);

/**
* Add task for NotifierWorker
Expand Down Expand Up @@ -244,22 +253,43 @@ export default class GrouperWorker extends Worker {
}

/**
* Decides whether to increase the number of affected users.
* Decides whether to increase the number of affected users for the repetition and the daily aggregation
*
* @param task - worker task to process
* @param existedEvent - original event to get its user
* @returns {[boolean, boolean]} - whether to increment affected users for the repetition and the daily aggregation
*/
private async shouldIncrementAffectedUsers(task: GroupWorkerTask, existedEvent: GroupedEventDBScheme): Promise<boolean> {
private async shouldIncrementAffectedUsers(task: GroupWorkerTask, existedEvent: GroupedEventDBScheme): Promise<[boolean, boolean]> {
const eventUser = task.event.user;

/**
* In case of no user, we don't need to increment affected users
*/
if (!eventUser) {
return false;
return [false, false];
}


/**
* Default to true - we'll set to false if conditions are met
*/
let shouldIncrementRepetitionAffectedUsers = true;
let shouldIncrementDailyAffectedUsers = true;

/**
* Check if user is the same as original event
*/
const isUserFromOriginalEvent = existedEvent.payload.user?.id === eventUser.id;

/**
* If user is the same as original event, don't increment repetition affected users
*/
if (isUserFromOriginalEvent) {
return false;
shouldIncrementRepetitionAffectedUsers = false;
} else {
/**
* Check if repetition exists for the user, if so, don't increment affected users
*/
const repetitionCacheKey = `repetitions:${task.projectId}:${existedEvent.groupHash}:${eventUser.id}`;
const repetition = await this.cache.get(repetitionCacheKey, async () => {
return this.db.getConnection().collection(`repetitions:${task.projectId}`)
Expand All @@ -270,13 +300,61 @@ export default class GrouperWorker extends Worker {
});

if (repetition) {
return false;
shouldIncrementRepetitionAffectedUsers = false;
}
}

const isLocked = await this.redis.checkOrSetEventLock(existedEvent.groupHash, eventUser.id);
/**
* Get midnight timestamps for the event and the next day
*/
const eventMidnight = this.getMidnightByEventTimestamp(task.event.timestamp);
const eventNextMidnight = this.getMidnightByEventTimestamp(task.event.timestamp, true);

/**
* Check if incoming event has the same day as the original event
*/
const isSameDay = existedEvent.payload.timestamp > eventMidnight && existedEvent.payload.timestamp < eventNextMidnight;

/**
* If incoming event has the same day as the original event and the same user, don't increment daily affected users
*/
if (isSameDay && existedEvent.payload.user?.id === eventUser.id) {
shouldIncrementDailyAffectedUsers = false;
} else {
/**
* Check if daily repetition exists for the user, if so, don't increment affected users
*/
const repetitionDailyCacheKey = `repetitions:${task.projectId}:${existedEvent.groupHash}:${eventUser.id}:${eventMidnight}`;
const repetitionDaily = await this.cache.get(repetitionDailyCacheKey, async () => {
return this.db.getConnection().collection(`repetitions:${task.projectId}`)
.findOne({
groupHash: existedEvent.groupHash,
'payload.user.id': eventUser.id,
'payload.timestamp': {
$gte: eventMidnight,
$lt: eventNextMidnight,
},
});
});

return !isLocked;
/**
* If daily repetition exists, don't increment daily affected users
*/
if (repetitionDaily) {
shouldIncrementDailyAffectedUsers = false;
}
}

/**
* Check Redis lock - if locked, don't increment either counter
*/
const isEventLocked = await this.redis.checkOrSetlockEventForAffectedUsersIncrement(existedEvent.groupHash, eventUser.id);
const isDailyEventLocked = await this.redis.checkOrSetlockDailyEventForAffectedUsersIncrement(existedEvent.groupHash, eventUser.id, eventMidnight);

shouldIncrementRepetitionAffectedUsers = isEventLocked ? false : shouldIncrementRepetitionAffectedUsers;
shouldIncrementDailyAffectedUsers = isDailyEventLocked ? false : shouldIncrementDailyAffectedUsers;

return [shouldIncrementRepetitionAffectedUsers, shouldIncrementDailyAffectedUsers];
}

/**
Expand Down Expand Up @@ -389,29 +467,22 @@ export default class GrouperWorker extends Worker {
* @param {string} eventHash - event hash
* @param {string} eventTimestamp - timestamp of the last event
* @param {string|null} repetitionId - event's last repetition id
* @param {boolean} shouldIncrementAffectedUsers - whether to increment affected users
* @returns {Promise<void>}
*/
private async saveDailyEvents(
projectId: string,
eventHash: string,
eventTimestamp: number,
repetitionId: string | null
repetitionId: string | null,
shouldIncrementAffectedUsers: boolean
): Promise<void> {
if (!projectId || !mongodb.ObjectID.isValid(projectId)) {
throw new ValidationError('GrouperWorker.saveDailyEvents: Project ID is invalid or missed');
}

try {
/**
* Get JavaScript date from event unixtime to convert daily aggregation collection format
*
* Problem was issued due to the numerous events that could be occurred in the past
* but the date always was current
*/
const eventDate = new Date(eventTimestamp * MS_IN_SEC);

eventDate.setUTCHours(0, 0, 0, 0); // 00:00 UTC
const midnight = eventDate.getTime() / MS_IN_SEC;
const midnight = this.getMidnightByEventTimestamp(eventTimestamp);

await this.db.getConnection()
.collection(`dailyEvents:${projectId}`)
Expand All @@ -427,11 +498,38 @@ export default class GrouperWorker extends Worker {
lastRepetitionTime: eventTimestamp,
lastRepetitionId: repetitionId,
},
$inc: { count: 1 },
$inc: {
count: 1,
affectedUsers: shouldIncrementAffectedUsers ? 1 : 0,
},
},
{ upsert: true });
} catch (err) {
throw new DatabaseReadWriteError(err);
}
}

/**
* Gets the midnight timestamp for the event date or the next day
*
* @param eventTimestamp - Unix timestamp of the event
* @param getNext - If true, returns the next day's midnight timestamp
*/
private getMidnightByEventTimestamp(eventTimestamp: number, getNext = false): number {
/**
* Get JavaScript date from event unixtime to convert daily aggregation collection format
*
* Problem was issued due to the numerous events that could be occurred in the past
* but the date always was current
*/
const eventDate = new Date(eventTimestamp * MS_IN_SEC);

if (getNext) {
eventDate.setUTCDate(eventDate.getUTCDate() + 1);
}

eventDate.setUTCHours(0, 0, 0, 0);

return eventDate.getTime() / MS_IN_SEC;
}
}
26 changes: 25 additions & 1 deletion workers/grouper/src/redisHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export default class RedisHelper {
* @param groupHash - event group hash
* @param userId - event user id
*/
public async checkOrSetEventLock(groupHash: string, userId: string): Promise<boolean> {
public async checkOrSetlockEventForAffectedUsersIncrement(groupHash: string, userId: string): Promise<boolean> {
const result = await this.redisClient.set(
`${groupHash}:${userId}`,
'1',
Expand All @@ -80,6 +80,30 @@ export default class RedisHelper {
return result === null;
}

/**
* Checks if a lock exists on the given group hash, identifier and timestamp. If it does not exist, creates a lock.
* Returns true if lock exists
*
* @param groupHash - event group hash
* @param userId - event user id
* @param timestamp - event timestamp for daily events
*/
public async checkOrSetlockDailyEventForAffectedUsersIncrement(groupHash: string, userId: string, timestamp: number): Promise<boolean> {
const result = await this.redisClient.set(
`${groupHash}:${userId}:${timestamp}`,
'1',
{
EX: RedisHelper.LOCK_TTL,
NX: true,
} as const
);

/**
* Result would be null if lock already exists, false otherwise
*/
return result === null;
}

/**
* Creates callback function for Redis operations
*
Expand Down
Loading

0 comments on commit 55be3bd

Please sign in to comment.