Skip to content

Commit

Permalink
feat(job): add sizeLimit option when creating a job
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored May 11, 2021
1 parent 81bbf4a commit f10aeeb
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 37 deletions.
12 changes: 11 additions & 1 deletion src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Pipeline } from 'ioredis';
import { debuglog } from 'util';
import { RetryErrors } from '../enums';
import { BackoffOptions, JobsOptions, WorkerOptions } from '../interfaces';
import { errorObject, isEmpty, tryCatch } from '../utils';
import { errorObject, isEmpty, lengthInUtf8Bytes, tryCatch } from '../utils';
import { getParentKey } from './flow-producer';
import { QueueEvents } from './queue-events';
import { Backoffs } from './backoffs';
Expand Down Expand Up @@ -601,6 +601,16 @@ export class Job<T = any, R = any, N extends string = string> {

const jobData = this.asJSON();

const exceedLimit =
this.opts.sizeLimit &&
lengthInUtf8Bytes(jobData.data) > this.opts.sizeLimit;

if (exceedLimit) {
throw new Error(
`The size of job ${this.name} exceeds the limit ${this.opts.sizeLimit} bytes`,
);
}

return Scripts.addJob(
client,
queue,
Expand Down
5 changes: 5 additions & 0 deletions src/interfaces/jobs-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,9 @@ export interface JobsOptions {
* Internal property used by repeatable jobs.
*/
prevMillis?: number;

/**
* Limits the size in bytes of the job's data payload (as a JSON serialized string).
*/
sizeLimit?: number;
}
101 changes: 67 additions & 34 deletions src/test/test_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ const sinon = require('sinon');
chai.use(chaiAsPromised);
const expect = chai.expect;

describe('Job', function () {
describe('Job', function() {
let queue: Queue;
let queueName: string;

beforeEach(async function () {
beforeEach(async function() {
queueName = 'test-' + v4();
queue = new Queue(queueName);
});
Expand All @@ -33,21 +33,21 @@ describe('Job', function () {
await removeAllQueueData(new IORedis(), queueName);
});

describe('.create', function () {
describe('.create', function() {
const timestamp = 1234567890;
let job: Job;
let data: any;
let opts: JobsOptions;

beforeEach(async function () {
beforeEach(async function() {
data = { foo: 'bar' };
opts = { timestamp };

const createdJob = await Job.create(queue, 'test', data, opts);
job = createdJob;
});

it('saves the job in redis', async function () {
it('saves the job in redis', async function() {
const storedJob = await Job.fromId(queue, job.id);
expect(storedJob).to.have.property('id');
expect(storedJob).to.have.property('data');
Expand All @@ -57,13 +57,46 @@ describe('Job', function () {
expect(storedJob.opts.timestamp).to.be.equal(timestamp);
});

it('should use the custom jobId if one is provided', async function () {
it('should use the custom jobId if one is provided', async function() {
const customJobId = 'customjob';
const createdJob = await Job.create(queue, 'test', data, {
jobId: customJobId,
});
expect(createdJob.id).to.be.equal(customJobId);
});

it('should set default size limit and succeed in creating job', async () => {
const data = { foo: 'bar' }; // 13 bytes
const opts = { sizeLimit: 20 };
const createdJob = await Job.create(queue, 'test', data, opts);
expect(createdJob).to.not.be.null;
expect(createdJob).to.have.property('opts');
expect(createdJob.opts.sizeLimit).to.be.equal(20);
});

it('should set default size limit and fail due to size limit exception', async () => {
const data = { foo: 'bar' }; // 13 bytes
const opts = { sizeLimit: 12 };
await expect(Job.create(queue, 'test', data, opts)).to.be.rejectedWith(
`The size of job test exceeds the limit ${opts.sizeLimit} bytes`,
);
});

it('should set default size limit with non-ascii data and fail due to size limit exception', async () => {
const data = { foo: 'βÅ®' }; // 16 bytes
const opts = { sizeLimit: 15 };
await expect(Job.create(queue, 'test', data, opts)).to.be.rejectedWith(
`The size of job test exceeds the limit ${opts.sizeLimit} bytes`,
);
});

it('should set custom job id and default size limit and fail due to size limit exception', async () => {
const data = { foo: 'bar' }; // 13 bytes
const opts = { sizeLimit: 12, jobId: 'customJobId' };
await expect(Job.create(queue, 'test', data, opts)).to.be.rejectedWith(
`The size of job test exceeds the limit ${opts.sizeLimit} bytes`,
);
});
});

describe('JSON.stringify', () => {
Expand Down Expand Up @@ -118,8 +151,8 @@ describe('Job', function () {
});
});

describe('.update', function () {
it('should allow updating job data', async function () {
describe('.update', function() {
it('should allow updating job data', async function() {
const job = await Job.create<{ foo?: string; baz?: string }>(
queue,
'test',
Expand All @@ -132,8 +165,8 @@ describe('Job', function () {
});
});

describe('.remove', function () {
it('removes the job from redis', async function () {
describe('.remove', function() {
it('removes the job from redis', async function() {
const job = await Job.create(queue, 'test', { foo: 'bar' });
await job.remove();
const storedJob = await Job.fromId(queue, job.id);
Expand All @@ -143,15 +176,15 @@ describe('Job', function () {

// TODO: Add more remove tests

describe('.progress', function () {
it('can set and get progress as number', async function () {
describe('.progress', function() {
it('can set and get progress as number', async function() {
const job = await Job.create(queue, 'test', { foo: 'bar' });
await job.updateProgress(42);
const storedJob = await Job.fromId(queue, job.id);
expect(storedJob.progress).to.be.equal(42);
});

it('can set and get progress as object', async function () {
it('can set and get progress as object', async function() {
const job = await Job.create(queue, 'test', { foo: 'bar' });
await job.updateProgress({ total: 120, completed: 40 });
const storedJob = await Job.fromId(queue, job.id);
Expand All @@ -177,8 +210,8 @@ describe('Job', function () {
});
});

describe('.moveToCompleted', function () {
it('marks the job as completed and returns new job', async function () {
describe('.moveToCompleted', function() {
it('marks the job as completed and returns new job', async function() {
const worker = new Worker(queueName);
const token = 'my-token';
await Job.create(queue, 'test', { foo: 'bar' });
Expand All @@ -200,7 +233,7 @@ describe('Job', function () {
* Verify moveToFinished use default value for opts.maxLenEvents
* if it does not exist in meta key (or entire meta key is missing).
*/
it('should not fail if queue meta key is missing', async function () {
it('should not fail if queue meta key is missing', async function() {
const worker = new Worker(queueName);
const token = 'my-token';
await Job.create(queue, 'test', { color: 'red' });
Expand Down Expand Up @@ -271,8 +304,8 @@ describe('Job', function () {
});
});

describe('.moveToFailed', function () {
it('marks the job as failed', async function () {
describe('.moveToFailed', function() {
it('marks the job as failed', async function() {
const worker = new Worker(queueName);
const token = 'my-token';
await Job.create(queue, 'test', { foo: 'bar' });
Expand All @@ -287,7 +320,7 @@ describe('Job', function () {
await worker.close();
});

it('moves the job to wait for retry if attempts are given', async function () {
it('moves the job to wait for retry if attempts are given', async function() {
const queueEvents = new QueueEvents(queueName);
await queueEvents.waitUntilReady();

Expand Down Expand Up @@ -318,7 +351,7 @@ describe('Job', function () {
await queueEvents.close();
});

it('marks the job as failed when attempts made equal to attempts given', async function () {
it('marks the job as failed when attempts made equal to attempts given', async function() {
const worker = new Worker(queueName);
const token = 'my-token';
await Job.create(queue, 'test', { foo: 'bar' }, { attempts: 1 });
Expand All @@ -333,7 +366,7 @@ describe('Job', function () {
await worker.close();
});

it('moves the job to delayed for retry if attempts are given and backoff is non zero', async function () {
it('moves the job to delayed for retry if attempts are given and backoff is non zero', async function() {
const worker = new Worker(queueName);
const token = 'my-token';
await Job.create(
Expand All @@ -355,7 +388,7 @@ describe('Job', function () {
await worker.close();
});

it('applies stacktrace limit on failure', async function () {
it('applies stacktrace limit on failure', async function() {
const worker = new Worker(queueName);
const token = 'my-token';
const stackTraceLimit = 1;
Expand All @@ -376,7 +409,7 @@ describe('Job', function () {
await worker.close();
});

it('saves error stacktrace', async function () {
it('saves error stacktrace', async function() {
const worker = new Worker(queueName);
const token = 'my-token';
await Job.create(queue, 'test', { foo: 'bar' });
Expand Down Expand Up @@ -408,7 +441,7 @@ describe('Job', function () {
expect(isWaiting).to.be.equal(true);
});

it('should process a promoted job according to its priority', async function () {
it('should process a promoted job according to its priority', async function() {
const queueScheduler = new QueueScheduler(queueName);
await queueScheduler.waitUntilReady();

Expand Down Expand Up @@ -662,19 +695,19 @@ describe('Job', function () {
});
*/

describe('.finished', function () {
describe('.finished', function() {
let queueEvents: QueueEvents;

beforeEach(async function () {
beforeEach(async function() {
queueEvents = new QueueEvents(queueName);
await queueEvents.waitUntilReady();
});

afterEach(async function () {
afterEach(async function() {
await queueEvents.close();
});

it('should resolve when the job has been completed', async function () {
it('should resolve when the job has been completed', async function() {
const worker = new Worker(queueName, async job => 'qux');

const job = await queue.add('test', { foo: 'bar' });
Expand All @@ -686,7 +719,7 @@ describe('Job', function () {
await worker.close();
});

it('should resolve when the job has been completed and return object', async function () {
it('should resolve when the job has been completed and return object', async function() {
const worker = new Worker(queueName, async job => ({ resultFoo: 'bar' }));

const job = await queue.add('test', { foo: 'bar' });
Expand All @@ -699,7 +732,7 @@ describe('Job', function () {
await worker.close();
});

it('should resolve when the job has been delayed and completed and return object', async function () {
it('should resolve when the job has been delayed and completed and return object', async function() {
const worker = new Worker(queueName, async job => {
await delay(300);
return { resultFoo: 'bar' };
Expand All @@ -715,7 +748,7 @@ describe('Job', function () {
await worker.close();
});

it('should resolve when the job has been completed and return string', async function () {
it('should resolve when the job has been completed and return string', async function() {
const worker = new Worker(queueName, async job => 'a string');

const job = await queue.add('test', { foo: 'bar' });
Expand All @@ -728,7 +761,7 @@ describe('Job', function () {
await worker.close();
});

it('should reject when the job has been failed', async function () {
it('should reject when the job has been failed', async function() {
const worker = new Worker(queueName, async job => {
await delay(500);
throw new Error('test error');
Expand All @@ -746,7 +779,7 @@ describe('Job', function () {
await worker.close();
});

it('should resolve directly if already processed', async function () {
it('should resolve directly if already processed', async function() {
const worker = new Worker(queueName, async job => ({ resultFoo: 'bar' }));

const job = await queue.add('test', { foo: 'bar' });
Expand All @@ -760,7 +793,7 @@ describe('Job', function () {
await worker.close();
});

it('should reject directly if already processed', async function () {
it('should reject directly if already processed', async function() {
const worker = new Worker(queueName, async job => {
throw new Error('test error');
});
Expand Down
13 changes: 11 additions & 2 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,18 @@ export function tryCatch(fn: (...args: any) => any, ctx: any, args: any[]) {
}
}

export function isEmpty(obj: object) {
/**
* Checks the size of string for ascii/non-ascii characters
* (Reference: https://stackoverflow.com/a/23318053/1347170)
* @param {string} str
*/
export function lengthInUtf8Bytes(str: string): number {
return Buffer.byteLength(str, 'utf8');
}

export function isEmpty(obj: object): boolean {
for (const key in obj) {
if (obj.hasOwnProperty(key)) {
if (Object.prototype.hasOwnProperty.call(obj, key)) {
return false;
}
}
Expand Down

0 comments on commit f10aeeb

Please sign in to comment.