From 6c10570399704592157b56f04a7331f6696712d0 Mon Sep 17 00:00:00 2001 From: AlCalzone Date: Mon, 9 Dec 2024 11:40:11 +0100 Subject: [PATCH 01/12] refactor: replace `xstate` with a simple built-in state machine (#7460) --- .github/dependabot.yml | 2 - packages/cc/src/cc/TransportServiceCC.ts | 28 +- packages/core/src/fsm/FSM.ts | 88 +++ packages/core/src/index.ts | 1 + packages/zwave-js/package.json | 4 +- packages/zwave-js/src/lib/driver/Driver.ts | 642 +++++++++++------- .../driver/SerialAPICommandMachine.test.ts | 509 -------------- .../src/lib/driver/SerialAPICommandMachine.ts | 617 +++++------------ .../src/lib/driver/StateMachineShared.ts | 98 +-- .../src/lib/driver/TransportServiceMachine.ts | 288 +++----- packages/zwave-js/src/lib/node/Node.ts | 12 +- .../src/lib/node/NodeReadyMachine.test.ts | 60 +- .../zwave-js/src/lib/node/NodeReadyMachine.ts | 138 ++-- .../src/lib/node/NodeStatusMachine.test.ts | 63 +- .../src/lib/node/NodeStatusMachine.ts | 172 +++-- .../zwave-js/src/lib/node/mixins/20_Status.ts | 86 ++- .../lib/test/driver/transportService.test.ts | 409 +++++++++++ yarn.lock | 30 - 18 files changed, 1424 insertions(+), 1823 deletions(-) create mode 100644 packages/core/src/fsm/FSM.ts delete mode 100644 packages/zwave-js/src/lib/driver/SerialAPICommandMachine.test.ts create mode 100644 packages/zwave-js/src/lib/test/driver/transportService.test.ts diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 45d3e0c980a1..2e32508f9cb6 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -18,8 +18,6 @@ updates: # so only apply patch and minor updates automatically - dependency-name: '@types/node' update-types: ['version-update:semver-major'] - # xstate usually requires manual intervention - - dependency-name: 'xstate' - package-ecosystem: github-actions directory: '/' diff --git a/packages/cc/src/cc/TransportServiceCC.ts b/packages/cc/src/cc/TransportServiceCC.ts index c142e676ff89..6c27a8e8bc6a 100644 --- a/packages/cc/src/cc/TransportServiceCC.ts +++ b/packages/cc/src/cc/TransportServiceCC.ts @@ -15,15 +15,10 @@ import type { } from "@zwave-js/host/safe"; import { Bytes } from "@zwave-js/shared/safe"; import { buffer2hex } from "@zwave-js/shared/safe"; -import { - type CCRaw, - type CCResponseRole, - CommandClass, -} from "../lib/CommandClass.js"; +import { type CCRaw, CommandClass } from "../lib/CommandClass.js"; import { CCCommand, commandClass, - expectedCCResponse, implementedVersion, } from "../lib/CommandClassDecorators.js"; import { TransportServiceCommand } from "../lib/_Types.js"; @@ -81,7 +76,7 @@ export function isTransportServiceEncapsulation( } @CCCommand(TransportServiceCommand.FirstSegment) -// @expectedCCResponse(TransportServiceCCReport) +// Handling expected responses is done by the RX state machine export class TransportServiceCCFirstSegment extends TransportServiceCC { public constructor( options: WithAddress, @@ -227,7 +222,7 @@ export interface TransportServiceCCSubsequentSegmentOptions } @CCCommand(TransportServiceCommand.SubsequentSegment) -// @expectedCCResponse(TransportServiceCCReport) +// Handling expected responses is done by the RX state machine export class TransportServiceCCSubsequentSegment extends TransportServiceCC { public constructor( options: WithAddress, @@ -465,23 +460,8 @@ export interface TransportServiceCCSegmentRequestOptions { datagramOffset: number; } -function testResponseForSegmentRequest( - sent: TransportServiceCCSegmentRequest, - received: TransportServiceCC, -): CCResponseRole { - return ( - (sent.datagramOffset === 0 - && received instanceof TransportServiceCCFirstSegment - && received.sessionId === sent.sessionId) - || (sent.datagramOffset > 0 - && received instanceof TransportServiceCCSubsequentSegment - && sent.datagramOffset === received.datagramOffset - && received.sessionId === sent.sessionId) - ); -} - @CCCommand(TransportServiceCommand.SegmentRequest) -@expectedCCResponse(TransportServiceCC, testResponseForSegmentRequest) +// Handling expected responses is done by the RX state machine export class TransportServiceCCSegmentRequest extends TransportServiceCC { public constructor( options: WithAddress, diff --git a/packages/core/src/fsm/FSM.ts b/packages/core/src/fsm/FSM.ts new file mode 100644 index 000000000000..82b40b5bee7e --- /dev/null +++ b/packages/core/src/fsm/FSM.ts @@ -0,0 +1,88 @@ +export interface StateMachineTransition< + State extends StateMachineState, + Effect = undefined, +> { + effect?: Effect; + newState: State; +} + +export interface StateMachineState { + value: number | string; + done?: boolean; +} + +export interface StateMachineInput { + value: number | string; +} + +export type StateMachineTransitionMap< + State extends StateMachineState, + Input extends StateMachineInput, + Effect = undefined, +> = ( + state: State, +) => ( + input: Input, +) => StateMachineTransition | undefined; + +export type InferStateMachineTransitions< + T extends StateMachine, +> = T extends StateMachine + ? StateMachineTransitionMap + : never; + +export class StateMachine< + State extends StateMachineState, + Input extends StateMachineInput, + Effect = undefined, +> { + public constructor( + initialState: State, + transitions: StateMachineTransitionMap< + State, + Input, + Effect | undefined + >, + ) { + this._initial = this._state = initialState; + this.transitions = transitions; + } + + protected transitions: StateMachineTransitionMap< + State, + Input, + Effect | undefined + >; + + /** Restarts the machine from the initial state */ + public restart(): void { + this._state = this._initial; + } + + /** Determines the next transition to take */ + public next( + input: Input, + ): StateMachineTransition | undefined { + if (this._state.done) return; + return this.transitions(this._state)(input); + } + + /** Transitions the machine to the next state. This does not execute effects */ + public transition(next?: State): void { + // Allow some convenience by passing the transition's next state directly + if (next == undefined) return; + this._state = next; + } + + private _initial: State; + private _state: State; + /** Returns the current state of the state machine */ + public get state(): State { + return this._state; + } + + /** Returns whether this state machine is done */ + public get done(): boolean { + return !!this._state.done; + } +} diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 668462c617b7..04e735becd0a 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -3,6 +3,7 @@ export * from "./crypto/index.node.js"; export * from "./definitions/index.js"; export * from "./dsk/index.js"; export * from "./error/ZWaveError.js"; +export * from "./fsm/FSM.js"; export * from "./log/Controller.js"; export * from "./log/shared.js"; export * from "./log/shared_safe.js"; diff --git a/packages/zwave-js/package.json b/packages/zwave-js/package.json index 2a54691272b2..6daab6362130 100644 --- a/packages/zwave-js/package.json +++ b/packages/zwave-js/package.json @@ -118,8 +118,7 @@ "semver": "^7.6.3", "serialport": "^12.0.0", "source-map-support": "^0.5.21", - "winston": "^3.15.0", - "xstate": "4.38.3" + "winston": "^3.15.0" }, "devDependencies": { "@alcalzone/esm2cjs": "^1.4.1", @@ -130,7 +129,6 @@ "@types/semver": "^7.5.8", "@types/sinon": "^17.0.3", "@types/source-map-support": "^0.5.10", - "@xstate/test": "^0.5.1", "@zwave-js/maintenance": "workspace:*", "@zwave-js/transformers": "workspace:*", "del-cli": "^6.0.0", diff --git a/packages/zwave-js/src/lib/driver/Driver.ts b/packages/zwave-js/src/lib/driver/Driver.ts index db86a9b6dc48..e5e06345cca9 100644 --- a/packages/zwave-js/src/lib/driver/Driver.ts +++ b/packages/zwave-js/src/lib/driver/Driver.ts @@ -90,7 +90,6 @@ import { extractRawECDHPrivateKeySync, generateECDHKeyPairSync, getCCName, - highResTimestamp, isEncapsulationCC, isLongRangeNodeId, isMissingControllerACK, @@ -191,7 +190,6 @@ import path from "node:path"; import { URL } from "node:url"; import * as util from "node:util"; import { SerialPort } from "serialport"; -import { InterpreterStatus, interpret } from "xstate"; import { ZWaveController } from "../controller/Controller.js"; import { InclusionState, RemoveNodeReason } from "../controller/Inclusion.js"; import { DriverLogger } from "../log/Driver.js"; @@ -237,8 +235,7 @@ import { } from "./NetworkCache.js"; import { type SerialAPIQueueItem, TransactionQueue } from "./Queue.js"; import { - type SerialAPICommandDoneData, - type SerialAPICommandInterpreter, + type SerialAPICommandMachineInput, createSerialAPICommandMachine, } from "./SerialAPICommandMachine.js"; import { @@ -251,7 +248,8 @@ import { TaskScheduler } from "./Task.js"; import { throttlePresets } from "./ThrottlePresets.js"; import { Transaction } from "./Transaction.js"; import { - type TransportServiceRXInterpreter, + type TransportServiceRXMachine, + type TransportServiceRXMachineInput, createTransportServiceRXMachine, } from "./TransportServiceMachine.js"; import { checkForConfigUpdates, installConfigUpdate } from "./UpdateConfig.js"; @@ -562,7 +560,8 @@ export type AwaitedBootloaderChunkEntry = AwaitedThing; interface TransportServiceSession { fragmentSize: number; - interpreter: TransportServiceRXInterpreter; + machine: TransportServiceRXMachine; + timeout?: NodeJS.Timeout; } interface Sessions { @@ -811,8 +810,8 @@ export class Driver extends TypedEventTarget } private queuePaused = false; - /** The interpreter for the currently active Serial API command */ - private serialAPIInterpreter: SerialAPICommandInterpreter | undefined; + /** Used to immediately abort ongoing Serial API commands */ + private abortSerialAPICommand: DeferredPromise | undefined; // Keep track of which queues are currently busy private _queuesBusyFlags = 0; @@ -3413,9 +3412,7 @@ export class Driver extends TypedEventTarget for (const queue of this.queues) { queue.abort(); } - if (this.serialAPIInterpreter?.status === InterpreterStatus.Running) { - this.serialAPIInterpreter.stop(); - } + if (this.serial != undefined) { // Avoid spewing errors if the port was in the middle of receiving something this.serial.removeAllListeners(); @@ -3496,36 +3493,49 @@ export class Driver extends TypedEventTarget ): Promise { if (typeof data === "number") { switch (data) { - // single-byte messages - just forward them to the send thread - case MessageHeaders.ACK: { - if ( - this.serialAPIInterpreter?.status - === InterpreterStatus.Running - ) { - this.serialAPIInterpreter.send("ACK"); - } - return; - } - case MessageHeaders.NAK: { - if ( - this.serialAPIInterpreter?.status - === InterpreterStatus.Running - ) { - this.serialAPIInterpreter.send("NAK"); - } - this._controller?.incrementStatistics("NAK"); - return; - } + case MessageHeaders.ACK: + case MessageHeaders.NAK: case MessageHeaders.CAN: { - if ( - this.serialAPIInterpreter?.status - === InterpreterStatus.Running - ) { - this.serialAPIInterpreter.send("CAN"); + // check if someone is waiting for this + for (const entry of this.awaitedMessageHeaders) { + if (entry.predicate(data)) { + entry.handler(data); + break; + } } - this._controller?.incrementStatistics("CAN"); return; } + + // // single-byte messages - just forward them to the send thread + // case MessageHeaders.ACK: { + // if ( + // this.serialAPIInterpreter?.status + // === InterpreterStatus.Running + // ) { + // this.serialAPIInterpreter.send("ACK"); + // } + // return; + // } + // case MessageHeaders.NAK: { + // if ( + // this.serialAPIInterpreter?.status + // === InterpreterStatus.Running + // ) { + // this.serialAPIInterpreter.send("NAK"); + // } + // this._controller?.incrementStatistics("NAK"); + // return; + // } + // case MessageHeaders.CAN: { + // if ( + // this.serialAPIInterpreter?.status + // === InterpreterStatus.Running + // ) { + // this.serialAPIInterpreter.send("CAN"); + // } + // this._controller?.incrementStatistics("CAN"); + // return; + // } } } @@ -3747,17 +3757,17 @@ export class Driver extends TypedEventTarget } } - // Check if this message is unsolicited by passing it to the Serial API command interpreter if possible - if ( - this.serialAPIInterpreter?.status === InterpreterStatus.Running - ) { - this.serialAPIInterpreter.send({ - type: "message", - message: msg, - }); - } else { - void this.handleUnsolicitedMessage(msg); - } + // // Check if this message is unsolicited by passing it to the Serial API command interpreter if possible + // if ( + // this.serialAPIInterpreter?.status === InterpreterStatus.Running + // ) { + // this.serialAPIInterpreter.send({ + // type: "message", + // message: msg, + // }); + // } else { + void this.handleUnsolicitedMessage(msg); + // } } } @@ -4571,18 +4581,103 @@ export class Driver extends TypedEventTarget | TransportServiceCCSubsequentSegment, ): Promise { const nodeSessions = this.ensureNodeSessions(command.nodeId); - // If this command belongs to an existing session, just forward it to the state machine - const transportSession = nodeSessions.transportService.get( - command.sessionId, - ); + + // TODO: Figure out how to know which timeout is the correct one. For now use the larger one + const missingSegmentTimeout = + TransportServiceTimeouts.requestMissingSegmentR2; + + const advanceTransportServiceSession = async ( + session: TransportServiceSession, + input: TransportServiceRXMachineInput, + ): Promise => { + const machine = session.machine; + + // Figure out what needs to be done for this input + const transition = machine.next(input); + if (transition) { + machine.transition(transition.newState); + + if (machine.state.value === "receive") { + // We received a segment in the normal flow. Restart the timer + startMissingSegmentTimeout(session); + } else if (machine.state.value === "requestMissing") { + // A segment is missing. Request it and restart the timeout + this.controllerLog.logNode(command.nodeId, { + message: + `Transport Service RX session #${command.sessionId}: Segment with offset ${machine.state.offset} missing - requesting it...`, + level: "debug", + direction: "outbound", + }); + const cc = new TransportServiceCCSegmentRequest({ + nodeId: command.nodeId, + sessionId: command.sessionId, + datagramOffset: machine.state.offset, + }); + await this.sendCommand(cc, { + maxSendAttempts: 1, + priority: MessagePriority.Immediate, + }); + + startMissingSegmentTimeout(session); + } else if (machine.state.value === "failure") { + this.controllerLog.logNode(command.nodeId, { + message: + `Transport Service RX session #${command.sessionId} failed`, + level: "error", + direction: "none", + }); + // TODO: Update statistics + nodeSessions.transportService.delete(command.sessionId); + if (session.timeout) { + clearTimeout(session.timeout); + } + } + } + + if (machine.state.value === "success") { + // This state may happen without a transition if we received the last segment before + // but the SegmentComplete message got lost + this.controllerLog.logNode(command.nodeId, { + message: + `Transport Service RX session #${command.sessionId} complete`, + level: "debug", + direction: "inbound", + }); + if (session.timeout) { + clearTimeout(session.timeout); + } + + const cc = new TransportServiceCCSegmentComplete({ + nodeId: command.nodeId, + sessionId: command.sessionId, + }); + await this.sendCommand(cc, { + maxSendAttempts: 1, + priority: MessagePriority.Immediate, + }).catch(noop); + } + }; + + function startMissingSegmentTimeout( + session: TransportServiceSession, + ): void { + if (session.timeout) { + clearTimeout(session.timeout); + } + + session.timeout = setTimeout(() => { + session.timeout = undefined; + void advanceTransportServiceSession(session, { + value: "timeout", + }); + }, missingSegmentTimeout); + } + if (command instanceof TransportServiceCCFirstSegment) { // This is the first message in a sequence. Create or re-initialize the session // We don't delete finished sessions when the last message is received in order to be able to // handle when the SegmentComplete message gets lost. As soon as the node initializes a new session, // we do know that the previous one is finished. - if (transportSession) { - transportSession.interpreter.stop(); - } nodeSessions.transportService.clear(); this.controllerLog.logNode(command.nodeId, { @@ -4592,75 +4687,27 @@ export class Driver extends TypedEventTarget direction: "inbound", }); - const RXStateMachine = createTransportServiceRXMachine( - { - requestMissingSegment: async (offset: number) => { - this.controllerLog.logNode(command.nodeId, { - message: - `Transport Service RX session #${command.sessionId}: Segment with offset ${offset} missing - requesting it...`, - level: "debug", - direction: "outbound", - }); - const cc = new TransportServiceCCSegmentRequest({ - nodeId: command.nodeId, - sessionId: command.sessionId, - datagramOffset: offset, - }); - await this.sendCommand(cc, { - maxSendAttempts: 1, - priority: MessagePriority.Immediate, - }); - }, - sendSegmentsComplete: async () => { - this.controllerLog.logNode(command.nodeId, { - message: - `Transport Service RX session #${command.sessionId} complete`, - level: "debug", - direction: "outbound", - }); - const cc = new TransportServiceCCSegmentComplete({ - nodeId: command.nodeId, - sessionId: command.sessionId, - }); - await this.sendCommand(cc, { - maxSendAttempts: 1, - priority: MessagePriority.Immediate, - }); - }, - }, - { - // TODO: Figure out how to know which timeout is the correct one. For now use the larger one - missingSegmentTimeout: - TransportServiceTimeouts.requestMissingSegmentR2, - datagramSize: command.datagramSize, - firstSegmentSize: command.partialDatagram.length, - }, + const machine = createTransportServiceRXMachine( + command.datagramSize, + command.partialDatagram.length, ); - const interpreter = interpret(RXStateMachine).start(); - nodeSessions.transportService.set(command.sessionId, { + const session: TransportServiceSession = { fragmentSize: command.partialDatagram.length, - interpreter, - }); + machine, + }; + nodeSessions.transportService.set(command.sessionId, session); - interpreter.onTransition((state) => { - if (state.changed && state.value === "failure") { - this.controllerLog.logNode(command.nodeId, { - message: - `Transport Service RX session #${command.sessionId} failed`, - level: "error", - direction: "none", - }); - // TODO: Update statistics - interpreter.stop(); - nodeSessions.transportService.delete(command.sessionId); - } - }); + // Time out waiting for subsequent segments + startMissingSegmentTimeout(session); } else { - // This is a subsequent message in a sequence. Just forward it to the state machine if there is one + // This is a subsequent message in a sequence. Continue executing the state machine + const transportSession = nodeSessions.transportService.get( + command.sessionId, + ); if (transportSession) { - transportSession.interpreter.send({ - type: "segment", + await advanceTransportServiceSession(transportSession, { + value: "segment", offset: command.datagramOffset, length: command.partialDatagram.length, }); @@ -4683,26 +4730,26 @@ export class Driver extends TypedEventTarget * @param msg The decoded message */ private async handleUnsolicitedMessage(msg: Message): Promise { - if (msg.type === MessageType.Request) { - // This is a request we might have registered handlers for - try { + // FIXME: Rename this - msg might not be unsolicited + // This is a message we might have registered handlers for + try { + if (msg.type === MessageType.Request) { await this.handleRequest(msg); - } catch (e) { - if ( - isZWaveError(e) - && e.code === ZWaveErrorCodes.Driver_NotReady - ) { - this.driverLog.print( - `Cannot handle message because the driver is not ready to handle it yet.`, - "warn", - ); - } else { - throw e; - } + } else { + await this.handleResponse(msg); + } + } catch (e) { + if ( + isZWaveError(e) + && e.code === ZWaveErrorCodes.Driver_NotReady + ) { + this.driverLog.print( + `Cannot handle message because the driver is not ready to handle it yet.`, + "warn", + ); + } else { + throw e; } - } else { - this.driverLog.transactionResponse(msg, undefined, "unexpected"); - this.driverLog.print("unexpected response, discarding...", "warn"); } } @@ -4730,19 +4777,13 @@ export class Driver extends TypedEventTarget ); // In this situation, we may be executing a Serial API command, which will never complete. - // Bail out of it, without rejecting the actual transaction - if ( - this.serialAPIInterpreter?.status - === InterpreterStatus.Running - ) { - this.serialAPIInterpreter.stop(); - } - if (this._currentSerialAPICommandPromise) { + // Abort it, so it can be retried + if (this.abortSerialAPICommand) { this.controllerLog.print( `Currently active command will be retried...`, "warn", ); - this._currentSerialAPICommandPromise.reject( + this.abortSerialAPICommand.reject( new ZWaveError( "The Serial API restarted unexpectedly", ZWaveErrorCodes.Controller_Reset, @@ -5022,6 +5063,25 @@ ${handlers.length} left`, return false; } + /** + * Is called when a Response-type message was received + */ + private handleResponse(msg: Message): Promise { + // Check if we have a dynamic handler waiting for this message + for (const entry of this.awaitedMessages) { + if (entry.predicate(msg)) { + // We do + entry.handler(msg); + return Promise.resolve(); + } + } + + this.driverLog.transactionResponse(msg, undefined, "unexpected"); + this.driverLog.print("unexpected response, discarding...", "warn"); + + return Promise.resolve(); + } + /** * Is called when a Request-type message was received */ @@ -5849,72 +5909,6 @@ ${handlers.length} left`, msg.callbackId = this.getNextCallbackId(); } - const machine = createSerialAPICommandMachine( - msg, - await msg.serializeAsync(this.getEncodingContext()), - { - sendData: (data) => this.writeSerial(data), - sendDataAbort: () => this.abortSendData(), - notifyUnsolicited: (msg) => { - void this.handleUnsolicitedMessage(msg); - }, - notifyRetry: ( - lastError, - message, - attempts, - maxAttempts, - delay, - ) => { - // Translate the error into a better one - let errorReason: string; - switch (lastError) { - case "response timeout": - errorReason = "No response from controller"; - this._controller?.incrementStatistics( - "timeoutResponse", - ); - break; - case "callback timeout": - errorReason = "No callback from controller"; - this._controller?.incrementStatistics( - "timeoutCallback", - ); - break; - case "response NOK": - errorReason = - "The controller response indicated failure"; - break; - case "callback NOK": - errorReason = - "The controller callback indicated failure"; - break; - case "ACK timeout": - this._controller?.incrementStatistics("timeoutACK"); - // fall through - case "CAN": - case "NAK": - default: - errorReason = - "Failed to execute controller command"; - break; - } - this.controllerLog.print( - `${errorReason} after ${attempts}/${maxAttempts} attempts. Scheduling next try in ${delay} ms.`, - "warn", - ); - }, - timestamp: highResTimestamp, - logOutgoingMessage: (msg: Message) => { - this.driverLog.logMessage(msg, { - direction: "outbound", - }); - }, - }, - pick(this._options, ["timeouts", "attempts"]), - ); - - const result = createDeferredPromise(); - // Work around an issue in the 700 series firmware where the ACK after a soft-reset has a random high nibble. // This was broken in 7.19, not fixed so far if ( @@ -5924,46 +5918,190 @@ ${handlers.length} left`, this.serial?.ignoreAckHighNibbleOnce(); } - this.serialAPIInterpreter = interpret(machine).onDone((evt) => { - this.serialAPIInterpreter?.stop(); - this.serialAPIInterpreter = undefined; + const machine = createSerialAPICommandMachine(msg); + this.abortSerialAPICommand = createDeferredPromise(); + const abortController = new AbortController(); - const cmdResult = evt.data as SerialAPICommandDoneData; - if (cmdResult.type === "success") { - result.resolve(cmdResult.result); - } else if ( - cmdResult.reason === "callback NOK" - && (isSendData(msg) || isTransmitReport(cmdResult.result)) - ) { - // For messages that were sent to a node, a NOK callback still contains useful info we need to evaluate - // ... so we treat it as a result - result.resolve(cmdResult.result); - } else { - // Convert to a Z-Wave error - result.reject( - serialAPICommandErrorToZWaveError( - cmdResult.reason, - msg, - cmdResult.result, - transactionSource, - ), - ); - } - }); + let nextInput: SerialAPICommandMachineInput | undefined = { + value: "start", + }; - // Uncomment this for debugging state machine transitions - // this.serialAPIInterpreter.onTransition((state) => { - // if (state.changed) { - // this.driverLog.print( - // `CMDMACHINE: ${JSON.stringify(state.toStrings())}`, - // ); - // } - // }); + try { + while (!machine.done) { + if (nextInput == undefined) { + // We should not be in a situation where we have no input for the state machine + throw new Error( + "Serial API Command machine is in an invalid state: no input provided", + ); + } + const transition = machine.next(nextInput); + if (transition == undefined) { + // We should not be in a situation where the state machine does not transition + throw new Error( + "Serial API Command machine is in an invalid state: no transition taken", + ); + } - this.serialAPIInterpreter.start(); + // The input was used + nextInput = undefined; - this._currentSerialAPICommandPromise = result; - return result; + // Transition to the new state + machine.transition(transition.newState); + + // Now check what needs to be done in the new state + switch (machine.state.value) { + case "initial": + // This should never happen + throw new Error( + "Serial API Command machine is in an invalid state: transitioned to initial state", + ); + + case "sending": { + this.driverLog.logMessage(msg, { + direction: "outbound", + }); + + // Mark the message as sent immediately before actually sending + msg.markAsSent(); + const data = await msg.serializeAsync( + this.getEncodingContext(), + ); + await this.writeSerial(data); + nextInput = { value: "message sent" }; + break; + } + + case "waitingForACK": { + const controlFlow = await this.waitForMessageHeader( + () => true, + this.options.timeouts.ack, + ).catch(() => "timeout" as const); + + if (controlFlow === "timeout") { + nextInput = { value: "timeout" }; + } else if (controlFlow === MessageHeaders.ACK) { + nextInput = { value: "ACK" }; + } else if (controlFlow === MessageHeaders.CAN) { + nextInput = { value: "CAN" }; + } else if (controlFlow === MessageHeaders.NAK) { + nextInput = { value: "NAK" }; + } + + break; + } + + case "waitingForResponse": { + const response = await Promise.race([ + this.abortSerialAPICommand?.catch((e) => + e as Error + ), + this.waitForMessage( + (resp) => msg.isExpectedResponse(resp), + msg.getResponseTimeout() + ?? this.options.timeouts.response, + undefined, + abortController.signal, + ).catch(() => "timeout" as const), + ]); + + if (response instanceof Error) { + // The command was aborted from the outside + // Remove the pending wait entry + abortController.abort(); + throw response; + } + + if (response === "timeout") { + if (isSendData(msg)) { + // Timed out SendData commands must be aborted + void this.abortSendData(); + } + + nextInput = { value: "timeout" }; + } else if ( + isSuccessIndicator(response) && !response.isOK() + ) { + nextInput = { value: "response NOK", response }; + } else { + nextInput = { value: "response", response }; + } + + break; + } + + case "waitingForCallback": { + let sendDataAbortTimeout: NodeJS.Timeout | undefined; + if (isSendData(msg)) { + // We abort timed out SendData commands before the callback times out + sendDataAbortTimeout = setTimeout(() => { + void this.abortSendData(); + }, this.options.timeouts.sendDataAbort).unref(); + } + + const callback = await Promise.race([ + this.abortSerialAPICommand?.catch((e) => + e as Error + ), + this.waitForMessage( + (resp) => msg.isExpectedCallback(resp), + msg.getCallbackTimeout() + ?? this.options.timeouts.sendDataCallback, + undefined, + abortController.signal, + ).catch(() => "timeout" as const), + ]); + + if (sendDataAbortTimeout) { + clearTimeout(sendDataAbortTimeout); + } + + if (callback instanceof Error) { + // The command was aborted from the outside + // Remove the pending wait entry + abortController.abort(); + throw callback; + } + + if (callback === "timeout") { + nextInput = { value: "timeout" }; + } else if ( + isSuccessIndicator(callback) && !callback.isOK() + ) { + nextInput = { value: "callback NOK", callback }; + } else { + nextInput = { value: "callback", callback }; + } + + break; + } + + case "success": { + return machine.state.result; + } + + case "failure": { + const { reason, result } = machine.state; + if ( + reason === "callback NOK" + && (isSendData(msg) || isTransmitReport(result)) + ) { + // For messages that were sent to a node, a NOK callback still contains useful info we need to evaluate + // ... so we treat it as a result + return result; + } else { + throw serialAPICommandErrorToZWaveError( + reason, + msg, + result, + transactionSource, + ); + } + } + } + } + } finally { + this.abortSerialAPICommand = undefined; + } } private getQueueForTransaction(t: Transaction): TransactionQueue { @@ -6497,6 +6635,7 @@ ${handlers.length} left`, predicate: (msg: Message) => boolean, timeout: number, refreshPredicate?: (msg: Message) => boolean, + abortSignal?: AbortSignal, ): Promise { return new Promise((resolve, reject) => { const promise = createDeferredPromise(); @@ -6527,6 +6666,10 @@ ${handlers.length} left`, removeEntry(); resolve(cc as T); }); + // When the abort signal is used, silently remove the wait entry + abortSignal?.addEventListener("abort", () => { + removeEntry(); + }); }); } @@ -6538,6 +6681,7 @@ ${handlers.length} left`, public waitForCommand( predicate: (cc: CCId) => boolean, timeout: number, + abortSignal?: AbortSignal, ): Promise { return new Promise((resolve, reject) => { const promise = createDeferredPromise(); @@ -6567,6 +6711,10 @@ ${handlers.length} left`, removeEntry(); resolve(cc as T); }); + // When the abort signal is used, silently remove the wait entry + abortSignal?.addEventListener("abort", () => { + removeEntry(); + }); }); } diff --git a/packages/zwave-js/src/lib/driver/SerialAPICommandMachine.test.ts b/packages/zwave-js/src/lib/driver/SerialAPICommandMachine.test.ts deleted file mode 100644 index fad0f9b29265..000000000000 --- a/packages/zwave-js/src/lib/driver/SerialAPICommandMachine.test.ts +++ /dev/null @@ -1,509 +0,0 @@ -import { createModel } from "@xstate/test"; -import type { Message } from "@zwave-js/serial"; -import { - type DeferredPromise, - createDeferredPromise, -} from "alcalzone-shared/deferred-promise"; -import sinon from "sinon"; -import { type ExpectStatic, afterAll, beforeAll, test, vi } from "vitest"; -import { Machine, type State, assign, interpret } from "xstate"; -import { - dummyCallbackNOK, - dummyCallbackOK, - dummyCallbackPartial, - dummyMessageNoResponseNoCallback, - dummyMessageNoResponseWithCallback, - dummyMessageUnrelated, - dummyMessageWithResponseNoCallback, - dummyMessageWithResponseWithCallback, - dummyResponseNOK, - dummyResponseOK, -} from "../test/messages.js"; -import { - type SerialAPICommandDoneData, - type SerialAPICommandInterpreter, - type SerialAPICommandMachineParams, - type SerialAPICommandServiceImplementations, - createSerialAPICommandMachine, -} from "./SerialAPICommandMachine.js"; - -interface TestMachineStateSchema { - states: { - init: object; - sending: object; - waitForACK: object; - waitForResponse: object; - waitForCallback: object; - // FIXME: This is relevant for SendData, but we're not using SendData messages in this test - // waitForCallbackAfterTimeout: {}; - unsolicited: object; - success: object; - failure: object; - }; -} - -interface TestMachineContext { - resp: boolean; - cb: boolean; - gotPartialCB?: boolean; - attempt: number; -} - -type TestMachineEvents = - | { - type: "CREATE"; - expectsResponse: boolean; - expectsCallback: boolean; - } - | { type: "SEND_SUCCESS" } - | { type: "SEND_FAILURE" } - | { type: "ACK" } - | { type: "ACK_TIMEOUT" } - | { type: "CAN" } - | { type: "NAK" } - | { type: "RESPONSE_OK" } - | { type: "RESPONSE_NOK" } - | { type: "RESPONSE_TIMEOUT" } - | { type: "CALLBACK_OK" } - | { type: "CALLBACK_NOK" } - | { type: "CALLBACK_PARTIAL" } - | { type: "CALLBACK_TIMEOUT" } - | { type: "UNSOLICITED" }; - -const messages = { - false: { - false: dummyMessageNoResponseNoCallback, - true: dummyMessageNoResponseWithCallback, - }, - true: { - false: dummyMessageWithResponseNoCallback, - true: dummyMessageWithResponseWithCallback, - }, -}; - -interface MockImplementations { - sendData: sinon.SinonStub; - notifyRetry: sinon.SinonStub; -} - -interface TestContext { - expect: ExpectStatic; - interpreter: SerialAPICommandInterpreter; - implementations: MockImplementations; - sendDataPromise?: DeferredPromise | undefined; - machineResult?: any; - expectedFailureReason?: (SerialAPICommandDoneData & { - type: "failure"; - })["reason"]; - expectedResult?: Message; - respondedUnsolicited?: boolean; -} - -const machineParams: SerialAPICommandMachineParams = { - timeouts: { - ack: 1000, - response: 1600, - sendDataCallback: 65000, - }, - attempts: { - controller: 3, - }, -}; - -beforeAll((t) => { - vi.useFakeTimers(); -}); - -afterAll((t) => { - vi.useRealTimers(); -}); - -const testMachine = Machine< - TestMachineContext, - TestMachineStateSchema, - TestMachineEvents ->( - { - id: "SerialAPICommandTest", - initial: "init", - context: { - attempt: 0, - resp: false, - cb: false, - }, - states: { - init: { - on: { - CREATE: { - target: "sending", - actions: assign((_, evt) => ({ - resp: evt.resp, - cb: evt.cb, - })), - }, - }, - }, - sending: { - entry: assign({ - attempt: (ctx) => ctx.attempt + 1, - }), - on: { - SEND_SUCCESS: "waitForACK", - SEND_FAILURE: [ - { target: "sending", cond: "maySendAgain" }, - { target: "failure" }, - ], - UNSOLICITED: "unsolicited", - }, - meta: { - test: async ( - { - expect, - interpreter, - implementations: { sendData }, - }: TestContext, - state: State, - ) => { - // Skip the wait time if there should be any - const att = state.context.attempt; - if (att > 1) { - expect(interpreter.getSnapshot().value).toBe( - "retryWait", - ); - vi.advanceTimersByTime(100 + (att - 1) * 1000); - } - - // Assert that sendData was called - expect(interpreter.getSnapshot().value).toBe("sending"); - // but not more than 3 times - expect(att).toBeLessThanOrEqual(3); - expect(sendData.callCount).toBe(att); - }, - }, - }, - waitForACK: { - on: { - ACK: [ - { - target: "waitForResponse", - cond: "expectsResponse", - }, - { - target: "waitForCallback", - cond: "expectsCallback", - }, - { target: "success" }, - ], - NAK: [ - { target: "sending", cond: "maySendAgain" }, - { target: "failure" }, - ], - CAN: [ - { target: "sending", cond: "maySendAgain" }, - { target: "failure" }, - ], - ACK_TIMEOUT: [ - { target: "sending", cond: "maySendAgain" }, - { target: "failure" }, - ], - UNSOLICITED: "unsolicited", - }, - }, - waitForResponse: { - on: { - RESPONSE_OK: [ - { - target: "waitForCallback", - cond: "expectsCallback", - }, - { target: "success" }, - ], - RESPONSE_NOK: [ - { target: "sending", cond: "maySendAgain" }, - { target: "failure" }, - ], - RESPONSE_TIMEOUT: [ - // FIXME: This is relevant for SendData, but we're not using SendData messages in this test - // { - // target: "waitForCallbackAfterTimeout", - // cond: "expectsCallback", - // }, - { target: "failure" }, - ], - UNSOLICITED: "unsolicited", - }, - }, - waitForCallback: { - on: { - CALLBACK_PARTIAL: { - target: "waitForCallback", - actions: assign({ - gotPartialCB: true, - }), - }, - CALLBACK_OK: "success", - CALLBACK_NOK: "failure", - CALLBACK_TIMEOUT: "failure", - UNSOLICITED: "unsolicited", - }, - }, - // FIXME: This is relevant for SendData, but we're not using SendData messages in this test - // waitForCallbackAfterTimeout: { - // on: { - // CALLBACK_NOK: "failure", - // CALLBACK_TIMEOUT: "failure", - // }, - // }, - unsolicited: { - meta: { - test: ({ - expect, - respondedUnsolicited, - }: TestContext) => { - expect(respondedUnsolicited).toBe(true); - }, - }, - }, - success: { - meta: { - test: ({ - expect, - interpreter, - expectedResult, - machineResult, - }: TestContext) => { - // Ensure that the interpreter is in "success" state - expect(interpreter.getSnapshot().value).toBe("success"); - // with the correct reason - // expect(machineResult).toBeObject(); - expect(machineResult).toMatchObject({ - type: "success", - result: expectedResult, - }); - }, - }, - }, - failure: { - meta: { - test: ({ - expect, - interpreter, - expectedFailureReason, - machineResult, - implementations: { sendData }, - }: TestContext) => { - // Ensure that the interpreter is in "failure" state - expect(interpreter.getSnapshot().value).toBe("failure"); - // with the correct reason - // expect(machineResult).toBeObject(); - expect(machineResult).toMatchObject({ - type: "failure", - reason: expectedFailureReason, - }); - // and that at most three attempts were made - expect(sendData.callCount).toBeLessThanOrEqual(3); - }, - }, - }, - }, - }, - { - guards: { - maySendAgain: (ctx) => ctx.attempt < 3, - expectsResponse: (ctx) => ctx.resp, - expectsCallback: (ctx) => ctx.cb, - }, - }, -); - -const testModel = createModel( - testMachine, -).withEvents({ - CREATE: { - // test all possible combinations of message expectations - cases: [ - { resp: false, cb: false }, - { resp: false, cb: true }, - { resp: true, cb: false }, - { resp: true, cb: true }, - ], - }, - SEND_SUCCESS: { - exec: ({ sendDataPromise }) => { - sendDataPromise?.resolve(undefined as any); - }, - }, - SEND_FAILURE: { - exec: ({ sendDataPromise }) => { - sendDataPromise?.reject(); - }, - }, - ACK: ({ interpreter }) => { - interpreter.send("ACK"); - }, - NAK: ({ interpreter }) => { - interpreter.send("NAK"); - }, - CAN: ({ interpreter }) => { - interpreter.send("CAN"); - }, - RESPONSE_OK: ({ interpreter }) => { - interpreter.send({ type: "message", message: dummyResponseOK }); - }, - RESPONSE_NOK: ({ interpreter }) => { - interpreter.send({ type: "message", message: dummyResponseNOK }); - }, - CALLBACK_OK: ({ interpreter }) => { - interpreter.send({ type: "message", message: dummyCallbackOK }); - }, - CALLBACK_NOK: ({ interpreter }) => { - interpreter.send({ type: "message", message: dummyCallbackNOK }); - }, - CALLBACK_PARTIAL: ({ interpreter }) => { - interpreter.send({ - type: "message", - message: dummyCallbackPartial, - }); - }, - UNSOLICITED: ({ interpreter }) => { - interpreter.send({ - type: "message", - message: dummyMessageUnrelated, - }); - }, - ACK_TIMEOUT: ({ expect }) => { - vi.advanceTimersByTime(machineParams.timeouts.ack); - }, - RESPONSE_TIMEOUT: ({ expect }) => { - vi.advanceTimersByTime(machineParams.timeouts.response); - }, - CALLBACK_TIMEOUT: ({ expect }) => { - vi.advanceTimersByTime(machineParams.timeouts.sendDataCallback); - }, -}); - -const testPlans = testModel.getSimplePathPlans(); - -testPlans.forEach((plan) => { - if (plan.state.value === "idle") return; - - const planDescription = plan.description.replace( - ` (${JSON.stringify(plan.state.context)})`, - "", - ); - - plan.paths.forEach((path) => { - // Uncomment this and change the path description to only run a single test - // if ( - // !path.description.includes( - // `CREATE ({"resp":true,"cb":true}) → SEND_FAILURE → SEND_FAILURE → SEND_SUCCESS → ACK → RESPONSE_TIMEOUT → CALLBACK_NOK`, - // ) - // ) { - // return; - // } - test.sequential(`${planDescription} ${path.description}`, async (t) => { - // eslint-disable-next-line prefer-const - let context: TestContext; - const sendData = sinon.stub().callsFake(() => { - context.sendDataPromise = createDeferredPromise(); - return context.sendDataPromise; - }); - const sendDataAbort = sinon.stub(); - const notifyRetry = sinon.stub(); - const timestamp = () => 0; - const logOutgoingMessage = () => {}; - const notifyUnsolicited = () => { - context.respondedUnsolicited = true; - }; - - const implementations: SerialAPICommandServiceImplementations = { - sendData, - sendDataAbort, - notifyRetry, - timestamp, - logOutgoingMessage, - notifyUnsolicited, - }; - - // parse message from test description - // CREATE ({"expectsResponse":false,"expectsCallback":false}) - const createMachineRegex = /CREATE \((?[^\)]+)\)/; - const match = createMachineRegex.exec(path.description); - if (!match?.groups?.json) { - await path.test(undefined as any); - return; - } - - // And create a test machine with it - const msgSelector = JSON.parse(match.groups.json); - // @ts-ignore - const message: Message = messages[msgSelector.resp][msgSelector.cb]; - const machine = createSerialAPICommandMachine( - message, - await message.serializeAsync({} as any), - implementations, - machineParams, - ); - - context = { - expect: t.expect, - interpreter: interpret(machine), - implementations: implementations as any, - }; - context.interpreter.onDone((evt) => { - context.machineResult = evt.data; - }); - // context.interpreter.onTransition((state) => { - // if (state.changed) console.log(state.value); - // }); - context.interpreter.start(); - - if (plan.state.value === "failure") { - // parse expected failure reason from plan - const failureEvent = path.segments.slice(-1)?.[0]?.event.type; - context.expectedFailureReason = (() => { - switch (failureEvent) { - case "SEND_FAILURE": - return "send failure"; - case "RESPONSE_NOK": - return "response NOK"; - case "CALLBACK_NOK": - return "callback NOK"; - case "ACK_TIMEOUT": - return "ACK timeout"; - case "RESPONSE_TIMEOUT": - return "response timeout"; - case "CALLBACK_TIMEOUT": - return "callback timeout"; - case "NAK": - case "CAN": - return failureEvent; - } - })(); - } else if (plan.state.value === "success") { - // parse expected success event from plan - const successEvent = path.segments.slice(-1)?.[0]?.event.type; - context.expectedResult = (() => { - switch (successEvent) { - case "ACK": - return; - case "RESPONSE_OK": - return dummyResponseOK; - case "CALLBACK_OK": - return dummyCallbackOK; - } - })(); - } - - await path.test(context); - }); - }); -}); - -// test.serial("coverage", (t) => { -// testModel.testCoverage({ -// filter: (stateNode) => { -// return !!stateNode.meta; -// }, -// }); -// // }); diff --git a/packages/zwave-js/src/lib/driver/SerialAPICommandMachine.ts b/packages/zwave-js/src/lib/driver/SerialAPICommandMachine.ts index 1a83951f5ac9..b69dc0433fd7 100644 --- a/packages/zwave-js/src/lib/driver/SerialAPICommandMachine.ts +++ b/packages/zwave-js/src/lib/driver/SerialAPICommandMachine.ts @@ -1,466 +1,195 @@ -import type { Message } from "@zwave-js/serial"; import { - MessageType, + type InferStateMachineTransitions, + StateMachine, + type StateMachineTransition, +} from "@zwave-js/core"; +import { + type Message, isMultiStageCallback, + isSendData, isSuccessIndicator, } from "@zwave-js/serial"; -import { isSendData } from "@zwave-js/serial/serialapi"; -import { - type InterpreterFrom, - type MachineConfig, - type MachineOptions, - type StateMachine, - assign, - createMachine, - raise, -} from "xstate"; -import type { ZWaveOptions } from "./ZWaveOptions.js"; - -export interface SerialAPICommandStateSchema { - states: { - sending: object; - waitForACK: object; - waitForResponse: object; - waitForCallback: object; - retry: object; - retryWait: object; - failure: object; - success: object; - }; -} - -export type SerialAPICommandError = - | "send failure" - | "CAN" - | "NAK" - | "ACK timeout" - | "response timeout" - | "callback timeout" - | "response NOK" - | "callback NOK"; - -export interface SerialAPICommandContext { - msg: Message; - data: Uint8Array; - attempts: number; - maxAttempts: number; - lastError?: SerialAPICommandError; - result?: Message; - txTimestamp?: number; -} - -export type SerialAPICommandEvent = - | { type: "ACK" } - | { type: "CAN" } - | { type: "NAK" } - | { type: "message"; message: Message } // A message that might or might not be expected - | { type: "response"; message: Message } // Gets forwarded when a response-type message is expected - | { type: "callback"; message: Message }; // Gets forwarded when a callback-type message is expected -export type SerialAPICommandDoneData = +export type SerialAPICommandState = + | { value: "initial" } + | { value: "sending" } + | { value: "waitingForACK" } + | { value: "waitingForResponse" } | { - type: "success"; - // TODO: Can we get rid of this? - txTimestamp: number; + value: "waitingForCallback"; + responseTimedOut?: boolean; + } + | { + value: "success"; result?: Message; + done: true; } - | ( - & { - type: "failure"; - } - & ( - | { - reason: - | "send failure" - | "CAN" - | "NAK" - | "ACK timeout" - | "response timeout" - | "callback timeout"; - result?: undefined; - } - | { - reason: "response NOK" | "callback NOK"; - result: Message; - } - ) - ); + | ({ + value: "failure"; + } & SerialAPICommandMachineFailure); -export interface SerialAPICommandServiceImplementations { - timestamp: () => number; - sendData: (data: Uint8Array) => Promise; - sendDataAbort: () => Promise; - notifyRetry: ( - lastError: SerialAPICommandError | undefined, - message: Message, - attempts: number, - maxAttempts: number, - delay: number, - ) => void; - notifyUnsolicited: (message: Message) => void; - logOutgoingMessage: (message: Message) => void; -} - -function computeRetryDelay(ctx: SerialAPICommandContext): number { - return 100 + 1000 * (ctx.attempts - 1); -} +export type SerialAPICommandMachineFailure = + | { reason: "ACK timeout"; result?: undefined } + | { reason: "CAN"; result?: undefined } + | { reason: "NAK"; result?: undefined } + | { reason: "response timeout"; result?: undefined } + | { reason: "callback timeout"; result?: undefined } + | { reason: "response NOK"; result: Message } + | { reason: "callback NOK"; result: Message }; -const forwardMessage = raise((_, evt: SerialAPICommandEvent) => { - const msg = (evt as any).message as Message; - return { - type: msg.type === MessageType.Response ? "response" : "callback", - message: msg, - } as SerialAPICommandEvent; -}); +export type SerialAPICommandMachineInput = + | { value: "start" } + | { value: "message sent" } + | { value: "ACK" } + | { value: "CAN" } + | { value: "NAK" } + | { value: "timeout" } + | { value: "response" | "response NOK"; response: Message } + | { value: "callback" | "callback NOK"; callback: Message }; -export type SerialAPICommandMachineConfig = MachineConfig< - SerialAPICommandContext, - SerialAPICommandStateSchema, - SerialAPICommandEvent ->; export type SerialAPICommandMachine = StateMachine< - SerialAPICommandContext, - SerialAPICommandStateSchema, - SerialAPICommandEvent + SerialAPICommandState, + SerialAPICommandMachineInput >; -export type SerialAPICommandInterpreter = InterpreterFrom< - SerialAPICommandMachine ->; -export type SerialAPICommandMachineOptions = Partial< - MachineOptions ->; - -export type SerialAPICommandMachineParams = { - timeouts: Pick< - ZWaveOptions["timeouts"], - "ack" | "response" | "sendDataAbort" | "sendDataCallback" - >; - attempts: Pick; -}; - -export function getSerialAPICommandMachineConfig( - message: Message, - messageData: Uint8Array, - { - timestamp, - logOutgoingMessage, - notifyUnsolicited, - sendDataAbort, - }: Pick< - SerialAPICommandServiceImplementations, - | "timestamp" - | "logOutgoingMessage" - | "notifyUnsolicited" - | "sendDataAbort" - >, - attemptsConfig: SerialAPICommandMachineParams["attempts"], -): SerialAPICommandMachineConfig { - return { - predictableActionArguments: true, - id: "serialAPICommand", - initial: "sending", - context: { - msg: message, - data: messageData, - attempts: 0, - maxAttempts: attemptsConfig.controller, - }, - on: { - // The state machine accepts any message. If it is expected - // it will be forwarded to the correct states. If not, it - // will be returned with the "unsolicited" event. - message: [ - { - cond: "isExpectedMessage", - actions: forwardMessage as any, - }, - { - actions: (_: any, evt: any) => { - notifyUnsolicited(evt.message); - }, - }, - ], - }, - states: { - sending: { - // Every send attempt should increase the attempts by one - // and remember the timestamp of transmission - entry: [ - assign({ - attempts: (ctx) => ctx.attempts + 1, - txTimestamp: (_) => timestamp(), - }), - (ctx) => logOutgoingMessage(ctx.msg), - ], - invoke: { - id: "sendMessage", - src: "send", - onDone: "waitForACK", - onError: { - target: "retry", - actions: assign({ - lastError: (_) => "send failure", - }), - }, - }, - }, - waitForACK: { - always: [{ target: "success", cond: "expectsNoAck" }], - on: { - CAN: { - target: "retry", - actions: assign({ - lastError: (_) => "CAN", - }), - }, - NAK: { - target: "retry", - actions: assign({ - lastError: (_) => "NAK", - }), - }, - ACK: "waitForResponse", - }, - after: { - ACK_TIMEOUT: { - target: "retry", - actions: assign({ - lastError: (_) => "ACK timeout", - }), - }, - }, - }, - waitForResponse: { - always: [ - { - target: "waitForCallback", - cond: "expectsNoResponse", - }, - ], - on: { - response: [ - { - target: "retry", - cond: "responseIsNOK", - actions: assign({ - lastError: (_) => "response NOK", - result: (_, evt) => (evt as any).message, - }), - }, - { - target: "waitForCallback", - actions: assign({ - result: (_, evt) => (evt as any).message, - }), - }, - ], - }, - after: { - // Do not retry when a response times out - RESPONSE_TIMEOUT: [ - { - cond: "isSendData", - target: "waitForCallback", - actions: [ - () => sendDataAbort(), - assign({ - lastError: (_) => "response timeout", - }), - ], - }, - { - target: "failure", - actions: assign({ - lastError: (_) => "response timeout", - }), - }, - ], - }, - }, - waitForCallback: { - always: [{ target: "success", cond: "expectsNoCallback" }], - on: { - callback: [ - { - // Preserve "response timeout" errors - // A NOK callback afterwards is expected, but we're not interested in it - target: "failure", - cond: "callbackIsNOKAfterTimedOutResponse", - }, - { - target: "failure", - cond: "callbackIsNOK", - actions: assign({ - // Preserve "response timeout" errors - lastError: (_) => "callback NOK", - result: (_, evt) => (evt as any).message, - }), - }, - { - target: "success", - cond: "callbackIsFinal", - actions: assign({ - result: (_, evt) => (evt as any).message, - }), - }, - { target: "waitForCallback" }, - ], - }, - after: { - // Abort Send Data when it takes too long - SENDDATA_ABORT_TIMEOUT: { - cond: "isSendData", - actions: [ - () => sendDataAbort(), - ], - }, - CALLBACK_TIMEOUT: { - target: "failure", - actions: assign({ - lastError: (_) => "callback timeout", - }), - }, - }, - }, - retry: { - always: [ - { target: "retryWait", cond: "mayRetry" }, - { target: "failure" }, - ], - }, - retryWait: { - invoke: { - id: "notify", - src: "notifyRetry", - }, - after: { - RETRY_DELAY: "sending", - }, - }, - success: { - type: "final", - data: { - type: "success", - txTimestamp: (ctx: SerialAPICommandContext) => - ctx.txTimestamp!, - result: (ctx: SerialAPICommandContext) => ctx.result, - }, - }, - failure: { - type: "final", - data: { - type: "failure", - reason: (ctx: SerialAPICommandContext) => ctx.lastError, - result: (ctx: SerialAPICommandContext) => ctx.result!, - }, - }, - }, - }; +function to( + state: SerialAPICommandState, +): StateMachineTransition { + return { newState: state }; } -export function getSerialAPICommandMachineOptions( - { - sendData, - notifyRetry, - }: Pick< - SerialAPICommandServiceImplementations, - "sendData" | "sendDataAbort" | "notifyRetry" - >, - timeoutConfig: SerialAPICommandMachineParams["timeouts"], -): SerialAPICommandMachineOptions { - return { - services: { - send: (ctx) => { - // Mark the message as sent immediately before actually sending - ctx.msg.markAsSent(); - return sendData(ctx.data); - }, - notifyRetry: (ctx) => { - notifyRetry( - ctx.lastError, - ctx.msg, - ctx.attempts, - ctx.maxAttempts, - computeRetryDelay(ctx), - ); - return Promise.resolve(); - }, - }, - guards: { - mayRetry: (ctx) => ctx.attempts < ctx.maxAttempts, - isSendData: (ctx) => isSendData(ctx.msg), - expectsNoAck: (ctx) => !ctx.msg.expectsAck(), - expectsNoResponse: (ctx) => !ctx.msg.expectsResponse(), - expectsNoCallback: (ctx) => !ctx.msg.expectsCallback(), - isExpectedMessage: (ctx, evt, meta) => - meta.state.matches("waitForResponse") - ? ctx.msg.isExpectedResponse((evt as any).message) - : meta.state.matches("waitForCallback") - ? ctx.msg.isExpectedCallback((evt as any).message) - : false, - responseIsNOK: (ctx, evt) => - evt.type === "response" - // assume responses without success indication to be OK - && isSuccessIndicator(evt.message) - && !evt.message.isOK(), - callbackIsNOKAfterTimedOutResponse: (ctx, evt) => - evt.type === "callback" - // assume callbacks without success indication to be OK - && isSuccessIndicator(evt.message) - && !evt.message.isOK() - && isSendData(ctx.msg) - && ctx.lastError === "response timeout", - callbackIsNOK: (ctx, evt) => - evt.type === "callback" - // assume callbacks without success indication to be OK - && isSuccessIndicator(evt.message) - && !evt.message.isOK(), - callbackIsFinal: (ctx, evt) => - evt.type === "callback" - // assume callbacks without success indication to be OK - && (!isSuccessIndicator(evt.message) || evt.message.isOK()) - // assume callbacks without isFinal method to be final - && (!isMultiStageCallback(evt.message) - || evt.message.isFinal()), - }, - delays: { - RETRY_DELAY: (ctx) => computeRetryDelay(ctx), - RESPONSE_TIMEOUT: (ctx) => { - return ( - // Ask the message for its callback timeout - ctx.msg.getResponseTimeout() - // and fall back to default values - || timeoutConfig.response - ); - }, - CALLBACK_TIMEOUT: (ctx) => { - return ( - // Ask the message for its callback timeout - ctx.msg.getCallbackTimeout() - // and fall back to default values - || timeoutConfig.sendDataCallback - ); - }, - SENDDATA_ABORT_TIMEOUT: timeoutConfig.sendDataAbort, - ACK_TIMEOUT: timeoutConfig.ack, - }, - }; +function callbackIsFinal(callback: Message): boolean { + return ( + // assume callbacks without success indication to be OK + (!isSuccessIndicator(callback) || callback.isOK()) + // assume callbacks without isFinal method to be final + && (!isMultiStageCallback(callback) || callback.isFinal()) + ); } export function createSerialAPICommandMachine( message: Message, - messageData: Uint8Array, - implementations: SerialAPICommandServiceImplementations, - params: SerialAPICommandMachineParams, ): SerialAPICommandMachine { - return createMachine( - getSerialAPICommandMachineConfig( - message, - messageData, - implementations, - params.attempts, - ), - getSerialAPICommandMachineOptions(implementations, params.timeouts), - ); + const initialState: SerialAPICommandState = { + value: "initial", + }; + + const transitions: InferStateMachineTransitions = + (state) => (input) => { + switch (state.value) { + case "initial": + if (input.value === "start") { + return to({ value: "sending" }); + } + break; + case "sending": + if (input.value === "message sent") { + if (message.expectsAck()) { + return to({ value: "waitingForACK" }); + } else { + return to({ + value: "success", + result: undefined, + done: true, + }); + } + } + break; + case "waitingForACK": + switch (input.value) { + case "ACK": + if (message.expectsResponse()) { + return to({ value: "waitingForResponse" }); + } else if (message.expectsCallback()) { + return to({ value: "waitingForCallback" }); + } else { + return to({ + value: "success", + result: undefined, + done: true, + }); + } + case "CAN": + return to({ value: "failure", reason: "CAN" }); + case "NAK": + return to({ value: "failure", reason: "NAK" }); + case "timeout": + return to({ + value: "failure", + reason: "ACK timeout", + }); + } + break; + case "waitingForResponse": + switch (input.value) { + case "response": + if (message.expectsCallback()) { + return to({ value: "waitingForCallback" }); + } else { + return to({ + value: "success", + result: input.response, + done: true, + }); + } + case "response NOK": + return to({ + value: "failure", + reason: "response NOK", + result: input.response, + }); + case "timeout": + if (isSendData(message)) { + return { + newState: { + value: "waitingForCallback", + responseTimedOut: true, + }, + }; + } else { + return to({ + value: "failure", + reason: "response timeout", + }); + } + } + break; + case "waitingForCallback": + switch (input.value) { + case "callback": + if (callbackIsFinal(input.callback)) { + return to({ + value: "success", + result: input.callback, + done: true, + }); + } else { + return to({ value: "waitingForCallback" }); + } + case "callback NOK": + // Preserve "response timeout" errors + // A NOK callback afterwards is expected, but we're not interested in it + if (state.responseTimedOut) { + return to({ + value: "failure", + reason: "response timeout", + }); + } else { + return to({ + value: "failure", + reason: "callback NOK", + result: input.callback, + }); + } + case "timeout": + return to({ + value: "failure", + reason: "callback timeout", + }); + } + break; + } + }; + + return new StateMachine(initialState, transitions); } diff --git a/packages/zwave-js/src/lib/driver/StateMachineShared.ts b/packages/zwave-js/src/lib/driver/StateMachineShared.ts index 88a86387130f..adf57ac91ab9 100644 --- a/packages/zwave-js/src/lib/driver/StateMachineShared.ts +++ b/packages/zwave-js/src/lib/driver/StateMachineShared.ts @@ -3,7 +3,6 @@ import { TransmitStatus, ZWaveError, ZWaveErrorCodes, - isZWaveError, } from "@zwave-js/core"; import type { Message } from "@zwave-js/serial"; import { @@ -23,23 +22,16 @@ import { isSendDataTransmitReport, } from "@zwave-js/serial/serialapi"; import { getEnumMemberName } from "@zwave-js/shared"; -import { - type AnyStateMachine, - Interpreter, - type InterpreterFrom, - type InterpreterOptions, -} from "xstate"; -import type { SerialAPICommandDoneData } from "./SerialAPICommandMachine.js"; +import { type SerialAPICommandMachineFailure } from "./SerialAPICommandMachine.js"; import type { Transaction } from "./Transaction.js"; export function serialAPICommandErrorToZWaveError( - reason: (SerialAPICommandDoneData & { type: "failure" })["reason"], + reason: SerialAPICommandMachineFailure["reason"], sentMessage: Message, receivedMessage: Message | undefined, transactionSource: string | undefined, ): ZWaveError { switch (reason) { - case "send failure": case "CAN": case "NAK": return new ZWaveError( @@ -167,87 +159,6 @@ export function createMessageDroppedUnexpectedError( return ret; } -/** Tests whether the given error is one that was caused by the serial API execution */ -export function isSerialCommandError(error: unknown): boolean { - if (!isZWaveError(error)) return false; - switch (error.code) { - case ZWaveErrorCodes.Controller_Timeout: - case ZWaveErrorCodes.Controller_ResponseNOK: - case ZWaveErrorCodes.Controller_CallbackNOK: - case ZWaveErrorCodes.Controller_MessageDropped: - return true; - } - return false; -} - -export type ExtendedInterpreterFrom< - TMachine extends AnyStateMachine | ((...args: any[]) => AnyStateMachine), -> = Extended>; - -export type Extended< - TInterpreter extends Interpreter, -> = TInterpreter & { - restart(): TInterpreter; -}; - -/** Extends the default xstate interpreter with a restart function that re-attaches all event handlers */ -export function interpretEx( - machine: TMachine, - options?: Partial, -): ExtendedInterpreterFrom { - const interpreter = new Interpreter( - machine, - options, - ) as ExtendedInterpreterFrom; - - return new Proxy(interpreter, { - get(target, key) { - if (key === "restart") { - return () => { - const listeners = [...(target["listeners"] as Set)]; - const contextListeners = [ - ...(target["contextListeners"] as Set), - ]; - const stopListeners = [ - ...(target["stopListeners"] as Set), - ]; - const doneListeners = [ - ...(target["doneListeners"] as Set), - ]; - const eventListeners = [ - ...(target["eventListeners"] as Set), - ]; - const sendListeners = [ - ...(target["sendListeners"] as Set), - ]; - target.stop(); - for (const listener of listeners) { - target.onTransition(listener); - } - for (const listener of contextListeners) { - target.onChange(listener); - } - for (const listener of stopListeners) { - target.onStop(listener); - } - for (const listener of doneListeners) { - target.onDone(listener); - } - for (const listener of eventListeners) { - target.onEvent(listener); - } - for (const listener of sendListeners) { - target.onSend(listener); - } - return target.start(); - }; - } else { - return (target as any)[key]; - } - }, - }); -} - export type TransactionReducerResult = | { // Silently drop the transaction @@ -282,3 +193,8 @@ export type TransactionReducer = ( transaction: Transaction, source: "queue" | "active", ) => TransactionReducerResult; + +// TODO: Do we still need this? +// function computeRetryDelay(attempts: number): number { +// return 100 + 1000 * (attempts - 1); +// } diff --git a/packages/zwave-js/src/lib/driver/TransportServiceMachine.ts b/packages/zwave-js/src/lib/driver/TransportServiceMachine.ts index 6cdaa478a0b6..f1c9cbe1ca09 100644 --- a/packages/zwave-js/src/lib/driver/TransportServiceMachine.ts +++ b/packages/zwave-js/src/lib/driver/TransportServiceMachine.ts @@ -1,188 +1,126 @@ import { - type AssignAction, - type Interpreter, - Machine, - type StateMachine, - assign, -} from "xstate"; + type InferStateMachineTransitions, + StateMachine, + type StateMachineTransition, +} from "@zwave-js/core"; -/* - This state machine handles the receipt of Transport Service encapsulated commands from a node -*/ - -export interface TransportServiceRXStateSchema { - states: { - waitingForSegment: object; - segmentTimeout: object; - waitingForRequestedSegment: object; - segmentsComplete: object; - success: object; - failure: object; +export type TransportServiceRXState = + // We are passively listening for segments + | { value: "receive" } + // We have requested a missing segment + | { value: "requestMissing"; offset: number } + | { + value: "success" | "failure"; + done: true; }; -} - -export interface TransportServiceRXContext { - receivedBytes: boolean[]; -} -export type TransportServiceRXEvent = { - type: "segment"; - offset: number; - length: number; -}; +export type TransportServiceRXMachineInput = + | { + value: "segment"; + offset: number; + length: number; + } + | { value: "timeout" } + | { value: "abort" }; export type TransportServiceRXMachine = StateMachine< - TransportServiceRXContext, - TransportServiceRXStateSchema, - TransportServiceRXEvent, - any, - any, - any, - any ->; -export type TransportServiceRXInterpreter = Interpreter< - TransportServiceRXContext, - TransportServiceRXStateSchema, - TransportServiceRXEvent + TransportServiceRXState, + TransportServiceRXMachineInput >; -export type TransportServiceRXMachineParams = { - datagramSize: number; - firstSegmentSize: number; - missingSegmentTimeout: number; -}; - -const receiveSegment: AssignAction = assign( - (ctx, evt: TransportServiceRXEvent) => { - for (let i = evt.offset; i < evt.offset + evt.length; i++) { - ctx.receivedBytes[i] = true; - } - return ctx; - }, -); - -export interface TransportServiceRXServiceImplementations { - requestMissingSegment(offset: number): Promise; - sendSegmentsComplete(): Promise; +function to( + state: TransportServiceRXState, +): StateMachineTransition { + return { newState: state }; } export function createTransportServiceRXMachine( - implementations: TransportServiceRXServiceImplementations, - params: TransportServiceRXMachineParams, + datagramSize: number, + firstSegmentSize: number, ): TransportServiceRXMachine { - return Machine< - TransportServiceRXContext, - TransportServiceRXStateSchema, - TransportServiceRXEvent - >( - { - id: "TransportServiceRX", - initial: "waitingForSegment", - context: { - receivedBytes: [ - // When the machine is started, we've already received the first segment - ...(new Array(params.firstSegmentSize).fill( - true, - ) as boolean[]), - // The rest of the segments are still missing - ...(new Array( - params.datagramSize - params.firstSegmentSize, - ).fill(false) as boolean[]), - ], - }, - states: { - waitingForSegment: { - always: [ - { - cond: "isComplete", - target: "segmentsComplete", - }, - { - cond: "hasHole", - target: "segmentTimeout", - }, - ], - after: { - missingSegment: "segmentTimeout", - }, - on: { - segment: { - actions: receiveSegment, - target: "waitingForSegment", - internal: false, - }, - }, - }, - segmentTimeout: { - invoke: { - id: "requestMissing", - src: "requestMissingSegment", - onDone: { - target: "waitingForRequestedSegment", - }, - onError: { - target: "failure", - }, - }, - }, - waitingForRequestedSegment: { - after: { - missingSegment: "failure", - }, - on: { - segment: { - actions: receiveSegment, - target: "waitingForSegment", - internal: false, - }, - }, - }, - segmentsComplete: { - invoke: { - id: "segmentsComplete", - src: "sendSegmentsComplete", - onDone: { - target: "success", - }, - onError: { - // If sending the command fails, the node will send us the segment again - target: "success", - }, - }, - }, - success: { - type: "final", - on: { - segment: "segmentsComplete", - }, - }, - failure: { - type: "final", - }, - }, - }, - { - services: { - requestMissingSegment: (ctx) => { - return implementations.requestMissingSegment( - ctx.receivedBytes.indexOf(false), - ); - }, - sendSegmentsComplete: () => { - return implementations.sendSegmentsComplete(); - }, - }, - guards: { - isComplete: (ctx) => { - return ctx.receivedBytes.every(Boolean); - }, - hasHole: (ctx) => - ctx.receivedBytes.lastIndexOf(true) - > ctx.receivedBytes.indexOf(false), - }, - delays: { - missingSegment: params.missingSegmentTimeout, - }, - }, - ); + const initialState: TransportServiceRXState = { + value: "receive", + }; + + const receivedBytes: boolean[] = [ + // When the machine is started, we've already received the first segment + ...(new Array(firstSegmentSize).fill(true)), + // The rest of the segments are still missing + ...(new Array(datagramSize - firstSegmentSize).fill(false)), + ]; + + function markReceived(offset: number, length: number): void { + for (let i = offset; i < offset + length; i++) { + receivedBytes[i] = true; + } + } + + function isComplete(): boolean { + return receivedBytes.every(Boolean); + } + + function hasReceivedLastSegment(): boolean { + return receivedBytes.at(-1)!; + } + + function hasHole(): boolean { + return receivedBytes.lastIndexOf(true) + > receivedBytes.indexOf(false); + } + + const transitions: InferStateMachineTransitions< + TransportServiceRXMachine + > = (state) => (input) => { + if (input.value === "abort") { + if (state.value !== "success" && state.value !== "failure") { + return to({ value: "failure", done: true }); + } + return; + } + + switch (state.value) { + case "receive": { + if (input.value === "segment") { + markReceived(input.offset, input.length); + if (isComplete()) { + return to({ value: "success", done: true }); + } else if (hasReceivedLastSegment() && hasHole()) { + return to({ + value: "requestMissing", + offset: receivedBytes.indexOf(false), + }); + } else { + return to({ value: "receive" }); + } + } else if (input.value === "timeout") { + // One or more segments are missing, start requesting them + return to({ + value: "requestMissing", + offset: receivedBytes.indexOf(false), + }); + } + break; + } + + case "requestMissing": { + if (input.value === "segment") { + markReceived(input.offset, input.length); + if (isComplete()) { + return to({ value: "success", done: true }); + } else { + // still not complete, request the next missing segment + return to({ + value: "requestMissing", + offset: receivedBytes.indexOf(false), + }); + } + } else if (input.value === "timeout") { + // Give up + return to({ value: "failure", done: true }); + } + } + } + }; + + return new StateMachine(initialState, transitions); } diff --git a/packages/zwave-js/src/lib/node/Node.ts b/packages/zwave-js/src/lib/node/Node.ts index 63a458c4dacf..feb5fcee5434 100644 --- a/packages/zwave-js/src/lib/node/Node.ts +++ b/packages/zwave-js/src/lib/node/Node.ts @@ -313,10 +313,6 @@ export class ZWaveNode extends ZWaveNodeMixins implements QuerySecurityClasses { * Cleans up all resources used by this node */ public destroy(): void { - // Stop all state machines - this.statusMachine.stop(); - this.readyMachine.stop(); - // Remove all timeouts for ( const timeout of [ @@ -989,8 +985,8 @@ export class ZWaveNode extends ZWaveNodeMixins implements QuerySecurityClasses { super.reset(); // Restart all state machines - this.readyMachine.restart(); - this.statusMachine.restart(); + this.restartReadyMachine(); + this.restartStatusMachine(); // Remove queued polls that would interfere with the interview this.cancelAllScheduledPolls(); @@ -1115,7 +1111,7 @@ export class ZWaveNode extends ZWaveNodeMixins implements QuerySecurityClasses { this.cachedDeviceConfigHash = await this._deviceConfig?.getHash(); this.setInterviewStage(InterviewStage.Complete); - this.readyMachine.send("INTERVIEW_DONE"); + this.updateReadyMachine({ value: "INTERVIEW_DONE" }); // Tell listeners that the interview is completed // The driver will then send this node to sleep @@ -4924,7 +4920,7 @@ protocol version: ${this.protocolVersion}`; // Mark already-interviewed nodes as potentially ready if (this.interviewStage === InterviewStage.Complete) { - this.readyMachine.send("RESTART_FROM_CACHE"); + this.updateReadyMachine({ value: "RESTART_FROM_CACHE" }); } } diff --git a/packages/zwave-js/src/lib/node/NodeReadyMachine.test.ts b/packages/zwave-js/src/lib/node/NodeReadyMachine.test.ts index def13ea53e9d..2b82ca4213da 100644 --- a/packages/zwave-js/src/lib/node/NodeReadyMachine.test.ts +++ b/packages/zwave-js/src/lib/node/NodeReadyMachine.test.ts @@ -1,52 +1,34 @@ -import { type Interpreter, interpret } from "xstate"; -// import { SimulatedClock } from "xstate/lib/SimulatedClock"; -import { type TaskContext, type TestContext, test } from "vitest"; -import { - type NodeReadyContext, - type NodeReadyEvent, - type NodeReadyMachine, - type NodeReadyStateSchema, - createNodeReadyMachine, -} from "./NodeReadyMachine.js"; - -function startMachine( - t: TaskContext & TestContext, - machine: NodeReadyMachine, -): Interpreter { - const service = interpret(machine).start(); - t.onTestFinished(() => { - service.stop(); - }); - return service; -} +import { test } from "vitest"; +import { createNodeReadyMachine } from "./NodeReadyMachine.js"; test(`The node should start in the notReady state`, (t) => { - const testMachine = createNodeReadyMachine(undefined as any); + const machine = createNodeReadyMachine(); - const service = startMachine(t, testMachine); - t.expect(service.getSnapshot().value).toBe("notReady"); + t.expect(machine.state.value).toBe("notReady"); }); test("when the driver is restarted from cache, the node should be ready as soon as it is known NOT to be dead", (t) => { - const testMachine = createNodeReadyMachine(); - const service = startMachine(t, testMachine); - service.send("RESTART_FROM_CACHE"); - t.expect(service.getSnapshot().value).toBe("readyIfNotDead"); - service.send("NOT_DEAD"); - t.expect(service.getSnapshot().value).toBe("ready"); + const machine = createNodeReadyMachine(); + + machine.transition(machine.next({ value: "RESTART_FROM_CACHE" })?.newState); + t.expect(machine.state.value).toBe("readyIfNotDead"); + + machine.transition(machine.next({ value: "NOT_DEAD" })?.newState); + t.expect(machine.state.value).toBe("ready"); }); test("when the driver is restarted from cache and the node is known to be not dead, it should be ready immediately", (t) => { - const testMachine = createNodeReadyMachine(); - const service = startMachine(t, testMachine); - service.send("NOT_DEAD"); - service.send("RESTART_FROM_CACHE"); - t.expect(service.getSnapshot().value).toBe("ready"); + const machine = createNodeReadyMachine(); + + machine.transition(machine.next({ value: "NOT_DEAD" })?.newState); + machine.transition(machine.next({ value: "RESTART_FROM_CACHE" })?.newState); + + t.expect(machine.state.value).toBe("ready"); }); test("when the interview is done, the node should be marked as ready", (t) => { - const testMachine = createNodeReadyMachine(); - const service = startMachine(t, testMachine); - service.send("INTERVIEW_DONE"); - t.expect(service.getSnapshot().value).toBe("ready"); + const machine = createNodeReadyMachine(); + + machine.transition(machine.next({ value: "INTERVIEW_DONE" })?.newState); + t.expect(machine.state.value).toBe("ready"); }); diff --git a/packages/zwave-js/src/lib/node/NodeReadyMachine.ts b/packages/zwave-js/src/lib/node/NodeReadyMachine.ts index ded9e47896b2..1d21b52bd357 100644 --- a/packages/zwave-js/src/lib/node/NodeReadyMachine.ts +++ b/packages/zwave-js/src/lib/node/NodeReadyMachine.ts @@ -1,88 +1,66 @@ import { - type InterpreterFrom, - Machine, - type StateMachine, - assign, -} from "xstate"; + type InferStateMachineTransitions, + StateMachine, + type StateMachineTransition, +} from "@zwave-js/core"; -export interface NodeReadyStateSchema { - states: { - notReady: object; - readyIfNotDead: object; - ready: object; - }; -} +export type NodeReadyState = { + value: "notReady"; + maybeDead: boolean; +} | { + value: "readyIfNotDead"; +} | { + value: "ready"; + done: true; +}; -export interface NodeReadyContext { - isMaybeDead: boolean; -} - -export type NodeReadyEvent = - | { type: "NOT_DEAD" } - | { type: "MAYBE_DEAD" } - | { type: "RESTART_FROM_CACHE" } - | { type: "INTERVIEW_DONE" }; +export type NodeReadyMachineInput = { + value: "NOT_DEAD" | "MAYBE_DEAD" | "RESTART_FROM_CACHE" | "INTERVIEW_DONE"; +}; export type NodeReadyMachine = StateMachine< - NodeReadyContext, - NodeReadyStateSchema, - NodeReadyEvent, - any, - any, - any, - any + NodeReadyState, + NodeReadyMachineInput >; -export type NodeReadyInterpreter = InterpreterFrom; -export function createNodeReadyMachine( - initialContext: Partial = {}, -): NodeReadyMachine { - return Machine( - { - id: "nodeReady", - initial: "notReady", - context: { - isMaybeDead: true, - ...initialContext, - }, - on: { - MAYBE_DEAD: { - actions: assign({ isMaybeDead: true }) as any, - }, - NOT_DEAD: { - actions: assign({ isMaybeDead: false }) as any, - }, - INTERVIEW_DONE: { - target: "ready", - actions: assign({ isMaybeDead: false }) as any, - }, - }, - states: { - notReady: { - entry: assign({ isMaybeDead: true }) as any, - on: { - RESTART_FROM_CACHE: [{ target: "readyIfNotDead" }], - }, - }, - readyIfNotDead: { - always: [{ cond: "isDefinitelyNotDead", target: "ready" }], - on: { - NOT_DEAD: { - target: "ready", - actions: assign({ isMaybeDead: false }) as any, - }, - }, - }, - ready: { - // If this is final, we will get warnings in the log - // So don't :) - }, - }, - }, - { - guards: { - isDefinitelyNotDead: (ctx) => !ctx.isMaybeDead, - }, - }, - ); +function to(state: NodeReadyState): StateMachineTransition { + return { newState: state }; +} + +export function createNodeReadyMachine(): NodeReadyMachine { + const initialState: NodeReadyState = { + value: "notReady", + maybeDead: true, + }; + + const READY: NodeReadyState = { value: "ready", done: true }; + + const transitions: InferStateMachineTransitions = + (state) => (input) => { + switch (state.value) { + case "notReady": { + switch (input.value) { + case "NOT_DEAD": + return to({ ...state, maybeDead: false }); + case "MAYBE_DEAD": + return to({ ...state, maybeDead: true }); + case "RESTART_FROM_CACHE": + if (state.maybeDead) { + return to({ value: "readyIfNotDead" }); + } else { + return to(READY); + } + case "INTERVIEW_DONE": + return to(READY); + } + break; + } + case "readyIfNotDead": { + if (input.value === "NOT_DEAD") return to(READY); + break; + } + } + }; + + return new StateMachine(initialState, transitions); } diff --git a/packages/zwave-js/src/lib/node/NodeStatusMachine.test.ts b/packages/zwave-js/src/lib/node/NodeStatusMachine.test.ts index 099353f6f3c9..80c6610e90ed 100644 --- a/packages/zwave-js/src/lib/node/NodeStatusMachine.test.ts +++ b/packages/zwave-js/src/lib/node/NodeStatusMachine.test.ts @@ -1,49 +1,29 @@ -import { type Interpreter, interpret } from "xstate"; -// import { SimulatedClock } from "xstate/lib/SimulatedClock"; -import { type TaskContext, type TestContext, test } from "vitest"; +import { test } from "vitest"; import { - type NodeStatusEvent, - type NodeStatusMachine, - type NodeStatusStateSchema, + type NodeStatusMachineInput, + type NodeStatusState, createNodeStatusMachine, } from "./NodeStatusMachine.js"; const testNodeNonSleeping = { canSleep: false } as any; const testNodeSleeping = { canSleep: true } as any; -function startMachine( - t: TaskContext & TestContext, - machine: NodeStatusMachine, -): Interpreter { - const service = interpret(machine).start(); - t.onTestFinished(() => { - service.stop(); - }); - return service; -} - test(`The node should start in the unknown state if it maybe cannot sleep`, (t) => { - const testMachine = createNodeStatusMachine({ - canSleep: false, - } as any); + const machine = createNodeStatusMachine(testNodeNonSleeping); - const service = startMachine(t, testMachine); - t.expect(service.getSnapshot().value).toBe("unknown"); + t.expect(machine.state.value).toBe("unknown"); }); test(`The node should start in the unknown state if it can definitely sleep`, (t) => { - const testMachine = createNodeStatusMachine({ - canSleep: true, - } as any); + const machine = createNodeStatusMachine(testNodeSleeping); - const service = startMachine(t, testMachine); - t.expect(service.getSnapshot().value).toBe("unknown"); + t.expect(machine.state.value).toBe("unknown"); }); const transitions: { - start: keyof NodeStatusStateSchema["states"]; - event: NodeStatusEvent["type"]; - target: keyof NodeStatusStateSchema["states"]; + start: NodeStatusState["value"]; + event: NodeStatusMachineInput["value"]; + target: NodeStatusState["value"]; canSleep?: boolean; }[] = [ { @@ -181,27 +161,24 @@ for (const testCase of transitions) { ? testNodeSleeping : testNodeNonSleeping; - const testMachine = createNodeStatusMachine(testNode); - testMachine.initial = testCase.start; + const machine = createNodeStatusMachine(testNode); + machine["_state"] = { value: testCase.start }; - const service = startMachine(t, testMachine); - service.send(testCase.event); - t.expect(service.getSnapshot().value).toBe(testCase.target); + machine.transition(machine.next({ value: testCase.event })?.newState); + t.expect(machine.state.value).toBe(testCase.target); }); } test("A transition from unknown to awake should not happen if the node cannot sleep", (t) => { - const testMachine = createNodeStatusMachine(testNodeNonSleeping); + const machine = createNodeStatusMachine(testNodeNonSleeping); - const service = startMachine(t, testMachine); - service.send("AWAKE"); - t.expect(service.getSnapshot().value).toBe("unknown"); + machine.transition(machine.next({ value: "AWAKE" })?.newState); + t.expect(machine.state.value).toBe("unknown"); }); test("A transition from unknown to asleep should not happen if the node cannot sleep", (t) => { - const testMachine = createNodeStatusMachine(testNodeNonSleeping); + const machine = createNodeStatusMachine(testNodeNonSleeping); - const service = startMachine(t, testMachine); - service.send("ASLEEP"); - t.expect(service.getSnapshot().value).toBe("unknown"); + machine.transition(machine.next({ value: "ASLEEP" })?.newState); + t.expect(machine.state.value).toBe("unknown"); }); diff --git a/packages/zwave-js/src/lib/node/NodeStatusMachine.ts b/packages/zwave-js/src/lib/node/NodeStatusMachine.ts index 22cb31fb17e3..15c97addb36e 100644 --- a/packages/zwave-js/src/lib/node/NodeStatusMachine.ts +++ b/packages/zwave-js/src/lib/node/NodeStatusMachine.ts @@ -1,17 +1,26 @@ -import { type InterpreterFrom, Machine, type StateMachine } from "xstate"; -import { NodeStatus } from "./_Types.js"; +import { + type InferStateMachineTransitions, + NodeStatus, + StateMachine, + type StateMachineTransition, +} from "@zwave-js/core"; import { type NodeNetworkRole } from "./mixins/01_NetworkRole.js"; -export interface NodeStatusStateSchema { - states: { - unknown: object; - // non-sleeping nodes are either dead or alive - dead: object; - alive: object; - // sleeping nodes are asleep or awake - asleep: object; - awake: object; - }; +export type NodeStatusState = { + value: "unknown" | "dead" | "alive" | "asleep" | "awake"; +}; + +export type NodeStatusMachineInput = { + value: "DEAD" | "ALIVE" | "ASLEEP" | "AWAKE"; +}; + +export type NodeStatusMachine = StateMachine< + NodeStatusState, + NodeStatusMachineInput +>; + +function to(state: NodeStatusState): StateMachineTransition { + return { newState: state }; } const statusDict = { @@ -21,94 +30,73 @@ const statusDict = { asleep: NodeStatus.Asleep, awake: NodeStatus.Awake, } as const; + export function nodeStatusMachineStateToNodeStatus( - state: keyof NodeStatusStateSchema["states"], + state: NodeStatusState["value"], ): NodeStatus { return statusDict[state] ?? NodeStatus.Unknown; } -export type NodeStatusEvent = - | { type: "DEAD" } - | { type: "ALIVE" } - | { type: "ASLEEP" } - | { type: "AWAKE" }; - -export type NodeStatusMachine = StateMachine< - any, - NodeStatusStateSchema, - NodeStatusEvent, - any, - any, - any, - any ->; -export type NodeStatusInterpreter = InterpreterFrom; - export function createNodeStatusMachine( node: NodeNetworkRole, ): NodeStatusMachine { - return Machine( - { - id: "nodeStatus", - initial: "unknown", - states: { - unknown: { - on: { - DEAD: { - target: "dead", - cond: "cannotSleep", - }, - ALIVE: { - target: "alive", - cond: "cannotSleep", - }, - ASLEEP: { - target: "asleep", - cond: "canSleep", - }, - AWAKE: { - target: "awake", - cond: "canSleep", - }, - }, - }, - dead: { - on: { - ALIVE: "alive", - }, - }, - alive: { - on: { - DEAD: "dead", + const initialState: NodeStatusState = { + value: "unknown", + }; + + const transitions: InferStateMachineTransitions = + (state) => (input) => { + switch (state.value) { + case "unknown": { + switch (input.value) { + case "DEAD": + if (!node.canSleep) return to({ value: "dead" }); + break; + case "ALIVE": + if (!node.canSleep) return to({ value: "alive" }); + break; + case "ASLEEP": + if (node.canSleep) return to({ value: "asleep" }); + break; + case "AWAKE": + if (node.canSleep) return to({ value: "awake" }); + break; + } + break; + } + case "dead": { + if (input.value === "ALIVE") return to({ value: "alive" }); + break; + } + case "alive": { + switch (input.value) { + case "DEAD": + return to({ value: "dead" }); // GH#1054 we must have a way to send a node to sleep even if // it was previously detected as a non-sleeping device - ASLEEP: { - target: "asleep", - cond: "canSleep", - }, - AWAKE: { - target: "awake", - cond: "canSleep", - }, - }, - }, - asleep: { - on: { - AWAKE: "awake", - }, - }, - awake: { - on: { - ASLEEP: "asleep", - }, - }, - }, - }, - { - guards: { - canSleep: () => !!node.canSleep, - cannotSleep: () => !node.canSleep, - }, - }, - ); + case "ASLEEP": { + if (node.canSleep) return to({ value: "asleep" }); + break; + } + case "AWAKE": { + if (node.canSleep) return to({ value: "awake" }); + break; + } + } + break; + } + case "asleep": { + if (input.value === "AWAKE") return to({ value: "awake" }); + break; + } + case "awake": { + if (input.value === "ASLEEP") { + return to({ value: "asleep" }); + } + break; + } + } + }; + + return new StateMachine(initialState, transitions); } diff --git a/packages/zwave-js/src/lib/node/mixins/20_Status.ts b/packages/zwave-js/src/lib/node/mixins/20_Status.ts index 638d88c79d87..dc14c2361953 100644 --- a/packages/zwave-js/src/lib/node/mixins/20_Status.ts +++ b/packages/zwave-js/src/lib/node/mixins/20_Status.ts @@ -1,14 +1,15 @@ import { type CommandClasses, InterviewStage } from "@zwave-js/core"; import { type Driver } from "../../driver/Driver.js"; import { cacheKeys } from "../../driver/NetworkCache.js"; -import { type Extended, interpretEx } from "../../driver/StateMachineShared.js"; import { type DeviceClass } from "../DeviceClass.js"; import { - type NodeReadyInterpreter, + type NodeReadyMachine, + type NodeReadyMachineInput, createNodeReadyMachine, } from "../NodeReadyMachine.js"; import { - type NodeStatusInterpreter, + type NodeStatusMachine, + type NodeStatusMachineInput, createNodeStatusMachine, nodeStatusMachineStateToNodeStatus, } from "../NodeStatusMachine.js"; @@ -68,29 +69,12 @@ export abstract class NodeStatusMixin extends NodeEventsMixin ) { super(nodeId, driver, index, deviceClass, supportedCCs); - // Create and hook up the status machine - this.statusMachine = interpretEx( - createNodeStatusMachine(this), - ); - this.statusMachine.onTransition((state) => { - if (state.changed) { - this.onStatusChange( - nodeStatusMachineStateToNodeStatus(state.value as any), - ); - } - }); - this.statusMachine.start(); - - this.readyMachine = interpretEx(createNodeReadyMachine()); - this.readyMachine.onTransition((state) => { - if (state.changed) { - this.onReadyChange(state.value === "ready"); - } - }); - this.readyMachine.start(); - } - - protected statusMachine: Extended; + // Create the state machines + this.statusMachine = createNodeStatusMachine(this); + this.readyMachine = createNodeReadyMachine(); + } + + private statusMachine: NodeStatusMachine; private _status: NodeStatus = NodeStatus.Unknown; @@ -101,6 +85,21 @@ export abstract class NodeStatusMixin extends NodeEventsMixin return this._status; } + protected restartStatusMachine(): void { + this.statusMachine.restart(); + this.onStatusChange(NodeStatus.Unknown); + } + + protected updateStatusMachine(input: NodeStatusMachineInput): void { + const newState = this.statusMachine.next(input)?.newState; + if (newState) { + this.statusMachine.transition(newState); + this.onStatusChange( + nodeStatusMachineStateToNodeStatus(newState.value), + ); + } + } + private onStatusChange(newStatus: NodeStatus) { // Ignore duplicate events if (newStatus === this._status) return; @@ -120,11 +119,13 @@ export abstract class NodeStatusMixin extends NodeEventsMixin // To be marked ready, a node must be known to be not dead. // This means that listening nodes must have communicated with us and // sleeping nodes are assumed to be ready - this.readyMachine.send( - this._status !== NodeStatus.Unknown - && this._status !== NodeStatus.Dead - ? "NOT_DEAD" - : "MAYBE_DEAD", + this.updateReadyMachine( + { + value: this._status !== NodeStatus.Unknown + && this._status !== NodeStatus.Dead + ? "NOT_DEAD" + : "MAYBE_DEAD", + }, ); } @@ -133,7 +134,7 @@ export abstract class NodeStatusMixin extends NodeEventsMixin * Marks this node as dead (if applicable) */ public markAsDead(): void { - this.statusMachine.send("DEAD"); + this.updateStatusMachine({ value: "DEAD" }); } /** @@ -141,7 +142,7 @@ export abstract class NodeStatusMixin extends NodeEventsMixin * Marks this node as alive (if applicable) */ public markAsAlive(): void { - this.statusMachine.send("ALIVE"); + this.updateStatusMachine({ value: "ALIVE" }); } /** @@ -149,7 +150,7 @@ export abstract class NodeStatusMixin extends NodeEventsMixin * Marks this node as asleep (if applicable) */ public markAsAsleep(): void { - this.statusMachine.send("ASLEEP"); + this.updateStatusMachine({ value: "ASLEEP" }); } /** @@ -157,15 +158,28 @@ export abstract class NodeStatusMixin extends NodeEventsMixin * Marks this node as awake (if applicable) */ public markAsAwake(): void { - this.statusMachine.send("AWAKE"); + this.updateStatusMachine({ value: "AWAKE" }); } // The node is only ready when the interview has been completed // to a certain degree - protected readyMachine: Extended; + private readyMachine: NodeReadyMachine; private _ready: boolean = false; + protected restartReadyMachine(): void { + this.readyMachine.restart(); + this.onReadyChange(false); + } + + protected updateReadyMachine(input: NodeReadyMachineInput): void { + const newState = this.readyMachine.next(input)?.newState; + if (newState) { + this.readyMachine.transition(newState); + this.onReadyChange(newState.value === "ready"); + } + } + private onReadyChange(ready: boolean) { // Ignore duplicate events if (ready === this._ready) return; diff --git a/packages/zwave-js/src/lib/test/driver/transportService.test.ts b/packages/zwave-js/src/lib/test/driver/transportService.test.ts new file mode 100644 index 000000000000..aece173a5ba8 --- /dev/null +++ b/packages/zwave-js/src/lib/test/driver/transportService.test.ts @@ -0,0 +1,409 @@ +import { + ConfigurationCCInfoReport, + TransportServiceCCFirstSegment, + TransportServiceCCSegmentComplete, + TransportServiceCCSegmentRequest, + TransportServiceCCSegmentWait, + TransportServiceCCSubsequentSegment, +} from "@zwave-js/cc"; +import { + MockZWaveFrameType, + createMockZWaveRequestFrame, +} from "@zwave-js/testing"; +import { wait } from "alcalzone-shared/async"; +import { integrationTest } from "../integrationTestSuite.js"; + +integrationTest("Receiving Transport Service commands works (happy path)", { + // debug: true, + + async testBody(t, driver, node, mockController, mockNode) { + const cc = new ConfigurationCCInfoReport({ + nodeId: 2, + parameter: 1, + reportsToFollow: 0, + info: + "Loooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong text", + }); + const ccBuffer = await cc.serializeAsync(mockNode.encodingContext); + const part1 = ccBuffer.slice(0, 39); + const part2 = ccBuffer.slice(39, 78); + const part3 = ccBuffer.slice(78); + + const awaitedCommand = driver.waitForCommand( + (cc) => cc instanceof ConfigurationCCInfoReport, + 1000, + ); + + const frame1 = new TransportServiceCCFirstSegment({ + nodeId: 2, + sessionId: 1, + datagramSize: part1.length + part2.length + part3.length, + partialDatagram: part1, + }); + + const frame2 = new TransportServiceCCSubsequentSegment({ + nodeId: 2, + sessionId: 1, + datagramSize: frame1.datagramSize, + datagramOffset: part1.length, + partialDatagram: part2, + }); + + const frame3 = new TransportServiceCCSubsequentSegment({ + nodeId: 2, + sessionId: 1, + datagramSize: frame1.datagramSize, + datagramOffset: part1.length + part2.length, + partialDatagram: part3, + }); + + await mockNode.sendToController(createMockZWaveRequestFrame(frame1)); + await wait(30); + await mockNode.sendToController(createMockZWaveRequestFrame(frame2)); + await wait(30); + await mockNode.sendToController(createMockZWaveRequestFrame(frame3)); + + await wait(100); + + // The node should have received the confirmation + mockNode.assertReceivedControllerFrame((f) => + f.type === MockZWaveFrameType.Request + && f.payload instanceof TransportServiceCCSegmentComplete + ); + + // And the ConfigurationCCInfoReport should have been assembled correctly + const received = await awaitedCommand; + t.expect(received).toBeInstanceOf(ConfigurationCCInfoReport); + t.expect((received as ConfigurationCCInfoReport).info).toBe(cc.info); + }, +}); + +integrationTest( + "Receiving Transport Service commands works (first segment missing)", + { + // debug: true, + + async testBody(t, driver, node, mockController, mockNode) { + const cc = new ConfigurationCCInfoReport({ + nodeId: 2, + parameter: 1, + reportsToFollow: 0, + info: + "Loooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong text", + }); + const ccBuffer = await cc.serializeAsync(mockNode.encodingContext); + const part1 = ccBuffer.slice(0, 39); + const part2 = ccBuffer.slice(39, 78); + const part3 = ccBuffer.slice(78); + + const frame1 = new TransportServiceCCFirstSegment({ + nodeId: 2, + sessionId: 1, + datagramSize: part1.length + part2.length + part3.length, + partialDatagram: part1, + }); + + const frame2 = new TransportServiceCCSubsequentSegment({ + nodeId: 2, + sessionId: 1, + datagramSize: frame1.datagramSize, + datagramOffset: part1.length, + partialDatagram: part2, + }); + + const frame3 = new TransportServiceCCSubsequentSegment({ + nodeId: 2, + sessionId: 1, + datagramSize: frame1.datagramSize, + datagramOffset: part1.length + part2.length, + partialDatagram: part3, + }); + + // Frame 1 went missing + await mockNode.sendToController( + createMockZWaveRequestFrame(frame2), + ); + + await wait(50); + + mockNode.assertReceivedControllerFrame((f) => + f.type === MockZWaveFrameType.Request + && f.payload instanceof TransportServiceCCSegmentWait + && f.payload.pendingSegments === 0 + ); + + mockNode.assertReceivedControllerFrame( + (f) => + f.type === MockZWaveFrameType.Request + && f.payload instanceof TransportServiceCCSegmentComplete, + { + noMatch: true, + }, + ); + }, + }, +); + +integrationTest( + "Receiving Transport Service commands works (subsequent segment missing)", + { + // debug: true, + + async testBody(t, driver, node, mockController, mockNode) { + const cc = new ConfigurationCCInfoReport({ + nodeId: 2, + parameter: 1, + reportsToFollow: 0, + info: + "Loooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong text", + }); + const ccBuffer = await cc.serializeAsync(mockNode.encodingContext); + const part1 = ccBuffer.slice(0, 39); + const part2 = ccBuffer.slice(39, 78); + const part3 = ccBuffer.slice(78); + + const awaitedCommand = driver.waitForCommand( + (cc) => cc instanceof ConfigurationCCInfoReport, + 1000, + ); + + const frame1 = new TransportServiceCCFirstSegment({ + nodeId: 2, + sessionId: 1, + datagramSize: part1.length + part2.length + part3.length, + partialDatagram: part1, + }); + + const frame2 = new TransportServiceCCSubsequentSegment({ + nodeId: 2, + sessionId: 1, + datagramSize: frame1.datagramSize, + datagramOffset: part1.length, + partialDatagram: part2, + }); + + const frame3 = new TransportServiceCCSubsequentSegment({ + nodeId: 2, + sessionId: 1, + datagramSize: frame1.datagramSize, + datagramOffset: part1.length + part2.length, + partialDatagram: part3, + }); + + await mockNode.sendToController( + createMockZWaveRequestFrame(frame1), + ); + await wait(30); + // await mockNode.sendToController(createMockZWaveRequestFrame(frame2)); + await wait(30); + await mockNode.sendToController( + createMockZWaveRequestFrame(frame3), + ); + + await wait(50); + + // The node should have received a request for the missing frame + mockNode.assertReceivedControllerFrame((f) => + f.type === MockZWaveFrameType.Request + && f.payload instanceof TransportServiceCCSegmentRequest + && f.payload.datagramOffset === part1.length + ); + + // Send the requested frame + await mockNode.sendToController( + createMockZWaveRequestFrame(frame2), + ); + await wait(50); + + // Now, the node should have received the confirmation + mockNode.assertReceivedControllerFrame((f) => + f.type === MockZWaveFrameType.Request + && f.payload instanceof TransportServiceCCSegmentComplete + ); + + // And the ConfigurationCCInfoReport should have been assembled correctly + const received = await awaitedCommand; + t.expect(received).toBeInstanceOf(ConfigurationCCInfoReport); + t.expect((received as ConfigurationCCInfoReport).info).toBe( + cc.info, + ); + }, + }, +); + +integrationTest( + "Receiving Transport Service commands works (last segment missing)", + { + // debug: true, + + async testBody(t, driver, node, mockController, mockNode) { + const cc = new ConfigurationCCInfoReport({ + nodeId: 2, + parameter: 1, + reportsToFollow: 0, + info: + "Loooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong text", + }); + const ccBuffer = await cc.serializeAsync(mockNode.encodingContext); + const part1 = ccBuffer.slice(0, 39); + const part2 = ccBuffer.slice(39, 78); + const part3 = ccBuffer.slice(78); + + const awaitedCommand = driver.waitForCommand( + (cc) => cc instanceof ConfigurationCCInfoReport, + 3000, + ); + + const frame1 = new TransportServiceCCFirstSegment({ + nodeId: 2, + sessionId: 1, + datagramSize: part1.length + part2.length + part3.length, + partialDatagram: part1, + }); + + const frame2 = new TransportServiceCCSubsequentSegment({ + nodeId: 2, + sessionId: 1, + datagramSize: frame1.datagramSize, + datagramOffset: part1.length, + partialDatagram: part2, + }); + + const frame3 = new TransportServiceCCSubsequentSegment({ + nodeId: 2, + sessionId: 1, + datagramSize: frame1.datagramSize, + datagramOffset: part1.length + part2.length, + partialDatagram: part3, + }); + + await mockNode.sendToController( + createMockZWaveRequestFrame(frame1), + ); + await wait(30); + await mockNode.sendToController( + createMockZWaveRequestFrame(frame2), + ); + await wait(1000); + // Last segment is missing + + // The node should have received a request for the missing frame + mockNode.assertReceivedControllerFrame((f) => + f.type === MockZWaveFrameType.Request + && f.payload instanceof TransportServiceCCSegmentRequest + && f.payload.datagramOffset === frame3.datagramOffset + ); + + // Send the requested frame + await mockNode.sendToController( + createMockZWaveRequestFrame(frame3), + ); + await wait(50); + + // Now, the node should have received the confirmation + mockNode.assertReceivedControllerFrame((f) => + f.type === MockZWaveFrameType.Request + && f.payload instanceof TransportServiceCCSegmentComplete + ); + + // And the ConfigurationCCInfoReport should have been assembled correctly + const received = await awaitedCommand; + t.expect(received).toBeInstanceOf(ConfigurationCCInfoReport); + t.expect((received as ConfigurationCCInfoReport).info).toBe( + cc.info, + ); + }, + }, +); + +integrationTest( + "Receiving Transport Service commands works (SegmentComplete missing)", + { + // debug: true, + + async testBody(t, driver, node, mockController, mockNode) { + const cc = new ConfigurationCCInfoReport({ + nodeId: 2, + parameter: 1, + reportsToFollow: 0, + info: + "Loooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong text", + }); + const ccBuffer = await cc.serializeAsync(mockNode.encodingContext); + const part1 = ccBuffer.slice(0, 39); + const part2 = ccBuffer.slice(39, 78); + const part3 = ccBuffer.slice(78); + + const awaitedCommand = driver.waitForCommand( + (cc) => cc instanceof ConfigurationCCInfoReport, + 1000, + ); + + const frame1 = new TransportServiceCCFirstSegment({ + nodeId: 2, + sessionId: 1, + datagramSize: part1.length + part2.length + part3.length, + partialDatagram: part1, + }); + + const frame2 = new TransportServiceCCSubsequentSegment({ + nodeId: 2, + sessionId: 1, + datagramSize: frame1.datagramSize, + datagramOffset: part1.length, + partialDatagram: part2, + }); + + const frame3 = new TransportServiceCCSubsequentSegment({ + nodeId: 2, + sessionId: 1, + datagramSize: frame1.datagramSize, + datagramOffset: part1.length + part2.length, + partialDatagram: part3, + }); + + await mockNode.sendToController( + createMockZWaveRequestFrame(frame1), + ); + await wait(30); + await mockNode.sendToController( + createMockZWaveRequestFrame(frame2), + ); + await wait(30); + await mockNode.sendToController( + createMockZWaveRequestFrame(frame3), + ); + + await wait(100); + + // The node should have received the confirmation + mockNode.assertReceivedControllerFrame((f) => + f.type === MockZWaveFrameType.Request + && f.payload instanceof TransportServiceCCSegmentComplete + ); + mockNode.clearReceivedControllerFrames(); + + // And the ConfigurationCCInfoReport should have been assembled correctly + const received = await awaitedCommand; + t.expect(received).toBeInstanceOf(ConfigurationCCInfoReport); + t.expect((received as ConfigurationCCInfoReport).info).toBe( + cc.info, + ); + + // Simulate the SegmentComplete being lost. The node should send the last segment again + + await mockNode.sendToController( + createMockZWaveRequestFrame(frame3), + ); + + await wait(100); + + // The node should have received the confirmation again + mockNode.assertReceivedControllerFrame((f) => + f.type === MockZWaveFrameType.Request + && f.payload instanceof TransportServiceCCSegmentComplete + ); + mockNode.clearReceivedControllerFrames(); + }, + }, +); diff --git a/yarn.lock b/yarn.lock index 5e0dd46aeb07..6a17fe15073b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2533,27 +2533,6 @@ __metadata: languageName: node linkType: hard -"@xstate/graph@npm:^1.4.1": - version: 1.4.1 - resolution: "@xstate/graph@npm:1.4.1" - peerDependencies: - xstate: ^4.29.0 - checksum: 10/67b5fc112d005adf709e227405bc6c23e5e31c826babe6b1a095c246acefbc9e353a9767ae700a800a572cc5cb7f3e934102bcfef8248a87fd5cfc5ef67e464f - languageName: node - linkType: hard - -"@xstate/test@npm:^0.5.1": - version: 0.5.1 - resolution: "@xstate/test@npm:0.5.1" - dependencies: - "@xstate/graph": "npm:^1.4.1" - chalk: "npm:^2.4.2" - peerDependencies: - xstate: ^4.29.0 - checksum: 10/39fb8aa08ea67e0211a200fe1795651a9421ee51fb299b73f471c3b8cec4a922f21d82506ed5b9caff4995472d91dfea77735f19ebd521781caf1efdb9caa2d4 - languageName: node - linkType: hard - "@zwave-js/cc@workspace:*, @zwave-js/cc@workspace:packages/cc": version: 0.0.0-use.local resolution: "@zwave-js/cc@workspace:packages/cc" @@ -9916,13 +9895,6 @@ __metadata: languageName: node linkType: hard -"xstate@npm:4.38.3": - version: 4.38.3 - resolution: "xstate@npm:4.38.3" - checksum: 10/82f30ed1d049d6be6274e54e34f46cad93fe773e4e333753acf363b8010f3685d256f154a91e5c1d615df654e14164dfc630c768af090925442b2c877cb9f11c - languageName: node - linkType: hard - "y18n@npm:^5.0.5": version: 5.0.8 resolution: "y18n@npm:5.0.8" @@ -10079,7 +10051,6 @@ __metadata: "@types/semver": "npm:^7.5.8" "@types/sinon": "npm:^17.0.3" "@types/source-map-support": "npm:^0.5.10" - "@xstate/test": "npm:^0.5.1" "@zwave-js/cc": "workspace:*" "@zwave-js/config": "workspace:*" "@zwave-js/core": "workspace:*" @@ -10109,7 +10080,6 @@ __metadata: typescript: "npm:5.6.2" vitest: "npm:^2.1.4" winston: "npm:^3.15.0" - xstate: "npm:4.38.3" bin: mock-server: bin/mock-server.cjs languageName: unknown From 69162a17df395fdf8d6275c2375fa5ae4619b78b Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 9 Dec 2024 10:47:24 +0000 Subject: [PATCH 02/12] chore(deps-dev): bump eslint-compat-utils from 0.5.1 to 0.6.4 (#7467) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- packages/eslint-plugin/package.json | 2 +- yarn.lock | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/packages/eslint-plugin/package.json b/packages/eslint-plugin/package.json index 084c122f418c..5639e83ab3f4 100644 --- a/packages/eslint-plugin/package.json +++ b/packages/eslint-plugin/package.json @@ -43,7 +43,7 @@ "@typescript-eslint/utils": "^8.8.1", "@zwave-js/core": "workspace:*", "eslint": "^9.12.0", - "eslint-compat-utils": "^0.5.1", + "eslint-compat-utils": "^0.6.4", "eslint-plugin-jsonc": "^2.16.0", "typescript": "5.6.2" } diff --git a/yarn.lock b/yarn.lock index 6a17fe15073b..53df3ff021fe 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2637,7 +2637,7 @@ __metadata: "@typescript-eslint/utils": "npm:^8.8.1" "@zwave-js/core": "workspace:*" eslint: "npm:^9.12.0" - eslint-compat-utils: "npm:^0.5.1" + eslint-compat-utils: "npm:^0.6.4" eslint-plugin-jsonc: "npm:^2.16.0" typescript: "npm:5.6.2" languageName: unknown @@ -4790,7 +4790,7 @@ __metadata: languageName: node linkType: hard -"eslint-compat-utils@npm:^0.5.0, eslint-compat-utils@npm:^0.5.1": +"eslint-compat-utils@npm:^0.5.0": version: 0.5.1 resolution: "eslint-compat-utils@npm:0.5.1" dependencies: @@ -4801,6 +4801,17 @@ __metadata: languageName: node linkType: hard +"eslint-compat-utils@npm:^0.6.4": + version: 0.6.4 + resolution: "eslint-compat-utils@npm:0.6.4" + dependencies: + semver: "npm:^7.5.4" + peerDependencies: + eslint: ">=6.0.0" + checksum: 10/97f08f4aa8d9a1bc1087aaeceab46a5fa65a6d70703c1a2f2cd533562381208fdd0a293ce0f63ad607f1e697ddb348ef1076b02f5afa83c70f4a07ca0dcec90e + languageName: node + linkType: hard + "eslint-plugin-jsonc@npm:^2.16.0": version: 2.16.0 resolution: "eslint-plugin-jsonc@npm:2.16.0" From d6aad246ae67f6f8d9402b4b55b35f72e54896e7 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 9 Dec 2024 10:48:15 +0000 Subject: [PATCH 03/12] chore(deps-dev): bump eslint-plugin-unicorn from 56.0.0 to 56.0.1 (#7468) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- package.json | 2 +- yarn.lock | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/package.json b/package.json index e53ffe027e97..a36ee1774b97 100644 --- a/package.json +++ b/package.json @@ -70,7 +70,7 @@ "del-cli": "^6.0.0", "dprint": "^0.47.5", "eslint": "^9.12.0", - "eslint-plugin-unicorn": "^56.0.0", + "eslint-plugin-unicorn": "^56.0.1", "eslint-plugin-unused-imports": "patch:eslint-plugin-unused-imports@npm%3A4.1.4#~/.yarn/patches/eslint-plugin-unused-imports-npm-4.1.4-a7d7c7cdf3.patch", "execa": "^5.1.1", "husky": "^9.1.6", diff --git a/yarn.lock b/yarn.lock index 53df3ff021fe..9cb64ca66db2 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2790,7 +2790,7 @@ __metadata: del-cli: "npm:^6.0.0" dprint: "npm:^0.47.5" eslint: "npm:^9.12.0" - eslint-plugin-unicorn: "npm:^56.0.0" + eslint-plugin-unicorn: "npm:^56.0.1" eslint-plugin-unused-imports: "patch:eslint-plugin-unused-imports@npm%3A4.1.4#~/.yarn/patches/eslint-plugin-unused-imports-npm-4.1.4-a7d7c7cdf3.patch" execa: "npm:^5.1.1" husky: "npm:^9.1.6" @@ -4829,9 +4829,9 @@ __metadata: languageName: node linkType: hard -"eslint-plugin-unicorn@npm:^56.0.0": - version: 56.0.0 - resolution: "eslint-plugin-unicorn@npm:56.0.0" +"eslint-plugin-unicorn@npm:^56.0.1": + version: 56.0.1 + resolution: "eslint-plugin-unicorn@npm:56.0.1" dependencies: "@babel/helper-validator-identifier": "npm:^7.24.7" "@eslint-community/eslint-utils": "npm:^4.4.0" @@ -4851,7 +4851,7 @@ __metadata: strip-indent: "npm:^3.0.0" peerDependencies: eslint: ">=8.56.0" - checksum: 10/142c66c65b2fd53136727a434b0fc77e9a9f9614aebe09330aeab83b021c842c3a5f9dafe3130c0f39fbd3562e91aadcc55a9de4312639e70fe7efb475cd358e + checksum: 10/59e13ded0e6bf4eff96018f3156829044d6edc37f66d25bc8ca99ed2b44e0cd3e9959587cf5a111498ff2d267a92bed49ade4f4dad98dcd8544e9edd9f6642b2 languageName: node linkType: hard From 6afbab9a664394a135785a2b3fb0b0bacb7023af Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 9 Dec 2024 10:55:45 +0000 Subject: [PATCH 04/12] chore(deps-dev): bump @commitlint/config-conventional from 19.5.0 to 19.6.0 (#7471) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- package.json | 2 +- yarn.lock | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/package.json b/package.json index a36ee1774b97..0cee8f664159 100644 --- a/package.json +++ b/package.json @@ -37,7 +37,7 @@ "@alcalzone/monopack": "^1.3.0", "@alcalzone/release-script": "~3.8.0", "@commitlint/cli": "^19.5.0", - "@commitlint/config-conventional": "^19.5.0", + "@commitlint/config-conventional": "^19.6.0", "@dprint/formatter": "^0.4.1", "@dprint/json": "^0.19.4", "@dprint/markdown": "^0.17.8", diff --git a/yarn.lock b/yarn.lock index 9cb64ca66db2..bb13517afcd8 100644 --- a/yarn.lock +++ b/yarn.lock @@ -292,13 +292,13 @@ __metadata: languageName: node linkType: hard -"@commitlint/config-conventional@npm:^19.5.0": - version: 19.5.0 - resolution: "@commitlint/config-conventional@npm:19.5.0" +"@commitlint/config-conventional@npm:^19.6.0": + version: 19.6.0 + resolution: "@commitlint/config-conventional@npm:19.6.0" dependencies: "@commitlint/types": "npm:^19.5.0" conventional-changelog-conventionalcommits: "npm:^7.0.2" - checksum: 10/5844fb51347677dd28f970a50528fbc44b9b415a8a5fd6fea6c7f6a2a11357956748eda9d1d6ae499430659b0aa78bfa7dcf8abf599951e7f34a581e60bf57da + checksum: 10/f96f6706502edad60d275f62a4aac06993a5fd47c343ad6286d48f173941e4e0848dab6642559099cc6714b513197338ac24da4b07debead5371326eef87bcb8 languageName: node linkType: hard @@ -2757,7 +2757,7 @@ __metadata: "@alcalzone/monopack": "npm:^1.3.0" "@alcalzone/release-script": "npm:~3.8.0" "@commitlint/cli": "npm:^19.5.0" - "@commitlint/config-conventional": "npm:^19.5.0" + "@commitlint/config-conventional": "npm:^19.6.0" "@dprint/formatter": "npm:^0.4.1" "@dprint/json": "npm:^0.19.4" "@dprint/markdown": "npm:^0.17.8" From 863244ee66a9e9edf40d98c8f1035f2fa358e798 Mon Sep 17 00:00:00 2001 From: AlCalzone Date: Mon, 9 Dec 2024 14:27:54 +0100 Subject: [PATCH 05/12] refactor: migrate from Node.js streams to Web Streams API (#7457) --- packages/serial/src/index.ts | 15 +- packages/serial/src/index_mock.ts | 2 +- packages/serial/src/mock/MockPort.ts | 81 ++++++ ...ckSerialPort.ts => _MockSerialPort.ts.txt} | 0 .../serial/src/parsers/BootloaderParsers.ts | 223 +++++++-------- .../serial/src/parsers/SerialAPIParser.ts | 90 ++++-- .../serial/src/parsers/ZWaveSerialFrame.ts | 61 ++++ packages/serial/src/parsers/ZnifferParser.ts | 46 ++- .../serial/src/parsers/ZnifferSerialFrame.ts | 14 + packages/serial/src/plumbing/Faucet.ts | 78 +++++ packages/serial/src/plumbing/Merge.ts | 43 +++ .../serial/src/plumbing/SerialModeSwitch.ts | 98 +++++++ .../serial/src/plumbing/ZWaveSerialParser.ts | 72 +++++ .../src/serialport/LegacyBindingWrapper.ts | 67 +++++ .../serial/src/serialport/NodeSerialPort.ts | 123 ++++++++ packages/serial/src/serialport/NodeSocket.ts | 111 ++++++++ .../src/serialport/ZWaveSerialPort.test.ts | 245 ---------------- .../serial/src/serialport/ZWaveSerialPort.ts | 82 ------ .../src/serialport/ZWaveSerialPortBase.ts | 267 ------------------ .../ZWaveSerialPortImplementation.ts | 2 + .../src/serialport/ZWaveSerialStream.test.ts | 226 +++++++++++++++ .../src/serialport/ZWaveSerialStream.ts | 171 +++++++++++ packages/serial/src/serialport/ZWaveSocket.ts | 74 ----- packages/serial/src/serialport/definitions.ts | 12 + .../serial/src/zniffer/ZnifferSerialPort.ts | 84 ------ .../src/zniffer/ZnifferSerialPortBase.ts | 178 ------------ .../serial/src/zniffer/ZnifferSerialStream.ts | 131 +++++++++ packages/serial/src/zniffer/ZnifferSocket.ts | 76 ----- packages/shared/src/errors.ts | 4 + packages/testing/src/MockController.ts | 70 +++-- .../Controller.nodes.getOrThrow.test.ts | 5 +- packages/zwave-js/src/lib/driver/Driver.ts | 111 +++++--- .../zwave-js/src/lib/driver/DriverMock.ts | 59 ++-- .../zwave-js/src/lib/driver/ZWaveOptions.ts | 6 +- .../zwave-js/src/lib/node/Endpoint.test.ts | 7 +- .../src/lib/node/VirtualEndpoint.test.ts | 83 +++--- packages/zwave-js/src/lib/test/cc/API.test.ts | 7 +- .../cc/CommandClass.persistValues.test.ts | 7 +- .../test/driver/assemblePartialCCs.test.ts | 7 +- .../test/driver/bootloaderDetection.test.ts | 4 +- .../driver/computeNetCCPayloadSize.test.ts | 7 +- .../lib/test/driver/controllerJammed.test.ts | 207 +++++++------- .../test/driver/hasPendingMessages.test.ts | 7 +- .../lib/test/driver/receiveMessages.test.ts | 18 +- .../lib/test/driver/unresponsiveStick.test.ts | 16 +- .../src/lib/test/integrationTestSuite.ts | 9 +- .../src/lib/test/integrationTestSuiteMulti.ts | 9 +- .../lib/test/integrationTestSuiteShared.ts | 10 +- .../src/lib/test/node/Node.ccVersions.test.ts | 7 +- .../lib/test/node/Node.constructor.test.ts | 7 +- .../test/node/Node.createCCInstance.test.ts | 7 +- .../lib/test/node/Node.getEndpoint.test.ts | 7 +- .../lib/test/node/Node.getSetValue.test.ts | 7 +- .../src/lib/test/node/Node.status.test.ts | 7 +- .../lib/test/node/Node.updateNodeInfo.test.ts | 7 +- .../lib/test/node/Node.valueEvents.test.ts | 7 +- .../lib/test/node/Node.waitForWakeup.test.ts | 7 +- packages/zwave-js/src/lib/zniffer/Zniffer.ts | 97 +++++-- packages/zwave-js/src/mockServer.ts | 66 +++-- test/run.ts | 10 +- 60 files changed, 2038 insertions(+), 1523 deletions(-) create mode 100644 packages/serial/src/mock/MockPort.ts rename packages/serial/src/mock/{MockSerialPort.ts => _MockSerialPort.ts.txt} (100%) create mode 100644 packages/serial/src/parsers/ZWaveSerialFrame.ts create mode 100644 packages/serial/src/parsers/ZnifferSerialFrame.ts create mode 100644 packages/serial/src/plumbing/Faucet.ts create mode 100644 packages/serial/src/plumbing/Merge.ts create mode 100644 packages/serial/src/plumbing/SerialModeSwitch.ts create mode 100644 packages/serial/src/plumbing/ZWaveSerialParser.ts create mode 100644 packages/serial/src/serialport/LegacyBindingWrapper.ts create mode 100644 packages/serial/src/serialport/NodeSerialPort.ts create mode 100644 packages/serial/src/serialport/NodeSocket.ts delete mode 100644 packages/serial/src/serialport/ZWaveSerialPort.test.ts delete mode 100644 packages/serial/src/serialport/ZWaveSerialPort.ts delete mode 100644 packages/serial/src/serialport/ZWaveSerialPortBase.ts create mode 100644 packages/serial/src/serialport/ZWaveSerialStream.test.ts create mode 100644 packages/serial/src/serialport/ZWaveSerialStream.ts delete mode 100644 packages/serial/src/serialport/ZWaveSocket.ts create mode 100644 packages/serial/src/serialport/definitions.ts delete mode 100644 packages/serial/src/zniffer/ZnifferSerialPort.ts delete mode 100644 packages/serial/src/zniffer/ZnifferSerialPortBase.ts create mode 100644 packages/serial/src/zniffer/ZnifferSerialStream.ts delete mode 100644 packages/serial/src/zniffer/ZnifferSocket.ts diff --git a/packages/serial/src/index.ts b/packages/serial/src/index.ts index 9721ee83f0e4..295b05ed49d8 100644 --- a/packages/serial/src/index.ts +++ b/packages/serial/src/index.ts @@ -8,13 +8,16 @@ export * from "./message/SuccessIndicator.js"; export * from "./message/ZnifferMessages.js"; export * from "./parsers/BootloaderParsers.js"; export * from "./parsers/SerialAPIParser.js"; -export * from "./serialport/ZWaveSerialPort.js"; -export * from "./serialport/ZWaveSerialPortBase.js"; +export * from "./parsers/ZWaveSerialFrame.js"; +export * from "./parsers/ZnifferSerialFrame.js"; +export * from "./plumbing/Faucet.js"; +export * from "./serialport/LegacyBindingWrapper.js"; +export * from "./serialport/NodeSerialPort.js"; +export * from "./serialport/NodeSocket.js"; export * from "./serialport/ZWaveSerialPortImplementation.js"; -export * from "./serialport/ZWaveSocket.js"; +export * from "./serialport/ZWaveSerialStream.js"; export * from "./serialport/ZWaveSocketOptions.js"; -export * from "./zniffer/ZnifferSerialPort.js"; -export * from "./zniffer/ZnifferSerialPortBase.js"; -export * from "./zniffer/ZnifferSocket.js"; +export * from "./serialport/definitions.js"; +export * from "./zniffer/ZnifferSerialStream.js"; export * from "./index_serialapi.js"; diff --git a/packages/serial/src/index_mock.ts b/packages/serial/src/index_mock.ts index e077354e3e17..59e501aeb6cc 100644 --- a/packages/serial/src/index_mock.ts +++ b/packages/serial/src/index_mock.ts @@ -1,3 +1,3 @@ -export * from "./mock/MockSerialPort.js"; +export * from "./mock/MockPort.js"; export * from "./mock/SerialPortBindingMock.js"; export * from "./mock/SerialPortMock.js"; diff --git a/packages/serial/src/mock/MockPort.ts b/packages/serial/src/mock/MockPort.ts new file mode 100644 index 000000000000..17f2756acc7a --- /dev/null +++ b/packages/serial/src/mock/MockPort.ts @@ -0,0 +1,81 @@ +import { ZWaveLogContainer } from "@zwave-js/core"; +import type { UnderlyingSink, UnderlyingSource } from "node:stream/web"; +import { + type ZWaveSerialBindingFactory, + type ZWaveSerialStream, + ZWaveSerialStreamFactory, +} from "../serialport/ZWaveSerialStream.js"; + +export class MockPort { + public constructor() { + const { readable, writable: sink } = new TransformStream(); + this.#sink = sink; + this.readable = readable; + } + + // Remembers the last written data + public lastWrite: Uint8Array | undefined; + + // Internal stream to allow emitting data from the port + #sourceController: ReadableStreamDefaultController | undefined; + + // Public readable stream to allow handling the written data + #sink: WritableStream; + /** Exposes the data written by the host as a readable stream */ + public readonly readable: ReadableStream; + + public factory(): ZWaveSerialBindingFactory { + return () => { + const sink: UnderlyingSink = { + write: async (chunk, _controller) => { + // Remember the last written data + this.lastWrite = chunk; + // Only write to the sink if its readable side has a reader attached. + // Otherwise, we get backpressure on the writable side of the mock port + if (this.readable.locked) { + const writer = this.#sink.getWriter(); + try { + await writer.write(chunk); + } finally { + writer.releaseLock(); + } + } + }, + }; + + const source: UnderlyingSource = { + start: (controller) => { + this.#sourceController = controller; + }, + }; + + return Promise.resolve({ sink, source }); + }; + } + + public emitData(data: Uint8Array): void { + this.#sourceController?.enqueue(data); + } + + public destroy(): void { + try { + this.#sourceController?.close(); + this.#sourceController = undefined; + } catch { + // Ignore - the controller might already be closed + } + } +} + +export async function createAndOpenMockedZWaveSerialPort(): Promise<{ + port: MockPort; + serial: ZWaveSerialStream; +}> { + const port = new MockPort(); + const factory = new ZWaveSerialStreamFactory( + port.factory(), + new ZWaveLogContainer({ enabled: false }), + ); + const serial = await factory.createStream(); + return { port, serial }; +} diff --git a/packages/serial/src/mock/MockSerialPort.ts b/packages/serial/src/mock/_MockSerialPort.ts.txt similarity index 100% rename from packages/serial/src/mock/MockSerialPort.ts rename to packages/serial/src/mock/_MockSerialPort.ts.txt diff --git a/packages/serial/src/parsers/BootloaderParsers.ts b/packages/serial/src/parsers/BootloaderParsers.ts index e35a7d209584..6f36b74f7d1b 100644 --- a/packages/serial/src/parsers/BootloaderParsers.ts +++ b/packages/serial/src/parsers/BootloaderParsers.ts @@ -1,39 +1,13 @@ -import { Transform, type TransformCallback } from "node:stream"; +import { Bytes } from "@zwave-js/shared"; +import { type Transformer } from "node:stream/web"; import type { SerialLogger } from "../log/Logger.js"; import { XModemMessageHeaders } from "../message/MessageHeaders.js"; - -export enum BootloaderChunkType { - Error, - Menu, - Message, - FlowControl, -} - -export type BootloaderChunk = - | { - type: BootloaderChunkType.Error; - error: string; - _raw: string; - } - | { - type: BootloaderChunkType.Menu; - version: string; - options: { num: number; option: string }[]; - _raw: string; - } - | { - type: BootloaderChunkType.Message; - message: string; - _raw: string; - } - | { - type: BootloaderChunkType.FlowControl; - command: - | XModemMessageHeaders.ACK - | XModemMessageHeaders.NAK - | XModemMessageHeaders.CAN - | XModemMessageHeaders.C; - }; +import { + type BootloaderChunk, + BootloaderChunkType, + type ZWaveSerialFrame, + ZWaveSerialFrameType, +} from "./ZWaveSerialFrame.js"; function isFlowControl(byte: number): boolean { return ( @@ -44,27 +18,24 @@ function isFlowControl(byte: number): boolean { ); } -/** Parses the screen output from the bootloader, either waiting for a NUL char or a timeout */ -export class BootloaderScreenParser extends Transform { - constructor(private logger?: SerialLogger) { - // We read byte streams but emit messages - super({ readableObjectMode: true }); - } +class BootloaderScreenParserTransformer + implements Transformer +{ + constructor(private logger?: SerialLogger) {} private receiveBuffer = ""; private flushTimeout: NodeJS.Timeout | undefined; - _transform( - chunk: any, - encoding: string, - callback: TransformCallback, - ): void { + transform( + chunk: Uint8Array, + controller: TransformStreamDefaultController, + ) { if (this.flushTimeout) { clearTimeout(this.flushTimeout); this.flushTimeout = undefined; } - this.receiveBuffer += chunk.toString("utf8"); + this.receiveBuffer += Bytes.view(chunk).toString("utf8"); // Correct buggy ordering of NUL char in error codes. // The bootloader may send errors as "some error 0x\012" instead of "some error 0x12\0" @@ -82,7 +53,7 @@ export class BootloaderScreenParser extends Transform { if (screen === "") continue; this.logger?.bootloaderScreen(screen); - this.push(screen); + controller.enqueue(screen); } // Emit single flow-control bytes @@ -91,7 +62,7 @@ export class BootloaderScreenParser extends Transform { if (!isFlowControl(charCode)) break; this.logger?.data("inbound", Uint8Array.from([charCode])); - this.push(charCode); + controller.enqueue(charCode); this.receiveBuffer = this.receiveBuffer.slice(1); } @@ -99,12 +70,21 @@ export class BootloaderScreenParser extends Transform { if (this.receiveBuffer) { this.flushTimeout = setTimeout(() => { this.flushTimeout = undefined; - this.push(this.receiveBuffer); + controller.enqueue(this.receiveBuffer); this.receiveBuffer = ""; }, 500); } + } +} - callback(); +/** Parses the screen output from the bootloader, either waiting for a NUL char or a timeout */ +export class BootloaderScreenParser + extends TransformStream +{ + constructor( + logger?: SerialLogger, + ) { + super(new BootloaderScreenParserTransformer(logger)); } } @@ -116,77 +96,84 @@ const menuSuffix = "BL >"; const optionsRegex = /^(?\d+)\. (?