From 25bbaa81af87f9944a64bc4fb7e0c76ef223ada4 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Mon, 7 Oct 2024 15:41:32 +0200 Subject: [PATCH] fix(repeat): also consider startDate when using "every" --- src/classes/job-scheduler.ts | 42 ++++++++++++-------- tests/test_job_scheduler.ts | 77 +++++++++++++++++++++++++++++++++++- 2 files changed, 100 insertions(+), 19 deletions(-) diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index 75819c6223..24e7466d02 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -2,7 +2,6 @@ import { parseExpression } from 'cron-parser'; import { RedisClient, RepeatBaseOptions, RepeatOptions } from '../interfaces'; import { JobsOptions, RepeatStrategy } from '../types'; import { Job } from './job'; -import { Scripts } from './scripts'; import { QueueBase } from './queue-base'; import { RedisConnection } from './redis-connection'; @@ -39,6 +38,20 @@ export class JobScheduler extends QueueBase { opts: Omit, { override }: { override: boolean }, ): Promise | undefined> { + const { every, pattern } = repeatOpts; + + if (pattern && every) { + throw new Error( + 'Both .pattern and .every options are defined for this repeatable job', + ); + } + + if (repeatOpts.immediately && repeatOpts.startDate) { + throw new Error( + 'Both .immediately and .startDate options are defined for this repeatable job', + ); + } + // Check if we reached the limit of the repeatable job's iterations const iterationCount = repeatOpts.count ? repeatOpts.count + 1 : 1; if ( @@ -58,8 +71,14 @@ export class JobScheduler extends QueueBase { const prevMillis = opts.prevMillis || 0; now = prevMillis < now ? now : prevMillis; + // Check if we have a start date for the repeatable job + const { startDate } = repeatOpts; + if (startDate) { + const startMillis = new Date(startDate).getTime(); + now = startMillis > now ? startMillis : now; + } + const nextMillis = await this.repeatStrategy(now, repeatOpts, jobName); - const { every, pattern } = repeatOpts; const hasImmediately = Boolean( (every || pattern) && repeatOpts.immediately, @@ -215,24 +234,13 @@ export const getNextMillis = ( millis: number, opts: RepeatOptions, ): number | undefined => { - const pattern = opts.pattern; - if (pattern && opts.every) { - throw new Error( - 'Both .pattern and .every options are defined for this repeatable job', - ); - } + const { every, pattern } = opts; - if (opts.every) { - return ( - Math.floor(millis / opts.every) * opts.every + - (opts.immediately ? 0 : opts.every) - ); + if (every) { + return Math.floor(millis / every) * every + (opts.immediately ? 0 : every); } - const currentDate = - opts.startDate && new Date(opts.startDate) > new Date(millis) - ? new Date(opts.startDate) - : new Date(millis); + const currentDate = new Date(millis); const interval = parseExpression(pattern, { ...opts, currentDate, diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index f22f80f253..b851cfaf31 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -1365,8 +1365,6 @@ describe('Job Scheduler', function () { const job = delayed[0]; try { await job.remove(); - const delayed = await queue.getDelayed(); - console.log({ delayed }); expect.fail( 'Should not be able to remove a delayed job that belongs to a repeatable job', ); @@ -1546,6 +1544,69 @@ describe('Job Scheduler', function () { delayStub.restore(); }); + it('should repeat every 2 seconds with a startDate in the future', async function () { + this.timeout(10000); + + // Set the initial system time + const initialDate = new Date('2024-01-01 10:00:00'); + this.clock.setSystemTime(initialDate); + + // Set the next tick (repeat interval) and the startDate in the future + const nextTick = 2 * ONE_SECOND + 500; + const startDate = new Date('2024-01-01 10:00:10'); // 10 seconds in the future + + const worker = new Worker( + queueName, + async () => { + this.clock.tick(nextTick); + }, + { autorun: false, connection, prefix }, + ); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {}); + + // Schedule the job with the 'every' interval and a future startDate + await queue.upsertJobScheduler( + 'test', + { + every: 2000, // every 2 seconds + startDate, + }, + { data: { foo: 'bar' } }, + ); + + // Simulate the passage of time up to the startDate + const startDateDelay = startDate.getTime() - initialDate.getTime(); + this.clock.tick(startDateDelay + nextTick); + + let prev: Job; + let counter = 0; + + // Promise to resolve when 5 iterations of the job are completed + const completing = new Promise((resolve, reject) => { + worker.on('completed', async job => { + try { + if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.gte(2000); // Ensure it's repeating every 2 seconds + } + prev = job; + counter++; + if (counter == 5) { + resolve(); + } + } catch (err) { + reject(err); + } + }); + }); + + worker.run(); + + await completing; + await worker.close(); + delayStub.restore(); + }); + it('should throw an error when using .pattern and .every simultaneously', async function () { await expect( queue.upsertJobScheduler('repeat', { @@ -1557,6 +1618,18 @@ describe('Job Scheduler', function () { ); }); + it('should throw an error when using .immediately and .startDate simultaneously', async function () { + await expect( + queue.upsertJobScheduler('repeat', { + every: 5000, + immediately: true, + startDate: new Date(), + }), + ).to.be.rejectedWith( + 'Both .immediately and .startDate options are defined for this repeatable job', + ); + }); + it('should emit a waiting event when adding a repeatable job to the waiting list', async function () { const date = new Date('2017-02-07 9:24:00'); this.clock.setSystemTime(date);