From c29d22e12591ea111844bb9703511f10abc66dfb Mon Sep 17 00:00:00 2001 From: AlCalzone Date: Fri, 27 Sep 2024 14:43:24 +0200 Subject: [PATCH] feat: rework route rebuilding to use task scheduler (#7196) --- .../zwave-js/src/lib/controller/Controller.ts | 522 ++++++++++-------- packages/zwave-js/src/lib/controller/utils.ts | 8 + packages/zwave-js/src/lib/driver/Driver.ts | 15 +- packages/zwave-js/src/lib/driver/Task.test.ts | 225 ++++++++ packages/zwave-js/src/lib/driver/Task.ts | 73 ++- 5 files changed, 611 insertions(+), 232 deletions(-) diff --git a/packages/zwave-js/src/lib/controller/Controller.ts b/packages/zwave-js/src/lib/controller/Controller.ts index 0f905ab69dc3..6d9ce3070b53 100644 --- a/packages/zwave-js/src/lib/controller/Controller.ts +++ b/packages/zwave-js/src/lib/controller/Controller.ts @@ -143,6 +143,7 @@ import crypto from "node:crypto"; import type { Driver } from "../driver/Driver"; import { cacheKeyUtils, cacheKeys } from "../driver/NetworkCache"; import type { StatisticsEventCallbacks } from "../driver/Statistics"; +import { type TaskBuilder, TaskPriority } from "../driver/Task"; import { DeviceClass } from "../node/DeviceClass"; import { ZWaveNode } from "../node/Node"; import { VirtualNode } from "../node/VirtualNode"; @@ -432,6 +433,7 @@ import { } from "./_Types"; import { assertProvisioningEntry, + isRebuildRoutesTask, sdkVersionGt, sdkVersionGte, sdkVersionLt, @@ -933,10 +935,9 @@ export class ZWaveController */ public readonly indicatorValues = new Map(); - private _isRebuildingRoutes: boolean = false; /** Returns whether the routes are currently being rebuilt for one or more nodes. */ public get isRebuildingRoutes(): boolean { - return this._isRebuildingRoutes; + return !!this.driver.scheduler.findTask(isRebuildRoutesTask); } /** @@ -4558,7 +4559,7 @@ export class ZWaveController > | undefined { - if (!this._isRebuildingRoutes) return undefined; + if (!this.isRebuildingRoutes) return undefined; return new Map(this._rebuildRoutesProgress); } @@ -4572,8 +4573,10 @@ export class ZWaveController */ public beginRebuildingRoutes(options: RebuildRoutesOptions = {}): boolean { // Don't start the process twice - if (this._isRebuildingRoutes) return false; - this._isRebuildingRoutes = true; + const existingTask = this.driver.scheduler.findTask( + (t) => t.tag?.id === "rebuild-routes", + ); + if (existingTask) return false; options.includeSleeping ??= true; @@ -4609,9 +4612,7 @@ export class ZWaveController } // Rebuild routes in the background - void this.rebuildRoutes(options).catch(() => { - /* ignore errors */ - }); + void this.rebuildRoutesInternal(options).catch(noop); // And update the progress once at the start this.emit( @@ -4622,7 +4623,17 @@ export class ZWaveController return true; } - private async rebuildRoutes(options: RebuildRoutesOptions): Promise { + private rebuildRoutesInternal( + options: RebuildRoutesOptions, + ): Promise { + return this.driver.scheduler.queueTask( + this.getRebuildRoutesTask(options), + ); + } + + private getRebuildRoutesTask( + options: RebuildRoutesOptions, + ): TaskBuilder { const pendingNodes = new Set( [...this._rebuildRoutesProgress] .filter(([, status]) => status === "pending") @@ -4657,89 +4668,133 @@ export class ZWaveController } }; - // We work our way outwards from the controller and start with non-sleeping nodes, one by one - try { - const neighbors = await this.getNodeNeighbors(this._ownNodeId!); - neighbors.forEach((id) => addTodo(id)); - } catch { - // ignore - } + const self = this; - const doRebuildRoutes = async (nodeId: number) => { - // await the process for each node and convert errors to a non-successful result - const result = await this.rebuildNodeRoutesInternal(nodeId).catch( - () => false, - ); - if (!this._isRebuildingRoutes) return; + return { + priority: TaskPriority.Lower, + tag: { id: "rebuild-routes" }, + task: async function* rebuildRoutesTask() { + // We work our way outwards from the controller and start with non-sleeping nodes, one by one + try { + const neighbors = await self.getNodeNeighbors( + self._ownNodeId!, + ); + neighbors.forEach((id) => addTodo(id)); + } catch { + // ignore + } - // Track the success in a map - this._rebuildRoutesProgress.set(nodeId, result ? "done" : "failed"); - // Notify listeners about the progress - this.emit( - "rebuild routes progress", - new Map(this._rebuildRoutesProgress), - ); + yield; // Give the task scheduler time to do something else - // Figure out which nodes to do next - try { - const neighbors = await this.getNodeNeighbors(nodeId); - neighbors.forEach((id) => addTodo(id)); - } catch { - // ignore - } - }; + async function* doRebuildRoutes(nodeId: number) { + // Await the process for each node and convert errors to a non-successful result + const result: boolean = yield () => + self.rebuildNodeRoutesInternal(nodeId).catch(() => + false + ); - // First try to rebuild routes for as many nodes as possible one by one - while (todoListening.length > 0) { - const nodeId = todoListening.shift()!; - await doRebuildRoutes(nodeId); - if (!this._isRebuildingRoutes) return; - } + // Track the success in a map + self._rebuildRoutesProgress.set( + nodeId, + result ? "done" : "failed", + ); + // Notify listeners about the progress + self.emit( + "rebuild routes progress", + new Map(self._rebuildRoutesProgress), + ); - // We might end up with a few unconnected listening nodes, try to rebuild routes for them too - pendingNodes.forEach((nodeId) => addTodo(nodeId)); - while (todoListening.length > 0) { - const nodeId = todoListening.shift()!; - await doRebuildRoutes(nodeId); - if (!this._isRebuildingRoutes) return; - } + yield; // Give the task scheduler time to do something else - if (options.includeSleeping) { - // Now do all sleeping nodes at once - this.driver.controllerLog.print( - "Rebuilding routes for sleeping nodes when they wake up", - ); + // Figure out which nodes to do next + try { + const neighbors = await self.getNodeNeighbors(nodeId); + neighbors.forEach((id) => addTodo(id)); + } catch { + // ignore + } - const tasks = todoSleeping.map((nodeId) => doRebuildRoutes(nodeId)); - await Promise.all(tasks); - } + yield; // Give the task scheduler time to do something else + } - // Only emit the done event when the process wasn't stopped in the meantime - if (this._isRebuildingRoutes) { - this.driver.controllerLog.print("rebuilding routes completed"); + // First try to rebuild routes for as many nodes as possible one by one + while (todoListening.length > 0) { + const nodeId = todoListening.shift()!; + yield* doRebuildRoutes(nodeId); + } - this.emit( - "rebuild routes done", - new Map(this._rebuildRoutesProgress), - ); - } else { - this.driver.controllerLog.print("rebuilding routes aborted"); - } - // We're done! - this._isRebuildingRoutes = false; - this._rebuildRoutesProgress.clear(); + // We might end up with a few unconnected listening nodes, try to rebuild routes for them too + pendingNodes.forEach((nodeId) => addTodo(nodeId)); + while (todoListening.length > 0) { + const nodeId = todoListening.shift()!; + yield* doRebuildRoutes(nodeId); + } + + if (options.includeSleeping) { + // Now do all sleeping nodes at once + self.driver.controllerLog.print( + "Rebuilding routes for sleeping nodes when they wake up", + ); + + const tasks = todoSleeping.map((nodeId) => + self.nodes.get(nodeId) + ).filter((node) => node != undefined) + .map((node) => { + const sleepingNodeTask: TaskBuilder = { + priority: TaskPriority.Lower - 1, + tag: { + id: "rebuild-node-routes-wakeup", + nodeId: node.id, + }, + task: + async function* rebuildSleepingNodeRoutesTask() { + // Pause the task until the node wakes up + yield () => node.waitForWakeup(); + yield* doRebuildRoutes(node.id); + }, + }; + return self.driver.scheduler.queueTask( + sleepingNodeTask, + ); + }); + // Pause until all sleeping nodes have been processed + yield () => Promise.all(tasks); + } + + self.driver.controllerLog.print( + "rebuilding routes completed", + ); + + self.emit( + "rebuild routes done", + new Map(self._rebuildRoutesProgress), + ); + + // We're done! + self._rebuildRoutesProgress.clear(); + }, + }; } /** * Stops the route rebuilding process. Resolves false if the process was not active, true otherwise. */ public stopRebuildingRoutes(): boolean { + const hasTasks = !!this.driver.scheduler.findTask(isRebuildRoutesTask); + // don't stop it twice - if (!this._isRebuildingRoutes) return false; - this._isRebuildingRoutes = false; + if (!hasTasks) return false; this.driver.controllerLog.print(`stopping route rebuilding process...`); + // Stop all tasks that are part of the route rebuilding process + // FIXME: This should be an async function that waits for the task removal + void this.driver.scheduler.removeTasks(isRebuildRoutesTask).then(() => { + this.driver.controllerLog.print( + "rebuilding routes aborted", + ); + }); + // Cancel all transactions that were created by the route rebuilding process this.driver.rejectTransactions( (t) => @@ -4778,16 +4833,6 @@ export class ZWaveController ); } - // Don't start the process twice - if (this._isRebuildingRoutes) { - this.driver.controllerLog.logNode( - nodeId, - `Cannot rebuild routes because another rebuilding process is in progress.`, - ); - return false; - } - this._isRebuildingRoutes = true; - // Figure out if nodes are responsive before attempting to rebuild routes if ( // The node is known to be dead @@ -4806,173 +4851,216 @@ export class ZWaveController } } - try { - return await this.rebuildNodeRoutesInternal(nodeId); - } finally { - this._isRebuildingRoutes = false; - } + return this.rebuildNodeRoutesInternal(nodeId); } - private async rebuildNodeRoutesInternal(nodeId: number): Promise { + private rebuildNodeRoutesInternal( + nodeId: number, + ): Promise { + // Don't start the process twice + const existingTask = this.driver.scheduler.findTask((t) => + t.tag?.id === "rebuild-node-routes" && t.tag.nodeId === nodeId + ); + if (existingTask) return existingTask; + const node = this.nodes.getOrThrow(nodeId); + return this.driver.scheduler.queueTask( + this.getRebuildNodeRoutesTask(node), + ); + } - // Keep battery powered nodes awake during the process - // and make sure that the flag gets reset at the end - const keepAwake = node.keepAwake; - try { - node.keepAwake = true; + private getRebuildNodeRoutesTask( + node: ZWaveNode, + ): TaskBuilder { + let keepAwake: boolean; - this.driver.controllerLog.logNode(nodeId, { - message: `Rebuilding routes...`, - direction: "none", - }); + const self = this; - // The process consists of four steps, each step is tried up to 5 times before i is considered failed - const maxAttempts = 5; + return { + // This task is executed by users and by the network-wide route rebuilding process. + // Since it can possibly be spawned by a "wait for wakeup" task aswell, we need to + // increment the priority by 2 here to avoid blocking. + priority: TaskPriority.Lower - 2, + tag: { id: "rebuild-node-routes", nodeId: node.id }, + task: async function* rebuildNodeRoutesTask() { + // Keep battery powered nodes awake during the process + keepAwake = node.keepAwake; + node.keepAwake = true; + + self.driver.controllerLog.logNode(node.id, { + message: `Rebuilding routes...`, + direction: "none", + }); - // 1. command the node to refresh its neighbor list - for (let attempt = 1; attempt <= maxAttempts; attempt++) { - // If the process was stopped in the meantime, cancel - if (!this._isRebuildingRoutes) return false; + // The process consists of four steps, each step is tried up to 5 times before it is considered failed + const maxAttempts = 5; - this.driver.controllerLog.logNode(nodeId, { - message: `refreshing neighbor list (attempt ${attempt})...`, - direction: "outbound", - }); + // 1. command the node to refresh its neighbor list + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + yield; // Give the task scheduler time to do something else - try { - const result = await this.discoverNodeNeighbors(nodeId); - if (result) { - this.driver.controllerLog.logNode(nodeId, { - message: "neighbor list refreshed...", - direction: "inbound", - }); - // this step was successful, continue with the next - break; - } else { - this.driver.controllerLog.logNode(nodeId, { - message: "refreshing neighbor list failed...", - direction: "inbound", + self.driver.controllerLog.logNode(node.id, { + message: + `refreshing neighbor list (attempt ${attempt})...`, + direction: "outbound", + }); + + try { + const result = await self.discoverNodeNeighbors( + node.id, + ); + if (result) { + self.driver.controllerLog.logNode(node.id, { + message: "neighbor list refreshed...", + direction: "inbound", + }); + // this step was successful, continue with the next + break; + } else { + self.driver.controllerLog.logNode(node.id, { + message: "refreshing neighbor list failed...", + direction: "inbound", + level: "warn", + }); + } + } catch (e) { + self.driver.controllerLog.logNode( + node.id, + `refreshing neighbor list failed: ${ + getErrorMessage( + e, + ) + }`, + "warn", + ); + } + if (attempt === maxAttempts) { + self.driver.controllerLog.logNode(node.id, { + message: + `rebuilding routes failed: could not update the neighbor list after ${maxAttempts} attempts`, level: "warn", + direction: "none", }); + return false; } - } catch (e) { - this.driver.controllerLog.logNode( - nodeId, - `refreshing neighbor list failed: ${ - getErrorMessage( - e, - ) - }`, - "warn", - ); - } - if (attempt === maxAttempts) { - this.driver.controllerLog.logNode(nodeId, { - message: - `rebuilding routes failed: could not update the neighbor list after ${maxAttempts} attempts`, - level: "warn", - direction: "none", - }); - return false; } - } - // 2. re-create the SUC return route, just in case - node.hasSUCReturnRoute = await this.assignSUCReturnRoutes(nodeId); + yield; // Give the task scheduler time to do something else - // 3. delete all return routes to get rid of potential priority return routes - for (let attempt = 1; attempt <= maxAttempts; attempt++) { - this.driver.controllerLog.logNode(nodeId, { - message: `deleting return routes (attempt ${attempt})...`, - direction: "outbound", - }); + // 2. re-create the SUC return route, just in case + node.hasSUCReturnRoute = await self.assignSUCReturnRoutes( + node.id, + ); - if (await this.deleteReturnRoutes(nodeId)) { - break; - } + // 3. delete all return routes to get rid of potential priority return routes + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + yield; // Give the task scheduler time to do something else - if (attempt === maxAttempts) { - this.driver.controllerLog.logNode(nodeId, { + self.driver.controllerLog.logNode(node.id, { message: - `rebuilding routes failed: failed to delete return routes after ${maxAttempts} attempts`, - level: "warn", - direction: "none", + `deleting return routes (attempt ${attempt})...`, + direction: "outbound", }); - return false; - } - } - // 4. Assign return routes to all association destinations... - let associatedNodes: number[] = []; - try { - associatedNodes = distinct( - flatMap( - [...(this.getAssociations({ nodeId }).values() as any)], - (assocs: AssociationAddress[]) => - assocs.map((a) => a.nodeId), - ), - ) - // ...except the controller itself, which was handled by step 2 - .filter((id) => id !== this._ownNodeId!) - // ...and the node itself - .filter((id) => id !== nodeId) - .sort(); - } catch { - /* ignore */ - } + if (await self.deleteReturnRoutes(node.id)) { + break; + } - if (associatedNodes.length > 0) { - this.driver.controllerLog.logNode(nodeId, { - message: `assigning return routes to the following nodes: -${associatedNodes.join(", ")}`, - direction: "outbound", - }); - for (const destinationNodeId of associatedNodes) { - for (let attempt = 1; attempt <= maxAttempts; attempt++) { - this.driver.controllerLog.logNode(nodeId, { + if (attempt === maxAttempts) { + self.driver.controllerLog.logNode(node.id, { message: - `assigning return route to node ${destinationNodeId} (attempt ${attempt})...`, - direction: "outbound", + `rebuilding routes failed: failed to delete return routes after ${maxAttempts} attempts`, + level: "warn", + direction: "none", }); + return false; + } + } - if ( - await this.assignReturnRoutes( - nodeId, - destinationNodeId, - ) + // 4. Assign return routes to all association destinations... + let associatedNodes: number[] = []; + try { + associatedNodes = distinct( + flatMap( + [ + ...(self.getAssociations({ nodeId: node.id }) + .values() as any), + ], + (assocs: AssociationAddress[]) => + assocs.map((a) => a.nodeId), + ), + ) + // ...except the controller itself, which was handled by step 2 + .filter((id) => id !== self._ownNodeId!) + // ...and the node itself + .filter((id) => id !== node.id) + .sort(); + } catch { + // ignore + } + + if (associatedNodes.length > 0) { + self.driver.controllerLog.logNode(node.id, { + message: + `assigning return routes to the following nodes: + ${associatedNodes.join(", ")}`, + direction: "outbound", + }); + for (const destinationNodeId of associatedNodes) { + for ( + let attempt = 1; + attempt <= maxAttempts; + attempt++ ) { - // this step was successful, continue with the next - break; - } + yield; // Give the task scheduler time to do something else - if (attempt === maxAttempts) { - this.driver.controllerLog.logNode(nodeId, { + self.driver.controllerLog.logNode(node.id, { message: - `rebuilding routes failed: failed to assign return route after ${maxAttempts} attempts`, - level: "warn", - direction: "none", + `assigning return route to node ${destinationNodeId} (attempt ${attempt})...`, + direction: "outbound", }); - return false; + + if ( + await self.assignReturnRoutes( + node.id, + destinationNodeId, + ) + ) { + // this step was successful, continue with the next + break; + } + + if (attempt === maxAttempts) { + self.driver.controllerLog.logNode(node.id, { + message: + `rebuilding routes failed: failed to assign return route after ${maxAttempts} attempts`, + level: "warn", + direction: "none", + }); + return false; + } } } } - } - this.driver.controllerLog.logNode(nodeId, { - message: `rebuilt routes successfully`, - direction: "none", - }); - - return true; - } finally { - node.keepAwake = keepAwake; - if (!keepAwake) { - setImmediate(() => { - this.driver.debounceSendNodeToSleep(node); + self.driver.controllerLog.logNode(node.id, { + message: `rebuilt routes successfully`, + direction: "none", }); - } - } + + return true; + }, + cleanup: () => { + // Make sure that the keepAwake flag gets reset at the end + node.keepAwake = keepAwake; + if (!keepAwake) { + setImmediate(() => { + this.driver.debounceSendNodeToSleep(node); + }); + } + return Promise.resolve(); + }, + }; } /** Configures the given Node to be SUC/SIS or not */ diff --git a/packages/zwave-js/src/lib/controller/utils.ts b/packages/zwave-js/src/lib/controller/utils.ts index 22d59a27f907..a99032087072 100644 --- a/packages/zwave-js/src/lib/controller/utils.ts +++ b/packages/zwave-js/src/lib/controller/utils.ts @@ -9,6 +9,7 @@ import { import { padVersion } from "@zwave-js/shared/safe"; import { isArray, isObject } from "alcalzone-shared/typeguards"; import semver from "semver"; +import { type Task } from "../driver/Task"; import { type PlannedProvisioningEntry, ProvisioningEntryStatus, @@ -126,3 +127,10 @@ export function sdkVersionLte( } return semver.lte(padVersion(sdkVersion), padVersion(compareVersion)); } + +/** Checks if a task belongs to a route rebuilding process */ +export function isRebuildRoutesTask(t: Task): boolean { + return t.tag?.id === "rebuild-routes" + || t.tag?.id === "rebuild-node-routes" + || t.tag?.id === "rebuild-node-routes-wakeup"; +} diff --git a/packages/zwave-js/src/lib/driver/Driver.ts b/packages/zwave-js/src/lib/driver/Driver.ts index 7c6124857b0e..a687eacdcf0a 100644 --- a/packages/zwave-js/src/lib/driver/Driver.ts +++ b/packages/zwave-js/src/lib/driver/Driver.ts @@ -233,6 +233,7 @@ import { createMessageDroppedUnexpectedError, serialAPICommandErrorToZWaveError, } from "./StateMachineShared"; +import { TaskScheduler } from "./Task"; import { throttlePresets } from "./ThrottlePresets"; import { Transaction } from "./Transaction"; import { @@ -676,6 +677,8 @@ export class Driver extends TypedEventEmitter }); this.serialAPIQueue = new AsyncQueue(); this._queueIdle = false; + + this._scheduler = new TaskScheduler(); } /** The serial port instance */ @@ -694,6 +697,11 @@ export class Driver extends TypedEventEmitter return [this.immediateQueue, this.queue]; } + private _scheduler: TaskScheduler; + public get scheduler(): TaskScheduler { + return this._scheduler; + } + private queuePaused = false; /** The interpreter for the currently active Serial API command */ private serialAPIInterpreter: SerialAPICommandInterpreter | undefined; @@ -1236,6 +1244,9 @@ export class Driver extends TypedEventEmitter // Start the serial API queue void this.drainSerialAPIQueue(); + // Start the task scheduler + this._scheduler.start(); + if ( typeof this._options.testingHooks?.onSerialPortOpen === "function" @@ -3198,7 +3209,9 @@ export class Driver extends TypedEventEmitter this.driverLog.print("destroying driver instance..."); - // First stop all queues and close the serial port, so nothing happens anymore + // First stop the scheduler, all queues and close the serial port, so nothing happens anymore + await this._scheduler.stop(); + this.serialAPIQueue.abort(); for (const queue of this.queues) { queue.abort(); diff --git a/packages/zwave-js/src/lib/driver/Task.test.ts b/packages/zwave-js/src/lib/driver/Task.test.ts index d1fec273cc4f..8618734f42ed 100644 --- a/packages/zwave-js/src/lib/driver/Task.test.ts +++ b/packages/zwave-js/src/lib/driver/Task.test.ts @@ -1,4 +1,5 @@ import { ZWaveError, ZWaveErrorCodes, assertZWaveError } from "@zwave-js/core"; +import { noop } from "@zwave-js/shared"; import { wait } from "alcalzone-shared/async"; import { createDeferredPromise } from "alcalzone-shared/deferred-promise"; import test from "ava"; @@ -752,6 +753,98 @@ test.failing("Tasks cannot yield-queue lower-priority tasks", async (t) => { await outer; }); +test("Tasks receive the result of yielded tasks", async (t) => { + const scheduler = new TaskScheduler(); + scheduler.start(); + + const innerBuilder: TaskBuilder = { + priority: TaskPriority.High, + task: async function*() { + yield; + yield; + return "foo"; + }, + }; + + const outer = scheduler.queueTask({ + priority: TaskPriority.Normal, + task: async function*() { + const inner = scheduler.queueTask(innerBuilder); + const result = (yield () => inner) as Awaited; + return result; + }, + }); + + t.is(await outer, "foo"); +}); + +test("Tasks receive the result of yielded tasks, part 2", async (t) => { + const scheduler = new TaskScheduler(); + scheduler.start(); + + const inner1Builder: TaskBuilder = { + priority: TaskPriority.High, + task: async function*() { + yield; + yield; + return "foo"; + }, + }; + + const inner3Builder: TaskBuilder = { + priority: TaskPriority.High, + task: async function*() { + yield; + yield; + return "bar"; + }, + }; + + const outer = scheduler.queueTask({ + priority: TaskPriority.Normal, + task: async function*() { + const inner1 = scheduler.queueTask(inner1Builder); + const result1 = (yield () => inner1) as Awaited; + const result2 = (yield) as any; + const inner3 = scheduler.queueTask(inner3Builder); + const result3 = (yield () => inner3) as Awaited; + return result1 + (result2 ?? "") + result3; + }, + }); + + t.is(await outer, "foobar"); +}); + +test("Tasks receive the result of yielded tasks, part 3", async (t) => { + const scheduler = new TaskScheduler(); + scheduler.start(); + + const innerBuilder: TaskBuilder = { + priority: TaskPriority.High, + task: async function*() { + yield; + throw new Error("foo"); + }, + }; + + const outer = scheduler.queueTask({ + priority: TaskPriority.Normal, + task: async function*() { + const inner = scheduler.queueTask(innerBuilder); + try { + const ret = (yield () => inner) as any; + return ret; + } catch (e) { + return e; + } + }, + }); + + const result = await outer; + t.true(result instanceof Error); + t.is(result.message, "foo"); +}); + test("Tasks can queue lower-priority tasks without waiting for them", async (t) => { const scheduler = new TaskScheduler(); scheduler.start(); @@ -1072,3 +1165,135 @@ test("The task rejection uses the given error, if any", async (t) => { t.deepEqual(order, ["1a", "1c"]); }); + +test("Canceling nested tasks works", async (t) => { + const scheduler = new TaskScheduler(); + scheduler.start(); + + const order: string[] = []; + const yieldedPromise = createDeferredPromise(); + + const innerBuilder: TaskBuilder = { + priority: TaskPriority.High, + task: async function*() { + order.push("2a"); + yield () => yieldedPromise; + order.push("2b"); + }, + }; + + const outer = scheduler.queueTask({ + priority: TaskPriority.Normal, + task: async function*() { + order.push("1a"); + const inner = scheduler.queueTask(innerBuilder); + yield () => inner; + order.push("1b"); + }, + }).catch(noop); + + // Wait long enough that the task is definitely waiting for the promise + await wait(50); + t.deepEqual(order, ["1a", "2a"]); + + // Cancel all tasks + await scheduler.removeTasks(() => true); + + t.deepEqual(order, ["1a", "2a"]); +}); + +test("Canceling nested tasks works, part 2", async (t) => { + const scheduler = new TaskScheduler(); + scheduler.start(); + + const yieldedPromise = createDeferredPromise(); + + const innerBuilder: TaskBuilder = { + priority: TaskPriority.High, + name: "inner", + task: async function*() { + yield () => yieldedPromise; + }, + }; + + const outer = scheduler.queueTask({ + priority: TaskPriority.Normal, + task: async function*() { + const inner = scheduler.queueTask(innerBuilder); + try { + yield () => inner; + } catch (e) { + return "canceled"; + } + }, + }); + + // Wait long enough that the task is definitely waiting for the promise + await wait(10); + + // Cancel all tasks + await scheduler.removeTasks((t) => t.name === "inner"); + + t.is(await outer, "canceled"); +}); + +test("Splitting tasks into multiple generator functions works", async (t) => { + const scheduler = new TaskScheduler(); + scheduler.start(); + + const order: string[] = []; + + const task1 = scheduler.queueTask({ + priority: TaskPriority.Normal, + task: async function*() { + order.push("1a"); + + async function* inner() { + order.push("1b"); + yield; + order.push("1c"); + } + + yield* inner(); + yield; + order.push("1d"); + }, + }); + + await task1; + + t.deepEqual(order, ["1a", "1b", "1c", "1d"]); +}); + +test("Split tasks can be canceled", async (t) => { + const scheduler = new TaskScheduler(); + scheduler.start(); + + const order: string[] = []; + const yieldedPromise = createDeferredPromise(); + + const task1 = scheduler.queueTask({ + priority: TaskPriority.Normal, + task: async function*() { + order.push("1a"); + + async function* inner() { + order.push("1b"); + yield () => yieldedPromise; + order.push("1c"); + } + + yield* inner(); + yield; + order.push("1d"); + }, + }).catch(noop); + + // Wait long enough that the task is definitely waiting for the promise + await wait(10); + + // Cancel all tasks + await scheduler.removeTasks(() => true); + + t.deepEqual(order, ["1a", "1b"]); +}); diff --git a/packages/zwave-js/src/lib/driver/Task.ts b/packages/zwave-js/src/lib/driver/Task.ts index a337c429b6bd..49bcd7c081dc 100644 --- a/packages/zwave-js/src/lib/driver/Task.ts +++ b/packages/zwave-js/src/lib/driver/Task.ts @@ -1,5 +1,5 @@ import { ZWaveError, ZWaveErrorCodes, highResTimestamp } from "@zwave-js/core"; -import { createWrappingCounter } from "@zwave-js/shared"; +import { createWrappingCounter, noop } from "@zwave-js/shared"; import { type CompareResult } from "alcalzone-shared/comparable"; import { type DeferredPromise, @@ -15,6 +15,8 @@ export interface Task { /** A name to identify the task */ readonly name?: string; + /** A tag to identify the task programmatically */ + readonly tag?: TaskTag; /** The task's priority */ readonly priority: TaskPriority; /** How the task should behave when interrupted */ @@ -35,9 +37,11 @@ export interface Task { } /** Defines the necessary information for creating a task */ -export interface TaskBuilder { +export interface TaskBuilder { /** A name to identify the task */ name?: string; + /** A tag to identify the task programmatically */ + tag?: TaskTag; /** The task's priority */ priority: TaskPriority; /** How the task should behave when interrupted */ @@ -48,9 +52,9 @@ export interface TaskBuilder { * When the task wants to wait for something, it must yield a function returning a Promise that resolves when it can continue. */ task: () => AsyncGenerator< - (() => Promise) | undefined, + (() => Promise) | undefined, TReturn, - void + TInner >; /** A cleanup function that gets called when the task is dropped */ cleanup?: () => Promise; @@ -63,14 +67,16 @@ export interface TaskBuilder { * The recommended priority for application-initiated communication is `Normal`. * `Low` and `Lower` are recommended for internal long-running tasks that should not interfere with user-initiated tasks. * `Idle` is recommended for tasks that should only run when no other tasks are pending. + * + * When nesting multiple levels of tasks, the "inner" tasks should decrement the priority by 1. */ export enum TaskPriority { Highest, - High, - Normal, - Low, - Lower, - Idle, + High = 10, + Normal = 20, + Low = 30, + Lower = 40, + Idle = 50, } export enum TaskState { @@ -126,6 +132,22 @@ function compareTasks(a: Task, b: Task): CompareResult { return Math.sign(b.id - a.id) as CompareResult; } +export type TaskTag = + | { + // Rebuild routes for all nodes + id: "rebuild-routes"; + } + | { + // Rebuild routes for a single node + id: "rebuild-node-routes"; + nodeId: number; + } + | { + // Rebuild routes for a single node -> wait for node to wake up + id: "rebuild-node-routes-wakeup"; + nodeId: number; + }; + export class TaskScheduler { private _tasks = new SortedList>(undefined, compareTasks); private _currentTask: Task | undefined; @@ -142,10 +164,11 @@ export class TaskScheduler { return task.promise; } + /** Removes/stops tasks matching the given predicate. Returns `true` when a task was removed, `false` otherwise. */ public async removeTasks( predicate: (task: Task) => boolean, reason?: ZWaveError, - ): Promise { + ): Promise { // Collect tasks that should be removed, but in reverse order, // so that we handle the current task last. const tasksToRemove: Task[] = []; @@ -172,19 +195,29 @@ export class TaskScheduler { || task.state === TaskState.Waiting ) { // The task is running, clean it up - await task.reset(); + await task.reset().catch(noop); } task.reject(reason); } if (removeCurrentTask && this._currentTask) { this._tasks.remove(this._currentTask); - await this._currentTask.reset(); + await this._currentTask.reset().catch(noop); this._currentTask.reject(reason); this._currentTask = undefined; } if (this._continueSignal) this._continueSignal.resolve(); + + return tasksToRemove.length > 0 || removeCurrentTask; + } + + public findTask( + predicate: (task: Task) => boolean, + ): Promise | undefined { + return this._tasks.find((t: any) => predicate(t))?.promise as + | Promise + | undefined; } /** Creates a task that can be executed */ @@ -194,11 +227,15 @@ export class TaskScheduler { let waitFor: Promise | undefined; const promise = createDeferredPromise(); + let prevResult: unknown; + let waitError: unknown; + return { id: this._idGenerator(), timestamp: highResTimestamp(), builder, name: builder.name, + tag: builder.tag, priority: builder.priority, interrupt: builder.interrupt ?? TaskInterruptBehavior.Resume, promise, @@ -214,7 +251,11 @@ export class TaskScheduler { generator ??= builder.task(); state = TaskState.Active; - const { value, done } = await generator.next(); + const { value, done } = waitError + ? await generator.throw(waitError) + : await generator.next(prevResult); + prevResult = undefined; + waitError = undefined; if (done) { state = TaskState.Done; return { @@ -223,7 +264,11 @@ export class TaskScheduler { }; } else if (typeof value === "function") { state = TaskState.Waiting; - waitFor = value().then(() => { + waitFor = value().then((result) => { + prevResult = result; + }).catch((e) => { + waitError = e; + }).finally(() => { waitFor = undefined; if (state === TaskState.Waiting) { state = TaskState.Active;