From 022f7b7d0a0ce14387ed2b9fed791e1f56e34770 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Mon, 11 Nov 2024 16:28:19 +0100 Subject: [PATCH] fix(job-scheculer): avoid hazards when upserting job schedulers concurrently --- .github/workflows/test.yml | 2 +- docs/gitbook/SUMMARY.md | 236 +++++++++++++++++------------------ src/classes/job-scheduler.ts | 57 +++++++-- src/classes/job.ts | 7 +- src/classes/scripts.ts | 5 +- src/utils.ts | 12 ++ tests/test_job_scheduler.ts | 41 ++++++ tests/test_worker.ts | 5 +- 8 files changed, 229 insertions(+), 136 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 95fd97bfbb..92b0945f25 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -123,7 +123,7 @@ jobs: services: dragonflydb: - image: docker.dragonflydb.io/dragonflydb/dragonfly + image: docker.dragonflydb.io/dragonflydb/dragonfly:v1.24.0 env: DFLY_cluster_mode: emulated DFLY_lock_on_hashtags: true diff --git a/docs/gitbook/SUMMARY.md b/docs/gitbook/SUMMARY.md index 58dbb9c43d..32944bac16 100644 --- a/docs/gitbook/SUMMARY.md +++ b/docs/gitbook/SUMMARY.md @@ -1,135 +1,135 @@ # Table of contents -* [What is BullMQ](README.md) -* [Quick Start]() -* [API Reference](https://api.docs.bullmq.io) -* [Changelogs](changelog.md) - * [v4](changelogs/changelog-v4.md) - * [v3](changelogs/changelog-v3.md) - * [v2](changelogs/changelog-v2.md) - * [v1](changelogs/changelog-v1.md) +- [What is BullMQ](README.md) +- [Quick Start]() +- [API Reference](https://api.docs.bullmq.io) +- [Changelogs](changelog.md) + - [v4](changelogs/changelog-v4.md) + - [v3](changelogs/changelog-v3.md) + - [v2](changelogs/changelog-v2.md) + - [v1](changelogs/changelog-v1.md) ## Guide -* [Introduction](guide/introduction.md) -* [Connections](guide/connections.md) -* [Queues](guide/queues/README.md) - * [Auto-removal of jobs](guide/queues/auto-removal-of-jobs.md) - * [Adding jobs in bulk](guide/queues/adding-bulks.md) - * [Global Concurrency](guide/queues/global-concurrency.md) - * [Removing Jobs](guide/queues/removing-jobs.md) -* [Workers](guide/workers/README.md) - * [Auto-removal of jobs](guide/workers/auto-removal-of-jobs.md) - * [Concurrency](guide/workers/concurrency.md) - * [Graceful shutdown](guide/workers/graceful-shutdown.md) - * [Stalled Jobs](guide/workers/stalled-jobs.md) - * [Sandboxed processors](guide/workers/sandboxed-processors.md) - * [Pausing queues](guide/workers/pausing-queues.md) -* [Jobs](guide/jobs/README.md) - * [FIFO](guide/jobs/fifo.md) - * [LIFO](guide/jobs/lifo.md) - * [Job Ids](guide/jobs/job-ids.md) - * [Job Data](guide/jobs/job-data.md) - * [Deduplication](guide/jobs/deduplication.md) - * [Delayed](guide/jobs/delayed.md) - * [Repeatable](guide/jobs/repeatable.md) - * [Prioritized](guide/jobs/prioritized.md) - * [Removing jobs](guide/jobs/removing-job.md) - * [Stalled](guide/jobs/stalled.md) - * [Getters](guide/jobs/getters.md) -* [Job Schedulers](guide/job-schedulers/README.md) - * [Repeat Strategies](guide/job-schedulers/repeat-strategies.md) - * [Repeat options](guide/job-schedulers/repeat-options.md) - * [Manage Job Schedulers](guide/job-schedulers/manage-job-schedulers.md) -* [Flows](guide/flows/README.md) - * [Adding flows in bulk](guide/flows/adding-bulks.md) - * [Get Flow Tree](guide/flows/get-flow-tree.md) - * [Fail Parent](guide/flows/fail-parent.md) - * [Remove Dependency](guide/flows/remove-dependency.md) - * [Ignore Dependency](guide/flows/ignore-dependency.md) - * [Remove Child Dependency](guide/flows/remove-child-dependency.md) -* [Metrics](guide/metrics/metrics.md) -* [Rate limiting](guide/rate-limiting.md) -* [Parallelism and Concurrency](guide/parallelism-and-concurrency.md) -* [Retrying failing jobs](guide/retrying-failing-jobs.md) -* [Returning job data](guide/returning-job-data.md) -* [Events](guide/events/README.md) - * [Create Custom Events](guide/events/create-custom-events.md) -* [Telemetry](guide/telemetry/README.md) - * [Getting started](guide/telemetry/getting-started.md) - * [Running Jaeger](guide/telemetry/running-jaeger.md) - * [Running a simple example](guide/telemetry/running-a-simple-example.md) -* [QueueScheduler](guide/queuescheduler.md) -* [Redis™ Compatibility](guide/redis-tm-compatibility/README.md) - * [Dragonfly](guide/redis-tm-compatibility/dragonfly.md) -* [Redis™ hosting](guide/redis-tm-hosting/README.md) - * [AWS MemoryDB](guide/redis-tm-hosting/aws-memorydb.md) - * [AWS Elasticache](guide/redis-tm-hosting/aws-elasticache.md) -* [Architecture](guide/architecture.md) -* [NestJs](guide/nestjs/README.md) - * [Producers](guide/nestjs/producers.md) - * [Queue Events Listeners](guide/nestjs/queue-events-listeners.md) -* [Going to production](guide/going-to-production.md) -* [Migration to newer versions](guide/migration-to-newer-versions.md) -* [Troubleshooting](guide/troubleshooting.md) +- [Introduction](guide/introduction.md) +- [Connections](guide/connections.md) +- [Queues](guide/queues/README.md) + - [Auto-removal of jobs](guide/queues/auto-removal-of-jobs.md) + - [Adding jobs in bulk](guide/queues/adding-bulks.md) + - [Global Concurrency](guide/queues/global-concurrency.md) + - [Removing Jobs](guide/queues/removing-jobs.md) +- [Workers](guide/workers/README.md) + - [Auto-removal of jobs](guide/workers/auto-removal-of-jobs.md) + - [Concurrency](guide/workers/concurrency.md) + - [Graceful shutdown](guide/workers/graceful-shutdown.md) + - [Stalled Jobs](guide/workers/stalled-jobs.md) + - [Sandboxed processors](guide/workers/sandboxed-processors.md) + - [Pausing queues](guide/workers/pausing-queues.md) +- [Jobs](guide/jobs/README.md) + - [FIFO](guide/jobs/fifo.md) + - [LIFO](guide/jobs/lifo.md) + - [Job Ids](guide/jobs/job-ids.md) + - [Job Data](guide/jobs/job-data.md) + - [Deduplication](guide/jobs/deduplication.md) + - [Delayed](guide/jobs/delayed.md) + - [Repeatable](guide/jobs/repeatable.md) + - [Prioritized](guide/jobs/prioritized.md) + - [Removing jobs](guide/jobs/removing-job.md) + - [Stalled](guide/jobs/stalled.md) + - [Getters](guide/jobs/getters.md) +- [Job Schedulers](guide/job-schedulers/README.md) + - [Repeat Strategies](guide/job-schedulers/repeat-strategies.md) + - [Repeat options](guide/job-schedulers/repeat-options.md) + - [Manage Job Schedulers](guide/job-schedulers/manage-job-schedulers.md) +- [Flows](guide/flows/README.md) + - [Adding flows in bulk](guide/flows/adding-bulks.md) + - [Get Flow Tree](guide/flows/get-flow-tree.md) + - [Fail Parent](guide/flows/fail-parent.md) + - [Remove Dependency](guide/flows/remove-dependency.md) + - [Ignore Dependency](guide/flows/ignore-dependency.md) + - [Remove Child Dependency](guide/flows/remove-child-dependency.md) +- [Metrics](guide/metrics/metrics.md) +- [Rate limiting](guide/rate-limiting.md) +- [Parallelism and Concurrency](guide/parallelism-and-concurrency.md) +- [Retrying failing jobs](guide/retrying-failing-jobs.md) +- [Returning job data](guide/returning-job-data.md) +- [Events](guide/events/README.md) + - [Create Custom Events](guide/events/create-custom-events.md) +- [Telemetry](guide/telemetry/README.md) + - [Getting started](guide/telemetry/getting-started.md) + - [Running Jaeger](guide/telemetry/running-jaeger.md) + - [Running a simple example](guide/telemetry/running-a-simple-example.md) +- [QueueScheduler](guide/queuescheduler.md) +- [Redis™ Compatibility](guide/redis-tm-compatibility/README.md) + - [Dragonfly](guide/redis-tm-compatibility/dragonfly.md) +- [Redis™ hosting](guide/redis-tm-hosting/README.md) + - [AWS MemoryDB](guide/redis-tm-hosting/aws-memorydb.md) + - [AWS Elasticache](guide/redis-tm-hosting/aws-elasticache.md) +- [Architecture](guide/architecture.md) +- [NestJs](guide/nestjs/README.md) + - [Producers](guide/nestjs/producers.md) + - [Queue Events Listeners](guide/nestjs/queue-events-listeners.md) +- [Going to production](guide/going-to-production.md) +- [Migration to newer versions](guide/migration-to-newer-versions.md) +- [Troubleshooting](guide/troubleshooting.md) ## Patterns -* [Adding jobs in bulk across different queues](patterns/adding-bulks.md) -* [Manually processing jobs](patterns/manually-fetching-jobs.md) -* [Named Processor](patterns/named-processor.md) -* [Flows](patterns/flows.md) -* [Idempotent jobs](patterns/idempotent-jobs.md) -* [Throttle jobs](patterns/throttle-jobs.md) -* [Process Step Jobs](patterns/process-step-jobs.md) -* [Failing fast when Redis is down](patterns/failing-fast-when-redis-is-down.md) -* [Stop retrying jobs](patterns/stop-retrying-jobs.md) -* [Timeout jobs](patterns/timeout-jobs.md) -* [Redis Cluster](patterns/redis-cluster.md) +- [Adding jobs in bulk across different queues](patterns/adding-bulks.md) +- [Manually processing jobs](patterns/manually-fetching-jobs.md) +- [Named Processor](patterns/named-processor.md) +- [Flows](patterns/flows.md) +- [Idempotent jobs](patterns/idempotent-jobs.md) +- [Throttle jobs](patterns/throttle-jobs.md) +- [Process Step Jobs](patterns/process-step-jobs.md) +- [Failing fast when Redis is down](patterns/failing-fast-when-redis-is-down.md) +- [Stop retrying jobs](patterns/stop-retrying-jobs.md) +- [Timeout jobs](patterns/timeout-jobs.md) +- [Redis Cluster](patterns/redis-cluster.md) ## BullMQ Pro -* [Introduction](bullmq-pro/introduction.md) -* [Install](bullmq-pro/install.md) -* [Observables](bullmq-pro/observables/README.md) - * [Cancelation](bullmq-pro/observables/cancelation.md) -* [Groups](bullmq-pro/groups/README.md) - * [Getters](bullmq-pro/groups/getters.md) - * [Rate limiting](bullmq-pro/groups/rate-limiting.md) - * [Concurrency](bullmq-pro/groups/concurrency.md) - * [Local group concurrency](bullmq-pro/groups/local-group-concurrency.md) - * [Max group size](bullmq-pro/groups/max-group-size.md) - * [Pausing groups](bullmq-pro/groups/pausing-groups.md) - * [Prioritized intra-groups](bullmq-pro/groups/prioritized.md) - * [Sandboxes for groups](bullmq-pro/groups/sandboxes-for-groups.md) -* [Batches](bullmq-pro/batches.md) -* [NestJs](bullmq-pro/nestjs/README.md) - * [Producers](bullmq-pro/nestjs/producers.md) - * [Queue Events Listeners](bullmq-pro/nestjs/queue-events-listeners.md) - * [API Reference](https://nestjs.bullmq.pro/) - * [Changelog](bullmq-pro/nestjs/changelog.md) -* [API Reference](https://api.bullmq.pro) -* [Changelog](bullmq-pro/changelog.md) -* [Support](bullmq-pro/support.md) +- [Introduction](bullmq-pro/introduction.md) +- [Install](bullmq-pro/install.md) +- [Observables](bullmq-pro/observables/README.md) + - [Cancelation](bullmq-pro/observables/cancelation.md) +- [Groups](bullmq-pro/groups/README.md) + - [Getters](bullmq-pro/groups/getters.md) + - [Rate limiting](bullmq-pro/groups/rate-limiting.md) + - [Concurrency](bullmq-pro/groups/concurrency.md) + - [Local group concurrency](bullmq-pro/groups/local-group-concurrency.md) + - [Max group size](bullmq-pro/groups/max-group-size.md) + - [Pausing groups](bullmq-pro/groups/pausing-groups.md) + - [Prioritized intra-groups](bullmq-pro/groups/prioritized.md) + - [Sandboxes for groups](bullmq-pro/groups/sandboxes-for-groups.md) +- [Batches](bullmq-pro/batches.md) +- [NestJs](bullmq-pro/nestjs/README.md) + - [Producers](bullmq-pro/nestjs/producers.md) + - [Queue Events Listeners](bullmq-pro/nestjs/queue-events-listeners.md) + - [API Reference](https://nestjs.bullmq.pro/) + - [Changelog](bullmq-pro/nestjs/changelog.md) +- [API Reference](https://api.bullmq.pro) +- [Changelog](bullmq-pro/changelog.md) +- [Support](bullmq-pro/support.md) ## Bull -* [Introduction](bull/introduction.md) -* [Install](bull/install.md) -* [Quick Guide](bull/quick-guide.md) -* [Important Notes](bull/important-notes.md) -* [Reference](https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md) -* [Patterns](bull/patterns/README.md) - * [Persistent connections](bull/patterns/persistent-connections.md) - * [Message queue](bull/patterns/message-queue.md) - * [Returning Job Completions](bull/patterns/returning-job-completions.md) - * [Reusing Redis Connections](bull/patterns/reusing-redis-connections.md) - * [Redis cluster](bull/patterns/redis-cluster.md) - * [Custom backoff strategy](bull/patterns/custom-backoff-strategy.md) - * [Debugging](bull/patterns/debugging.md) - * [Manually fetching jobs](bull/patterns/manually-fetching-jobs.md) +- [Introduction](bull/introduction.md) +- [Install](bull/install.md) +- [Quick Guide](bull/quick-guide.md) +- [Important Notes](bull/important-notes.md) +- [Reference](https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md) +- [Patterns](bull/patterns/README.md) + - [Persistent connections](bull/patterns/persistent-connections.md) + - [Message queue](bull/patterns/message-queue.md) + - [Returning Job Completions](bull/patterns/returning-job-completions.md) + - [Reusing Redis Connections](bull/patterns/reusing-redis-connections.md) + - [Redis cluster](bull/patterns/redis-cluster.md) + - [Custom backoff strategy](bull/patterns/custom-backoff-strategy.md) + - [Debugging](bull/patterns/debugging.md) + - [Manually fetching jobs](bull/patterns/manually-fetching-jobs.md) ## Python -* [Introduction](python/introduction.md) -* [Changelog](python/changelog.md) +- [Introduction](python/introduction.md) +- [Changelog](python/changelog.md) diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index 7c0e6596b7..c974d4a631 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -89,23 +89,31 @@ export class JobScheduler extends QueueBase { nextMillis = await this.repeatStrategy(now, repeatOpts, jobName); } + const multi = (await this.client).multi(); if (nextMillis) { if (override) { - await this.scripts.addJobScheduler(jobSchedulerId, nextMillis, { - name: jobName, - endDate: endDate ? new Date(endDate).getTime() : undefined, - tz: repeatOpts.tz, - pattern, - every, - }); + await this.scripts.addJobScheduler( + (multi) as RedisClient, + jobSchedulerId, + nextMillis, + { + name: jobName, + endDate: endDate ? new Date(endDate).getTime() : undefined, + tz: repeatOpts.tz, + pattern, + every, + }, + ); } else { await this.scripts.updateJobSchedulerNextMillis( + (multi) as RedisClient, jobSchedulerId, nextMillis, ); } - return this.createNextJob( + const job = this.createNextJob( + (multi) as RedisClient, jobName, nextMillis, jobSchedulerId, @@ -113,10 +121,26 @@ export class JobScheduler extends QueueBase { jobData, iterationCount, ); + + const results = await multi.exec(); // multi.exec returns an array of results [ err, result ][] + + // Check if there are any errors + const erroredResult = results.find(result => result[0]); + if (erroredResult) { + throw new Error( + `Error upserting job scheduler ${jobSchedulerId} - ${erroredResult[0]}`, + ); + } + + // Get last result with the job id + const lastResult = results.pop(); + job.id = lastResult[1] as string; + return job; } } - private async createNextJob( + private createNextJob( + client: RedisClient, name: N, nextMillis: number, jobSchedulerId: string, @@ -146,7 +170,10 @@ export class JobScheduler extends QueueBase { mergedOpts.repeat = { ...opts.repeat, count: currentCount }; - return this.Job.create(this, name, data, mergedOpts); + const job = new Job(this, name, data, mergedOpts, jobId); + job.addJob(client); + + return job; } async removeJobScheduler(jobSchedulerId: string): Promise { @@ -268,3 +295,13 @@ export const defaultRepeatStrategy = ( // Ignore error } }; + +function removeUndefinedFields(obj: Record) { + const newObj: Record = {}; + for (const key in obj) { + if (obj[key] !== undefined) { + newObj[key] = obj[key]; + } + } + return newObj; +} diff --git a/src/classes/job.ts b/src/classes/job.ts index 313bc283d0..b63e5cdf49 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -29,6 +29,7 @@ import { lengthInUtf8Bytes, parseObjectValues, tryCatch, + removeUndefinedFields, } from '../utils'; import { Backoffs } from './backoffs'; import { Scripts, raw2NextJobData } from './scripts'; @@ -464,11 +465,11 @@ export class Job< * @returns */ asJSON(): JobJson { - return { + return removeUndefinedFields({ id: this.id, name: this.name, data: JSON.stringify(typeof this.data === 'undefined' ? {} : this.data), - opts: this.optsAsJSON(this.opts), + opts: removeUndefinedFields(this.optsAsJSON(this.opts)), parent: this.parent ? { ...this.parent } : undefined, parentKey: this.parentKey, progress: this.progress, @@ -483,7 +484,7 @@ export class Job< deduplicationId: this.deduplicationId, repeatJobKey: this.repeatJobKey, returnvalue: JSON.stringify(this.returnvalue), - }; + }); } private optsAsJSON(opts: JobsOptions = {}): RedisJobOptions { diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 7e42f15ed6..c9156b291d 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -308,12 +308,12 @@ export class Scripts { } async addJobScheduler( + client: RedisClient, jobSchedulerId: string, nextMillis: number, opts: RepeatableOptions, ): Promise { const queueKeys = this.queue.keys; - const client = await this.queue.client; const keys: (string | number | Buffer)[] = [ queueKeys.repeat, @@ -339,11 +339,10 @@ export class Scripts { } async updateJobSchedulerNextMillis( + client: RedisClient, jobSchedulerId: string, nextMillis: number, ): Promise { - const client = await this.queue.client; - return client.zadd(this.queue.keys.repeat, nextMillis, jobSchedulerId); } diff --git a/src/utils.ts b/src/utils.ts index a8b0ff071b..fda5c356c1 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -270,6 +270,18 @@ export const toString = (value: any): string => { export const QUEUE_EVENT_SUFFIX = ':qe'; +export function removeUndefinedFields>( + obj: Record, +) { + const newObj: any = {}; + for (const key in obj) { + if (obj[key] !== undefined) { + newObj[key] = obj[key]; + } + } + return newObj as T; +} + /** * Wraps the code with telemetry and provides a span for configuration. * diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index eeda764ead..270ad20bdc 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -199,6 +199,47 @@ describe('Job Scheduler', function () { }); }); + describe('when clocks are slightly out of sync', function () { + it('should create only one delayed job', async function () { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + + const scheduler1 = queue.upsertJobScheduler( + 'test-scheduler1', + { + every: 100, + }, + { opts: { prevMillis: Date.now() } }, + ); + + this.clock.tick(1); + const scheduler2 = queue.upsertJobScheduler( + 'test-scheduler1', + { + every: 100, + }, + { opts: { prevMillis: Date.now() } }, + ); + + this.clock.tick(1); + const scheduler3 = queue.upsertJobScheduler( + 'test-scheduler1', + { + every: 100, + }, + { opts: { prevMillis: Date.now() } }, + ); + + await Promise.all([scheduler1, scheduler2, scheduler3]); + + const repeatableJobs = await queue.getJobSchedulers(); + expect(repeatableJobs.length).to.be.eql(1); + + const delayed = await queue.getDelayed(); + expect(delayed).to.have.length(1); + }); + }); + it('should create job schedulers with different cron patterns', async function () { const crons = [ '10 * * * * *', diff --git a/tests/test_worker.ts b/tests/test_worker.ts index d8542039a9..75539d0c5b 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -4499,7 +4499,10 @@ describe('workers', function () { }); it('should retrieve concurrency from getter', async () => { - const worker = new Worker(queueName, async () => {}, { connection, concurrency: 100 }); + const worker = new Worker(queueName, async () => {}, { + connection, + concurrency: 100, + }); worker.concurrency = 10; expect(worker.concurrency).to.equal(10);