Skip to content

Commit

Permalink
add timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
VitoMakarevich committed Mar 29, 2021
1 parent fe0ac58 commit 6f088c4
Show file tree
Hide file tree
Showing 5 changed files with 3,167 additions and 2,710 deletions.
4 changes: 3 additions & 1 deletion bin/qless-js-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ commander
.option('-v, --verbose', 'Increase logging level', increaseVerbosity, 0)
.option('-a, --allow-paths', 'Allow paths for job class names')
.option('-m, --max-memory <max>', 'Maximum memory each process can consume', 'Infinity')
.option('-t, --set-tmpdir', 'Set tmpdir to be qless worker process workdir');
.option('-t, --set-tmpdir', 'Set tmpdir to be qless worker process workdir')
.option('--timeout <ms>', 'Set max job lifetime');

const options = commander.parse(process.argv);

Expand Down Expand Up @@ -88,6 +89,7 @@ const config = {
max: getMaxMemory(),
},
setTmpdir: options.setTmpdir,
timeout: options.timeout !== undefined ? parseInt(options.timeout, 10) : undefined,
};

const worker = options.processes === 1
Expand Down
16 changes: 12 additions & 4 deletions lib/workers/forking.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Worker {
});
this.config = config;
this.stopped = false;
this.timeout = config.timeout;
this.memory = {
max: Infinity,
interval: 60000,
Expand Down Expand Up @@ -96,13 +97,20 @@ class Worker {
return id;
}

return this.pool.exec('work', [config])
const workerExecution = this.pool.exec('work', [config]);
if (this.timeout !== undefined) {
workerExecution.timeout(this.timeout);
}
return workerExecution
.then(() => {
logger.info('Worker %s exited gracefully', id);
return id;
})
.catch((error) => {
logger.error('Worker %s died...', id, error);
}).catch((error) => {
if (error instanceof workerpool.Promise.TimeoutError) {
logger.error('Worker %s killed because of timeout', id, error);
} else {
logger.error('Worker %s died...', id, error);
}
return workForever(id);
});
};
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "qless-js",
"version": "0.4.9",
"version": "0.4.10",
"private": false,
"description": "Qless JavaScript Bindings",
"main": "index.js",
Expand Down
30 changes: 30 additions & 0 deletions test/workers/forking.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ const Promise = require('bluebird').Promise;
const expect = require('expect.js');

const helper = require('../helper.js');
const { Job } = require('../../lib/job.js');
const sinon = require('sinon');
const Client = require('../../lib/client.js');
const Worker = require('../../lib/workers/forking.js');

Expand Down Expand Up @@ -75,6 +77,34 @@ describe('Forking Worker', () => {
]);
});

it('kills worker which runs longer than timeout', async () => {
worker.timeout = 5000;
const klass = {
queue: async job => {
// analogue of endless job, so only 1 way to finish it - by timeout
await Promise.delay(10000000);
return job.complete();
},
};
const disposer = helper.stubDisposer(Job, 'import', sinon.stub());
Promise.using(disposer, (stub) => {
stub.returns(klass);
return queue.put({ klass: 'Klass', jid: 'jid' })
.then(() => worker.run());
});

while (worker.pool.workers.length < worker.config.processes) {
// eslint-disable-next-line no-await-in-loop
await Promise.delay(50);
}
const mocks = worker.pool.workers.map((child) => jest.spyOn(child.worker, 'kill'));

await Promise.delay(6000);
for (const mock of mocks) {
expect(mock.mock.calls.length).to.be.equal(1);
}
}, 15000);

it('can quit workers that consume too much memory', async () => {
worker.memory.max = 0;
const promise = worker.run();
Expand Down
Loading

0 comments on commit 6f088c4

Please sign in to comment.