Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sandbox): add getChildrenValues method #2600

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions src/classes/child-processor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { ParentCommand } from '../enums';
import { SandboxedJob } from '../interfaces';
import { JobJsonSandbox } from '../types';
import {
ChildrenValues,
JobJsonSandbox,
JobJsonSandBoxWithChildrenValues,
} from '../types';
import { errorToJSON } from '../utils';

enum ChildStatus {
Expand Down Expand Up @@ -62,7 +66,10 @@ export class ChildProcessor {
});
}

public async start(jobJson: JobJsonSandbox, token?: string): Promise<void> {
public async start(
jobJson: JobJsonSandBoxWithChildrenValues,
token?: string,
): Promise<void> {
if (this.status !== ChildStatus.Idle) {
return this.send({
cmd: ParentCommand.Error,
Expand All @@ -73,6 +80,7 @@ export class ChildProcessor {
this.currentJobPromise = (async () => {
try {
const job = this.wrapJob(jobJson, this.send);

const result = await this.processor(job, token);
await this.send({
cmd: ParentCommand.Completed,
Expand Down Expand Up @@ -111,7 +119,7 @@ export class ChildProcessor {
* The wrapped job adds back some of those original functions.
*/
protected wrapJob(
job: JobJsonSandbox,
job: JobJsonSandBoxWithChildrenValues,
send: (msg: any) => Promise<void>,
): SandboxedJob {
return {
Expand Down Expand Up @@ -159,6 +167,12 @@ export class ChildProcessor {
value: data,
});
},
/*
* Emulate the real job `getChildrenValue` function.
*/
getChildrenValues: <CT = any>(): ChildrenValues<CT> => {
return job.childrenValues;
},
};
}
}
5 changes: 3 additions & 2 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
JobJsonSandbox,
MinimalQueue,
RedisJobOptions,
ChildrenValues,
} from '../types';
import {
errorObject,
Expand Down Expand Up @@ -826,7 +827,7 @@ export class Job<
*
* @returns Object mapping children job keys with their values.
*/
async getChildrenValues<CT = any>(): Promise<{ [jobKey: string]: CT }> {
async getChildrenValues<CT = any>(): Promise<ChildrenValues<CT>> {
const client = await this.queue.client;

const result = (await client.hgetall(
Expand All @@ -843,7 +844,7 @@ export class Job<
*
* @returns Object mapping children job keys with their failure values.
*/
async getFailedChildrenValues(): Promise<{ [jobKey: string]: string }> {
async getFailedChildrenValues(): Promise<ChildrenValues<string>> {
const client = await this.queue.client;

return client.hgetall(this.toKey(`${this.id}:failed`));
Expand Down
3 changes: 2 additions & 1 deletion src/classes/sandbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ const sandbox = <T, R, N extends string>(
let msgHandler: any;
let exitHandler: any;

const childrenValues = await job.getChildrenValues();
await child.send({
cmd: ChildCommand.Start,
job: job.asJSONSandbox(),
job: { ...job.asJSONSandbox(), childrenValues },
Comment on lines +15 to +18
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be ok, considering parent only starts after all of its children are done. Also, ig its probably ok to assume that children values don't need to be serialized. 🤔

If bullmq ever introduces new functionalities around running parent job before child jobs finishes, this approach will probably produce unexpected behavior since it will only return this last known values (based on when it was started).

token,
});

Expand Down
3 changes: 2 additions & 1 deletion src/interfaces/sandboxed-job.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { JobJsonSandbox, JobsOptions } from '../types';
import { ChildrenValues, JobJsonSandbox, JobsOptions } from '../types';

/**
* @see {@link https://docs.bullmq.io/guide/workers/sandboxed-processors}
Expand All @@ -11,5 +11,6 @@ export interface SandboxedJob<T = any, R = any>
log: (row: any) => void;
updateData: (data: any) => Promise<void>;
updateProgress: (value: object | number) => Promise<void>;
getChildrenValues: <CT = any>() => ChildrenValues<CT>;
returnValue: R;
}
1 change: 1 addition & 0 deletions src/types/children-values.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export type ChildrenValues<T = any> = { [jobKey: string]: T };
1 change: 1 addition & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export * from './job-json-sandbox';
export * from './job-options';
export * from './job-type';
export * from './repeat-strategy';
export * from './children-values';
8 changes: 7 additions & 1 deletion src/types/job-json-sandbox.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import { JobJson, ParentKeys } from '../interfaces';
import { JobJson } from '../interfaces';
import { ChildrenValues } from './children-values';

export type JobJsonSandbox = JobJson & {
queueName: string;
prefix: string;
};

// Maybe this is unnecessary? - Also, this is not a good name :(
export type JobJsonSandBoxWithChildrenValues<T = any> = JobJsonSandbox & {
childrenValues: ChildrenValues<T>;
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/**
* A processor file to be used in tests.
*
*/
'use strict';

module.exports = function (job) {
return { bar: 'oof' };
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/**
* A processor file to be used in tests.
*
*/
'use strict';

module.exports = function (job) {
return { foo: 'bar' };
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/**
* A processor file to be used in tests.
*
*/
'use strict';

module.exports = async function (job) {
return job.getChildrenValues();
};
75 changes: 75 additions & 0 deletions tests/test_sandboxed_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,81 @@ function sandboxProcessTests(
await flow.close();
});

it('should get children values in parent job', async () => {
const parentProcessFile =
__dirname +
'/fixtures/fixture_processor_sandbox_getChildrenValues_parent.js';
const child1ProcessFile =
__dirname +
'/fixtures/fixture_processor_sandbox_getChildrenValues_child_1.js';
const child2ProcessFile =
__dirname +
'/fixtures/fixture_processor_sandbox_getChildrenValues_child_2.js';

const parentQueueName = `parent-queue-${v4()}`;
const child1QueueName = `child-1-queue-${v4()}`;
const child2QueueName = `child-2-queue-${v4()}`;
const parentJobId = `parent-job-${v4()}`;
const child1JobId = `child-1-job-${v4()}`;
const child2JobId = `child-2-job-${v4()}`;

const parentWorker = new Worker(parentQueueName, parentProcessFile, {
connection,
drainDelay: 1,
});
const child1Worker = new Worker(child1QueueName, child1ProcessFile, {
connection,
drainDelay: 1,
});
const child2Worker = new Worker(child2QueueName, child2ProcessFile, {
connection,
drainDelay: 1,
});

const completing = new Promise<void>((resolve, reject) => {
parentWorker.on('completed', async (_job: Job, value: any) => {
try {
expect(value).to.be.eql({
[`bull:${child1QueueName}:${child1JobId}`]: { bar: 'oof' },
[`bull:${child2QueueName}:${child2JobId}`]: { foo: 'bar' },
});
resolve();
} catch (err) {
reject(err);
}
});
});

const flow = new FlowProducer({ connection });
await flow.add({
name: 'parent-job',
queueName: parentQueueName,
data: {},
opts: { jobId: parentJobId },
children: [
{
name: 'child-1-job',
data: {},
queueName: child1QueueName,
opts: { jobId: child1JobId },
},
{
name: 'child-2-job',
data: {},
queueName: child2QueueName,
opts: { jobId: child2JobId },
},
],
});

await completing;

await parentWorker.close();
await child1Worker.close();
await child2Worker.close();
await flow.close();
});

it('should process and fail', async () => {
const processFile = __dirname + '/fixtures/fixture_processor_fail.js';

Expand Down
Loading