Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Excessive Redis Connections and waitUntilFinished Timeout in High-Load Environments #2749

Open
1 task done
dbas-dn opened this issue Sep 2, 2024 · 6 comments
Open
1 task done
Labels
bug Something isn't working

Comments

@dbas-dn
Copy link

dbas-dn commented Sep 2, 2024

Version

v5.12.12

Platform

NodeJS

What happened?

Environment:

Kubernetes: k3s with 3 nodes
Redis: Sentinel configuration with 3 nodes
Description:

Our system is designed to create a large number of queues. We noticed that while it's possible to pass an already established Redis connection for creating Queue, Job, and Worker instances, the QueueEvents class duplicates the passed connection using duplicate().

During our load tests, where we ran between 100-600 queues, each queue created jobs that returned results. We observed that under low load, waitUntilFinished worked as expected, returning the job result. However, under high load conditions, waitUntilFinished failed to return, hanging indefinitely until the TTL expired.

To debug this, I implemented a parallel polling mechanism that ran the scripts.isFinished script, which indicated that the job had indeed reached the completed status and I could retrieve its result, even though waitUntilFinished remained stuck.

Additionally, the excessive number of connections created by QueueEvents due to the duplicate() method led to us exceeding the maximum number of Redis connections, especially under high concurrency.

Workaround:

As a temporary workaround, I replaced waitUntilFinished with periodic execution of scripts.isFinished to check job completion.

Issue Summary:

waitUntilFinished does not return in high-load scenarios, even when the job is completed according to scripts.isFinished.
QueueEvents creates redundant connections by duplicating the Redis connection, which leads to exceeding the maximum number of connections under high concurrency.

How to reproduce.

import { Queue, Worker, QueueEvents, Job } from 'bullmq';
import Redis, { RedisOptions } from 'ioredis';
import pQueue from 'p-queue';
import { setTimeout as sleep } from 'timers/promises';

jest.setTimeout(60000); // Set a longer timeout for high-load testing

describe('BullMQ waitUntilFinished bug reproduction', () => {
  let connectionOptions: RedisOptions;
  let queues: Queue[] = [];
  let workers: Worker[] = [];

  beforeAll(async () => {
    connectionOptions = {
      host: 'localhost', // Replace with your Redis host
      port: 6379,        // Replace with your Redis port
      password: '---', // Replace with your Redis password if any
      maxRetriesPerRequest: null,
      // Add any other Redis options here
    };
  });

  afterAll(async () => {
    await Promise.all([
      ...queues.map(queue => queue.obliterate({ force: true })),
      ...workers.map(worker => worker.close()),
    ]);
  });

  it('should reproduce the waitUntilFinished hang under high load', async () => {
    const concurrency = 20;
    const queueCount = 600;
    const jobCount = 1200;
    const connection = new Redis(connectionOptions);
    // Create multiple queues
    for (let i = 0; i < queueCount; i++) {
      const queue = new Queue(`test-queue-${i}`, { connection });
      const worker = new Worker(
        `test-queue-${i}`,
        async (job: Job) => {
          await sleep(Math.random() * 1000);
          // Simulate some work
          return `Result of ${job.name}`;
        },
        { connection }
      );
      queues.push(queue);
      workers.push(worker);
      await sleep(50);
    }

    const queue1 = new pQueue({ concurrency, autoStart: true });
    const queue2 = new pQueue({ concurrency, autoStart: true });

    // Add jobs to the queues
    const results = [];
    for (let i = 0; i < jobCount; i++) {
      void queue1.add(async () => {
        const queueIndex = i % queueCount;
        const queueEvent = new QueueEvents(`test-queue-${queueIndex}`, { connection });
        const job = await queues[queueIndex].add(`job-${i}`, { foo: 'bar' });
        try {
          const result = await job.waitUntilFinished(queueEvent, 10_000);
          await queueEvent.close();
          results.push(result);
        } catch (e) {
          job['scripts'].isFinished(job.id, true).then(v => {
            console.error(e);
            console.log(v);
          });
        }
      });
    }
    for (let i = 0; i < jobCount; i++) {
      void queue2.add(async () => {
        try {
          const queueIndex = i % queueCount;
          await queues[queueIndex].add(`job-${i}`, { foo: 'bar' });
        } catch (e) {
          console.error(e);
        }
      });
    }
    await Promise.all([queue1.onIdle(), queue2.onIdle()]);
    // Check if all jobs have results (if not, it indicates an issue)
    expect(results.length).toBe(jobCount);
  });
});

Relevant log output

  console.error
    Error: Job wait job-0 timed out before finishing, no finish notification arrived after 10000ms (id=1)
        at onFailed (/node_modules/.pnpm/[email protected]/node_modules/bullmq/src/classes/job.ts:1051:16)
        at /node_modules/.pnpm/[email protected]/node_modules/bullmq/src/classes/job.ts:1035:13
        at Timeout.task (/node_modules/.pnpm/[email protected]/node_modules/jsdom/lib/jsdom/browser/Window.js:520:19)
        at listOnTimeout (node:internal/timers:573:17)
        at processTimers (node:internal/timers:514:7)

      66 |         } catch (e) {
      67 |           job['scripts'].isFinished(job.id, true).then(v => {
    > 68 |             console.error(e);
         |                     ^
      69 |             console.log(v);
      70 |           });
      71 |         }

      at src/_tests_/bullmq-high-load.spec.ts:68:21

  console.log
    [ 1, '"Result of job-0"' ]

      at src/_tests_/bullmq-high-load.spec.ts:69:21

Code of Conduct

  • I agree to follow this project's Code of Conduct
@dbas-dn dbas-dn added the bug Something isn't working label Sep 2, 2024
@dbas-dn
Copy link
Author

dbas-dn commented Sep 16, 2024

any reaction?

@manast
Copy link
Contributor

manast commented Sep 16, 2024

waitUntilFinished is not a recommended api to use in production, it does not scale and is not a proper way to design an architecture based on queues: https://blog.taskforce.sh/do-not-wait-for-your-jobs-to-complete/

@daimalou
Copy link

@manast I am facing the same. I am developing a realtime AI and Image convertion tool. The user use Server Sent Event to push some image to my server, and i push to queue to process using third api(it has QPS) to convert and return to user until the job finished.

I think the waitUntilFinished is very useful when using Server Sent Event.

@manast
Copy link
Contributor

manast commented Sep 25, 2024

@daimalou it may be useful but it is not the proper way to use queues. As I see it, you would be better off just spawning a NodeJS worker thread, run the job and wait it for completion, than using a queue.

@manast
Copy link
Contributor

manast commented Sep 25, 2024

@daimalou btw, SSE are used for sending data from the server to the client, not sure what you mean that you use it for sending images to the server :/. In any case, if you use SSE or web sockets, it does not matter, you can easily communicate to the client when a job has completed without relying on waitUntilFinished, you may need to redesign your solution a bit, but thats the proper way to do it and have a scalable and issue free system that run stable for a long time.

@daimalou
Copy link

@manast yes, my description was a bit unclear. i use https://github.com/Azure/fetch-event-source, it can post some date to server and server response data using SSE.
Thank you. I think what you said is correct. I need to create more logic myself.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants