Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flow): support passing a chain option #2949

Draft
wants to merge 29 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7c46cd8
feat(flow): support passing a chain id
roggervalf Dec 4, 2024
c671708
refactor: save chainKey instead of just id
roggervalf Dec 4, 2024
53ea54c
chore: fix waiting-children key reference
roggervalf Dec 4, 2024
7bf0d96
chore: replace cid in favor of chkey
roggervalf Dec 4, 2024
e4e14aa
refactor: get job attributes ones
roggervalf Dec 4, 2024
8005194
refactor: consider parent opts
roggervalf Dec 4, 2024
62c9316
chore: attempt to fix flaky test
roggervalf Dec 4, 2024
ab04911
test: debug test case
roggervalf Dec 4, 2024
2a87dcf
test: second attempt
roggervalf Dec 4, 2024
4eab58b
fix: clear timeout using clearTimeout instead of clearInterval
roggervalf Dec 4, 2024
3b92897
chore: upgrade sinon
roggervalf Dec 4, 2024
3fc3813
chore: upgrade sinon
roggervalf Dec 4, 2024
41be2bc
chore: upgrade sinon
roggervalf Dec 4, 2024
772e277
chore: restore sinon version
roggervalf Dec 4, 2024
7687688
chore: 3rd attempt to fix tests
roggervalf Dec 4, 2024
aee5ded
chore: 4rd attempt to fix tests
roggervalf Dec 4, 2024
73b8e33
test: 5 attempt
roggervalf Dec 4, 2024
f7b2199
chore: skip test to verify
roggervalf Dec 4, 2024
6d75103
test: 6 attempt
roggervalf Dec 4, 2024
24d1140
test: 7 attempt
roggervalf Dec 5, 2024
c8e5f78
chore: debug
roggervalf Dec 5, 2024
9a92054
chore: debug2
roggervalf Dec 5, 2024
9bdde83
chore: 8 attempt
roggervalf Dec 5, 2024
60233cb
chore: 9 attempt
roggervalf Dec 5, 2024
152b2d6
chore: 9 attempt
roggervalf Dec 5, 2024
81d6c5e
chore: pass shouldClearNativeTimers as true
roggervalf Dec 5, 2024
51ba438
chore: 10 attempt
roggervalf Dec 5, 2024
8891c84
chore: 11 attempt
roggervalf Dec 5, 2024
37c202d
chore: fix tests with timing
roggervalf Dec 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
"@types/msgpack": "^0.0.31",
"@types/node": "^12.20.25",
"@types/semver": "^7.3.9",
"@types/sinon": "^7.5.2",
"@types/sinon": "^10.0.13",
"@types/uuid": "^3.4.10",
"@typescript-eslint/eslint-plugin": "^4.32.0",
"@typescript-eslint/parser": "^5.33.0",
Expand Down Expand Up @@ -112,7 +112,7 @@
"rimraf": "^3.0.2",
"rrule": "^2.6.9",
"semantic-release": "^19.0.3",
"sinon": "^15.1.0",
"sinon": "^18.0.1",
"test-console": "^2.0.0",
"ts-mocha": "^10.0.0",
"ts-node": "^10.7.0",
Expand Down
12 changes: 6 additions & 6 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
self.redisConnection = redisConnection
self.redisClient = redisConnection.conn
self.commands = {
"addStandardJob": self.redisClient.register_script(self.getScript("addStandardJob-8.lua")),
"addDelayedJob": self.redisClient.register_script(self.getScript("addDelayedJob-6.lua")),
"addStandardJob": self.redisClient.register_script(self.getScript("addStandardJob-9.lua")),
"addDelayedJob": self.redisClient.register_script(self.getScript("addDelayedJob-7.lua")),
"addParentJob": self.redisClient.register_script(self.getScript("addParentJob-4.lua")),
"addPrioritizedJob": self.redisClient.register_script(self.getScript("addPrioritizedJob-8.lua")),
"addPrioritizedJob": self.redisClient.register_script(self.getScript("addPrioritizedJob-9.lua")),
"changePriority": self.redisClient.register_script(self.getScript("changePriority-7.lua")),
"cleanJobsInSet": self.redisClient.register_script(self.getScript("cleanJobsInSet-3.lua")),
"extendLock": self.redisClient.register_script(self.getScript("extendLock-2.lua")),
Expand Down Expand Up @@ -119,7 +119,7 @@ def addStandardJob(self, job: Job, timestamp: int, pipe = None):
Add a standard job to the queue
"""
keys = self.getKeys(['wait', 'paused', 'meta', 'id',
'completed', 'active', 'events', 'marker'])
'completed', 'active', 'events', 'marker', 'waiting-children'])
args = self.addJobArgs(job, None)
args.append(timestamp)

Expand All @@ -130,7 +130,7 @@ def addDelayedJob(self, job: Job, timestamp: int, pipe = None):
Add a delayed job to the queue
"""
keys = self.getKeys(['marker', 'meta', 'id',
'delayed', 'completed', 'events'])
'delayed', 'completed', 'events', 'waiting-children'])
args = self.addJobArgs(job, None)
args.append(timestamp)

