From ab436938d895125635aef0393ae2fb5c77c16c1f Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Sun, 17 Mar 2024 07:52:18 -0600 Subject: [PATCH] fix(worker): validate drainDelay must be greater than 0 (#2477) --- src/classes/worker.ts | 9 +++++++-- tests/test_worker.ts | 14 +++++++++++++- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/classes/worker.ts b/src/classes/worker.ts index da45dadc96..61cdfa7528 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -38,6 +38,7 @@ import { // 10 seconds is the maximum time a BRPOPLPUSH can block. const maximumBlockTimeout = 10; +const minimumBlockTimeout = 0.001; // note: sandboxed processors would also like to define concurrency per process // for better resource utilization. @@ -225,6 +226,10 @@ export class Worker< throw new Error('stalledInterval must be greater than 0'); } + if (this.opts.drainDelay <= 0) { + throw new Error('drainDelay must be greater than 0'); + } + this.concurrency = this.opts.concurrency; this.opts.lockRenewTime = @@ -642,12 +647,12 @@ export class Worker< const blockDelay = blockUntil - Date.now(); // when we reach the time to get new jobs if (blockDelay < 1) { - return 0.001; + return minimumBlockTimeout; } else { return blockDelay / 1000; } } else { - return Math.max(opts.drainDelay, 0); + return Math.max(opts.drainDelay, minimumBlockTimeout); } } diff --git a/tests/test_worker.ts b/tests/test_worker.ts index ded040f315..8448985455 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -1904,7 +1904,7 @@ describe('workers', function () { }); it('stalled interval cannot be zero', function () { - this.timeout(8000); + this.timeout(4000); expect( () => new Worker(queueName, async () => {}, { @@ -1915,6 +1915,18 @@ describe('workers', function () { ).to.throw('stalledInterval must be greater than 0'); }); + it('drain delay cannot be zero', function () { + this.timeout(4000); + expect( + () => + new Worker(queueName, async () => {}, { + connection, + prefix, + drainDelay: 0, + }), + ).to.throw('drainDelay must be greater than 0'); + }); + it('lock extender continues to run until all active jobs are completed when closing a worker', async function () { this.timeout(4000); let worker;