Skip to content

Commit

Permalink
refactor: migrate from Node.js streams to Web Streams API (#7457)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCalzone authored Dec 9, 2024
1 parent 6afbab9 commit 863244e
Show file tree
Hide file tree
Showing 60 changed files with 2,038 additions and 1,523 deletions.
15 changes: 9 additions & 6 deletions packages/serial/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ export * from "./message/SuccessIndicator.js";
export * from "./message/ZnifferMessages.js";
export * from "./parsers/BootloaderParsers.js";
export * from "./parsers/SerialAPIParser.js";
export * from "./serialport/ZWaveSerialPort.js";
export * from "./serialport/ZWaveSerialPortBase.js";
export * from "./parsers/ZWaveSerialFrame.js";
export * from "./parsers/ZnifferSerialFrame.js";
export * from "./plumbing/Faucet.js";
export * from "./serialport/LegacyBindingWrapper.js";
export * from "./serialport/NodeSerialPort.js";
export * from "./serialport/NodeSocket.js";
export * from "./serialport/ZWaveSerialPortImplementation.js";
export * from "./serialport/ZWaveSocket.js";
export * from "./serialport/ZWaveSerialStream.js";
export * from "./serialport/ZWaveSocketOptions.js";
export * from "./zniffer/ZnifferSerialPort.js";
export * from "./zniffer/ZnifferSerialPortBase.js";
export * from "./zniffer/ZnifferSocket.js";
export * from "./serialport/definitions.js";
export * from "./zniffer/ZnifferSerialStream.js";

export * from "./index_serialapi.js";
2 changes: 1 addition & 1 deletion packages/serial/src/index_mock.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export * from "./mock/MockSerialPort.js";
export * from "./mock/MockPort.js";
export * from "./mock/SerialPortBindingMock.js";
export * from "./mock/SerialPortMock.js";
81 changes: 81 additions & 0 deletions packages/serial/src/mock/MockPort.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import { ZWaveLogContainer } from "@zwave-js/core";
import type { UnderlyingSink, UnderlyingSource } from "node:stream/web";
import {
type ZWaveSerialBindingFactory,
type ZWaveSerialStream,
ZWaveSerialStreamFactory,
} from "../serialport/ZWaveSerialStream.js";

export class MockPort {
public constructor() {
const { readable, writable: sink } = new TransformStream<Uint8Array>();
this.#sink = sink;
this.readable = readable;
}

// Remembers the last written data
public lastWrite: Uint8Array | undefined;

// Internal stream to allow emitting data from the port
#sourceController: ReadableStreamDefaultController<Uint8Array> | undefined;

// Public readable stream to allow handling the written data
#sink: WritableStream<Uint8Array>;
/** Exposes the data written by the host as a readable stream */
public readonly readable: ReadableStream<Uint8Array>;

public factory(): ZWaveSerialBindingFactory {
return () => {
const sink: UnderlyingSink<Uint8Array> = {
write: async (chunk, _controller) => {
// Remember the last written data
this.lastWrite = chunk;
// Only write to the sink if its readable side has a reader attached.
// Otherwise, we get backpressure on the writable side of the mock port
if (this.readable.locked) {
const writer = this.#sink.getWriter();
try {
await writer.write(chunk);
} finally {
writer.releaseLock();
}
}
},
};

const source: UnderlyingSource<Uint8Array> = {
start: (controller) => {
this.#sourceController = controller;
},
};

return Promise.resolve({ sink, source });
};
}

public emitData(data: Uint8Array): void {
this.#sourceController?.enqueue(data);
}

public destroy(): void {
try {
this.#sourceController?.close();
this.#sourceController = undefined;
} catch {
// Ignore - the controller might already be closed
}
}
}

export async function createAndOpenMockedZWaveSerialPort(): Promise<{
port: MockPort;
serial: ZWaveSerialStream;
}> {
const port = new MockPort();
const factory = new ZWaveSerialStreamFactory(
port.factory(),
new ZWaveLogContainer({ enabled: false }),
);
const serial = await factory.createStream();
return { port, serial };
}
File renamed without changes.
223 changes: 105 additions & 118 deletions packages/serial/src/parsers/BootloaderParsers.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,13 @@
import { Transform, type TransformCallback } from "node:stream";
import { Bytes } from "@zwave-js/shared";
import { type Transformer } from "node:stream/web";
import type { SerialLogger } from "../log/Logger.js";
import { XModemMessageHeaders } from "../message/MessageHeaders.js";

export enum BootloaderChunkType {
Error,
Menu,
Message,
FlowControl,
}

export type BootloaderChunk =
| {
type: BootloaderChunkType.Error;
error: string;
_raw: string;
}
| {
type: BootloaderChunkType.Menu;
version: string;
options: { num: number; option: string }[];
_raw: string;
}
| {
type: BootloaderChunkType.Message;
message: string;
_raw: string;
}
| {
type: BootloaderChunkType.FlowControl;
command:
| XModemMessageHeaders.ACK
| XModemMessageHeaders.NAK
| XModemMessageHeaders.CAN
| XModemMessageHeaders.C;
};
import {
type BootloaderChunk,
BootloaderChunkType,
type ZWaveSerialFrame,
ZWaveSerialFrameType,
} from "./ZWaveSerialFrame.js";

function isFlowControl(byte: number): boolean {
return (
Expand All @@ -44,27 +18,24 @@ function isFlowControl(byte: number): boolean {
);
}

/** Parses the screen output from the bootloader, either waiting for a NUL char or a timeout */
export class BootloaderScreenParser extends Transform {
constructor(private logger?: SerialLogger) {
// We read byte streams but emit messages
super({ readableObjectMode: true });
}
class BootloaderScreenParserTransformer
implements Transformer<Uint8Array, number | string>
{
constructor(private logger?: SerialLogger) {}

private receiveBuffer = "";
private flushTimeout: NodeJS.Timeout | undefined;

_transform(
chunk: any,
encoding: string,
callback: TransformCallback,
): void {
transform(
chunk: Uint8Array,
controller: TransformStreamDefaultController<number | string>,
) {
if (this.flushTimeout) {
clearTimeout(this.flushTimeout);
this.flushTimeout = undefined;
}

this.receiveBuffer += chunk.toString("utf8");
this.receiveBuffer += Bytes.view(chunk).toString("utf8");

// Correct buggy ordering of NUL char in error codes.
// The bootloader may send errors as "some error 0x\012" instead of "some error 0x12\0"
Expand All @@ -82,7 +53,7 @@ export class BootloaderScreenParser extends Transform {
if (screen === "") continue;

this.logger?.bootloaderScreen(screen);
this.push(screen);
controller.enqueue(screen);
}

// Emit single flow-control bytes
Expand All @@ -91,20 +62,29 @@ export class BootloaderScreenParser extends Transform {
if (!isFlowControl(charCode)) break;

this.logger?.data("inbound", Uint8Array.from([charCode]));
this.push(charCode);
controller.enqueue(charCode);
this.receiveBuffer = this.receiveBuffer.slice(1);
}

// If a partial output is kept for a certain amount of time, emit it aswell
if (this.receiveBuffer) {
this.flushTimeout = setTimeout(() => {
this.flushTimeout = undefined;
this.push(this.receiveBuffer);
controller.enqueue(this.receiveBuffer);
this.receiveBuffer = "";
}, 500);
}
}
}

callback();
/** Parses the screen output from the bootloader, either waiting for a NUL char or a timeout */
export class BootloaderScreenParser
extends TransformStream<Uint8Array, number | string>
{
constructor(
logger?: SerialLogger,
) {
super(new BootloaderScreenParserTransformer(logger));
}
}

Expand All @@ -116,77 +96,84 @@ const menuSuffix = "BL >";
const optionsRegex = /^(?<num>\d+)\. (?<option>.+)/gm;

/** Transforms the bootloader screen output into meaningful chunks */
export class BootloaderParser extends Transform {
export class BootloaderParser extends TransformStream<
number | string,
ZWaveSerialFrame & { type: ZWaveSerialFrameType.Bootloader }
> {
constructor() {
// We read strings and return objects
super({ objectMode: true });
}

_transform(
chunk: any,
encoding: string,
callback: TransformCallback,
): void {
// Flow control bytes come in as numbers
if (typeof chunk === "number") {
return callback(
null,
{
type: BootloaderChunkType.FlowControl,
command: chunk,
}, /* satisfies BootloaderChunk */
);
function wrapChunk(
chunk: BootloaderChunk,
): ZWaveSerialFrame & { type: ZWaveSerialFrameType.Bootloader } {
return {
type: ZWaveSerialFrameType.Bootloader,
data: chunk,
};
}

let screen = (chunk as string).trim();

// Apparently, the bootloader sometimes sends \0 in the wrong location.
// Therefore check if the screen contains the menu preamble, instead of forcing
// it to start with it
const menuPreambleIndex = screen.indexOf(bootloaderMenuPreamble);

if (menuPreambleIndex > -1 && screen.endsWith(menuSuffix)) {
screen = screen.slice(menuPreambleIndex);
const version = preambleRegex.exec(screen)?.groups?.version;
if (!version) {
return callback(
null,
{
type: BootloaderChunkType.Error,
error: "Could not determine bootloader version",
_raw: screen,
}, /* satisfies BootloaderChunk */
const transformer: Transformer<
number | string,
ZWaveSerialFrame & { type: ZWaveSerialFrameType.Bootloader }
> = {
transform(chunk, controller) {
// Flow control bytes come in as numbers
if (typeof chunk === "number") {
controller.enqueue(wrapChunk({
type: BootloaderChunkType.FlowControl,
command: chunk,
}));
return;
}

let screen = chunk.trim();

// Apparently, the bootloader sometimes sends \0 in the wrong location.
// Therefore check if the screen contains the menu preamble, instead of forcing
// it to start with it
const menuPreambleIndex = screen.indexOf(
bootloaderMenuPreamble,
);
}

const options: { num: number; option: string }[] = [];
let match: RegExpExecArray | null;
while ((match = optionsRegex.exec(screen)) !== null) {
options.push({
num: parseInt(match.groups!.num),
option: match.groups!.option,
});
}

this.push(
{
type: BootloaderChunkType.Menu,
_raw: screen,
version,
options,
}, /* satisfies BootloaderChunk */
);
} else {
// Some output
this.push(
{
type: BootloaderChunkType.Message,
_raw: screen,
message: screen,
}, /* satisfies BootloaderChunk */
);
}

callback();
if (menuPreambleIndex > -1 && screen.endsWith(menuSuffix)) {
screen = screen.slice(menuPreambleIndex);
const version = preambleRegex.exec(screen)?.groups?.version;
if (!version) {
controller.enqueue(wrapChunk({
type: BootloaderChunkType.Error,
error: "Could not determine bootloader version",
_raw: screen,
}) /* satisfies BootloaderChunk */);
return;
}

const options: { num: number; option: string }[] = [];
let match: RegExpExecArray | null;
while ((match = optionsRegex.exec(screen)) !== null) {
options.push({
num: parseInt(match.groups!.num),
option: match.groups!.option,
});
}

controller.enqueue(
wrapChunk({
type: BootloaderChunkType.Menu,
_raw: screen,
version,
options,
}), /* satisfies BootloaderChunk */
);
} else {
// Some output
controller.enqueue(
wrapChunk({
type: BootloaderChunkType.Message,
_raw: screen,
message: screen,
}), /* satisfies BootloaderChunk */
);
}
},
};

super(transformer);
}
}
Loading

0 comments on commit 863244e

Please sign in to comment.