From b4426f84976e5dbfe0f529f7bed8739ad0b09379 Mon Sep 17 00:00:00 2001 From: Eric Prud'hommeaux Date: Thu, 31 May 2018 07:24:05 -0400 Subject: [PATCH] ~ replace Promise.race with embedded timeout --- package.json | 2 +- test/flood-test.js | 62 +++++++++++++++++++++++++++++----------- timeout-promise-queue.js | 39 +++++++++++++------------ 3 files changed, 67 insertions(+), 36 deletions(-) diff --git a/package.json b/package.json index dc77486..83c9e28 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "timeout-promise-queue", - "version": "0.9.1", + "version": "0.9.2", "description": "Promise queue with timeouts and promise cleanup after expiration.", "repository": "https://github.com/ericprud/timeout-promise-queue.git", "keywords": ["timeout", "promise", "queue", "processes"], diff --git a/test/flood-test.js b/test/flood-test.js index ecf2bc1..fac8e7a 100644 --- a/test/flood-test.js +++ b/test/flood-test.js @@ -19,10 +19,15 @@ Tests = [ { processes: 1, sleep: 1000, ok: true }, + { processes: 5, sleep: 100, resolve: {exitCode: 0} , timeout: 200, ok: true }, + { processes: 1, sleep: 10000, timeout: 20, ok: false, // no rejection parameter so expect timeout-promise-queue's default: exceptionPattern: RegExp('^timeout of 20 exceeded$') }, + { processes: 5, sleep: 100, reject: Error('died') , timeout: 200, ok: false, + exceptionPattern: RegExp('^died$') }, + { processes: 1, sleep: 10000, timeout: 20, ok: false, rejection: Error(TestTimeoutMessage), exceptionPattern: RegExp('^' + TestTimeoutMessage + '$') }, @@ -37,27 +42,50 @@ Tests.forEach((test, idx) => { const command = 'setTimeout(() => { process.exit(0); }, ' + test.sleep + ')' // const Command = 'for (let i = 0; i < 2**28; ++i) ;' // The busy alternative. SuiteTimeout += 2 * test.sleep * test.processes / QueueSize - startProcesses(idx, test.processes, command, test.timeout, test.ok, test.rejection) + startProcesses(idx, test, command) ExpectedQueueSize += test.processes }) -function startProcesses (batch, processes, command, timeout, ok, rejection) { - for (let i = 0; i < processes; ++i) { +function startProcesses (batch, test, command) { + for (let i = 0; i < test.processes; ++i) { const label = batch + '-' + i - AllTests.push({ + AllTests.push(Object.assign({ label: label, - ok: ok, start: new Date(), - exec: Queue.add(cancel => new Promise((resolve, reject) => { - let program = child_process.spawn('node', ['-e', command]) - program.on('exit', exitCode => { resolve({exitCode:exitCode}) }) - program.on('error', reject) - if (cancel) - cancel.on('timeout', err => { - program.kill() - reject() - }) - }), timeout, rejection) + exec: Queue.add( + ('resolve' in test || 'reject' in test + ? makeThread(test) + : makeProcess(test)), + test.timeout, test.rejection) + }, test)) // copy guts of test template + } + + function makeProcess (test) { + return cancel => new Promise((resolve, reject) => { + let program = child_process.spawn('node', ['-e', command]) + program.on('exit', exitCode => { resolve({exitCode:exitCode}) }) + program.on('error', reject) + if (cancel) + cancel.on('timeout', err => { + program.kill() + reject(err) + }) + }) + } + + function makeThread (test) { + return cancel => new Promise((resolve, reject) => { + setTimeout(() => { + if ('reject' in test) { + reject(test.reject) + } else { + resolve(test.resolve) + } + }, test.sleep) + if (cancel) + cancel.on('timeout', err => { + reject(err) + }) }) } } @@ -86,7 +114,9 @@ describe('timeout-promise-queue', () => { report(false, e.message) } else { if (test.exceptionPattern) { - if (e.message.match(test.exceptionPattern)) { + if (!e || !e.message) { + report(false, "expected structured error instead of " + e) + } else if (e.message.match(test.exceptionPattern)) { report(true) } else { report(false, "expected \"" + e.message + "\" to match exceptionPattern") diff --git a/timeout-promise-queue.js b/timeout-promise-queue.js index 9e28820..42747bd 100644 --- a/timeout-promise-queue.js +++ b/timeout-promise-queue.js @@ -8,6 +8,8 @@ function PromiseQueue (threshold) { return { /** queue (I/O) functions which return promises. * @pfunc returns a promise + * If add is called with a @timeout, @pfunc will be called with an + * EventEmitter which will emit a 'timeout' if @timeout is exceeded. */ add: function (pfunc, timeout, rejection) { if (++inPlay > threshold) { @@ -21,34 +23,32 @@ function PromiseQueue (threshold) { } function makeTimeout () { - let timer = null - let clientCancellation = new EventEmitter(); - let myCancellation = new EventEmitter(); let ret = timeout === undefined - ? pfunc() - : Promise.race([ - new Promise((resolve, reject) => { - timer = setTimeout(() => { + ? pfunc() + : new Promise((resolve, reject) => { + let clientCancellation = new EventEmitter(); + + // Create a timer to send a cancellation to pfunc()'s promise. + let timer = setTimeout(() => { let r = typeof rejection === 'undefined' ? Error('timeout of ' + timeout + ' exceeded') : typeof rejection === 'function' ? rejection() : rejection clientCancellation.emit('timeout', r); - nextEntry() reject(r) }, timeout) - // pfunc().then emits 'clear' which resolves with pfunc's result. - myCancellation.on('clear', result => resolve(result)) - return timer - }), - pfunc(clientCancellation).then(result => { - myCancellation.emit('clear', result); - clearTimeout(timer) - return result + + // Delete timer after resolution. + resolve(pfunc(clientCancellation).then(result => { + clearTimeout(timer) + return result + }).catch(result => { + clearTimeout(timer) + throw result + })) }) - ]) - return ret.then(nextEntry) + return ret.then(nextEntry).catch(result => {throw nextEntry(result)}) } }, @@ -59,8 +59,9 @@ function PromiseQueue (threshold) { } } + /** After each resolution or rejection, check the queue. + */ function nextEntry (ret) { - // After each resolution, check the queue. --inPlay if (queue.length > 0) { queue.pop()()