Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: version 6 (#2682) #2707

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ Since there are a few job queue solutions, here is a table comparing them:
| Group Support | ✓ | | | | | |
| Batches Support | ✓ | | | | | |
| Parent/Child Dependencies | ✓ | ✓ | | | | |
| Debouncing | ✓ | ✓ | ✓ | | | |
| Deduplication | ✓ | ✓ | ✓ | | | |
| Priorities | ✓ | ✓ | ✓ | ✓ | | ✓ |
| Concurrency | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
| Delayed jobs | ✓ | ✓ | ✓ | ✓ | | ✓ |
Expand Down
3 changes: 2 additions & 1 deletion docs/gitbook/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@
- [Producers](guide/nestjs/producers.md)
- [Queue Events Listeners](guide/nestjs/queue-events-listeners.md)
- [Going to production](guide/going-to-production.md)
- [Migration to newer versions](guide/migration-to-newer-versions.md)
- [Migration to newer versions](guide/migrations/migration-to-newer-versions.md)
- [Version 6](guide/migrations/v6.md)
- [Troubleshooting](guide/troubleshooting.md)

## Patterns
Expand Down
28 changes: 28 additions & 0 deletions docs/gitbook/changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,31 @@
# [5.29.0](https://github.com/taskforcesh/bullmq/compare/v5.28.2...v5.29.0) (2024-11-22)


### Features

* **queue:** refactor a protected addJob method allowing telemetry extensions ([09f2571](https://github.com/taskforcesh/bullmq/commit/09f257196f6d5a6690edbf55f12d585cec86ee8f))

## [5.28.2](https://github.com/taskforcesh/bullmq/compare/v5.28.1...v5.28.2) (2024-11-22)


### Bug Fixes

* **queue:** change _jobScheduler from private to protected for extension ([#2920](https://github.com/taskforcesh/bullmq/issues/2920)) ([34c2348](https://github.com/taskforcesh/bullmq/commit/34c23485bcb32b3c69046b2fb37e5db8927561ce))

## [5.28.1](https://github.com/taskforcesh/bullmq/compare/v5.28.0...v5.28.1) (2024-11-20)


### Bug Fixes

* **scheduler:** use Job class from getter for extension ([#2917](https://github.com/taskforcesh/bullmq/issues/2917)) ([5fbb075](https://github.com/taskforcesh/bullmq/commit/5fbb075dd4abd51cc84a59575261de84e56633d8))

# [5.28.0](https://github.com/taskforcesh/bullmq/compare/v5.27.0...v5.28.0) (2024-11-19)


### Features

* **job-scheduler:** add telemetry support to the job scheduler ([72ea950](https://github.com/taskforcesh/bullmq/commit/72ea950ea251aa12f879ba19c0b5dfeb6a093da2))

# [5.27.0](https://github.com/taskforcesh/bullmq/compare/v5.26.2...v5.27.0) (2024-11-19)


Expand Down
6 changes: 3 additions & 3 deletions docs/gitbook/guide/flows/fail-parent.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

In some situations, you may need to fail a job when _one of its children_ fails.

The pattern to solve this requirement consists of using the **`failParentOnFailure`** option.
The pattern to solve this requirement consists of using the **`onChildFailure`** option as **fail**, this is also our default behavior.

```typescript
const flow = new FlowProducer({ connection });
Expand All @@ -16,13 +16,13 @@ const originalTree = await flow.add({
name,
data: { idx: 0, foo: 'bar' },
queueName: 'childrenQueueName',
opts: { failParentOnFailure: true },
opts: { onChildFailure: 'fail' },
children: [
{
name,
data: { idx: 1, foo: 'bah' },
queueName: 'grandChildrenQueueName',
opts: { failParentOnFailure: true },
opts: { onChildFailure: 'fail' },
},
{
name,
Expand Down
4 changes: 2 additions & 2 deletions docs/gitbook/guide/flows/ignore-dependency.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

In some situations, you may have a parent job and need to ignore when one of its children fail.

The pattern to solve this requirement consists on using the **ignoreDependencyOnFailure** option. This option will make sure that when a job fails, the dependency is ignored from the parent, so the parent will complete without waiting for the failed children.
The pattern to solve this requirement consists on using the **onChildFailure** option as **ignore**. This option will make sure that when a job fails, the dependency is ignored from the parent, so the parent will complete without waiting for the failed children.

```typescript
const flow = new FlowProducer({ connection });
Expand All @@ -16,7 +16,7 @@ const originalTree = await flow.add({
name,
data: { idx: 0, foo: 'bar' },
queueName: 'childrenQueueName',
opts: { ignoreDependencyOnFailure: true },
opts: { onChildFailure: 'ignore' },
children: [
{
name,
Expand Down
4 changes: 2 additions & 2 deletions docs/gitbook/guide/flows/remove-dependency.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

In some situations, you may have a parent job and need to remove the relationship when one of its children fail.

The pattern to solve this requirement consists on using the **removeDependencyOnFailure** option. This option will make sure that when a job fails, the dependency is removed from the parent, so the parent will complete without waiting for the failed children.
The pattern to solve this requirement consists on using the **onChildFailure** option as **remove**. This option will make sure that when a job fails, the dependency is removed from the parent, so the parent will complete without waiting for the failed children.

```typescript
const flow = new FlowProducer({ connection });
Expand All @@ -16,7 +16,7 @@ const originalTree = await flow.add({
name,
data: { idx: 0, foo: 'bar' },
queueName: 'childrenQueueName',
opts: { removeDependencyOnFailure: true },
opts: { onChildFailure: 'remove' },
children: [
{
name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,3 @@ Since BullMQ supports global pause, one possible strategy, if suitable for your
### Use new queues altogether

This drastic solution involves discontinuing use of older queues and creating new ones. You could rename older queues (e.g., "myQueueV2"), use a new Redis host, or maintain two versions of the service—one running an older BullMQ version with old queues, and a newer one with the latest BullMQ and a different set of queues. When the older version has no more jobs to process, it can be retired, leaving only the upgraded version.

25 changes: 25 additions & 0 deletions docs/gitbook/guide/migrations/v6.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
description: Tips and hints on how to migrate to v6.
---

# Migration to v6

Make sure to call **runMigrations** method from Queue class in order to execute all necessary changes when coming from an older version.

## Migration of deprecated paused key

If you have paused queues after upgrading to this version. These jobs will be moved to wait state when initializing any of our instances (Worker, Queue, QueueEvents or FlowProducer).

Paused key is not longer needed as this state is already represented inside meta key. It also improves the process of pausing or resuming a queue as we don't need to rename any key.

## Remove legacy markers

When migrating from versions before v5.
It's recommended to do this process:

1. Pause your queues.
2. Upgrade to v6.
3. Instantiate a Queue instance and execute runMigrations method where migrations will be executed.
4. Resume your queues.

This way you will prevent that your workers pick a legacy marker that is no longer used because new markers are added in a different structure.
12 changes: 7 additions & 5 deletions docs/gitbook/guide/rate-limiting.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,23 @@ await queue.add('rate limited paint', { customerId: 'my-customer-id' });

Sometimes is useful to rate-limit a queue manually instead of based on some static options. For example, you may have an API that returns `429 Too Many Requests`, and you want to rate-limit the queue based on that response.

For this purpose, you can use the worker method **`rateLimit`** like this:
For this purpose, you can use the queue method **`rateLimit`** like this:

```typescript
import { Worker } from 'bullmq';
import { Queue, RateLimitError, Worker } from 'bullmq';

const queue = new Queue('myQueue', { connection });

const worker = new Worker(
'myQueue',
async () => {
const [isRateLimited, duration] = await doExternalCall();
if (isRateLimited) {
await worker.rateLimit(duration);
await queue.rateLimit(duration);
// Do not forget to throw this special exception,
// since we must differentiate this case from a failure
// in order to move the job to wait again.
throw Worker.RateLimitError();
throw new RateLimitError();
}
},
{
Expand Down Expand Up @@ -130,6 +132,6 @@ By removing rate limit key, workers will be able to pick jobs again and your rat

## Read more:

- 💡 [Rate Limit API Reference](https://api.docs.bullmq.io/classes/v5.Worker.html#rateLimit)
- 💡 [Rate Limit API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#rateLimit)
- 💡 [Get Rate Limit Ttl API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#getRateLimitTtl)
- 💡 [Remove Rate Limit Key API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#removeRateLimitKey)
10 changes: 6 additions & 4 deletions docs/gitbook/patterns/stop-retrying-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,23 @@ await queue.add(
When a job is rate limited using `Worker.RateLimitError` and tried again, the `attempts` check is ignored, as rate limiting is not considered a real error. However, if you want to manually check the attempts and avoid retrying the job, you can do the following:

```typescript
import { Worker, UnrecoverableError } from 'bullmq';
import { Queue, RateLimitError, Worker, UnrecoverableError } from 'bullmq';

const queue = new Queue('myQueue', { connection });

const worker = new Worker(
'myQueue',
async job => {
const [isRateLimited, duration] = await doExternalCall();
if (isRateLimited) {
await worker.rateLimit(duration);
await queue.rateLimit(duration);
if (job.attemptsMade >= job.opts.attempts) {
throw new UnrecoverableError('Unrecoverable');
}
// Do not forget to throw this special exception,
// since we must differentiate this case from a failure
// in order to move the job to wait again.
throw Worker.RateLimitError();
throw new RateLimitError();
}
},
{
Expand All @@ -54,4 +56,4 @@ const worker = new Worker(

## Read more:

- 💡 [Rate Limit API Reference](https://api.docs.bullmq.io/classes/v5.Worker.html#rateLimit)
- 💡 [Rate Limit API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#rateLimit)
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bullmq",
"version": "5.27.0",
"version": "5.29.0",
"description": "Queue for messages and jobs based on Redis",
"homepage": "https://bullmq.io/",
"main": "./dist/cjs/index.js",
Expand Down
3 changes: 1 addition & 2 deletions python/bullmq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@


optsDecodeMap = {
'fpof': 'failParentOnFailure',
'idof': 'ignoreDependencyOnFailure',
'ocf': 'onChildFailure',
'kl': 'keepLogs',
}

Expand Down
2 changes: 1 addition & 1 deletion python/bullmq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ async def getJobLogs(self, job_id:str, start = 0, end = -1, asc = True):
"logs": result[0],
"count": result[1]
}

async def obliterate(self, force: bool = False):
"""
Completely destroys the queue and all of its contents irreversibly.
Expand Down
Loading