Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(job): add removeChildDependency method #2435

Merged
merged 11 commits into from
Feb 27, 2024
1 change: 1 addition & 0 deletions docs/gitbook/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
* [Fail Parent](guide/flows/fail-parent.md)
* [Remove Dependency](guide/flows/remove-dependency.md)
* [Ignore Dependency](guide/flows/ignore-dependency.md)
* [Remove Child Dependency](guide/flows/remove-child-dependency.md)
* [Metrics](guide/metrics/metrics.md)
* [Rate limiting](guide/rate-limiting.md)
* [Retrying failing jobs](guide/retrying-failing-jobs.md)
Expand Down
31 changes: 31 additions & 0 deletions docs/gitbook/guide/flows/remove-child-dependency.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Remove Child Dependency

In some situations, you may have a parent job and need to remove the dependency of one of its children.

The pattern to solve this requirement consists on using the **removeChildDependency** method. It will make sure that if the job is the last pending child, to move its parent to _waiting_ and it won't be listed in unprocessed list of the parent.

```typescript
const flow = new FlowProducer({ connection });

const originalTree = await flow.add({
name: 'root-job',
queueName: 'topQueueName',
data: {},
children: [
{
name,
data: { idx: 0, foo: 'bar' },
queueName: 'childrenQueueName',
opts: {},
},
],
});

await originalTree.children[0].job.removeChildDependency();
```

{% hint style="waring" %}
As soon as a **child** calls this method, it will verify if it has an existing parent, if not, it'll throw an error.
{% endhint %}

Failed or completed children using this option won't generate any removal as they won't be part of unprocessed list:
26 changes: 25 additions & 1 deletion src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,25 @@ export class Job<
return Job.addJobLog(this.queue, this.id, logRow, this.opts.keepLogs);
}

/**
* Removes child dependency from parent when child is not yet finished
*
* @returns True if the relationship existed and if it was removed.
*/
async removeChildDependency(): Promise<boolean> {
const childDependencyIsRemoved = await this.scripts.removeChildDependency(
this.id,
this.parentKey,
);
if (childDependencyIsRemoved) {
this.parent = undefined;
this.parentKey = undefined;
return true;
}

return false;
}

