Skip to content

Commit

Permalink
~ replace Promise.race with embedded timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
Eric Prud'hommeaux committed May 31, 2018
1 parent 0711a78 commit b4426f8
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 36 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "timeout-promise-queue",
"version": "0.9.1",
"version": "0.9.2",
"description": "Promise queue with timeouts and promise cleanup after expiration.",
"repository": "https://github.com/ericprud/timeout-promise-queue.git",
"keywords": ["timeout", "promise", "queue", "processes"],
Expand Down
62 changes: 46 additions & 16 deletions test/flood-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@ Tests = [

{ processes: 1, sleep: 1000, ok: true },

{ processes: 5, sleep: 100, resolve: {exitCode: 0} , timeout: 200, ok: true },

{ processes: 1, sleep: 10000, timeout: 20, ok: false,
// no rejection parameter so expect timeout-promise-queue's default:
exceptionPattern: RegExp('^timeout of 20 exceeded$') },

{ processes: 5, sleep: 100, reject: Error('died') , timeout: 200, ok: false,
exceptionPattern: RegExp('^died$') },

{ processes: 1, sleep: 10000, timeout: 20, ok: false,
rejection: Error(TestTimeoutMessage),
exceptionPattern: RegExp('^' + TestTimeoutMessage + '$') },
Expand All @@ -37,27 +42,50 @@ Tests.forEach((test, idx) => {
const command = 'setTimeout(() => { process.exit(0); }, ' + test.sleep + ')'
// const Command = 'for (let i = 0; i < 2**28; ++i) ;' // The busy alternative.
SuiteTimeout += 2 * test.sleep * test.processes / QueueSize
startProcesses(idx, test.processes, command, test.timeout, test.ok, test.rejection)
startProcesses(idx, test, command)
ExpectedQueueSize += test.processes
})

function startProcesses (batch, processes, command, timeout, ok, rejection) {
for (let i = 0; i < processes; ++i) {
function startProcesses (batch, test, command) {
for (let i = 0; i < test.processes; ++i) {
const label = batch + '-' + i
AllTests.push({
AllTests.push(Object.assign({
label: label,
ok: ok,
start: new Date(),
exec: Queue.add(cancel => new Promise((resolve, reject) => {
let program = child_process.spawn('node', ['-e', command])
program.on('exit', exitCode => { resolve({exitCode:exitCode}) })
program.on('error', reject)
if (cancel)
cancel.on('timeout', err => {
program.kill()
reject()
})
}), timeout, rejection)
exec: Queue.add(
('resolve' in test || 'reject' in test
? makeThread(test)
: makeProcess(test)),
test.timeout, test.rejection)
}, test)) // copy guts of test template
}

function makeProcess (test) {
return cancel => new Promise((resolve, reject) => {
let program = child_process.spawn('node', ['-e', command])
program.on('exit', exitCode => { resolve({exitCode:exitCode}) })
program.on('error', reject)
if (cancel)
cancel.on('timeout', err => {
program.kill()
reject(err)
})
})
}

function makeThread (test) {
return cancel => new Promise((resolve, reject) => {
setTimeout(() => {
if ('reject' in test) {
reject(test.reject)
} else {
resolve(test.resolve)
}
}, test.sleep)
if (cancel)
cancel.on('timeout', err => {
reject(err)
})
})
}
}
Expand Down Expand Up @@ -86,7 +114,9 @@ describe('timeout-promise-queue', () => {
report(false, e.message)
} else {
if (test.exceptionPattern) {
if (e.message.match(test.exceptionPattern)) {
if (!e || !e.message) {
report(false, "expected structured error instead of " + e)
} else if (e.message.match(test.exceptionPattern)) {
report(true)
} else {
report(false, "expected \"" + e.message + "\" to match exceptionPattern")
Expand Down
39 changes: 20 additions & 19 deletions timeout-promise-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ function PromiseQueue (threshold) {
return {
/** queue (I/O) functions which return promises.
* @pfunc returns a promise
* If add is called with a @timeout, @pfunc will be called with an
* EventEmitter which will emit a 'timeout' if @timeout is exceeded.
*/
add: function (pfunc, timeout, rejection) {
if (++inPlay > threshold) {
Expand All @@ -21,34 +23,32 @@ function PromiseQueue (threshold) {
}

function makeTimeout () {
let timer = null
let clientCancellation = new EventEmitter();
let myCancellation = new EventEmitter();
let ret = timeout === undefined
? pfunc()
: Promise.race([
new Promise((resolve, reject) => {
timer = setTimeout(() => {
? pfunc()
: new Promise((resolve, reject) => {
let clientCancellation = new EventEmitter();

// Create a timer to send a cancellation to pfunc()'s promise.
let timer = setTimeout(() => {
let r = typeof rejection === 'undefined'
? Error('timeout of ' + timeout + ' exceeded')
: typeof rejection === 'function'
? rejection()
: rejection
clientCancellation.emit('timeout', r);
nextEntry()
reject(r)
}, timeout)
// pfunc().then emits 'clear' which resolves with pfunc's result.
myCancellation.on('clear', result => resolve(result))
return timer
}),
pfunc(clientCancellation).then(result => {
myCancellation.emit('clear', result);
clearTimeout(timer)
return result

// Delete timer after resolution.
resolve(pfunc(clientCancellation).then(result => {
clearTimeout(timer)
return result
}).catch(result => {
clearTimeout(timer)
throw result
}))
})
])
return ret.then(nextEntry)
return ret.then(nextEntry).catch(result => {throw nextEntry(result)})
}
},

Expand All @@ -59,8 +59,9 @@ function PromiseQueue (threshold) {
}
}

/** After each resolution or rejection, check the queue.
*/
function nextEntry (ret) {
// After each resolution, check the queue.
--inPlay
if (queue.length > 0) {
queue.pop()()
Expand Down

0 comments on commit b4426f8

Please sign in to comment.