Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: initial implementation endpoint based workers #7

Merged
merged 12 commits into from
Feb 25, 2024

Conversation

manast
Copy link
Contributor

@manast manast commented Feb 24, 2024

This PR implements a new approach for handling queues and workers based on a standard HTTP Restful api and a webhook based api for processing jobs. More documentation will be available here: https://docs.bullmq.net/ on the following days.

Copy link
Collaborator

@roggervalf roggervalf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm

src/validators/workers.validators.ts Outdated Show resolved Hide resolved
README.md Outdated Show resolved Hide resolved
"@taskforcesh/message-broker": "https://github.com/taskforcesh/message-broker.git#master",
"bullmq": "latest",
"bullmq": "^5.3.2",
"chalk": "^5.3.0",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reasons to use chalk? faster options are available.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chalk is well maintained so thats a plus. Usually I pick "pino" though, as a fast and versatile alternative to console.log, maybe it is the best alternative here as well... 🤔

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on pino, it is awesome. it doesn't work in browsers, but from my understanding you only need it on the server

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can change it in a separate PR as it will not imply a breaking change.

get(key: string) {
if (this.cache.has(key)) {
const value = this.cache.get(key);
if (value) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete and reset on every get is not great from perf perspective.
If you want a faster implementation, you can check out how it's done in https://github.com/kibertoad/toad-cache/blob/main/src/LruMap.js

Copy link
Contributor Author

@manast manast Feb 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it seems like delete+set would be slow, but I have been surprised so many times in the past where the seemly inefficient solution happens to be fast due to JS runtime internals... I would rather make some benchmarks first before changing this to a more complex solution. It could also be the case that this solution allows 1M get calls per second, meaning that a faster "get" will not lead to a noticeable faster "addJob".

Copy link

@kibertoad kibertoad Feb 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've measured difference between toad-cache and tinylru. tinylru does explicit set for LRU bumping on every get, while toad-cache doesn't. Perf difference is 3.8K ops/sec vs 4.2K ops/sec. I expect that explicit delete on top of that should make it significantly slower - delete is a pretty expensive operation in JavaScript: https://stackoverflow.com/questions/27397034/why-is-delete-slow-in-javascript

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe "bun" is super fast, but when testing this:

import { LRUCache } from './src/cache';

const cache = new LRUCache<string>(3);

const start = Date.now();
for (let i = 0; i < 1_000_000; i++) {
    cache.put(`key${i}`, `value${i}`);
}

console.log(`Put: ${Date.now() - start}ms`);

for (let i = 0; i < 1_000_000; i++) {
    cache.get(`key${i}`);
}

console.log(`Get: ${Date.now() - start}ms`);

I get this results:

$ bun bench-cache.ts
Put: 284ms
Get: 428ms

So around 2.3M ops/sec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on an intel i7 from 2018.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I increase the cache size to 3000:

$ bun bench-cache.ts
Put: 291ms
Get: 482ms

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice! yeah, looks like bun is super-optimized for this type of use-cases

// Gracefully close all workers
process.on('exit', async () => {
for (const queueName in workers) {
await workers[queueName].close();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would that work fine if all workers share same connection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The shared connection will not be closed, but all workers also have a dedicated connection for blocking calls, this dedicated connection needs to be closed by calling worker.close().

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come? worker.close() does this:

                .finally(() => client.disconnect())
                .finally(() => this.connection.close())

client here seems to be the shared connection.

(that's actually something that I addressed in taskforcesh/bullmq#2449, having encountered exactly this in our app, workers closing shared connection)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But client in this context is actually a duplication of the client that you pass to the constructor, so it is safe to close it...

      const client =
        this.blockingConnection.status == 'ready'
          ? await this.blockingConnection.client
          : null;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.blockingConnection = new RedisConnection(
      isRedisInstance(opts.connection)
        ? (<Redis>opts.connection).duplicate({ connectionName })
        : { ...opts.connection, connectionName },
      false,
      true,
      opts.skipVersionCheck,
    );

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in fact not just safe, you must close it to avoid a leak.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see, so it effectively always creates a new connection for each worker.

Is there a reason for that? There are limitations for shared connections that wouldn't work for BullMQ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we need connections that block, and a blocked connection can not be shared as no commands will be send until the connection is unblocked.

}, workerEndpoint.timeout || 3000)

try {
const response = await fetch(workerEndpoint.url, {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you sure you don't want to use a lightweight wrapper like wretch to write less boilerplate? or even undici, which is also faster

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. I prefer less dependencies actually unless the savings in code are really meaningful.

init: (redisClient: Redis | Cluster) => {
// Load workers from Redis and start them
debugEnabled && debug('Loading workers from Redis...');
const stream = redisClient.hscanStream(workerMetadataKey, { count: 10 });

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something similar to what we used for queue discovery could be helpful here too:

public static async getActiveQueueIds(redis: Redis): Promise<string[]> {
		await redis.zremrangebyscore(
			QUEUE_IDS_KEY,
			'-inf',
			Date.now() - daysToMilliseconds(RETENTION_QUEUE_IDS_IN_DAYS),
		)
		const queueIds = await redis.zrange(QUEUE_IDS_KEY, 0, -1)
		return queueIds.sort()
	}

	public async start(): Promise<void> {
		await this.redis.zadd(QUEUE_IDS_KEY, Date.now(), this.config.queueId)
}

Copy link
Contributor Author

@manast manast Feb 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the proxy we need to store a complete json object for every queue, so I don't know if ZSET is a good structure for this since it is a nice property of hsets to be able to update a workers options without needing to iterate through all the zset items.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would hscan work well on a big Redis store? Wouldn't it iterate over the whole thing? maybe some key lookup map would be helpful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would only iterate on the given hashmap, and since it is scanning it is not keeping redis busy. Also, this operation is only needed when restarting the proxy. The number of workers should not be very big either...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants