From 91f1f036288509b0034eaa5ed4f26d203b32ac29 Mon Sep 17 00:00:00 2001 From: Javier Mendiara Date: Wed, 18 May 2022 14:34:11 +0200 Subject: [PATCH] fix: event unsubscription (#5) --- __tests__/taskqueue.spec.ts | 93 +++++++++++++++++++++++++++++-------- src/index.ts | 50 ++++++++++++-------- 2 files changed, 104 insertions(+), 39 deletions(-) diff --git a/__tests__/taskqueue.spec.ts b/__tests__/taskqueue.spec.ts index 92907d8..06ce805 100755 --- a/__tests__/taskqueue.spec.ts +++ b/__tests__/taskqueue.spec.ts @@ -90,14 +90,25 @@ describe('TaskQueue', () => { const t3 = createAsyncTask('t3', 20); const events: string[] = []; + queue.on('start', () => events.push(`start`)); queue.on('taskStart', ({ task }) => events.push(`start-${task.title}`)); queue.on('taskCompleted', ({ task }) => events.push(`complete-${task.title}`)); + queue.on('complete', () => events.push(`complete`)); queue.push(t1, t2, t3); const res = await queue.run(); expect(res).toEqual(['t1', 't3', 't2']); - expect(events).toEqual(['start-t1', 'start-t2', 'start-t3', 'complete-t1', 'complete-t3', 'complete-t2']); + expect(events).toEqual([ + 'start', + 'start-t1', + 'start-t2', + 'start-t3', + 'complete-t1', + 'complete-t3', + 'complete-t2', + 'complete', + ]); }); it('should run all async tasks with concurrency', async () => { @@ -107,14 +118,25 @@ describe('TaskQueue', () => { const t3 = createAsyncTask('t3', 20); const events: string[] = []; + queue.on('start', () => events.push(`start`)); queue.on('taskStart', ({ task }) => events.push(`start-${task.title}`)); queue.on('taskCompleted', ({ task }) => events.push(`complete-${task.title}`)); + queue.on('complete', () => events.push(`complete`)); queue.push(t1, t2, t3); const res = await queue.run({ concurrency: 2 }); expect(res).toEqual(['t1', 't3', 't2']); - expect(events).toEqual(['start-t1', 'start-t2', 'complete-t1', 'start-t3', 'complete-t3', 'complete-t2']); + expect(events).toEqual([ + 'start', + 'start-t1', + 'start-t2', + 'complete-t1', + 'start-t3', + 'complete-t3', + 'complete-t2', + 'complete', + ]); }); it('should be able to combine sync/async tasks', async () => { @@ -124,14 +146,25 @@ describe('TaskQueue', () => { const t3 = createAsyncTask('t3', 10); const events: string[] = []; + queue.on('start', () => events.push(`start`)); queue.on('taskStart', ({ task }) => events.push(`start-${task.title}`)); queue.on('taskCompleted', ({ task }) => events.push(`complete-${task.title}`)); + queue.on('complete', () => events.push(`complete`)); queue.push(t1, t2, t3); const res = await queue.run({ concurrency: 2 }); expect(res).toEqual(['t2', 't1', 't3']); - expect(events).toEqual(['start-t1', 'start-t2', 'complete-t2', 'start-t3', 'complete-t1', 'complete-t3']); + expect(events).toEqual([ + 'start', + 'start-t1', + 'start-t2', + 'complete-t2', + 'start-t3', + 'complete-t1', + 'complete-t3', + 'complete', + ]); }); it('should manage errors silently', async () => { @@ -191,9 +224,11 @@ describe('TaskQueue', () => { const t3 = createAsyncTask('t3', 20); const events: string[] = []; + queue.on('start', () => events.push(`start`)); queue.on('taskStart', ({ task }) => events.push(`start-${task.title}`)); queue.on('taskError', ({ task }) => events.push(`error-${task.title}`)); queue.on('taskSuccess', ({ task }) => events.push(`success-${task.title}`)); + queue.on('complete', () => events.push(`complete`)); queue.push(t1, t2, t3); @@ -207,7 +242,16 @@ describe('TaskQueue', () => { error = err; } - expect(events).toEqual(['start-t1', 'start-t2', 'success-t1', 'start-t3', 'error-t2', 'error-t3']); + expect(events).toEqual([ + 'start', + 'start-t1', + 'start-t2', + 'success-t1', + 'start-t3', + 'error-t2', + 'error-t3', + 'complete', + ]); expect(messages[0]?.trim()).toEqual('TaskQueue ended with 3 errors:'); expect(error).toBeInstanceOf(QueueError); expect(error.errors).toHaveLength(3); @@ -224,9 +268,11 @@ describe('TaskQueue', () => { const t3 = createAsyncTask('t3', 20); const events: string[] = []; + queue.on('start', () => events.push(`start`)); queue.on('taskStart', ({ task }) => events.push(`start-${task.title}`)); queue.on('taskError', ({ task }) => events.push(`error-${task.title}`)); queue.on('taskSuccess', ({ task }) => events.push(`success-${task.title}`)); + queue.on('complete', () => events.push(`complete`)); queue.push(t1, t2, t3); @@ -244,7 +290,16 @@ describe('TaskQueue', () => { error = err; } - expect(events).toEqual(['start-t1', 'start-t2', 'success-t1', 'start-t3', 'error-t2', 'error-t3']); + expect(events).toEqual([ + 'start', + 'start-t1', + 'start-t2', + 'success-t1', + 'start-t3', + 'error-t2', + 'error-t3', + 'complete', + ]); expect(messages[0]?.trim()).toEqual('TaskQueue ended with 3 errors:'); expect(error).toBeInstanceOf(QueueError); expect(error.errors).toHaveLength(3); @@ -262,40 +317,38 @@ describe('TaskQueue', () => { const events: string[] = []; queue.on('start', () => events.push(`start`)); - queue.on('complete', () => events.push(`complete`)); queue.on('taskStart', ({ task }) => events.push(`start-${task.title}`)); - queue.on('taskCompleted', ({ task }) => events.push(`complete-${task.title}`)); + queue.on('taskError', ({ task }) => events.push(`error-${task.title}`)); + queue.on('taskSuccess', ({ task }) => events.push(`success-${task.title}`)); + queue.on('complete', () => events.push(`complete`)); queue.push(t1, t2, t3); const controller = new AbortController(); const signal = controller.signal; - try { - setTimeout(() => controller.abort(), 75); - await queue.every(10, { signal }); - // eslint-disable-next-line no-empty - } catch (err) {} + setTimeout(() => controller.abort(), 75); + await queue.every(10, { signal }); expect(events).toEqual([ - // first iteration in the firts 50 ms + // first iteration in the firsts 50 ms 'start', 'start-t1', 'start-t2', 'start-t3', - 'complete-t1', - 'complete-t2', - 'complete-t3', + 'success-t1', + 'success-t2', + 'success-t3', 'complete', - // second iteration. Cancelled by timeout + // second iteration. Cancelled by signal 'start', 'start-t1', 'start-t2', 'start-t3', - 'complete-t1', - 'complete-t2', - 'complete-t3', + 'error-t1', + 'error-t2', + 'error-t3', 'complete', ]); }); diff --git a/src/index.ts b/src/index.ts index 9301c09..94999e9 100755 --- a/src/index.ts +++ b/src/index.ts @@ -17,6 +17,7 @@ export class TaskQueue extends EventEmitter { private running = 0; private runStart = 0; private controller: AbortController; + private removeListeners = true; constructor(public title = 'TaskQueue', public options: QueueOptions = {}) { super(); @@ -31,6 +32,15 @@ export class TaskQueue extends EventEmitter { this.runStart = 0; this.emit('complete', event); + if (this.removeListeners) { + this.removeAllListeners(); + } + } + + private abort(error: Error) { + this.controller.abort(); + this.errors.unshift(error); + setImmediate(() => this.complete()); } /** consumes the queue runnin tasks */ @@ -103,15 +113,25 @@ export class TaskQueue extends EventEmitter { * @param options */ public async every(every: number, options: QueueOptions = {}): Promise { - try { - await this.run(options); - // eslint-disable-next-line no-empty - } catch (err) {} + this.removeListeners = false; - if (!options.signal?.aborted) { - await delay(every, { signal: options.signal }); - return this.every(every, options); - } + // remove all listeners when the `once('complete')` in run has ended and cancelled all the tasks + // this way we can notify 'complete' events to userland + options.signal?.addEventListener('abort', () => setTimeout(() => this.removeAllListeners(), 0), { once: true }); + + const run = async (): Promise => { + try { + await this.run(options); + await delay(every, { signal: options.signal }); + // eslint-disable-next-line no-empty + } catch (err) {} + + if (!options.signal?.aborted) { + await run(); + } + }; + + return run(); } /** * Runs the queue. @@ -135,13 +155,9 @@ export class TaskQueue extends EventEmitter { return new Promise((resolve, reject) => { let timeout: NodeJS.Timeout; - const onAbort = () => { - this.controller.abort(); - this.errors.unshift(new Error('Queue Aborted')); - setImmediate(() => this.complete()); - }; + const onAbort = () => this.abort(new Error('Queue Aborted')); - signal?.addEventListener('abort', onAbort); + signal?.addEventListener('abort', onAbort, { once: true }); this.once('complete', ({ errors }) => { signal?.removeEventListener('abort', onAbort); @@ -167,11 +183,7 @@ export class TaskQueue extends EventEmitter { } if (timeoutMs) { - timeout = setTimeout(() => { - this.controller.abort(); - this.errors.unshift(new Error('Queue Timeout')); - this.complete(); - }, timeoutMs); + timeout = setTimeout(() => this.abort(new Error('Queue Timeout')), timeoutMs); } // Copy queue to allow rehuse this.pending = [...this.queue];