Expand All @@ -141,7 +141,7 @@ def addPrioritizedJob(self, job: Job, timestamp: int, pipe = None):
Add a prioritized job to the queue
"""
keys = self.getKeys(['marker', 'meta', 'id',
'prioritized', 'completed', 'active', 'events', 'pc'])
'prioritized', 'completed', 'active', 'events', 'pc', 'waiting-children'])
args = self.addJobArgs(job, None)
args.append(timestamp)

Expand Down
17 changes: 16 additions & 1 deletion src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
removeUndefinedFields,
optsAsJSON,
optsFromJSON,
getChainKey,
} from '../utils';
import { Backoffs } from './backoffs';
import { Scripts, raw2NextJobData } from './scripts';
Expand Down Expand Up @@ -148,6 +149,11 @@ export class Job<
*/
deduplicationId?: string;

/**
* Chain key.
*/
chainKey?: string;

/**
* Base repeat job key.
*/
Expand Down Expand Up @@ -217,6 +223,10 @@ export class Job<
? opts.deduplication.id
: this.debounceId;

this.chainKey = opts.chain
? getChainKey(opts.chain, queue.qualifiedName)
: undefined;

this.toKey = queue.toKey.bind(queue);
this.setScripts();

Expand Down Expand Up @@ -340,6 +350,10 @@ export class Job<
job.repeatJobKey = json.rjk;
}

if (json.chkey) {
job.chainKey = json.chkey;
}

if (json.deid) {
job.debounceId = json.deid;
job.deduplicationId = json.deid;
Expand Down Expand Up @@ -447,6 +461,7 @@ export class Job<
timestamp: this.timestamp,
failedReason: JSON.stringify(this.failedReason),
stacktrace: JSON.stringify(this.stacktrace),
chainKey: this.chainKey,
debounceId: this.debounceId,
deduplicationId: this.deduplicationId,
repeatJobKey: this.repeatJobKey,
Expand Down Expand Up @@ -1062,7 +1077,7 @@ export class Job<
this.queue.on('closing', onFailed);

const removeListeners = () => {
clearInterval(timeout);
clearTimeout(timeout);
queueEvents.removeListener(completedEvent, onCompleted);
queueEvents.removeListener(failedEvent, onFailed);
this.queue.removeListener('closing', onFailed);
Expand Down
1 change: 1 addition & 0 deletions src/classes/queue-keys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export class QueueKeys {
'pc', // priority counter key
'marker', // marker key
'de', // deduplication key
'ch', // chains key
].forEach(key => {
keys[key] = this.toKey(name, key);
});
Expand Down
11 changes: 11 additions & 0 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,17 @@ export class Queue<
return !removed;
}

/**
* Removes a chain key.
*
* @param id - identifier
*/
async removeChainKey(id: string): Promise<number> {
const client = await this.client;

return client.del(`${this.keys.ch}:${id}`);
}

/**
* Removes a debounce key.
* @deprecated use removeDeduplicationKey
Expand Down
16 changes: 13 additions & 3 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ export class Scripts {
queueKeys.delayed,
queueKeys.completed,
queueKeys.events,
queueKeys['waiting-children'],
];

keys.push(pack(args), job.data, encodedOpts);
Expand All @@ -121,6 +122,7 @@ export class Scripts {
queueKeys.active,
queueKeys.events,
queueKeys.pc,
queueKeys['waiting-children'],
];

keys.push(pack(args), job.data, encodedOpts);
Expand Down Expand Up @@ -161,6 +163,7 @@ export class Scripts {
queueKeys.active,
queueKeys.events,
queueKeys.marker,
queueKeys['waiting-children'],
];

keys.push(pack(args), job.data, encodedOpts);
Expand All @@ -176,9 +179,15 @@ export class Scripts {
): Promise<string> {
const queueKeys = this.queue.keys;

const parent: Record<string, any> = job.parent
? { ...job.parent, fpof: opts.fpof, rdof: opts.rdof, idof: opts.idof }
: null;
const parent: Record<string, any> =
job.parent || job.chainKey
? {
...(job.parent ? job.parent : {}),
fpof: opts.fpof,
rdof: opts.rdof,
idof: opts.idof,
}
: null;

const args = [
queueKeys[''],
Expand All @@ -191,6 +200,7 @@ export class Scripts {
parent,
job.repeatJobKey,
job.deduplicationId ? `${queueKeys.de}:${job.deduplicationId}` : null,
job.chainKey ? job.chainKey : null,
];

let encodedOpts;
Expand Down
14 changes: 12 additions & 2 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ export class Worker<
this.blockUntil = await this.waiting;

if (this.blockUntil <= 0 || this.blockUntil - Date.now() < 1) {
return this.moveToActive(client, token, this.opts.name);
return await this.moveToActive(client, token, this.opts.name);
}
} catch (err) {
// Swallow error if locally paused or closing since we did force a disconnection
Expand All @@ -620,7 +620,17 @@ export class Worker<
this.abortDelayController,
);
}
return this.moveToActive(client, token, this.opts.name);
try {
return await this.moveToActive(client, token, this.opts.name);
} catch (err) {
// Swallow error if locally paused or closing since we did force a disconnection
if (
!(this.paused || this.closing) &&
isNotConnectionError(<Error>err)
) {
throw err;
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
KEYS[4] 'delayed'
KEYS[5] 'completed'
KEYS[6] events stream key
KEYS[7] 'waiting-children'

ARGV[1] msgpacked arguments array
[1] key prefix,
Expand All @@ -26,6 +27,7 @@
[8] parent? {id, queueKey}
[9] repeat job key
[10] deduplication key
[11] chain key

ARGV[2] Json stringified job data
ARGV[3] msgpacked options
Expand Down Expand Up @@ -53,20 +55,26 @@ local parentKey = args[5]
local parent = args[8]
local repeatJobKey = args[9]
local deduplicationKey = args[10]
local chainKey = args[11]
local parentData

-- Includes
--- @include "includes/addDelayMarkerIfNeeded"
--- @include "includes/addJobInWaitingChildren"
--- @include "includes/deduplicateJob"
--- @include "includes/getDelayedScore"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/handleDuplicatedJob"
--- @include "includes/storeJob"
--- @include "includes/updateParentInLastJobInChain"
--- @include "includes/upsertChainKeyIfNeeded"

if parentKey ~= nil then
if rcall("EXISTS", parentKey) ~= 1 then return -5 end
if rcall("EXISTS", parentKey) ~= 1 then return -5 end
end

parentData = cjson.encode(parent)
if parent ~= nil then
parentData = cjson.encode(parent)
end

local jobCounter = rcall("INCR", idKey)
Expand Down Expand Up @@ -99,21 +107,32 @@ end
-- Store the job.
local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2],
opts, timestamp, parentKey, parentData,
repeatJobKey)
repeatJobKey, chainKey)

local lastJobKeyInChain = upsertChainKeyIfNeeded(chainKey, jobIdKey)

if lastJobKeyInChain then
updateParentInLastJobInChain(lastJobKeyInChain, jobIdKey, jobId)

local score, delayedTimestamp = getDelayedScore(delayedKey, timestamp, tonumber(delay))
local waitingChildrenKey = KEYS[7]

addJobInWaitingChildren(waitingChildrenKey, jobIdKey .. ":dependencies", lastJobKeyInChain, jobId,
timestamp, eventsKey, maxEvents)
else
local score, delayedTimestamp = getDelayedScore(delayedKey, timestamp, tonumber(delay))

rcall("ZADD", delayedKey, score, jobId)
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed",
"jobId", jobId, "delay", delayedTimestamp)
rcall("ZADD", delayedKey, score, jobId)
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed",
"jobId", jobId, "delay", delayedTimestamp)

-- mark that a delayed job is available
local markerKey = KEYS[1]
addDelayMarkerIfNeeded(markerKey, delayedKey)
-- mark that a delayed job is available
local markerKey = KEYS[1]
addDelayMarkerIfNeeded(markerKey, delayedKey)

-- Check if this job is a child of another job, if so add it to the parents dependencies
if parentDependenciesKey ~= nil then
-- Check if this job is a child of another job, if so add it to the parents dependencies
if parentDependenciesKey ~= nil then
rcall("SADD", parentDependenciesKey, jobIdKey)
end
end

return jobId .. "" -- convert to string
12 changes: 4 additions & 8 deletions src/commands/addParentJob-4.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ local deduplicationKey = args[10]
local parentData

-- Includes
--- @include "includes/addJobInWaitingChildren"
--- @include "includes/deduplicateJob"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/handleDuplicatedJob"
Expand Down Expand Up @@ -91,14 +92,9 @@ end
storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp,
parentKey, parentData, repeatJobKey)

local waitChildrenKey = args[6]
rcall("ZADD", waitChildrenKey, timestamp, jobId)
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
"waiting-children", "jobId", jobId)
local waitingChildrenKey = args[6]

-- Check if this job is a child of another job, if so add it to the parents dependencies
if parentDependenciesKey ~= nil then
rcall("SADD", parentDependenciesKey, jobIdKey)
end
addJobInWaitingChildren(waitingChildrenKey, parentDependenciesKey, jobIdKey, jobId,
timestamp, eventsKey, maxEvents)

return jobId .. "" -- convert to string
Loading
Loading