diff --git a/docs/gitbook/python/changelog.md b/docs/gitbook/python/changelog.md index eaaf70df52..61c57b1e27 100644 --- a/docs/gitbook/python/changelog.md +++ b/docs/gitbook/python/changelog.md @@ -15,11 +15,6 @@ ## v2.2.1 (2024-01-16) ### Fix * **retry-jobs:** Add marker when needed ([#2374](https://github.com/taskforcesh/bullmq/issues/2374)) ([`1813d5f`](https://github.com/taskforcesh/bullmq/commit/1813d5fa12b7db69ee6c8c09273729cda8e3e3b5)) -* **security:** Upgrade msgpackr https://github.com/advisories/GHSA-7hpj-7hhx-2fgx ([`7ae0953`](https://github.com/taskforcesh/bullmq/commit/7ae095357fddbdaacc286cbe5782946b95160d55)) - -### Documentation -* **changelog:** Split changelog ([#2381](https://github.com/taskforcesh/bullmq/issues/2381)) ([`368b5a1`](https://github.com/taskforcesh/bullmq/commit/368b5a104b632fa181b2c19cc5e3530387f38ae4)) -* **summary:** Add remove dependency section ([#2378](https://github.com/taskforcesh/bullmq/issues/2378)) ([`03e1451`](https://github.com/taskforcesh/bullmq/commit/03e1451f54edf56f11f9e74f9b4095efe522bb97)) ## v2.2.0 (2024-01-14) ### Feature @@ -33,12 +28,6 @@ * **redis:** Upgrade to v5 [python] ([#2364](https://github.com/taskforcesh/bullmq/issues/2364)) ([`d5113c8`](https://github.com/taskforcesh/bullmq/commit/d5113c88ad108b281b292e2890e0eef3be41c8fb)) * **worker:** Worker can be closed if Redis is down ([#2350](https://github.com/taskforcesh/bullmq/issues/2350)) ([`888dcc2`](https://github.com/taskforcesh/bullmq/commit/888dcc2dd40571e05fe1f4a5c81161ed062f4542)) -### Documentation -* **sandbox:** Add URL support section (#2373) ref #2326 #2372 ([`3a38a47`](https://github.com/taskforcesh/bullmq/commit/3a38a471cbeda70ac9d4d9744b199090dc6f0a12)) -* **bullmq-pro:** Update changelog to v6.9.0 ([#2359](https://github.com/taskforcesh/bullmq/issues/2359)) ([`66d9469`](https://github.com/taskforcesh/bullmq/commit/66d9469b3b40b0b5b43601308fa063707cb41b91)) -* **workers:** Add auto removal jobs section ([#2355](https://github.com/taskforcesh/bullmq/issues/2355)) ([`dddd2c8`](https://github.com/taskforcesh/bullmq/commit/dddd2c89132c0b3216a8788e1672e4433b98a436)) -* **changelog:** Format v2 changes docs [python] ([#2353](https://github.com/taskforcesh/bullmq/issues/2353)) ([`97837f2`](https://github.com/taskforcesh/bullmq/commit/97837f22ec4abbe20c4026221801f51156f4861b)) - ## v2.0.0 (2023-12-23) ### Feature * **job:** Add isActive method [python] ([#2352](https://github.com/taskforcesh/bullmq/issues/2352)) ([`afb5e31`](https://github.com/taskforcesh/bullmq/commit/afb5e31484ed2e5a1c381c732321225c0a8b78ff)) diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 56ca709f48..347296df73 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -622,12 +622,19 @@ export class Scripts { timestamp = timestamp * 0x1000 + (+jobId & 0xfff); } - const keys: (string | number)[] = ['delayed', jobId].map(name => { - return this.queue.toKey(name); - }); - keys.push.apply(keys, [this.queue.keys.events]); + const keys: (string | number)[] = [ + this.queue.keys.delayed, + this.queue.keys.meta, + this.queue.keys.marker, + this.queue.keys.events, + ]; - return keys.concat([delay, JSON.stringify(timestamp), jobId]); + return keys.concat([ + delay, + JSON.stringify(timestamp), + jobId, + this.queue.toKey(jobId), + ]); } async changePriority( diff --git a/src/commands/changeDelay-3.lua b/src/commands/changeDelay-3.lua deleted file mode 100644 index 8e37a18051..0000000000 --- a/src/commands/changeDelay-3.lua +++ /dev/null @@ -1,41 +0,0 @@ ---[[ - Change job delay when it is in delayed set. - Input: - KEYS[1] delayed key - KEYS[2] job key - KEYS[3] events stream - - ARGV[1] delay - ARGV[2] delayedTimestamp - ARGV[3] the id of the job - Output: - 0 - OK - -1 - Missing job. - -3 - Job not in delayed set. - - Events: - - delayed key. -]] -local rcall = redis.call - -if rcall("EXISTS", KEYS[2]) == 1 then - local jobId = ARGV[3] - local score = tonumber(ARGV[2]) - local delayedTimestamp = (score / 0x1000) - - local numRemovedElements = rcall("ZREM", KEYS[1], jobId) - - if numRemovedElements < 1 then - return -3 - end - - rcall("HSET", KEYS[2], "delay", tonumber(ARGV[1])) - rcall("ZADD", KEYS[1], score, jobId) - -- TODO: check if we need to evaluate a new marker - - rcall("XADD", KEYS[3], "*", "event", "delayed", "jobId", jobId, "delay", delayedTimestamp) - - return 0 -else - return -1 -end \ No newline at end of file diff --git a/src/commands/changeDelay-4.lua b/src/commands/changeDelay-4.lua new file mode 100644 index 0000000000..39a85c53f4 --- /dev/null +++ b/src/commands/changeDelay-4.lua @@ -0,0 +1,57 @@ +--[[ + Change job delay when it is in delayed set. + Input: + KEYS[1] delayed key + KEYS[2] meta key + KEYS[3] marker key + KEYS[4] events stream + + ARGV[1] delay + ARGV[2] delayedTimestamp + ARGV[3] the id of the job + ARGV[4] job key + + Output: + 0 - OK + -1 - Missing job. + -3 - Job not in delayed set. + + Events: + - delayed key. +]] +local rcall = redis.call + +-- Includes +--- @include "includes/addDelayMarkerIfNeeded" +--- @include "includes/getOrSetMaxEvents" +--- @include "includes/isQueuePaused" + +if rcall("EXISTS", ARGV[4]) == 1 then + local jobId = ARGV[3] + local score = tonumber(ARGV[2]) + local delayedTimestamp = (score / 0x1000) + + local numRemovedElements = rcall("ZREM", KEYS[1], jobId) + + if numRemovedElements < 1 then + return -3 + end + + rcall("HSET", ARGV[4], "delay", tonumber(ARGV[1])) + rcall("ZADD", KEYS[1], score, jobId) + + local maxEvents = getOrSetMaxEvents(KEYS[2]) + + rcall("XADD", KEYS[4], "MAXLEN", "~", maxEvents, "*", "event", "delayed", + "jobId", jobId, "delay", delayedTimestamp) + + -- mark that a delayed job is available + local isPaused = isQueuePaused(KEYS[2]) + if not isPaused then + addDelayMarkerIfNeeded(KEYS[3], KEYS[1]) + end + + return 0 +else + return -1 +end \ No newline at end of file diff --git a/src/commands/includes/removeParentDependencyKey.lua b/src/commands/includes/removeParentDependencyKey.lua index 411b257298..63b7d5a871 100644 --- a/src/commands/includes/removeParentDependencyKey.lua +++ b/src/commands/includes/removeParentDependencyKey.lua @@ -10,7 +10,8 @@ --- @include "getTargetQueueList" local function moveParentToWait(parentPrefix, parentId, emitEvent) - local parentTarget, isPaused = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "wait", parentPrefix .. "paused") + local parentTarget, isPaused = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "wait", + parentPrefix .. "paused") addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPaused, parentId) if emitEvent then @@ -48,7 +49,8 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey) end else local missedParentKey = rcall("HGET", jobKey, "parentKey") - if( (type(missedParentKey) == "string") and missedParentKey ~= "" and (rcall("EXISTS", missedParentKey) == 1)) then + if( (type(missedParentKey) == "string") and missedParentKey ~= "" + and (rcall("EXISTS", missedParentKey) == 1)) then local parentDependenciesKey = missedParentKey .. ":dependencies" local result = rcall("SREM", parentDependenciesKey, jobKey) if result > 0 then diff --git a/tests/test_job.ts b/tests/test_job.ts index 63555e9db2..b2bf259d4a 100644 --- a/tests/test_job.ts +++ b/tests/test_job.ts @@ -847,7 +847,7 @@ describe('Job', function () { const completing = new Promise(resolve => { worker.on('completed', async () => { const timeDiff = new Date().getTime() - startTime; - expect(timeDiff).to.be.gte(4000); + expect(timeDiff).to.be.gte(2000); resolve(); }); }); @@ -856,17 +856,17 @@ describe('Job', function () { queue, 'test', { foo: 'bar' }, - { delay: 2000 }, + { delay: 8000 }, ); const isDelayed = await job.isDelayed(); expect(isDelayed).to.be.equal(true); - await job.changeDelay(4000); + await job.changeDelay(2000); const isDelayedAfterChangeDelay = await job.isDelayed(); expect(isDelayedAfterChangeDelay).to.be.equal(true); - expect(job.delay).to.be.equal(4000); + expect(job.delay).to.be.equal(2000); await completing; diff --git a/tests/test_worker.ts b/tests/test_worker.ts index 05f3b532a4..6b339ef546 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -1505,10 +1505,12 @@ describe('workers', function () { connection, prefix, }); + await worker1.waitUntilReady(); const worker2 = new Worker(queueName2, null, { connection, prefix, }); + await worker2.waitUntilReady(); try { // There is no point into checking the ready status after closing