-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: initial implementation endpoint based workers #7
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm
"@taskforcesh/message-broker": "https://github.com/taskforcesh/message-broker.git#master", | ||
"bullmq": "latest", | ||
"bullmq": "^5.3.2", | ||
"chalk": "^5.3.0", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reasons to use chalk? faster options are available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chalk is well maintained so thats a plus. Usually I pick "pino" though, as a fast and versatile alternative to console.log, maybe it is the best alternative here as well... 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on pino, it is awesome. it doesn't work in browsers, but from my understanding you only need it on the server
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can change it in a separate PR as it will not imply a breaking change.
get(key: string) { | ||
if (this.cache.has(key)) { | ||
const value = this.cache.get(key); | ||
if (value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete and reset on every get is not great from perf perspective.
If you want a faster implementation, you can check out how it's done in https://github.com/kibertoad/toad-cache/blob/main/src/LruMap.js
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it seems like delete+set would be slow, but I have been surprised so many times in the past where the seemly inefficient solution happens to be fast due to JS runtime internals... I would rather make some benchmarks first before changing this to a more complex solution. It could also be the case that this solution allows 1M get calls per second, meaning that a faster "get" will not lead to a noticeable faster "addJob".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've measured difference between toad-cache and tinylru. tinylru does explicit set
for LRU bumping on every get
, while toad-cache doesn't. Perf difference is 3.8K ops/sec vs 4.2K ops/sec. I expect that explicit delete
on top of that should make it significantly slower - delete is a pretty expensive operation in JavaScript: https://stackoverflow.com/questions/27397034/why-is-delete-slow-in-javascript
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe "bun" is super fast, but when testing this:
import { LRUCache } from './src/cache';
const cache = new LRUCache<string>(3);
const start = Date.now();
for (let i = 0; i < 1_000_000; i++) {
cache.put(`key${i}`, `value${i}`);
}
console.log(`Put: ${Date.now() - start}ms`);
for (let i = 0; i < 1_000_000; i++) {
cache.get(`key${i}`);
}
console.log(`Get: ${Date.now() - start}ms`);
I get this results:
$ bun bench-cache.ts
Put: 284ms
Get: 428ms
So around 2.3M ops/sec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on an intel i7 from 2018.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I increase the cache size to 3000:
$ bun bench-cache.ts
Put: 291ms
Get: 482ms
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice! yeah, looks like bun is super-optimized for this type of use-cases
// Gracefully close all workers | ||
process.on('exit', async () => { | ||
for (const queueName in workers) { | ||
await workers[queueName].close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would that work fine if all workers share same connection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. The shared connection will not be closed, but all workers also have a dedicated connection for blocking calls, this dedicated connection needs to be closed by calling worker.close().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How come? worker.close()
does this:
.finally(() => client.disconnect())
.finally(() => this.connection.close())
client
here seems to be the shared connection.
(that's actually something that I addressed in taskforcesh/bullmq#2449, having encountered exactly this in our app, workers closing shared connection)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But client in this context is actually a duplication of the client that you pass to the constructor, so it is safe to close it...
const client =
this.blockingConnection.status == 'ready'
? await this.blockingConnection.client
: null;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.blockingConnection = new RedisConnection(
isRedisInstance(opts.connection)
? (<Redis>opts.connection).duplicate({ connectionName })
: { ...opts.connection, connectionName },
false,
true,
opts.skipVersionCheck,
);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in fact not just safe, you must close it to avoid a leak.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see, so it effectively always creates a new connection for each worker.
Is there a reason for that? There are limitations for shared connections that wouldn't work for BullMQ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we need connections that block, and a blocked connection can not be shared as no commands will be send until the connection is unblocked.
}, workerEndpoint.timeout || 3000) | ||
|
||
try { | ||
const response = await fetch(workerEndpoint.url, { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you sure you don't want to use a lightweight wrapper like wretch to write less boilerplate? or even undici, which is also faster
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really. I prefer less dependencies actually unless the savings in code are really meaningful.
init: (redisClient: Redis | Cluster) => { | ||
// Load workers from Redis and start them | ||
debugEnabled && debug('Loading workers from Redis...'); | ||
const stream = redisClient.hscanStream(workerMetadataKey, { count: 10 }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe something similar to what we used for queue discovery could be helpful here too:
public static async getActiveQueueIds(redis: Redis): Promise<string[]> {
await redis.zremrangebyscore(
QUEUE_IDS_KEY,
'-inf',
Date.now() - daysToMilliseconds(RETENTION_QUEUE_IDS_IN_DAYS),
)
const queueIds = await redis.zrange(QUEUE_IDS_KEY, 0, -1)
return queueIds.sort()
}
public async start(): Promise<void> {
await this.redis.zadd(QUEUE_IDS_KEY, Date.now(), this.config.queueId)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the proxy we need to store a complete json object for every queue, so I don't know if ZSET is a good structure for this since it is a nice property of hsets to be able to update a workers options without needing to iterate through all the zset items.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would hscan work well on a big Redis store? Wouldn't it iterate over the whole thing? maybe some key lookup map would be helpful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would only iterate on the given hashmap, and since it is scanning it is not keeping redis busy. Also, this operation is only needed when restarting the proxy. The number of workers should not be very big either...
This PR implements a new approach for handling queues and workers based on a standard HTTP Restful api and a webhook based api for processing jobs. More documentation will be available here: https://docs.bullmq.net/ on the following days.