From 8643147e5151dc89864bb7d1c118e8787e4bacd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20G=C3=B3=C5=BAd=C5=BA?= Date: Tue, 12 Nov 2024 13:52:42 +0100 Subject: [PATCH] fix(docs): provide connection details in getting started --- docs/gitbook/README (1).md | 5 +- docs/gitbook/bullmq-pro/changelog.md | 62 ++++- docs/gitbook/changelog.md | 69 ++++++ docs/gitbook/guide/connections.md | 57 +++-- docs/gitbook/guide/rate-limiting.md | 17 ++ .../guide/redis-tm-compatibility/README.md | 2 +- docs/gitbook/python/changelog.md | 13 +- package.json | 2 +- python/bullmq/__init__.py | 2 +- python/bullmq/queue.py | 3 + python/pyproject.toml | 2 +- python/tests/queue_tests.py | 14 +- src/classes/index.ts | 1 + src/classes/job-scheduler.ts | 88 ++++--- src/classes/queue-base.ts | 2 +- src/classes/queue-getters.ts | 65 +----- src/classes/queue.ts | 220 +++++++++++++----- src/classes/worker.ts | 7 +- .../includes/prepareJobForProcessing.lua | 7 +- src/commands/moveToActive-11.lua | 4 +- src/commands/moveToFinished-14.lua | 9 +- src/enums/telemetry-attributes.ts | 2 + src/interfaces/repeat-options.ts | 3 - src/interfaces/worker-options.ts | 2 - src/utils.ts | 2 +- src/version.ts | 2 +- tests/test_bulk.ts | 47 +++- tests/test_concurrency.ts | 16 +- tests/test_flow.ts | 96 ++++++++ tests/test_job_scheduler.ts | 20 ++ tests/test_obliterate.ts | 13 ++ tests/test_rate_limiter.ts | 58 +++++ tests/test_telemetry_interface.ts | 56 ++++- tests/test_worker.ts | 38 +-- 34 files changed, 786 insertions(+), 220 deletions(-) diff --git a/docs/gitbook/README (1).md b/docs/gitbook/README (1).md index 95f85fffbf..e849e2ce07 100644 --- a/docs/gitbook/README (1).md +++ b/docs/gitbook/README (1).md @@ -45,12 +45,15 @@ Jobs are added to the queue and can be processed at any time, with at least one ```typescript import { Worker } from 'bullmq'; +import IORedis from 'ioredis'; + +const connection = new IORedis({ maxRetriesPerRequest: null }); const worker = new Worker('foo', async job => { // Will print { foo: 'bar'} for the first job // and { qux: 'baz' } for the second. console.log(job.data); -}); +}, { connection }); ``` {% hint style="info" %} diff --git a/docs/gitbook/bullmq-pro/changelog.md b/docs/gitbook/bullmq-pro/changelog.md index 237925bb50..e23c49afa7 100644 --- a/docs/gitbook/bullmq-pro/changelog.md +++ b/docs/gitbook/bullmq-pro/changelog.md @@ -1,9 +1,58 @@ +## [7.20.1](https://github.com/taskforcesh/bullmq-pro/compare/v7.20.0...v7.20.1) (2024-11-10) + + +### Bug Fixes + +* **repeatable:** only apply immediately in the first iteration ([f69cfbc](https://github.com/taskforcesh/bullmq/commit/f69cfbcbc5516a854adbbc29b259d08e65a19705)) + +# [7.20.0](https://github.com/taskforcesh/bullmq-pro/compare/v7.19.0...v7.20.0) (2024-11-09) + + +### Bug Fixes + +* **scripts:** set package version by default for extension ([#2887](https://github.com/taskforcesh/bullmq/issues/2887)) ([b955340](https://github.com/taskforcesh/bullmq/commit/b955340b940e4c1e330445526cd572e0ab25daa9)) +* **worker:** allow retrieving concurrency value ([#2883](https://github.com/taskforcesh/bullmq/issues/2883)) fixes [#2880](https://github.com/taskforcesh/bullmq/issues/2880) ([52f6317](https://github.com/taskforcesh/bullmq/commit/52f6317ecd2080a5c9684a4fe384e20d86f21de4)) +* **connection:** set packageVersion as protected attribute for extension ([#2884](https://github.com/taskforcesh/bullmq/issues/2884)) ([411ccae](https://github.com/taskforcesh/bullmq/commit/411ccae9419e008d916be6cf71c4d57dd2a07b2b)) + + +### Features + +* **queue-events:** add QueueEventsProducer for publishing custom events ([#2844](https://github.com/taskforcesh/bullmq/issues/2844)) ([5eb03cd](https://github.com/taskforcesh/bullmq/commit/5eb03cd7f27027191eb4bc4ed7386755fd9be1fb)) +* **flows:** add telemetry support ([#2879](https://github.com/taskforcesh/bullmq/issues/2879)) ([5ed154b](https://github.com/taskforcesh/bullmq/commit/5ed154ba240dbe9eb5c22e27ad02e851c0f3cf69)) + +# [7.19.0](https://github.com/taskforcesh/bullmq-pro/compare/v7.18.0...v7.19.0) (2024-11-08) + + +### Bug Fixes + +* **deps:** bump msgpackr to 1.1.2 to resolve ERR_BUFFER_OUT_OF_BOUNDS error ([#2882](https://github.com/taskforcesh/bullmq/issues/2882)) ref [#2747](https://github.com/taskforcesh/bullmq/issues/2747) ([4d2136c](https://github.com/taskforcesh/bullmq/commit/4d2136cc6ba340e511a539c130c9a739fe1055d0)) + + +### Features + +* **scheduler:** add getJobScheduler method ([#2877](https://github.com/taskforcesh/bullmq/issues/2877)) ref [#2875](https://github.com/taskforcesh/bullmq/issues/2875) ([956d98c](https://github.com/taskforcesh/bullmq/commit/956d98c6890484742bb080919c70692234f28c69)) +* **queue:** add a telemetry interface ([#2721](https://github.com/taskforcesh/bullmq/issues/2721)) ([273b574](https://github.com/taskforcesh/bullmq/commit/273b574e6b5628680990eb02e1930809c9cba5bb)) + +# [7.18.0](https://github.com/taskforcesh/bullmq-pro/compare/v7.17.2...v7.18.0) (2024-11-07) + + +### Bug Fixes + +* proper way to get version ([b4e25c1](https://github.com/taskforcesh/bullmq/commit/b4e25c13cafc001748ee6eb590133feb8ee24d7b)) +* **scripts:** add missing wait in isJobInList ([9ef865c](https://github.com/taskforcesh/bullmq/commit/9ef865c7de6086cb3c906721fd046aeed1e0d27f)) +* **redis:** use version for naming loaded lua scripts ([fe73f6d](https://github.com/taskforcesh/bullmq/commit/fe73f6d4d776dc9f99ad3a094e5c59c5fafc96f1)) + + +### Features + +* **queue:** add option to skip metas update ([b7dd925](https://github.com/taskforcesh/bullmq/commit/b7dd925e7f2a4468c98a05f3a3ca1a476482b6c0)) +* **queue:** add queue version support ([#2822](https://github.com/taskforcesh/bullmq/issues/2822)) ([3a4781b](https://github.com/taskforcesh/bullmq/commit/3a4781bf7cadf04f6a324871654eed8f01cdadae)) + ## [7.17.2](https://github.com/taskforcesh/bullmq-pro/compare/v7.17.1...v7.17.2) (2024-10-23) ### Bug Fixes -* **repeatable:** export RepeatOptions ([#261](https://github.com/taskforcesh/bullmq-pro/issues/261)) ([b620bdf](https://github.com/taskforcesh/bullmq-pro/commit/b620bdf4f7449ad20f0ffd07786880115ec77fd9)) * **sandbox:** fix serialization of error with circular references are present ([#2815](https://github.com/taskforcesh/bullmq/issues/2815)) fix [#2813](https://github.com/taskforcesh/bullmq/issues/2813) ([a384d92](https://github.com/taskforcesh/bullmq/commit/a384d926bee15bffa84178a8fad7b94a6a08b572)) ## [7.17.1](https://github.com/taskforcesh/bullmq-pro/compare/v7.17.0...v7.17.1) (2024-10-18) @@ -16,6 +65,12 @@ # [7.17.0](https://github.com/taskforcesh/bullmq-pro/compare/v7.16.0...v7.17.0) (2024-10-12) +### Bug Fixes + +* **repeat:** also consider startDate when using "every" ([25bbaa8](https://github.com/taskforcesh/bullmq/commit/25bbaa81af87f9944a64bc4fb7e0c76ef223ada4)) +* **sandbox:** catch exit errors ([#2800](https://github.com/taskforcesh/bullmq/issues/2800)) ([6babb9e](https://github.com/taskforcesh/bullmq/commit/6babb9e2f355feaf9bd1a8ed229c1001e6de7144)) + + ### Features * **repeat:** deprecate immediately on job scheduler ([ed047f7](https://github.com/taskforcesh/bullmq/commit/ed047f7ab69ebdb445343b6cb325e90b95ee9dc5)) @@ -25,11 +80,6 @@ * **worker-fork:** allow passing fork options ([#2795](https://github.com/taskforcesh/bullmq/issues/2795)) ([f7a4292](https://github.com/taskforcesh/bullmq/commit/f7a4292e064b41236f4489b3d7785a4c599a6435)) * **worker-thread:** allow passing Worker options ([#2791](https://github.com/taskforcesh/bullmq/issues/2791)) ref [#1555](https://github.com/taskforcesh/bullmq/issues/1555) ([6a1f7a9](https://github.com/taskforcesh/bullmq/commit/6a1f7a9f0303561d6ec7b2005ba0227132b89e07)) -### Bug Fixes - -* **repeat:** also consider startDate when using "every" ([25bbaa8](https://github.com/taskforcesh/bullmq/commit/25bbaa81af87f9944a64bc4fb7e0c76ef223ada4)) -* **sandbox:** catch exit errors ([#2800](https://github.com/taskforcesh/bullmq/issues/2800)) ([6babb9e](https://github.com/taskforcesh/bullmq/commit/6babb9e2f355feaf9bd1a8ed229c1001e6de7144)) - # [7.16.0](https://github.com/taskforcesh/bullmq-pro/compare/v7.15.4...v7.16.0) (2024-09-24) diff --git a/docs/gitbook/changelog.md b/docs/gitbook/changelog.md index aff547727a..2b5ab07162 100644 --- a/docs/gitbook/changelog.md +++ b/docs/gitbook/changelog.md @@ -1,3 +1,72 @@ +## [5.29.1](https://github.com/taskforcesh/bullmq/compare/v5.29.0...v5.29.1) (2024-11-23) + + +### Bug Fixes + +* **scheduler:** remove deprecation warning on immediately option ([#2923](https://github.com/taskforcesh/bullmq/issues/2923)) ([14ca7f4](https://github.com/taskforcesh/bullmq/commit/14ca7f44f31a393a8b6d0ce4ed244e0063198879)) + +# [5.29.0](https://github.com/taskforcesh/bullmq/compare/v5.28.2...v5.29.0) (2024-11-22) + + +### Features + +* **queue:** refactor a protected addJob method allowing telemetry extensions ([09f2571](https://github.com/taskforcesh/bullmq/commit/09f257196f6d5a6690edbf55f12d585cec86ee8f)) + +## [5.28.2](https://github.com/taskforcesh/bullmq/compare/v5.28.1...v5.28.2) (2024-11-22) + + +### Bug Fixes + +* **queue:** change _jobScheduler from private to protected for extension ([#2920](https://github.com/taskforcesh/bullmq/issues/2920)) ([34c2348](https://github.com/taskforcesh/bullmq/commit/34c23485bcb32b3c69046b2fb37e5db8927561ce)) + +## [5.28.1](https://github.com/taskforcesh/bullmq/compare/v5.28.0...v5.28.1) (2024-11-20) + + +### Bug Fixes + +* **scheduler:** use Job class from getter for extension ([#2917](https://github.com/taskforcesh/bullmq/issues/2917)) ([5fbb075](https://github.com/taskforcesh/bullmq/commit/5fbb075dd4abd51cc84a59575261de84e56633d8)) + +# [5.28.0](https://github.com/taskforcesh/bullmq/compare/v5.27.0...v5.28.0) (2024-11-19) + + +### Features + +* **job-scheduler:** add telemetry support to the job scheduler ([72ea950](https://github.com/taskforcesh/bullmq/commit/72ea950ea251aa12f879ba19c0b5dfeb6a093da2)) + +# [5.27.0](https://github.com/taskforcesh/bullmq/compare/v5.26.2...v5.27.0) (2024-11-19) + + +### Features + +* **queue:** add rateLimit method ([#2896](https://github.com/taskforcesh/bullmq/issues/2896)) ([db84ad5](https://github.com/taskforcesh/bullmq/commit/db84ad51a945c754c3cd03e5e718cd8d0341a8b4)) +* **queue:** add removeRateLimitKey method ([#2806](https://github.com/taskforcesh/bullmq/issues/2806)) ([ff70613](https://github.com/taskforcesh/bullmq/commit/ff706131bf642fb7544b9d15994d75b1edcb27dc)) + + +### Performance Improvements + +* **marker:** add base markers while consuming jobs to get workers busy ([#2904](https://github.com/taskforcesh/bullmq/issues/2904)) fixes [#2842](https://github.com/taskforcesh/bullmq/issues/2842) ([1759c8b](https://github.com/taskforcesh/bullmq/commit/1759c8bc111cab9e43d5fccb4d8d2dccc9c39fb4)) + +## [5.26.2](https://github.com/taskforcesh/bullmq/compare/v5.26.1...v5.26.2) (2024-11-15) + + +### Bug Fixes + +* **telemetry:** do not set span on parent context if undefined ([c417a23](https://github.com/taskforcesh/bullmq/commit/c417a23bb28d9effa42115e954b18cc41c1fc043)) + +## [5.26.1](https://github.com/taskforcesh/bullmq/compare/v5.26.0...v5.26.1) (2024-11-14) + + +### Bug Fixes + +* **queue:** fix generics to be able to properly be extended ([f2495e5](https://github.com/taskforcesh/bullmq/commit/f2495e5ee9ecdb26492da510dc38730718cb28c5)) + +# [5.26.0](https://github.com/taskforcesh/bullmq/compare/v5.25.6...v5.26.0) (2024-11-14) + + +### Features + +* improve queue getters to use generic job type ([#2905](https://github.com/taskforcesh/bullmq/issues/2905)) ([c9531ec](https://github.com/taskforcesh/bullmq/commit/c9531ec7a49126a017611eb2fd2eaea8fcb5ada5)) + ## [5.25.6](https://github.com/taskforcesh/bullmq/compare/v5.25.5...v5.25.6) (2024-11-11) diff --git a/docs/gitbook/guide/connections.md b/docs/gitbook/guide/connections.md index 54e1a2bd53..40adadbc77 100644 --- a/docs/gitbook/guide/connections.md +++ b/docs/gitbook/guide/connections.md @@ -7,32 +7,59 @@ Every class will consume at least one Redis connection, but it is also possible Some examples: ```typescript -import { Queue, Worker } from 'bullmq' +import { Queue, Worker } from 'bullmq'; // Create a new connection in every instance -const myQueue = new Queue('myqueue', { connection: { - host: "myredis.taskforce.run", - port: 32856 -}}); - -const myWorker = new Worker('myqueue', async (job)=>{}, { connection: { - host: "myredis.taskforce.run", - port: 32856 -}}); +const myQueue = new Queue('myqueue', { + connection: { + host: 'myredis.taskforce.run', + port: 32856, + }, +}); + +const myWorker = new Worker('myqueue', async job => {}, { + connection: { + host: 'myredis.taskforce.run', + port: 32856, + }, +}); ``` ```typescript -import { Queue, Worker } from 'bullmq'; +import { Queue } from 'bullmq'; import IORedis from 'ioredis'; const connection = new IORedis(); -// Reuse the ioredis instance -const myQueue = new Queue('myqueue', { connection }); -const myWorker = new Worker('myqueue', async (job)=>{}, { connection }); +// Reuse the ioredis instance in 2 different producers +const myFirstQueue = new Queue('myFirstQueue', { connection }); +const mySecondQueue = new Queue('mySecondQueue', { connection }); ``` -Note that in the second example, even though the ioredis instance is being reused, the worker will create a duplicated connection that it needs internally to make blocking connections. Consult the [ioredis](https://github.com/luin/ioredis/blob/master/API.md) documentation to learn how to properly create an instance of `IORedis.` +```typescript +import { Worker } from 'bullmq'; +import IORedis from 'ioredis'; + +const connection = new IORedis({ maxRetriesPerRequest: null }); + +// Reuse the ioredis instance in 2 different consumers +const myFirstWorker = new Worker('myFirstWorker', async job => {}, { + connection, +}); +const mySecondWorker = new Worker('mySecondWorker', async job => {}, { + connection, +}); +``` + +Note that in the third example, even though the ioredis instance is being reused, the worker will create a duplicated connection that it needs internally to make blocking connections. Consult the [ioredis](https://github.com/luin/ioredis/blob/master/API.md) documentation to learn how to properly create an instance of `IORedis`. + +Also note that simple Queue instance used for managing the queue such as adding jobs, pausing, using getters, etc. usually has different requirements from the worker. + +For example, say that you are adding jobs to a queue as the result of a call to an HTTP endpoint - producer service. The caller of this endpoint cannot wait forever if the connection to Redis happens to be down when this call is made. Therefore the `maxRetriesPerRequest` setting should either be left at its default (which currently is 20) or set it to another value, maybe 1 so that the user gets an error quickly and can retry later. + +On the other hand, if you are adding jobs inside a Worker processor, this process is expected to happen in the background - consumer service. In this case you can share the same connection. + +For more details, refer to the [persistent connections](https://docs.bullmq.io/bull/patterns/persistent-connections) page. {% hint style="danger" %} When using ioredis connections, be careful not to use the "keyPrefix" option in [ioredis](https://redis.github.io/ioredis/interfaces/CommonRedisOptions.html#keyPrefix) as this option is not compatible with BullMQ, which provides its own key prefixing mechanism. diff --git a/docs/gitbook/guide/rate-limiting.md b/docs/gitbook/guide/rate-limiting.md index a06d98997e..49e155e528 100644 --- a/docs/gitbook/guide/rate-limiting.md +++ b/docs/gitbook/guide/rate-limiting.md @@ -112,7 +112,24 @@ if (ttl > 0) { } ``` +### Remove Rate Limit Key + +Sometimes is useful to stop a rate limit delay. + +For this purpose, you can use the **`removeRateLimitKey`** method like this: + +```typescript +import { Queue } from 'bullmq'; + +const queue = new Queue('myQueue', { connection }); + +await queue.removeRateLimitKey(); +``` + +By removing rate limit key, workers will be able to pick jobs again and your rate limit counter is reset to zero. + ## Read more: - 💡 [Rate Limit API Reference](https://api.docs.bullmq.io/classes/v5.Worker.html#rateLimit) - 💡 [Get Rate Limit Ttl API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#getRateLimitTtl) +- 💡 [Remove Rate Limit Key API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#removeRateLimitKey) diff --git a/docs/gitbook/guide/redis-tm-compatibility/README.md b/docs/gitbook/guide/redis-tm-compatibility/README.md index 4c64f8522c..0317062547 100644 --- a/docs/gitbook/guide/redis-tm-compatibility/README.md +++ b/docs/gitbook/guide/redis-tm-compatibility/README.md @@ -1,4 +1,4 @@ # Redis™ Compatibility -There are several alternatives for Redis and even though BullMQ is full Redis™ compliant, not all the alternatives are going to work properly. In this section we present the vendors that officially support BullMQ and that we regularly test to verify they keep staying compatible. +There are several alternatives for Redis and even though BullMQ is full Redis™ compliant with version 6.2.0 or newer, not all the alternatives are going to work properly. In this section we present the vendors that officially support BullMQ and that we regularly test to verify they keep staying compatible. diff --git a/docs/gitbook/python/changelog.md b/docs/gitbook/python/changelog.md index 6bdea9577d..ec2a2753ea 100644 --- a/docs/gitbook/python/changelog.md +++ b/docs/gitbook/python/changelog.md @@ -2,23 +2,24 @@ +## v2.11.0 (2024-11-26) +### Feature +* **queue:** Add getDelayedCount method [python] ([#2934](https://github.com/taskforcesh/bullmq/issues/2934)) ([`71ce75c`](https://github.com/taskforcesh/bullmq/commit/71ce75c04b096b5593da0986c41a771add1a81ce)) + +### Performance +* **marker:** Add base markers while consuming jobs to get workers busy (#2904) fixes #2842 ([`1759c8b`](https://github.com/taskforcesh/bullmq/commit/1759c8bc111cab9e43d5fccb4d8d2dccc9c39fb4)) + ## v2.10.1 (2024-10-26) ### Fix * **commands:** Add missing build statement when releasing [python] (#2869) fixes #2868 ([`ff2a47b`](https://github.com/taskforcesh/bullmq/commit/ff2a47b37c6b36ee1a725f91de2c6e4bcf8b011a)) -### Documentation -* **job:** Clarify per-queue scoping of job ids ([#2864](https://github.com/taskforcesh/bullmq/issues/2864)) ([`6c2b80f`](https://github.com/taskforcesh/bullmq/commit/6c2b80f490a0ab4afe502fc8415d6549e0022367)) -* **v4:** Update changelog with v4.18.2 ([#2867](https://github.com/taskforcesh/bullmq/issues/2867)) ([`7ba452e`](https://github.com/taskforcesh/bullmq/commit/7ba452ec3e6d4658e357b2ca810893172c1e0b25)) - ## v2.10.0 (2024-10-24) ### Feature * **job:** Add getChildrenValues method [python] ([#2853](https://github.com/taskforcesh/bullmq/issues/2853)) ([`0f25213`](https://github.com/taskforcesh/bullmq/commit/0f25213b28900a1c35922bd33611701629d83184)) * **queue:** Add option to skip metas update ([`b7dd925`](https://github.com/taskforcesh/bullmq/commit/b7dd925e7f2a4468c98a05f3a3ca1a476482b6c0)) * **queue:** Add queue version support ([#2822](https://github.com/taskforcesh/bullmq/issues/2822)) ([`3a4781b`](https://github.com/taskforcesh/bullmq/commit/3a4781bf7cadf04f6a324871654eed8f01cdadae)) -* **repeat:** Deprecate immediately on job scheduler ([`ed047f7`](https://github.com/taskforcesh/bullmq/commit/ed047f7ab69ebdb445343b6cb325e90b95ee9dc5)) * **job:** Expose priority value ([#2804](https://github.com/taskforcesh/bullmq/issues/2804)) ([`9abec3d`](https://github.com/taskforcesh/bullmq/commit/9abec3dbc4c69f2496c5ff6b5d724f4d1a5ca62f)) * **job:** Add deduplication logic ([#2796](https://github.com/taskforcesh/bullmq/issues/2796)) ([`0a4982d`](https://github.com/taskforcesh/bullmq/commit/0a4982d05d27c066248290ab9f59349b802d02d5)) -* **queue:** Add new upsertJobScheduler, getJobSchedulers and removeJobSchedulers methods ([`dd6b6b2`](https://github.com/taskforcesh/bullmq/commit/dd6b6b2263badd8f29db65d1fa6bcdf5a1e9f6e2)) * **queue:** Add getDebounceJobId method ([#2717](https://github.com/taskforcesh/bullmq/issues/2717)) ([`a68ead9`](https://github.com/taskforcesh/bullmq/commit/a68ead95f32a7d9dabba602895d05c22794b2c02)) ### Fix diff --git a/package.json b/package.json index 769bfc069c..37e30fa43a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "bullmq", - "version": "5.25.6", + "version": "5.29.1", "description": "Queue for messages and jobs based on Redis", "homepage": "https://bullmq.io/", "main": "./dist/cjs/index.js", diff --git a/python/bullmq/__init__.py b/python/bullmq/__init__.py index 1db7674593..8a8ee15cf7 100644 --- a/python/bullmq/__init__.py +++ b/python/bullmq/__init__.py @@ -3,7 +3,7 @@ A background job processor and message queue for Python based on Redis. """ -__version__ = "2.10.1" +__version__ = "2.11.0" __author__ = 'Taskforce.sh Inc.' __credits__ = 'Taskforce.sh Inc.' diff --git a/python/bullmq/queue.py b/python/bullmq/queue.py index 0bcffc1fae..2672750ad6 100644 --- a/python/bullmq/queue.py +++ b/python/bullmq/queue.py @@ -254,6 +254,9 @@ def getJobState(self, job_id: str): def getCompletedCount(self): return self.getJobCountByTypes('completed') + def getDelayedCount(self): + return self.getJobCountByTypes('delayed') + def getFailedCount(self): return self.getJobCountByTypes('failed') diff --git a/python/pyproject.toml b/python/pyproject.toml index 68ad2bc0da..601b202b37 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "bullmq" -version = "2.10.1" +version = "2.11.0" description='BullMQ for Python' readme="README.md" authors = [ diff --git a/python/tests/queue_tests.py b/python/tests/queue_tests.py index 94bdb7da6a..5ffeaf5f38 100644 --- a/python/tests/queue_tests.py +++ b/python/tests/queue_tests.py @@ -54,7 +54,7 @@ async def test_get_job_state(self): async def test_add_job_with_options(self): queue = Queue(queueName) data = {"foo": "bar"} - attempts = 3, + attempts = 3 delay = 1000 job = await queue.add("test-job", data=data, opts={"attempts": attempts, "delay": delay}) @@ -133,6 +133,18 @@ async def test_trim_events_manually_with_custom_prefix(self): await queue.obliterate() await queue.close() + async def test_get_delayed_count(self): + queue = Queue(queueName) + data = {"foo": "bar"} + delay = 1000 + await queue.add("test-job", data=data, opts={"delay": delay}) + await queue.add("test-job", data=data, opts={"delay": delay * 2}) + + count = await queue.getDelayedCount() + self.assertEqual(count, 2) + + await queue.close() + async def test_retry_failed_jobs(self): queue = Queue(queueName) job_count = 8 diff --git a/src/classes/index.ts b/src/classes/index.ts index ad2158913c..88c348d845 100644 --- a/src/classes/index.ts +++ b/src/classes/index.ts @@ -5,6 +5,7 @@ export * from './child-processor'; export * from './errors'; export * from './flow-producer'; export * from './job'; +export * from './job-scheduler'; // export * from './main'; this file must not be exported // export * from './main-worker'; this file must not be exported export * from './queue-base'; diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index c974d4a631..63c0f8199c 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -4,6 +4,7 @@ import { JobsOptions, RepeatStrategy } from '../types'; import { Job } from './job'; import { QueueBase } from './queue-base'; import { RedisConnection } from './redis-connection'; +import { SpanKind, TelemetryAttributes } from '../enums'; export interface JobSchedulerJson { key: string; // key is actually the job scheduler id @@ -46,6 +47,12 @@ export class JobScheduler extends QueueBase { ); } + if (!pattern && !every) { + throw new Error( + 'Either .pattern or .every options must be defined for this repeatable job', + ); + } + if (repeatOpts.immediately && repeatOpts.startDate) { throw new Error( 'Both .immediately and .startDate options are defined for this repeatable job', @@ -77,7 +84,7 @@ export class JobScheduler extends QueueBase { now = startMillis > now ? startMillis : now; } - let nextMillis; + let nextMillis: number; if (every) { nextMillis = prevMillis + every; @@ -92,7 +99,7 @@ export class JobScheduler extends QueueBase { const multi = (await this.client).multi(); if (nextMillis) { if (override) { - await this.scripts.addJobScheduler( + this.scripts.addJobScheduler( (multi) as RedisClient, jobSchedulerId, nextMillis, @@ -105,37 +112,54 @@ export class JobScheduler extends QueueBase { }, ); } else { - await this.scripts.updateJobSchedulerNextMillis( + this.scripts.updateJobSchedulerNextMillis( (multi) as RedisClient, jobSchedulerId, nextMillis, ); } - const job = this.createNextJob( - (multi) as RedisClient, - jobName, - nextMillis, - jobSchedulerId, - { ...opts, repeat: filteredRepeatOpts }, - jobData, - iterationCount, + return this.trace>( + SpanKind.PRODUCER, + 'add', + `${this.name}.${jobName}`, + async (span, srcPropagationMedatada) => { + const job = this.createNextJob( + (multi) as RedisClient, + jobName, + nextMillis, + jobSchedulerId, + { + ...opts, + repeat: filteredRepeatOpts, + telemetryMetadata: srcPropagationMedatada, + }, + jobData, + iterationCount, + ); + + const results = await multi.exec(); // multi.exec returns an array of results [ err, result ][] + + // Check if there are any errors + const erroredResult = results.find(result => result[0]); + if (erroredResult) { + throw new Error( + `Error upserting job scheduler ${jobSchedulerId} - ${erroredResult[0]}`, + ); + } + + // Get last result with the job id + const lastResult = results.pop(); + job.id = lastResult[1] as string; + + span?.setAttributes({ + [TelemetryAttributes.JobSchedulerId]: jobSchedulerId, + [TelemetryAttributes.JobId]: job.id, + }); + + return job; + }, ); - - const results = await multi.exec(); // multi.exec returns an array of results [ err, result ][] - - // Check if there are any errors - const erroredResult = results.find(result => result[0]); - if (erroredResult) { - throw new Error( - `Error upserting job scheduler ${jobSchedulerId} - ${erroredResult[0]}`, - ); - } - - // Get last result with the job id - const lastResult = results.pop(); - job.id = lastResult[1] as string; - return job; } } @@ -170,7 +194,7 @@ export class JobScheduler extends QueueBase { mergedOpts.repeat = { ...opts.repeat, count: currentCount }; - const job = new Job(this, name, data, mergedOpts, jobId); + const job = new this.Job(this, name, data, mergedOpts, jobId); job.addJob(client); return job; @@ -295,13 +319,3 @@ export const defaultRepeatStrategy = ( // Ignore error } }; - -function removeUndefinedFields(obj: Record) { - const newObj: Record = {}; - for (const key in obj) { - if (obj[key] !== undefined) { - newObj[key] = obj[key]; - } - } - return newObj; -} diff --git a/src/classes/queue-base.ts b/src/classes/queue-base.ts index 2760a7b108..9e2008f958 100644 --- a/src/classes/queue-base.ts +++ b/src/classes/queue-base.ts @@ -12,7 +12,7 @@ import { RedisConnection } from './redis-connection'; import { Job } from './job'; import { KeysMap, QueueKeys } from './queue-keys'; import { Scripts } from './scripts'; -import { TelemetryAttributes, SpanKind } from '../enums'; +import { SpanKind } from '../enums'; /** * @class QueueBase diff --git a/src/classes/queue-getters.ts b/src/classes/queue-getters.ts index edaf192812..29e0e9d1c3 100644 --- a/src/classes/queue-getters.ts +++ b/src/classes/queue-getters.ts @@ -14,17 +14,9 @@ import { JobJsonRaw, Metrics } from '../interfaces'; * * @description Provides different getters for different aspects of a queue. */ -export class QueueGetters< - DataType, - ResultType, - NameType extends string, -> extends QueueBase { - getJob( - jobId: string, - ): Promise | undefined> { - return this.Job.fromId(this, jobId) as Promise< - Job - >; +export class QueueGetters extends QueueBase { + getJob(jobId: string): Promise { + return this.Job.fromId(this, jobId) as Promise; } private commandByType( @@ -53,13 +45,6 @@ export class QueueGetters< }); } - /** - * Helper to easily extend Job class calls. - */ - protected get Job(): typeof Job { - return Job; - } - private sanitizeJobTypes(types: JobType[] | JobType | undefined): JobType[] { const currentTypes = typeof types === 'string' ? [types] : types; @@ -251,10 +236,7 @@ export class QueueGetters< * @param start - zero based index from where to start returning jobs. * @param end - zero based index where to stop returning jobs. */ - getWaiting( - start = 0, - end = -1, - ): Promise[]> { + getWaiting(start = 0, end = -1): Promise { return this.getJobs(['waiting'], start, end, true); } @@ -264,10 +246,7 @@ export class QueueGetters< * @param start - zero based index from where to start returning jobs. * @param end - zero based index where to stop returning jobs. */ - getWaitingChildren( - start = 0, - end = -1, - ): Promise[]> { + getWaitingChildren(start = 0, end = -1): Promise { return this.getJobs(['waiting-children'], start, end, true); } @@ -276,10 +255,7 @@ export class QueueGetters< * @param start - zero based index from where to start returning jobs. * @param end - zero based index where to stop returning jobs. */ - getActive( - start = 0, - end = -1, - ): Promise[]> { + getActive(start = 0, end = -1): Promise { return this.getJobs(['active'], start, end, true); } @@ -288,10 +264,7 @@ export class QueueGetters< * @param start - zero based index from where to start returning jobs. * @param end - zero based index where to stop returning jobs. */ - getDelayed( - start = 0, - end = -1, - ): Promise[]> { + getDelayed(start = 0, end = -1): Promise { return this.getJobs(['delayed'], start, end, true); } @@ -300,10 +273,7 @@ export class QueueGetters< * @param start - zero based index from where to start returning jobs. * @param end - zero based index where to stop returning jobs. */ - getPrioritized( - start = 0, - end = -1, - ): Promise[]> { + getPrioritized(start = 0, end = -1): Promise { return this.getJobs(['prioritized'], start, end, true); } @@ -312,10 +282,7 @@ export class QueueGetters< * @param start - zero based index from where to start returning jobs. * @param end - zero based index where to stop returning jobs. */ - getCompleted( - start = 0, - end = -1, - ): Promise[]> { + getCompleted(start = 0, end = -1): Promise { return this.getJobs(['completed'], start, end, false); } @@ -324,10 +291,7 @@ export class QueueGetters< * @param start - zero based index from where to start returning jobs. * @param end - zero based index where to stop returning jobs. */ - getFailed( - start = 0, - end = -1, - ): Promise[]> { + getFailed(start = 0, end = -1): Promise { return this.getJobs(['failed'], start, end, false); } @@ -422,18 +386,13 @@ export class QueueGetters< start = 0, end = -1, asc = false, - ): Promise[]> { + ): Promise { const currentTypes = this.sanitizeJobTypes(types); const jobIds = await this.getRanges(currentTypes, start, end, asc); return Promise.all( - jobIds.map( - jobId => - this.Job.fromId(this, jobId) as Promise< - Job - >, - ), + jobIds.map(jobId => this.Job.fromId(this, jobId) as Promise), ); } diff --git a/src/classes/queue.ts b/src/classes/queue.ts index bb1dae7ae5..89373010ae 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -29,7 +29,7 @@ export interface ObliterateOpts { count?: number; } -export interface QueueListener +export interface QueueListener extends IoredisListener { /** * Listen to 'cleaned' event. @@ -57,17 +57,14 @@ export interface QueueListener * * This event is triggered when the job updates its progress. */ - progress: ( - job: Job, - progress: number | object, - ) => void; + progress: (job: JobBase, progress: number | object) => void; /** * Listen to 'removed' event. * * This event is triggered when a job is removed. */ - removed: (job: Job) => void; + removed: (job: JobBase) => void; /** * Listen to 'resumed' event. @@ -81,21 +78,74 @@ export interface QueueListener * * This event is triggered when the queue creates a new job. */ - waiting: (job: Job) => void; + waiting: (job: JobBase) => void; } +// Helper for JobBase type +type JobBase = T extends Job< + any, + any, + any +> + ? T + : Job; + +// Helper types to extract DataType, ResultType, and NameType +type ExtractDataType = DataTypeOrJob extends Job< + infer D, + any, + any +> + ? D + : Default; + +type ExtractResultType = DataTypeOrJob extends Job< + any, + infer R, + any +> + ? R + : Default; + +type ExtractNameType< + DataTypeOrJob, + Default extends string, +> = DataTypeOrJob extends Job ? N : Default; + /** * Queue * * This class provides methods to add jobs to a queue and some other high-level * administration such as pausing or deleting queues. * + * @template DataType - The type of the data that the job will process. + * @template ResultType - The type of the result of the job. + * @template NameType - The type of the name of the job. + * + * @example + * + * ```typescript + * import { Queue } from 'bullmq'; + * + * interface MyDataType { + * foo: string; + * } + * + * interface MyResultType { + * bar: string; + * } + * + * const queue = new Queue('myQueue'); + * ``` */ export class Queue< - DataType = any, - ResultType = any, - NameType extends string = string, -> extends QueueGetters { + DataTypeOrJob = any, + DefaultResultType = any, + DefaultNameType extends string = string, + DataType = ExtractDataType, + ResultType = ExtractResultType, + NameType extends string = ExtractNameType, +> extends QueueGetters> { token = v4(); jobsOpts: BaseJobOptions; opts: QueueOptions; @@ -103,7 +153,7 @@ export class Queue< protected libName = 'bullmq'; private _repeat?: Repeat; // To be deprecated in v6 in favor of JobScheduler - private _jobScheduler?: JobScheduler; + protected _jobScheduler?: JobScheduler; constructor( name: string, @@ -132,32 +182,34 @@ export class Queue< }); } - emit>( + emit>>( event: U, - ...args: Parameters[U]> + ...args: Parameters< + QueueListener>[U] + > ): boolean { return super.emit(event, ...args); } - off>( + off>>( eventName: U, - listener: QueueListener[U], + listener: QueueListener>[U], ): this { super.off(eventName, listener); return this; } - on>( + on>>( event: U, - listener: QueueListener[U], + listener: QueueListener>[U], ): this { super.on(event, listener); return this; } - once>( + once>>( event: U, - listener: QueueListener[U], + listener: QueueListener>[U], ): this { super.once(event, listener); return this; @@ -259,47 +311,66 @@ export class Queue< opts = { ...opts, telemetryMetadata: srcPropagationMedatada }; } - if (opts && opts.repeat) { - if (opts.repeat.endDate) { - if (+new Date(opts.repeat.endDate) < Date.now()) { - throw new Error( - 'End date must be greater than current timestamp', - ); - } - } + const job = await this.addJob(name, data, opts); - return (await this.repeat).updateRepeatableJob< - DataType, - ResultType, - NameType - >(name, data, { ...this.jobsOpts, ...opts }, { override: true }); - } else { - const jobId = opts?.jobId; + span?.setAttributes({ + [TelemetryAttributes.JobName]: name, + [TelemetryAttributes.JobId]: job.id, + }); - if (jobId == '0' || jobId?.startsWith('0:')) { - throw new Error("JobId cannot be '0' or start with 0:"); - } + return job; + }, + ); + } - const job = await this.Job.create( - this as MinimalQueue, - name, - data, - { - ...this.jobsOpts, - ...opts, - jobId, - }, - ); - this.emit('waiting', job); + /** + * addJob is a telemetry free version of the add method, useful in order to wrap it + * with custom telemetry on subclasses. + * + * @param name + * @param data + * @param opts + * + * @returns Job + */ + protected async addJob( + name: NameType, + data: DataType, + opts?: JobsOptions, + ): Promise> { + if (opts && opts.repeat) { + if (opts.repeat.endDate) { + if (+new Date(opts.repeat.endDate) < Date.now()) { + throw new Error('End date must be greater than current timestamp'); + } + } - span?.setAttributes({ - [TelemetryAttributes.JobId]: job.id, - }); + return (await this.repeat).updateRepeatableJob< + DataType, + ResultType, + NameType + >(name, data, { ...this.jobsOpts, ...opts }, { override: true }); + } else { + const jobId = opts?.jobId; - return job; - } - }, - ); + if (jobId == '0' || jobId?.startsWith('0:')) { + throw new Error("JobId cannot be '0' or start with 0:"); + } + + const job = await this.Job.create( + this as MinimalQueue, + name, + data, + { + ...this.jobsOpts, + ...opts, + jobId, + }, + ); + this.emit('waiting', job as JobBase); + + return job; + } } /** @@ -418,6 +489,34 @@ export class Queue< await super.close(); }); } + + /** + * Overrides the rate limit to be active for the next jobs. + * + * @param expireTimeMs - expire time in ms of this rate limit. + */ + async rateLimit(expireTimeMs: number): Promise { + await this.trace( + SpanKind.INTERNAL, + 'rateLimit', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.QueueRateLimit]: expireTimeMs, + }); + + await this.client.then(client => + client.set( + this.keys.limiter, + Number.MAX_SAFE_INTEGER, + 'PX', + expireTimeMs, + ), + ); + }, + ); + } + /** * Resumes the processing of this queue globally. * @@ -589,6 +688,15 @@ export class Queue< ); } + /** + * Removes rate limit key. + */ + async removeRateLimitKey(): Promise { + const client = await this.client; + + return client.del(this.keys.limiter); + } + /** * Removes a repeatable job by its key. Note that the key is the one used * to store the repeatable job metadata and not one of the job iterations diff --git a/src/classes/worker.ts b/src/classes/worker.ts index e6984e046f..acc6f1d533 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -191,7 +191,7 @@ export class Worker< private waiting: Promise | null = null; private _repeat: Repeat; // To be deprecated in v6 in favor of Job Scheduler - private _jobScheduler: JobScheduler; + protected _jobScheduler: JobScheduler; protected paused: Promise; protected processFn: Processor; @@ -626,7 +626,7 @@ export class Worker< /** * Overrides the rate limit to be active for the next jobs. - * + * @deprecated This method is deprecated and will be removed in v6. Use queue.rateLimit method instead. * @param expireTimeMs - expire time in ms of this rate limit. */ async rateLimit(expireTimeMs: number): Promise { @@ -995,7 +995,8 @@ will never work with more accuracy than 1ms. */ * This method waits for current jobs to finalize before returning. * * @param force - Use force boolean parameter if you do not want to wait for - * current jobs to be processed. + * current jobs to be processed. When using telemetry, be mindful that it can + * interfere with the proper closure of spans, potentially preventing them from being exported. * * @returns Promise that resolves when the worker has been closed. */ diff --git a/src/commands/includes/prepareJobForProcessing.lua b/src/commands/includes/prepareJobForProcessing.lua index b1e96ffb49..7bceac146e 100644 --- a/src/commands/includes/prepareJobForProcessing.lua +++ b/src/commands/includes/prepareJobForProcessing.lua @@ -7,8 +7,11 @@ opts - limiter ]] +-- Includes +--- @include "addBaseMarkerIfNeeded" + local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey, - jobId, processedOn, maxJobs, opts) + jobId, processedOn, maxJobs, markerKey, opts) local jobKey = keyPrefix .. jobId -- Check if we need to perform rate limiting. @@ -41,5 +44,7 @@ local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey rcall("HMSET", jobKey, "processedOn", processedOn, unpack(optionalValues)) rcall("HINCRBY", jobKey, "ats", 1) + addBaseMarkerIfNeeded(markerKey, false) + return {rcall("HGETALL", jobKey), jobId, 0, 0} -- get job data end diff --git a/src/commands/moveToActive-11.lua b/src/commands/moveToActive-11.lua index 946091e84a..064f3fecef 100644 --- a/src/commands/moveToActive-11.lua +++ b/src/commands/moveToActive-11.lua @@ -77,12 +77,12 @@ end if jobId then return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2], - maxJobs, opts) + maxJobs, markerKey, opts) else jobId = moveJobFromPriorityToActive(KEYS[3], activeKey, KEYS[10]) if jobId then return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2], - maxJobs, opts) + maxJobs, markerKey, opts) end end diff --git a/src/commands/moveToFinished-14.lua b/src/commands/moveToFinished-14.lua index 06631b52d9..616ee5dada 100644 --- a/src/commands/moveToFinished-14.lua +++ b/src/commands/moveToFinished-14.lua @@ -207,8 +207,9 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[2], KEYS[1], KEYS[8]) + local markerKey = KEYS[14] -- Check if there are delayed jobs that can be promoted - promoteDelayedJobs(KEYS[7], KEYS[14], target, KEYS[3], eventStreamKey, prefix, + promoteDelayedJobs(KEYS[7], markerKey, target, KEYS[3], eventStreamKey, prefix, timestamp, KEYS[10], isPausedOrMaxed) local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max']) @@ -233,19 +234,19 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10]) return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId, - timestamp, maxJobs, + timestamp, maxJobs, markerKey, opts) end else return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId, - timestamp, maxJobs, + timestamp, maxJobs, markerKey, opts) end else jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10]) if jobId then return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId, - timestamp, maxJobs, + timestamp, maxJobs, markerKey, opts) end end diff --git a/src/enums/telemetry-attributes.ts b/src/enums/telemetry-attributes.ts index 112a8a3c9c..c28853df99 100644 --- a/src/enums/telemetry-attributes.ts +++ b/src/enums/telemetry-attributes.ts @@ -13,6 +13,7 @@ export enum TelemetryAttributes { QueueDrainDelay = 'bullmq.queue.drain.delay', QueueGrace = 'bullmq.queue.grace', QueueCleanLimit = 'bullmq.queue.clean.limit', + QueueRateLimit = 'bullmq.queue.rate.limit', JobType = 'bullmq.job.type', QueueOptions = 'bullmq.queue.options', QueueEventMaxLength = 'bullmq.queue.event.max.length', @@ -30,6 +31,7 @@ export enum TelemetryAttributes { JobResult = 'bullmq.job.result', JobFailedReason = 'bullmq.job.failed.reason', FlowName = 'bullmq.flow.name', + JobSchedulerId = 'bullmq.job.scheduler.id', } export enum SpanKind { diff --git a/src/interfaces/repeat-options.ts b/src/interfaces/repeat-options.ts index 67d9450a96..fe71d27d12 100644 --- a/src/interfaces/repeat-options.ts +++ b/src/interfaces/repeat-options.ts @@ -33,9 +33,6 @@ export interface RepeatOptions extends Omit { /** * Repeated job should start right now * ( work only with every settings) - * - * @deprecated - * */ immediately?: boolean; diff --git a/src/interfaces/worker-options.ts b/src/interfaces/worker-options.ts index 13ea4047a4..8cf2d36c8f 100644 --- a/src/interfaces/worker-options.ts +++ b/src/interfaces/worker-options.ts @@ -100,7 +100,6 @@ export interface WorkerOptions extends QueueBaseOptions, SandboxedOptions { skipLockRenewal?: boolean; /** - * * Number of seconds to long poll for jobs when the queue is empty. * * @default 5 @@ -108,7 +107,6 @@ export interface WorkerOptions extends QueueBaseOptions, SandboxedOptions { drainDelay?: number; /** - * * Duration of the lock for the job in milliseconds. The lock represents that * a worker is processing the job. If the lock is lost, the job will be eventually * be picked up by the stalled checker and move back to wait so that another worker diff --git a/src/utils.ts b/src/utils.ts index fda5c356c1..4efa2e3467 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -341,7 +341,7 @@ export async function trace( let messageContext; let dstPropagationMetadata: undefined | string; - if (spanKind === SpanKind.CONSUMER) { + if (spanKind === SpanKind.CONSUMER && parentContext) { messageContext = span.setSpanOnContext(parentContext); } else { messageContext = span.setSpanOnContext(currentContext); diff --git a/src/version.ts b/src/version.ts index 709d8c2f36..f7e6775c35 100644 --- a/src/version.ts +++ b/src/version.ts @@ -1 +1 @@ -export const version = '5.25.6'; +export const version = '5.29.1'; diff --git a/tests/test_bulk.ts b/tests/test_bulk.ts index 5cff0f2122..1d6b8eb4ec 100644 --- a/tests/test_bulk.ts +++ b/tests/test_bulk.ts @@ -1,9 +1,10 @@ import { expect } from 'chai'; import { default as IORedis } from 'ioredis'; +import { after as afterNumExecutions } from 'lodash'; import { after, beforeEach, describe, it, before } from 'mocha'; import { v4 } from 'uuid'; -import { Queue, Worker, Job } from '../src/classes'; -import { removeAllQueueData } from '../src/utils'; +import { Queue, QueueEvents, Worker, Job } from '../src/classes'; +import { removeAllQueueData, delay } from '../src/utils'; describe('bulk jobs', () => { const redisHost = process.env.REDIS_HOST || 'localhost'; @@ -119,6 +120,48 @@ describe('bulk jobs', () => { await removeAllQueueData(new IORedis(redisHost), parentQueueName); }); + it('should keep workers busy', async () => { + const numJobs = 6; + const queue2 = new Queue(queueName, { connection, markerCount: 2, prefix }); + + const queueEvents = new QueueEvents(queueName, { connection, prefix }); + await queueEvents.waitUntilReady(); + + const worker = new Worker( + queueName, + async () => { + await delay(1000); + }, + { connection, prefix }, + ); + const worker2 = new Worker( + queueName, + async () => { + await delay(1000); + }, + { connection, prefix }, + ); + await worker.waitUntilReady(); + await worker2.waitUntilReady(); + + const completed = new Promise(resolve => { + queueEvents.on('completed', afterNumExecutions(numJobs, resolve)); + }); + + const jobs = Array.from(Array(numJobs).keys()).map(index => ({ + name: 'test', + data: { index }, + })); + + await queue2.addBulk(jobs); + + await completed; + await queue2.close(); + await worker.close(); + await worker2.close(); + await queueEvents.close(); + }); + it('should process jobs with custom ids', async () => { const name = 'test'; let processor; diff --git a/tests/test_concurrency.ts b/tests/test_concurrency.ts index 4834aa4bf5..400ae7bbcc 100644 --- a/tests/test_concurrency.ts +++ b/tests/test_concurrency.ts @@ -1,5 +1,11 @@ import { default as IORedis } from 'ioredis'; -import { FlowProducer, QueueEvents, Queue, Worker } from '../src/classes'; +import { + FlowProducer, + QueueEvents, + Queue, + Worker, + RateLimitError, +} from '../src/classes'; import { delay, removeAllQueueData } from '../src/utils'; import { beforeEach, describe, it, after as afterAll } from 'mocha'; import { v4 } from 'uuid'; @@ -156,8 +162,8 @@ describe('Concurrency', () => { queueName, async job => { if (job.attemptsStarted === 1) { - await worker.rateLimit(dynamicLimit); - throw Worker.RateLimitError(); + await queue.rateLimit(dynamicLimit); + throw new RateLimitError(); } }, { @@ -240,8 +246,8 @@ describe('Concurrency', () => { queueName, async job => { if (job.attemptsStarted === 1) { - await worker.rateLimit(dynamicLimit); - throw Worker.RateLimitError(); + await queue.rateLimit(dynamicLimit); + throw new RateLimitError(); } }, { diff --git a/tests/test_flow.ts b/tests/test_flow.ts index 8736904bb6..0369a39988 100644 --- a/tests/test_flow.ts +++ b/tests/test_flow.ts @@ -3286,6 +3286,102 @@ describe('flows', () => { }); }); + describe('when children have delay', () => { + it('moves children to delayed', async () => { + const name = 'child-job'; + const values = [{ idx: 0, bar: 'something' }]; + + const topQueueName = `top-queue-${v4()}`; + + let parentProcessor; + const childrenWorker = new Worker( + queueName, + async (job: Job) => { + await delay(500); + return values[job.data.idx]; + }, + { + connection, + prefix, + }, + ); + + const completed = new Promise((resolve, reject) => { + childrenWorker.on('completed', async function () { + resolve(); + }); + }); + + const processingTop = new Promise((resolve, reject) => [ + (parentProcessor = async (job: Job) => { + try { + const { processed } = await job.getDependencies(); + expect(Object.keys(processed)).to.have.length(1); + + const childrenValues = await job.getChildrenValues(); + + const jobKey = queue.toKey(tree.children[0].job.id); + expect(childrenValues[jobKey]).to.be.deep.equal(values[0]); + expect(processed[jobKey]).to.be.deep.equal(values[0]); + + resolve(); + } catch (err) { + console.error(err); + reject(err); + } + }), + ]); + + const parentWorker = new Worker(topQueueName, parentProcessor, { + connection, + prefix, + }); + + const flow = new FlowProducer({ connection, prefix }); + const tree = await flow.add({ + name: 'root-job', + queueName: topQueueName, + data: {}, + children: [ + { + name, + data: { idx: 0, foo: 'bar' }, + queueName, + opts: { + delay: 2000, + }, + }, + ], + }); + + expect(tree).to.have.property('job'); + expect(tree).to.have.property('children'); + + const { children, job } = tree; + const isWaitingChildren = await job.isWaitingChildren(); + + expect(isWaitingChildren).to.be.true; + expect(children).to.have.length(1); + + expect(children[0].job.id).to.be.ok; + expect(children[0].job.data.foo).to.be.eql('bar'); + + const isDelayed = await children![0].job.isDelayed(); + + expect(isDelayed).to.be.true; + + await completed; + + await childrenWorker.close(); + + await processingTop; + await parentWorker.close(); + await flow.close(); + + await removeAllQueueData(new IORedis(redisHost), topQueueName); + }); + }); + it('should not process parent if child fails', async () => { const name = 'child-job'; diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index 270ad20bdc..f0f8ad5e4d 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -1809,6 +1809,12 @@ describe('Job Scheduler', function () { ); }); + it('should throw an error when not specifying .pattern or .every', async function () { + await expect(queue.upsertJobScheduler('repeat', {})).to.be.rejectedWith( + 'Either .pattern or .every options must be defined for this repeatable job', + ); + }); + it('should throw an error when using .immediately and .startDate simultaneously', async function () { await expect( queue.upsertJobScheduler('repeat', { @@ -1821,6 +1827,20 @@ describe('Job Scheduler', function () { ); }); + it("should return a valid job with the job's options and data passed as the job template", async function () { + const repeatOpts = { + every: 1000, + }; + + const job = await queue.upsertJobScheduler('test', repeatOpts, { + data: { foo: 'bar' }, + }); + + expect(job).to.be.ok; + expect(job!.data.foo).to.be.eql('bar'); + expect(job!.opts.repeat!.every).to.be.eql(1000); + }); + it('should emit a waiting event when adding a repeatable job to the waiting list', async function () { const date = new Date('2017-02-07 9:24:00'); this.clock.setSystemTime(date); diff --git a/tests/test_obliterate.ts b/tests/test_obliterate.ts index 48967bd5df..2358bcf790 100644 --- a/tests/test_obliterate.ts +++ b/tests/test_obliterate.ts @@ -47,6 +47,19 @@ describe('Obliterate', function () { expect(keys.length).to.be.eql(0); }); + it('should obliterate a queue which is empty but has had jobs in the past', async () => { + await queue.waitUntilReady(); + + const job = await queue.add('test', { foo: 'bar' }); + await job.remove(); + + await queue.obliterate(); + + const client = await queue.client; + const keys = await client.keys(`${prefix}:${queue.name}:*`); + expect(keys.length).to.be.eql(0); + }); + it('should obliterate a queue with jobs in different statuses', async () => { await queue.waitUntilReady(); diff --git a/tests/test_rate_limiter.ts b/tests/test_rate_limiter.ts index bb25a9d53c..28e4c5c9ad 100644 --- a/tests/test_rate_limiter.ts +++ b/tests/test_rate_limiter.ts @@ -845,6 +845,64 @@ describe('Rate Limiter', function () { await worker.close(); }); }); + + describe('when removing rate limit', () => { + it('should process jobs normally', async function () { + this.timeout(5000); + + const numJobs = 2; + const dynamicLimit = 10000; + const duration = 1000; + + const ttl = await queue.getRateLimitTtl(); + expect(ttl).to.be.equal(-2); + + const worker = new Worker(queueName, async () => {}, { + autorun: false, + connection, + prefix, + limiter: { + max: 1, + duration, + }, + }); + + await worker.rateLimit(dynamicLimit); + + await queue.removeRateLimitKey(); + const result = new Promise((resolve, reject) => { + queueEvents.on( + 'completed', + // after every job has been completed + after(numJobs, async () => { + try { + const timeDiff = new Date().getTime() - startTime; + expect(timeDiff).to.be.gte((numJobs - 1) * duration); + expect(timeDiff).to.be.lte(numJobs * duration); + resolve(); + } catch (err) { + reject(err); + } + }), + ); + + queueEvents.on('failed', async err => { + reject(err); + }); + }); + + const startTime = new Date().getTime(); + const jobs = Array.from(Array(numJobs).keys()).map(() => ({ + name: 'rate test', + data: {}, + })); + await queue.addBulk(jobs); + + worker.run(); + await result; + await worker.close(); + }); + }); }); describe('when there are more added jobs than max limiter', () => { diff --git a/tests/test_telemetry_interface.ts b/tests/test_telemetry_interface.ts index 13e48cc77d..b598475d86 100644 --- a/tests/test_telemetry_interface.ts +++ b/tests/test_telemetry_interface.ts @@ -2,7 +2,7 @@ import { expect, assert } from 'chai'; import { default as IORedis } from 'ioredis'; import { after, beforeEach, describe, it, before } from 'mocha'; import { v4 } from 'uuid'; -import { FlowProducer, Queue, Worker } from '../src/classes'; +import { FlowProducer, JobScheduler, Queue, Worker } from '../src/classes'; import { removeAllQueueData } from '../src/utils'; import { Telemetry, @@ -234,6 +234,60 @@ describe('Telemetry', () => { }); }); + describe('Queue.upsertJobScheduler', async () => { + it('should correctly interact with telemetry when adding a job scheduler', async () => { + const jobSchedulerId = 'testJobScheduler'; + const data = { foo: 'bar' }; + + await queue.upsertJobScheduler( + jobSchedulerId, + { every: 1000, endDate: Date.now() + 1000 }, + { name: 'repeatable-job', data }, + ); + + const activeContext = telemetryClient.contextManager.active(); + const span = activeContext.getSpan?.() as MockSpan; + expect(span).to.be.an.instanceOf(MockSpan); + expect(span.name).to.equal(`add ${queueName}.repeatable-job`); + expect(span.options?.kind).to.equal(SpanKind.PRODUCER); + expect(span.attributes[TelemetryAttributes.JobSchedulerId]).to.equal( + jobSchedulerId, + ); + expect(span.attributes[TelemetryAttributes.JobId]).to.be.a('string'); + expect(span.attributes[TelemetryAttributes.JobId]).to.include( + `repeat:${jobSchedulerId}:`, + ); + }); + + it('should correctly handle errors and record them in telemetry for upsertJobScheduler', async () => { + const recordExceptionSpy = sinon.spy( + MockSpan.prototype, + 'recordException', + ); + + const errMessage = 'Error creating job'; + + // Force an exception on the job schedulers private method createNextJob + (JobScheduler).prototype.createNextJob = () => { + throw new Error(errMessage); + }; + + try { + await queue.upsertJobScheduler( + 'testJobScheduler', + { every: 1000 }, + { data: { foo: 'bar' } }, + ); + } catch (e) { + assert(recordExceptionSpy.calledOnce); + const recordedError = recordExceptionSpy.firstCall.args[0]; + assert.equal(recordedError.message, errMessage); + } finally { + recordExceptionSpy.restore(); + } + }); + }); + describe('Worker.processJob', async () => { it('should correctly interact with telemetry when processing a job', async () => { const job = await queue.add('testJob', { foo: 'bar' }); diff --git a/tests/test_worker.ts b/tests/test_worker.ts index 75539d0c5b..ad9e8354a4 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -482,16 +482,20 @@ describe('workers', function () { // Add spy to worker.moveToActive const spy = sinon.spy(worker, 'moveToActive'); const bclientSpy = sinon.spy( - await worker.blockingConnection.client, + await (worker as any).blockingConnection.client, 'bzpopmin', ); - for (let i = 0; i < numJobs; i++) { - const job = await queue.add('test', { foo: 'bar' }); - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.eql('bar'); + const jobsData: { name: string; data: any }[] = []; + for (let j = 0; j < numJobs; j++) { + jobsData.push({ + name: 'test', + data: { foo: 'bar' }, + }); } + await queue.addBulk(jobsData); + expect(bclientSpy.callCount).to.be.equal(1); await new Promise((resolve, reject) => { @@ -510,9 +514,17 @@ describe('workers', function () { await worker.close(); }); - it('do not call moveToActive more than number of jobs + 1', async () => { + it('do not call moveToActive more than number of jobs + 2', async () => { const numJobs = 50; let completedJobs = 0; + + const jobs: Promise[] = []; + for (let i = 0; i < numJobs; i++) { + jobs.push(queue.add('test', { foo: 'bar' })); + } + + await Promise.all(jobs); + const worker = new Worker( queueName, async job => { @@ -521,7 +533,6 @@ describe('workers', function () { }, { connection, prefix, concurrency: 100 }, ); - await worker.waitUntilReady(); // Add spy to worker.moveToActive const spy = sinon.spy(worker, 'moveToActive'); @@ -529,14 +540,9 @@ describe('workers', function () { await worker.blockingConnection.client, 'bzpopmin', ); + await worker.waitUntilReady(); - for (let i = 0; i < numJobs; i++) { - const job = await queue.add('test', { foo: 'bar' }); - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.eql('bar'); - } - - expect(bclientSpy.callCount).to.be.equal(1); + expect(bclientSpy.callCount).to.be.equal(0); await new Promise((resolve, reject) => { worker.on('completed', (job: Job, result: any) => { @@ -547,9 +553,11 @@ describe('workers', function () { }); }); + expect(completedJobs).to.be.equal(numJobs); + expect(bclientSpy.callCount).to.be.equal(2); + // Check moveToActive was called numJobs + 2 times expect(spy.callCount).to.be.equal(numJobs + 2); - expect(bclientSpy.callCount).to.be.equal(3); await worker.close(); });