/**
* Clears job's logs
*
Expand Down Expand Up @@ -704,7 +723,12 @@ export class Job<

const code = results[results.length - 1][1] as number;
if (code < 0) {
throw this.scripts.finishedErrors(code, this.id, command, 'active');
throw this.scripts.finishedErrors({
code,
jobId: this.id,
command,
state: 'active',
});
}

if (finishedOn && typeof finishedOn === 'number') {
Expand Down
129 changes: 107 additions & 22 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,11 @@ export class Scripts {
}

if (<number>result < 0) {
throw this.finishedErrors(<number>result, parentOpts.parentKey, 'addJob');
throw this.finishedErrors({
code: <number>result,
parentKey: parentOpts.parentKey,
command: 'addJob',
});
}

return <string>result;
Expand Down Expand Up @@ -314,7 +318,11 @@ export class Scripts {
const result = await (<any>client).updateData(keys.concat([dataJson]));

if (result < 0) {
throw this.finishedErrors(result, job.id, 'updateData');
throw this.finishedErrors({
code: result,
jobId: job.id,
command: 'updateData',
});
}
}

Expand All @@ -336,7 +344,11 @@ export class Scripts {
);

if (result < 0) {
throw this.finishedErrors(result, jobId, 'updateProgress');
throw this.finishedErrors({
code: result,
jobId,
command: 'updateProgress',
});
}
}

Expand Down Expand Up @@ -414,20 +426,32 @@ export class Scripts {

const result = await (<any>client).moveToFinished(args);
if (result < 0) {
throw this.finishedErrors(result, jobId, 'moveToFinished', 'active');
throw this.finishedErrors({
code: result,
jobId,
command: 'moveToFinished',
state: 'active',
});
} else {
if (typeof result !== 'undefined') {
return raw2NextJobData(result);
}
}
}

finishedErrors(
code: number,
jobId: string,
command: string,
state?: string,
): Error {
finishedErrors({
code,
jobId,
parentKey,
command,
state,
}: {
code: number;
jobId?: string;
parentKey?: string;
command: string;
state?: string;
}): Error {
switch (code) {
case ErrorCode.JobNotExist:
return new Error(`Missing key for job ${jobId}. ${command}`);
Expand All @@ -440,14 +464,14 @@ export class Scripts {
case ErrorCode.JobPendingDependencies:
return new Error(`Job ${jobId} has pending dependencies. ${command}`);
case ErrorCode.ParentJobNotExist:
return new Error(`Missing key for parent job ${jobId}. ${command}`);
return new Error(`Missing key for parent job ${parentKey}. ${command}`);
case ErrorCode.JobLockMismatch:
return new Error(
`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`,
);
case ErrorCode.ParentJobCannotBeReplaced:
return new Error(
`The parent job ${jobId} cannot be replaced. ${command}`,
`The parent job ${parentKey} cannot be replaced. ${command}`,
);
default:
return new Error(`Unknown code ${code} error for ${jobId}. ${command}`);
Expand Down Expand Up @@ -476,6 +500,43 @@ export class Scripts {
return (<any>client).drain(args);
}

private removeChildDependencyArgs(
jobId: string,
parentKey: string,
): (string | number)[] {
const queueKeys = this.queue.keys;

const keys: string[] = [queueKeys['']];

const args = [this.queue.toKey(jobId), parentKey];

return keys.concat(args);
}

async removeChildDependency(
jobId: string,
parentKey: string,
): Promise<boolean> {
const client = await this.queue.client;
const args = this.removeChildDependencyArgs(jobId, parentKey);

const result = await (<any>client).removeChildDependency(args);

switch (result) {
case 0:
return true;
case 1:
return false;
default:
throw this.finishedErrors({
code: result,
jobId,
parentKey,
command: 'removeChildDependency',
});
}
}

private getRangesArgs(
types: JobType[],
start: number,
Expand Down Expand Up @@ -609,7 +670,12 @@ export class Scripts {
const args = this.changeDelayArgs(jobId, delay);
const result = await (<any>client).changeDelay(args);
if (result < 0) {
throw this.finishedErrors(result, jobId, 'changeDelay', 'delayed');
throw this.finishedErrors({
code: result,
jobId,
command: 'changeDelay',
state: 'delayed',
});
}
}

Expand Down Expand Up @@ -652,7 +718,11 @@ export class Scripts {
const args = this.changePriorityArgs(jobId, priority, lifo);
const result = await (<any>client).changePriority(args);
if (result < 0) {
throw this.finishedErrors(result, jobId, 'changePriority');
throw this.finishedErrors({
code: result,
jobId,
command: 'changePriority',
});
}
}

Expand Down Expand Up @@ -766,7 +836,12 @@ export class Scripts {
const args = this.moveToDelayedArgs(jobId, timestamp, token, delay, opts);
const result = await (<any>client).moveToDelayed(args);
if (result < 0) {
throw this.finishedErrors(result, jobId, 'moveToDelayed', 'active');
throw this.finishedErrors({
code: result,
jobId,
command: 'moveToDelayed',
state: 'active',
});
}
}

Expand Down Expand Up @@ -797,12 +872,12 @@ export class Scripts {
case 1:
return false;
default:
throw this.finishedErrors(
result,
throw this.finishedErrors({
code: result,
jobId,
'moveToWaitingChildren',
'active',
);
command: 'moveToWaitingChildren',
state: 'active',
});
}
}

Expand Down Expand Up @@ -939,7 +1014,12 @@ export class Scripts {
case 1:
return;
default:
throw this.finishedErrors(result, job.id, 'reprocessJob', state);
throw this.finishedErrors({
code: result,
jobId: job.id,
command: 'reprocessJob',
state,
});
}
}

Expand Down Expand Up @@ -997,7 +1077,12 @@ export class Scripts {

const code = await (<any>client).promote(keys.concat(args));
if (code < 0) {
throw this.finishedErrors(code, jobId, 'promote', 'delayed');
throw this.finishedErrors({
code,
jobId,
command: 'promote',
state: 'delayed',
});
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/commands/includes/removeParentDependencyKey.lua
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey)
end
end
end
return true
end
else
local missedParentKey = rcall("HGET", jobKey, "parentKey")
Expand Down Expand Up @@ -74,7 +75,9 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey)
end
end
end
return true
end
end
end
return false
end
34 changes: 34 additions & 0 deletions src/commands/removeChildDependency-1.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
--[[
Break parent-child dependency by removing
child reference from parent

Input:
KEYS[1] 'key' prefix,

ARGV[1] job key
ARGV[2] parent key

Output:
0 - OK
1 - There is not relationship.
-1 - Missing job key
-5 - Missing parent key
]]
local rcall = redis.call
local jobKey = ARGV[1]
local parentKey = ARGV[2]

-- Includes
--- @include "includes/removeParentDependencyKey"

if rcall("EXISTS", jobKey) ~= 1 then return -1 end

if rcall("EXISTS", parentKey) ~= 1 then return -5 end

if removeParentDependencyKey(jobKey, false, parentKey, KEYS[1]) then
rcall("HDEL", jobKey, "parentKey", "parent")

return 0
else
return 1
end
Loading
Loading