Skip to content

Commit

Permalink
test(job-scheduler): add tests covering job retries and stalls (#2941)
Browse files Browse the repository at this point in the history
  • Loading branch information
manast authored Dec 2, 2024
1 parent 9b1f30c commit c94e2bd
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1042,7 +1042,6 @@ will never work with more accuracy than 1ms. */
}

clearTimeout(this.extendLocksTimer);
//clearTimeout(this.stalledCheckTimer);
this.stalledCheckStopper?.();

this.closed = true;
Expand Down Expand Up @@ -1249,6 +1248,7 @@ will never work with more accuracy than 1ms. */
this.emit('stalled', jobId, 'active');
});

// Todo: check if there any listeners on failed event
const jobPromises: Promise<Job<DataType, ResultType, NameType>>[] = [];
for (let i = 0; i < failed.length; i++) {
jobPromises.push(
Expand Down
249 changes: 249 additions & 0 deletions tests/test_job_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1406,6 +1406,255 @@ describe('Job Scheduler', function () {
});
});

describe('when repeatable job fails', function () {
it('should continue repeating', async function () {
const repeatOpts = {
pattern: '0 * 1 * *',
};

const worker = new Worker(
queueName,
async () => {
throw new Error('failed');
},
{
connection,
prefix,
},
);

const failing = new Promise<void>(resolve => {
worker.on('failed', () => {
resolve();
});
});

const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts);
const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.be.equal(1);

await repeatableJob!.promote();
await failing;

const failedCount = await queue.getFailedCount();
expect(failedCount).to.be.equal(1);

const delayedCount2 = await queue.getDelayedCount();
expect(delayedCount2).to.be.equal(1);

const jobSchedulers = await queue.getJobSchedulers();

const count = await queue.count();
expect(count).to.be.equal(1);
expect(jobSchedulers).to.have.length(1);
await worker.close();
});

it('should not create a new delayed job if the failed job is retried with retryJobs', async function () {
const repeatOpts = {
every: 579,
};

let isFirstRun = true;

const worker = new Worker(
queueName,
async () => {
this.clock.tick(177);
if (isFirstRun) {
isFirstRun = false;
throw new Error('failed');
}
},
{
connection,
prefix,
},
);

const failing = new Promise<void>(resolve => {
worker.on('failed', async () => {
resolve();
});
});

const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts);
const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.be.equal(1);

await repeatableJob!.promote();
await failing;

const failedCount = await queue.getFailedCount();
expect(failedCount).to.be.equal(1);

// Retry the failed job
this.clock.tick(1143);
await queue.retryJobs({ state: 'failed' });
const failedCountAfterRetry = await queue.getFailedCount();
expect(failedCountAfterRetry).to.be.equal(0);

const delayedCount2 = await queue.getDelayedCount();
expect(delayedCount2).to.be.equal(1);
});

it('should not create a new delayed job if the failed job is retried with Job.retry()', async function () {
const repeatOpts = {
every: 477,
};

let isFirstRun = true;

const worker = new Worker(
queueName,
async () => {
this.clock.tick(177);

if (isFirstRun) {
isFirstRun = false;
throw new Error('failed');
}
},
{
connection,
prefix,
},
);

const failing = new Promise<void>(resolve => {
worker.on('failed', async () => {
resolve();
});
});

const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts);
const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.be.equal(1);

await repeatableJob!.promote();

this.clock.tick(177);

await failing;

this.clock.tick(177);

const failedJobs = await queue.getFailed();
expect(failedJobs.length).to.be.equal(1);

// Retry the failed job
const failedJob = await queue.getJob(failedJobs[0].id);
await failedJob!.retry();
const failedCountAfterRetry = await queue.getFailedCount();
expect(failedCountAfterRetry).to.be.equal(0);

const delayedCount2 = await queue.getDelayedCount();
expect(delayedCount2).to.be.equal(1);
});

it('should not create a new delayed job if the failed job is stalled and moved back to wait', async function () {
// Note, this test is expected to throw an exception like this:
// "Error: Missing lock for job repeat:test:1486455840000. moveToFinished"
const date = new Date('2017-02-07 9:24:00');
this.clock.setSystemTime(date);

const repeatOpts = {
every: 2000,
};

const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts);
expect(repeatableJob).to.be.ok;

const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.be.equal(1);

await repeatableJob!.promote();

let resolveCompleting: () => void;
const complettingJob = new Promise<void>(resolve => {
resolveCompleting = resolve;
});

let worker: Worker;
const processing = new Promise<void>(resolve => {
worker = new Worker(
queueName,
async () => {
resolve();
return complettingJob;
},
{
connection,
prefix,
skipLockRenewal: true,
skipStalledCheck: true,
},
);
});

await processing;

// force remove the lock
const client = await queue.client;
const lockKey = `${prefix}:${queueName}:${repeatableJob!.id}:lock`;
await client.del(lockKey);

const stalledCheckerKey = `${prefix}:${queueName}:stalled-check`;
await client.del(stalledCheckerKey);

const scripts = (<any>worker!).scripts;
let [failed, stalled] = await scripts.moveStalledJobsToWait();

await client.del(stalledCheckerKey);

[failed, stalled] = await scripts.moveStalledJobsToWait();

const waitingJobs = await queue.getWaiting();
expect(waitingJobs.length).to.be.equal(1);

await this.clock.tick(500);

resolveCompleting!();
await worker!.close();

await this.clock.tick(500);

const delayedCount2 = await queue.getDelayedCount();
expect(delayedCount2).to.be.equal(1);

let completedJobs = await queue.getCompleted();
expect(completedJobs.length).to.be.equal(0);

const processing2 = new Promise<void>(resolve => {
worker = new Worker(
queueName,
async () => {
resolve();
},
{
connection,
prefix,
skipLockRenewal: true,
skipStalledCheck: true,
},
);
});

await processing2;

await worker!.close();

completedJobs = await queue.getCompleted();
expect(completedJobs.length).to.be.equal(1);

const waitingJobs2 = await queue.getWaiting();
expect(waitingJobs2.length).to.be.equal(0);

const delayedCount3 = await queue.getDelayedCount();
expect(delayedCount3).to.be.equal(1);
});
});

describe('when every option is provided', function () {
it('should keep only one delayed job if adding a new repeatable job with the same id', async function () {
const date = new Date('2017-02-07 9:24:00');
Expand Down
15 changes: 15 additions & 0 deletions tests/test_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ describe('queues', function () {
await connection.quit();
});

describe('use generics', function () {
it('should be able to use generics', async function () {
const queue = new Queue<{ foo: string; bar: number }>(queueName, {
prefix,
connection,
});

const job = await queue.add(queueName, { foo: 'bar', bar: 1 });
const job2 = await queue.getJob(job.id!);
expect(job2?.data.foo).to.be.eql('bar');
expect(job2?.data.bar).to.be.eql(1);
await queue.close();
});
});

it('should return the queue version', async () => {
const queue = new Queue(queueName, { connection });
const version = await queue.getVersion();
Expand Down

0 comments on commit c94e2bd

Please sign in to comment.