Skip to content

Commit

Permalink
add timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
VitoMakarevich committed Mar 29, 2021
1 parent 514736e commit 87fcda6
Show file tree
Hide file tree
Showing 8 changed files with 3,205 additions and 2,714 deletions.
4 changes: 3 additions & 1 deletion bin/qless-js-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ commander
.option('-v, --verbose', 'Increase logging level', increaseVerbosity, 0)
.option('-a, --allow-paths', 'Allow paths for job class names')
.option('-m, --max-memory <max>', 'Maximum memory each process can consume', 'Infinity')
.option('-t, --set-tmpdir', 'Set tmpdir to be qless worker process workdir');
.option('-t, --set-tmpdir', 'Set tmpdir to be qless worker process workdir')
.option('--timeout <ms>', 'Set max job lifetime');

const options = commander.parse(process.argv);

Expand Down Expand Up @@ -88,6 +89,7 @@ const config = {
max: getMaxMemory(),
},
setTmpdir: options.setTmpdir,
timeout: options.timeout !== undefined ? parseInt(options.timeout, 10) : undefined,
};

const worker = options.processes === 1
Expand Down
16 changes: 12 additions & 4 deletions lib/workers/forking.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Worker {
});
this.config = config;
this.stopped = false;
this.timeout = config.timeout;
this.memory = {
max: Infinity,
interval: 60000,
Expand Down Expand Up @@ -96,13 +97,20 @@ class Worker {
return id;
}

return this.pool.exec('work', [config])
const workerExecution = this.pool.exec('work', [config]);
if (this.timeout !== undefined) {
workerExecution.timeout(this.timeout);
}
return workerExecution
.then(() => {
logger.info('Worker %s exited gracefully', id);
return id;
})
.catch((error) => {
logger.error('Worker %s died...', id, error);
}).catch((error) => {
if (error instanceof workerpool.Promise.TimeoutError) {
logger.error('Worker %s killed because of timeout', id, error);
} else {
logger.error('Worker %s died...', id, error);
}
return workForever(id);
});
};
Expand Down
10 changes: 7 additions & 3 deletions lib/workers/multi.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ class Worker {
this.setTmpdir = config.setTmpdir;
}

// eslint-disable-next-line class-methods-use-this
cleanWorkDir(directory) {
logger.warn(`Cleaning ${directory}...`);
rimraf.sync(this.workdir);
logger.info(`Cleaning ${directory}...`);
rimraf.sync(directory);
}

/**
Expand Down Expand Up @@ -71,7 +72,10 @@ class Worker {
*/
run() {
this.cleanWorkDir(this.workdir);
return this.doRun();
}

doRun() {
mkdirp.sync(this.workdir);

if (this.setTmpdir) {
Expand Down Expand Up @@ -100,7 +104,7 @@ class Worker {
});

// Try to run another job, waiting on the semaphore.
return this.run();
return this.doRun();
});
}

Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "qless-js",
"version": "0.4.8",
"version": "0.4.10",
"private": false,
"description": "Qless JavaScript Bindings",
"main": "index.js",
Expand Down
30 changes: 30 additions & 0 deletions test/workers/forking.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ const Promise = require('bluebird').Promise;
const expect = require('expect.js');

const helper = require('../helper.js');
const { Job } = require('../../lib/job.js');
const sinon = require('sinon');
const Client = require('../../lib/client.js');
const Worker = require('../../lib/workers/forking.js');

Expand Down Expand Up @@ -75,6 +77,34 @@ describe('Forking Worker', () => {
]);
});

it('kills worker which runs longer than timeout', async () => {
worker.timeout = 5000;
const klass = {
queue: async job => {
// analogue of endless job, so only 1 way to finish it - by timeout
await Promise.delay(10000000);
return job.complete();
},
};
const disposer = helper.stubDisposer(Job, 'import', sinon.stub());
Promise.using(disposer, (stub) => {
stub.returns(klass);
return queue.put({ klass: 'Klass', jid: 'jid' })
.then(() => worker.run());
});

while (worker.pool.workers.length < worker.config.processes) {
// eslint-disable-next-line no-await-in-loop
await Promise.delay(50);
}
const mocks = worker.pool.workers.map((child) => jest.spyOn(child.worker, 'kill'));

await Promise.delay(6000);
for (const mock of mocks) {
expect(mock.mock.calls.length).to.be.equal(1);
}
}, 15000);

it('can quit workers that consume too much memory', async () => {
worker.memory.max = 0;
const promise = worker.run();
Expand Down
30 changes: 30 additions & 0 deletions test/workers/multi.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,36 @@ describe('Multi Worker', () => {
});
});

it('only cleans the jobs working directory', async () => {
// In this scenario, two jobs need to run, then one job needs to finish
// before the other.
let count = 2;
const results = [];
const klass = {
queue: async (job) => {
await Promise.delay(interval * 5);
results.push(fs.existsSync(job.workdir));
await job.complete();
count -= 1;
if (count === 0) {
return worker.stop();
}
return count;
},
};
const disposer = helper.stubDisposer(Job, 'import', sinon.stub());
await Promise.using(disposer, (stub) => {
stub.returns(klass);
const promise = worker.run();
return queue.put({ klass: 'Klass', jid: 'jid' })
.then(() => Promise.delay(interval * 2))
.then(() => queue.put({ klass: 'Klass', jid: 'jid-2' }))
.then(() => promise);
});

return expect(results).to.eql([true, true]);
});

it('remains saturated with max-concurrency', () => {
// This test will put several jobs in the queue and
const jids = ['one', 'two', 'three', 'four', 'five'];
Expand Down
Loading

0 comments on commit 87fcda6

Please sign in to comment.