diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml index 1d0929c..0689107 100644 --- a/.idea/inspectionProfiles/Project_Default.xml +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -33,5 +33,6 @@ + \ No newline at end of file diff --git a/README.md b/README.md index a6a158a..46e700c 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,10 @@ to any number of different targets, such as databases or MQTT. * Supports arbitrary _**grouping of devices**_ * A group can be for example "Heating" or "Lights". This allows users to get a better overview of their energy consumption when many circuits and devices are involved. -* Can apply various _**filters**_ to the power sensor data (clamping, high-pass etc.) +* Can apply various _**filters**_ to the power sensor data, such as: + * Clamp values, useful for ignoring negative readings from bi-directional sensors + * High-pass filtering, useful for ignoring tiny power readings + * Scaling, often essential when dealing with Modbus sensors * Can _**measure unmetered power**_ too * You can have a current-transformer type sensor measuring a circuit, then a smart plug measuring some specific device on that circuit, then an unmetered type sensor which calculates the difference between the two, yielding the diff --git a/package-lock.json b/package-lock.json index a004a03..141233e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,6 +11,7 @@ "license": "GPL-3.0-or-later", "dependencies": { "@influxdata/influxdb-client": "^1.33.2", + "async-mutex": "^0.5.0", "modbus-serial": "^8.0.16", "mqtt": "^5.1.2", "set-interval-async": "^3.0.3", @@ -2094,6 +2095,15 @@ "resolved": "https://registry.npmjs.org/async/-/async-3.2.4.tgz", "integrity": "sha512-iAB+JbDEGXhyIUavoDl9WP/Jj106Kz9DEn1DPgYw5ruDn0e3Wgi3sKFm55sASdGBNOQB8F59d9qQ7deqrHA8wQ==" }, + "node_modules/async-mutex": { + "version": "0.5.0", + "resolved": "https://registry.npmjs.org/async-mutex/-/async-mutex-0.5.0.tgz", + "integrity": "sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==", + "license": "MIT", + "dependencies": { + "tslib": "^2.4.0" + } + }, "node_modules/babel-jest": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/babel-jest/-/babel-jest-29.7.0.tgz", @@ -5613,6 +5623,12 @@ "optional": true, "peer": true }, + "node_modules/tslib": { + "version": "2.7.0", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.7.0.tgz", + "integrity": "sha512-gLXCKdN1/j47AiHiOkJN69hJmcbGTHI0ImLmbYLHykhgeN0jVGola9yVjFgzCUklsZQMW55o+dW7IXv3RCXDzA==", + "license": "0BSD" + }, "node_modules/type-check": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/type-check/-/type-check-0.4.0.tgz", diff --git a/package.json b/package.json index e1ec2f2..cca457c 100644 --- a/package.json +++ b/package.json @@ -32,6 +32,7 @@ }, "dependencies": { "@influxdata/influxdb-client": "^1.33.2", + "async-mutex": "^0.5.0", "modbus-serial": "^8.0.16", "mqtt": "^5.1.2", "set-interval-async": "^3.0.3", diff --git a/src/circuit.ts b/src/circuit.ts index 907063e..657eef9 100644 --- a/src/circuit.ts +++ b/src/circuit.ts @@ -1,4 +1,5 @@ import { PowerSensor, PowerSensorData } from './sensor' +import { applyFilters } from './filter/filter' export enum CircuitType { Main = 'main', @@ -28,10 +29,24 @@ export const pollPowerSensors = async ( const promises = [] for (const circuit of circuits) { - const sensor = circuit.sensor - - promises.push(sensor.pollFunc(timestamp, circuit, existingSensorData)) + promises.push(pollPowerSensor(timestamp, circuit, existingSensorData)) } return Promise.all(promises) } + +const pollPowerSensor = async ( + timestamp: number, + circuit: Circuit, + existingSensorData?: PowerSensorData[], +): Promise => { + const sensor = circuit.sensor + + let data = await sensor.pollFunc(timestamp, circuit, existingSensorData) + + if (sensor.filters) { + data = applyFilters(sensor.filters, data) + } + + return data +} diff --git a/src/eachwatt.ts b/src/eachwatt.ts index 4872454..1f813b7 100644 --- a/src/eachwatt.ts +++ b/src/eachwatt.ts @@ -11,7 +11,6 @@ import { pollCharacteristicsSensors } from './characteristics' import { createLogger, LogLevel, setLogLevel } from './logger' import { setRequestTimeout as setHttpRequestTimeout } from './http/client' import { setRequestTimeout as setModbusRequestTimeout } from './modbus/client' -import { applyFilters } from './filter/filter' import { setIntervalAsync } from 'set-interval-async' // Set up a signal handler, so we can exit on Ctrl + C when run from Docker @@ -65,11 +64,6 @@ const mainPollerFunc = async (config: Config) => { // Post-process power sensor data powerSensorData = powerSensorData.map((data) => { - // Apply optional data filters - if (data.circuit.sensor.filters) { - data = applyFilters(data.circuit.sensor.filters, data) - } - if (data.power !== undefined) { // Round all numbers to one decimal point data.power = Number(data.power.toFixed(1)) diff --git a/src/filter/filter.ts b/src/filter/filter.ts index 1f1cb64..a7ecf1e 100644 --- a/src/filter/filter.ts +++ b/src/filter/filter.ts @@ -3,6 +3,7 @@ import { PowerSensorData } from '../sensor' export type PowerSensorFilters = { clamp?: 'positive' highPass?: number + scale?: number } export const applyFilters = (filters: PowerSensorFilters, data: PowerSensorData): PowerSensorData => { @@ -21,5 +22,11 @@ export const applyFilters = (filters: PowerSensorFilters, data: PowerSensorData) data.power = 0 } + // Scale + const scale = filters?.scale + if (scale !== undefined) { + data.power = data.power / scale + } + return data } diff --git a/src/modbus/client.ts b/src/modbus/client.ts index fda7106..8ee9fc7 100644 --- a/src/modbus/client.ts +++ b/src/modbus/client.ts @@ -11,14 +11,16 @@ export const setRequestTimeout = (timeoutMs: number) => { logger.info(`Using ${timeoutMs} millisecond timeout for Modbus operations`) } -// Keep track of clients, use one per address +// Keep track of clients, use one per address/port/unit combination const clients = new Map() -export const getClient = (address: string): ModbusRTU => { - if (!clients.has(address)) { +export const getClient = (address: string, port: number, unit: number): ModbusRTU => { + const key = `${address}_${port}_${unit}` + + if (!clients.has(key)) { const client = new ModbusRTU() - clients.set(address, client) + clients.set(key, client) } - return clients.get(address) as ModbusRTU + return clients.get(key) as ModbusRTU } diff --git a/src/sensor/modbus.ts b/src/sensor/modbus.ts index 2c53694..31db6d5 100644 --- a/src/sensor/modbus.ts +++ b/src/sensor/modbus.ts @@ -5,11 +5,13 @@ import { createLogger } from '../logger' import { getClient, requestTimeout } from '../modbus/client' import { getRegisterLength, ModbusRegister, RegisterType, stringify } from '../modbus/register' import ModbusRTU from 'modbus-serial' +import { Mutex } from 'async-mutex' export const DEFAULT_PORT = 502 export const DEFAULT_UNIT = 1 const logger = createLogger('sensor.modbus') +const mutex = new Mutex() export const getSensorData: PowerSensorPollFunction = async ( timestamp: number, @@ -18,10 +20,10 @@ export const getSensorData: PowerSensorPollFunction = async ( const sensor = circuit.sensor as ModbusSensor const sensorSettings = sensor.modbus - const client = getClient(sensorSettings.address) + const client = getClient(sensorSettings.address, sensorSettings.port, sensorSettings.unit) try { - // Connect if not connected yet + // Connect if not connected yet, skip if (!client.isOpen) { logger.info(`Connecting to ${sensorSettings.address}:${sensorSettings.port}...`) await client.connectTCP(sensorSettings.address, { @@ -32,6 +34,14 @@ export const getSensorData: PowerSensorPollFunction = async ( client.setID(sensorSettings.unit) // Request timeout client.setTimeout(requestTimeout) + + // Wait 100 ms for the port to open, if it's not open, give up and return empty data + await new Promise((resolve) => setTimeout(resolve, 100)) + + if (!client.isOpen) { + logger.warn(`Modbus TCP channel not open after 100ms, will not attempt to read values this tick`) + return emptySensorData(timestamp, circuit) + } } // Read the register and parse it accordingly @@ -58,16 +68,19 @@ const readRegisters = async ( const address = register.address const length = getRegisterLength(register) - switch (register.registerType) { - case RegisterType.HOLDING_REGISTER: - return client.readHoldingRegisters(address, length) - case RegisterType.INPUT_REGISTER: - return client.readInputRegisters(address, length) - case RegisterType.COIL: - return client.readCoils(address, length) - case RegisterType.DISCRETE_INPUT: - return client.readDiscreteInputs(address, length) - } + // Serialize access to the underlying Modbus client + return mutex.runExclusive(async () => { + switch (register.registerType) { + case RegisterType.HOLDING_REGISTER: + return client.readHoldingRegisters(address, length) + case RegisterType.INPUT_REGISTER: + return client.readInputRegisters(address, length) + case RegisterType.COIL: + return client.readCoils(address, length) + case RegisterType.DISCRETE_INPUT: + return client.readDiscreteInputs(address, length) + } + }) } const parseReadRegisterResult = (result: ReadRegisterResult | ReadCoilResult, register: ModbusRegister): number => { diff --git a/tests/filter/filter.test.ts b/tests/filter/filter.test.ts index 2db1735..405656c 100644 --- a/tests/filter/filter.test.ts +++ b/tests/filter/filter.test.ts @@ -2,7 +2,6 @@ import { emptySensorData, PowerSensorData, SensorType } from '../../src/sensor' import { applyFilters, PowerSensorFilters } from '../../src/filter/filter' import { Circuit } from '../../src/circuit' import { getSensorData as getDummySensorData } from '../../src/sensor/dummy' -import exp = require('node:constants') test('clamping works', () => { const filters: PowerSensorFilters = {} @@ -21,7 +20,7 @@ test('clamping works', () => { expect(data.power).toEqual(0) }) -test('high-pas works', () => { +test('high-pass works', () => { const filters: PowerSensorFilters = {} let data: PowerSensorData = dummySensorData() @@ -38,6 +37,20 @@ test('high-pas works', () => { expect(data.power).toEqual(3) }) +test('scale works', () => { + const filters: PowerSensorFilters = {} + let data: PowerSensorData = dummySensorData() + + data.power = 155 + data = applyFilters(filters, data) + expect(data.power).toEqual(155) + + data.power = 155 + filters.scale = 0.1 + data = applyFilters(filters, data) + expect(data.power).toEqual(1550) +}) + const dummySensorData = (): PowerSensorData => { const circuit: Circuit = { name: 'dummy',