Skip to content

Commit

Permalink
Support throwing errors on waitForEmpty
Browse files Browse the repository at this point in the history
  • Loading branch information
perry-mitchell committed Nov 6, 2023
1 parent 17f0be9 commit a586f1e
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 11 deletions.
25 changes: 22 additions & 3 deletions source/Channel.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import EventEmitter from "eventemitter3";
import { Layerr } from "layerr";
import { Task } from "./Task";
import { Callable, TaskPriority } from "./types";

Expand Down Expand Up @@ -46,6 +47,7 @@ function compareTasks(taskA: Task, taskB: Task): number {
*/
export class Channel extends EventEmitter {
private _name: string;
private _taskErrors: Error[] = [];
private _tasks: Task[];
private _running: boolean;
private _autostart: boolean;
Expand Down Expand Up @@ -226,13 +228,21 @@ export class Channel extends EventEmitter {
* Wait for the queue to become empty
* @returns {Promise}
*/
async waitForEmpty(): Promise<void> {
return new Promise<void>(resolve => {
async waitForEmpty(
opts: {
throwForFailures?: boolean;
} = {}
): Promise<void> {
const { throwForFailures = false } = opts;
await new Promise<void>(resolve => {
if (this.isEmpty) return resolve();
this.once("stopped", () => {
resolve();
});
});
if (throwForFailures && this._taskErrors.length > 0) {
throw new Layerr(this._taskErrors[0], "Enqueued task failed");
}
}

_runNextItem() {
Expand All @@ -241,7 +251,16 @@ export class Channel extends EventEmitter {
this.isRunning = false;
this.emit("stopped");
} else {
item.execute().then(() => this._runNextItem());
item.execute()
.then(() => {
if (item.error) {
this._taskErrors.push(item.error);
}
})
.then(() => this._runNextItem())
.catch(err => {
console.error(err);
});
}
}
}
26 changes: 18 additions & 8 deletions source/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import { Callable, TaskPriority } from "./types";
* Internal Task class, for handling executions
*/
export class Task {
private _target: Callable<any>;
private _stack: string | null = null;
private _type: TaskPriority;
private _resolveFn: Function | null = null;
private _rejectFn: Function | null = null;
private _queuedPromise: Promise<any>;
private _created: number;
private _error: Error | null = null;
private _queuedPromise: Promise<any>;
private _rejectFn: Function | null = null;
private _resolveFn: Function | null = null;
private _stack: string | null = null;
private _target: Callable<any>;
private _timeLimit: number;
private _type: TaskPriority;

/**
* Constructor for a Task
Expand Down Expand Up @@ -51,6 +52,14 @@ export class Task {
return this._created;
}

/**
* Execution error, if one occurred
* @type {Error | null}
*/
get error(): Error | null {
return this._error;
}

/**
* Promise which resolves when work has completed
* @type {Promise}
Expand Down Expand Up @@ -99,9 +108,9 @@ export class Task {
* Execute the task
* @returns {Promise}
*/
execute(): Promise<any> {
async execute(): Promise<void> {
const fn = this.target;
let output;
let output: any;
try {
output = fn();
} catch (err) {
Expand All @@ -118,6 +127,7 @@ export class Task {
this._resolveFn?.(result);
})
.catch(err => {
this._error = err;
this._rejectFn?.(err);
});
}
Expand Down
43 changes: 43 additions & 0 deletions test/unit/Channel.spec.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const EventEmitter = require("eventemitter3");
const sleep = require("sleep-promise");
const { Layerr } = require("layerr");
const { Task, Channel, TaskPriority } = require("../../dist");

const NOOP = () => {};
Expand Down Expand Up @@ -154,6 +155,20 @@ describe("Channel", function() {
await expect(result).to.be.eventually.rejected;
clearTimeout(timer);
});

it("throws if the enqueued promise throws", async function() {
this.channel.autostart = true;
const fails = () => Promise.reject(new Error("Test"));
const resultPromise = this.channel.enqueue(fails);
let failureResult = null;
resultPromise.catch(err => {
failureResult = err;
});
await sleep(250);
expect(failureResult)
.to.be.an.instanceof(Error)
.that.satisfies(err => err.message === "Test");
});
});

describe("getStackedItems", function() {
Expand Down Expand Up @@ -263,5 +278,33 @@ describe("Channel", function() {
await sleep(50);
expect(spy.callCount).to.equal(1);
});

it("does not throw an error, when unconfigured, if one occurred during execution", async function() {
this.channel.autostart = true;
const fails = () => Promise.reject(new Error("Test"));
this.channel.enqueue(fails);
let failureResult = null;
this.channel.waitForEmpty().catch(err => {
failureResult = err;
});
await sleep(250);
await this.channel.waitForEmpty();
expect(failureResult).to.be.null;
});

it("throws an error, when configured, if one occurred during execution", async function() {
this.channel.autostart = true;
const fails = () => Promise.reject(new Error("Test"));
this.channel.enqueue(fails);
let failureResult = null;
this.channel.waitForEmpty({ throwForFailures: true }).catch(err => {
failureResult = err;
});
await sleep(250);
await this.channel.waitForEmpty();
expect(failureResult)
.to.be.an.instanceof(Layerr)
.that.satisfies(err => /Enqueued task failed.+Test/i.test(err.message));
});
});
});

0 comments on commit a586f1e

Please sign in to comment.