Skip to content

Commit

Permalink
feat: every and signal
Browse files Browse the repository at this point in the history
  • Loading branch information
jmendiara committed May 16, 2022
1 parent 434e493 commit b5ebce6
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 14 deletions.
101 changes: 90 additions & 11 deletions __tests__/taskqueue.spec.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
import { Task, TaskQueue, ConsoleSubscriber, QueueError, TaskError } from '../src';
import { AbortController, AbortSignal } from 'node-abort-controller';

interface GenericAbortSignal {
aborted: boolean;
onabort: ((...args: any[]) => any) | null;
addEventListener: (...args: any[]) => any;
removeEventListener: (...args: any[]) => any;
}

const delay = (title: string, ms: number, signal?: GenericAbortSignal) => {
const delay = (title: string, ms: number, signal?: AbortSignal) => {
if (signal?.aborted) {
return Promise.reject(new Error(`Aborted`));
}
Expand Down Expand Up @@ -189,7 +183,7 @@ describe('TaskQueue', () => {
}
});

it('should cancel the queue', async () => {
it('should cancel the queue on timeout', async () => {
const queue = new TaskQueue();

const t1 = createAsyncTask('t1', 10);
Expand All @@ -198,7 +192,8 @@ describe('TaskQueue', () => {

const events: string[] = [];
queue.on('taskStart', ({ task }) => events.push(`start-${task.title}`));
queue.on('taskCompleted', ({ task }) => events.push(`complete-${task.title}`));
queue.on('taskError', ({ task }) => events.push(`error-${task.title}`));
queue.on('taskSuccess', ({ task }) => events.push(`success-${task.title}`));

queue.push(t1, t2, t3);

Expand All @@ -212,7 +207,7 @@ describe('TaskQueue', () => {
error = err;
}

expect(events).toEqual(['start-t1', 'start-t2', 'complete-t1', 'start-t3']);
expect(events).toEqual(['start-t1', 'start-t2', 'success-t1', 'start-t3', 'error-t2', 'error-t3']);
expect(messages[0]?.trim()).toEqual('TaskQueue ended with 3 errors:');
expect(error).toBeInstanceOf(QueueError);
expect(error.errors).toHaveLength(3);
Expand All @@ -221,6 +216,90 @@ describe('TaskQueue', () => {
expect(error.errors[2].message).toMatch('t3: Aborted');
});

it('should cancel the queue on signal', async () => {
const queue = new TaskQueue();

const t1 = createAsyncTask('t1', 10);
const t2 = createAsyncTask('t2', 50);
const t3 = createAsyncTask('t3', 20);

const events: string[] = [];
queue.on('taskStart', ({ task }) => events.push(`start-${task.title}`));
queue.on('taskError', ({ task }) => events.push(`error-${task.title}`));
queue.on('taskSuccess', ({ task }) => events.push(`success-${task.title}`));

queue.push(t1, t2, t3);

let messages: string[] = [];
let error = null;

const controller = new AbortController();
const signal = controller.signal;

try {
setTimeout(() => controller.abort(), 20);
await queue.run({ concurrency: 2, signal });
} catch (err) {
messages = err.message.split('\n');
error = err;
}

expect(events).toEqual(['start-t1', 'start-t2', 'success-t1', 'start-t3', 'error-t2', 'error-t3']);
expect(messages[0]?.trim()).toEqual('TaskQueue ended with 3 errors:');
expect(error).toBeInstanceOf(QueueError);
expect(error.errors).toHaveLength(3);
expect(error.errors[0].message).toMatch('Queue Aborted');
expect(error.errors[1].message).toMatch('t2: Aborted');
expect(error.errors[2].message).toMatch('t3: Aborted');
});

it('should repeat the queue until signal', async () => {
const queue = new TaskQueue();

const t1 = createAsyncTask('t1', 50);
const t2 = createAsyncTask('t2', 50);
const t3 = createAsyncTask('t3', 50);

const events: string[] = [];
queue.on('start', () => events.push(`start`));
queue.on('complete', () => events.push(`complete`));
queue.on('taskStart', ({ task }) => events.push(`start-${task.title}`));
queue.on('taskCompleted', ({ task }) => events.push(`complete-${task.title}`));

queue.push(t1, t2, t3);

const controller = new AbortController();
const signal = controller.signal;

try {
setTimeout(() => controller.abort(), 75);
await queue.every(10, { signal });
// eslint-disable-next-line no-empty
} catch (err) {}

expect(events).toEqual([
// first iteration in the firts 50 ms
'start',
'start-t1',
'start-t2',
'start-t3',
'complete-t1',
'complete-t2',
'complete-t3',
'complete',

// second iteration. Cancelled by timeout
'start',
'start-t1',
'start-t2',
'start-t3',
'complete-t1',
'complete-t2',
'complete-t3',
'complete',
]);
});

it('should subscribe to events and print to console', async () => {
// TODO: think in a way with jest to no have flacky tests
console.info = jest.fn();
Expand Down
5 changes: 5 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"typescript": "^4.6.4"
},
"dependencies": {
"delay": "^5.0.0",
"node-abort-controller": "^3.0.1"
}
}
36 changes: 33 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { EventEmitter } from 'events';
import { AbortController, AbortSignal } from 'node-abort-controller';
import delay from 'delay';

/**
* Simple Task Queue to make notificable concurrent tasks, with continue-on-failure
Expand Down Expand Up @@ -30,7 +31,6 @@ export class TaskQueue<RESULT = unknown> extends EventEmitter {

this.runStart = 0;
this.emit('complete', event);
this.removeAllListeners();
}

/** consumes the queue runnin tasks */
Expand All @@ -49,8 +49,8 @@ export class TaskQueue<RESULT = unknown> extends EventEmitter {
const err = new TaskError(`${task.title ?? task.name}: ${error?.message ?? error}`, error);
this.errors.push(err);
}
this.emit('taskCompleted', { task, time: Date.now() - start });
this.running--;
this.emit('taskCompleted', { task, time: Date.now() - start });

// aborting does efectivelly complete the queue
if (!signal.aborted) {
Expand Down Expand Up @@ -96,6 +96,23 @@ export class TaskQueue<RESULT = unknown> extends EventEmitter {
return task;
}

/**
* Runs the queue every specified ms
*
* @param every time in ms to wait once the queue completes to rerun it
* @param options
*/
public async every(every: number, options: QueueOptions = {}): Promise<void> {
try {
await this.run(options);
// eslint-disable-next-line no-empty
} catch (err) {}

if (!options.signal?.aborted) {
await delay(every, { signal: options.signal });
return this.every(every, options);
}
}
/**
* Runs the queue.
*
Expand All @@ -107,6 +124,7 @@ export class TaskQueue<RESULT = unknown> extends EventEmitter {
public async run(options: QueueOptions = {}): Promise<RESULT[]> {
const concurrency = options.concurrency ?? this.options.concurrency ?? this.queue.length;
const timeoutMs = options.timeout ?? this.options.timeout;
const signal = options.signal ?? this.options.signal;
if ((concurrency as number) <= 0 && this.queue.length !== 0) {
throw new Error('Invalid concurrency');
}
Expand All @@ -117,7 +135,17 @@ export class TaskQueue<RESULT = unknown> extends EventEmitter {
return new Promise((resolve, reject) => {
let timeout: NodeJS.Timeout;

const onAbort = () => {
this.controller.abort();
this.errors.unshift(new Error('Queue Aborted'));
setImmediate(() => this.complete());
};

signal?.addEventListener('abort', onAbort);

this.once('complete', ({ errors }) => {
signal?.removeEventListener('abort', onAbort);

clearTimeout(timeout);
// Schedule resolving after I/O to allow Aborted Tasks to process the cancellation
setImmediate(() => {
Expand Down Expand Up @@ -146,7 +174,7 @@ export class TaskQueue<RESULT = unknown> extends EventEmitter {
}, timeoutMs);
}
// Copy queue to allow rehuse
this.pending = this.queue;
this.pending = [...this.queue];
this.pending.splice(0, concurrency).forEach((task) => this.runTask(task));
});
}
Expand Down Expand Up @@ -212,6 +240,8 @@ export interface QueueOptions {
concurrency?: number;
/** max time in ms allowed to run the queue. If it's not done in the provided time, will cancel the pending tasks and fail the queue execution */
timeout?: number;
/** AbortSignal signaling the Queue execution is cancelled. See https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal */
signal?: AbortSignal;
}

/**
Expand Down

0 comments on commit b5ebce6

Please sign in to comment.