From 6bfe78da3389b818047f692385310753d35e7991 Mon Sep 17 00:00:00 2001 From: Gordon Stein <7331488+gsteinLTU@users.noreply.github.com> Date: Mon, 5 Aug 2024 13:50:52 -0500 Subject: [PATCH] IoTScape Improvements (#205) * Copy over old changes * Fix code formatting * Fix typo and include other change * Fix code formatting * Fix missing methods * Fix code formatting * fix sending * Fix code formatting * Fix set key behavior * Setup to allow "set client rate" and "set total rate" * Fix code formatting * Improve comment * Sequence numbers working * Fix code formatting * Rate limiting * Fix code formatting * Send client IDs * Fix code formatting * Send service name with message * HTTP announce endpoint * Allow "lite" announce * Fix code formatting * Fix to function info * Fix code formatting * Send device messages * Take methods out of "basic" category * Route for HTTP responses to IoTScape messages * Fix code formatting * Symmetric --------- Co-authored-by: Format Bot --- src/community/device-service.js | 33 +- src/procedures/iotscape/iotscape-devices.js | 365 ++++++++++++ src/procedures/iotscape/iotscape-services.js | 590 +++++++++++++------ src/procedures/iotscape/iotscape.js | 321 +++++++--- src/procedures/iotscape/routes.js | 46 ++ test/procedures/iotscape.spec.js | 1 + 6 files changed, 1100 insertions(+), 256 deletions(-) create mode 100644 src/procedures/iotscape/iotscape-devices.js diff --git a/src/community/device-service.js b/src/community/device-service.js index e74f6504..d6b09036 100644 --- a/src/community/device-service.js +++ b/src/community/device-service.js @@ -1,5 +1,7 @@ const createLogger = require("../procedures/utils/logger"); const IoTScapeServices = require("../procedures/iotscape/iotscape-services"); +const IoTScapeDevices = require("../procedures/iotscape/iotscape-devices"); +const IoTScape = require("../procedures/iotscape/iotscape"); /** * Represents a service created for a IoTScape device @@ -24,12 +26,13 @@ class DeviceService { this._docs = { description: record.description, - categories: [["Community", "Device"]], + categories: [["Community", "Devices"], ["Devices", "Community"]], getDocFor: (method) => { let m = record.methods.find((val) => val.name == method); return { name: m.name, description: m.documentation, + categories: m.categories, args: m.arguments.map((argument) => ({ name: argument.name, optional: argument.optional, @@ -41,10 +44,10 @@ class DeviceService { } async _initializeRPC(methodSpec) { - // getDevices and listen have special implementations + // Default methods have special implementations if (methodSpec.name === "getDevices") { this[methodSpec.name] = async function () { - return IoTScapeServices.getDevices(this.serviceName); + return IoTScapeDevices.getDevices(this.serviceName); }; } else if (methodSpec.name === "listen") { this[methodSpec.name] = async function () { @@ -54,12 +57,30 @@ class DeviceService { ...arguments, ); }; + } else if (methodSpec.name === "send") { + this[methodSpec.name] = async function () { + return IoTScape._send( + this.serviceName, + arguments[0], + arguments[1], + this.caller, + ); + }; + } else if (methodSpec.name === "getMessageTypes") { + this[methodSpec.name] = async function () { + return IoTScapeServices.getMessageTypes(this.serviceName); + }; + } else if (methodSpec.name === "getMethods") { + this[methodSpec.name] = async function () { + return IoTScapeServices.getMethods(this.serviceName); + }; } else { this[methodSpec.name] = async function () { - return await IoTScapeServices.call( + return await IoTScape._send( this.serviceName, - methodSpec.name, - ...arguments, + arguments[0], + [methodSpec.name, ...Object.values(arguments).splice(1)].join(" "), + this.caller, ); }; } diff --git a/src/procedures/iotscape/iotscape-devices.js b/src/procedures/iotscape/iotscape-devices.js new file mode 100644 index 00000000..c60b4f0a --- /dev/null +++ b/src/procedures/iotscape/iotscape-devices.js @@ -0,0 +1,365 @@ +const logger = require("../utils/logger")("iotscape-devices"); +const ciphers = require("../roboscape/ciphers"); + +/** + * Stores information about registered devices, with a list of IDs and their respective hosts + */ +const IoTScapeDevices = {}; +IoTScapeDevices._services = {}; +IoTScapeDevices._encryptionStates = {}; + +/** + * Encrypt a string with a device's encryption settings + * @param {String} service Service device is contained in + * @param {String} id ID of device to use encryption settings for + * @param {String} plaintext Plaintext to encrypt + * @returns Plaintext encrypted with device's encryption settings + */ +IoTScapeDevices.deviceEncrypt = function (service, id, plaintext) { + let encryptionState = IoTScapeDevices.getEncryptionState(service, id); + return ciphers[encryptionState.cipher].encrypt( + plaintext, + encryptionState.key, + ); +}; + +/** + * Encrypt a string with a device's encryption settings + * @param {String} service Service device is contained in + * @param {String} id ID of device to use encryption settings for + * @param {String} ciphertext Ciphertext to decrypt + * @returns Ciphertext decrypted with device's encryption settings + */ +IoTScapeDevices.deviceDecrypt = function (service, id, ciphertext) { + let encryptionState = IoTScapeDevices.getEncryptionState(service, id); + return ciphers[encryptionState.cipher].decrypt( + ciphertext, + encryptionState.key, + ); +}; + +/** + * Get the remote host of a IoTScape device + * @param {String} service Name of service + * @param {String} id ID of device + */ +IoTScapeDevices.getInfo = function (service, id) { + return IoTScapeDevices._services[service][id]; +}; + +/** + * Get a device's encryption settings (or defaults if not set) + * @param {String} service Service device is contained in + * @param {String} id ID of device to get encryption settings for + */ +IoTScapeDevices.getEncryptionState = function (service, id) { + if (!IoTScapeDevices.deviceExists(service, id)) { + throw new Error("Device not found"); + } + + if (!Object.keys(IoTScapeDevices._encryptionStates).includes(service)) { + IoTScapeDevices._encryptionStates[service] = {}; + } + + if (!Object.keys(IoTScapeDevices._encryptionStates[service]).includes(id)) { + // Create entry with default + IoTScapeDevices._encryptionStates[service][id] = { + key: [0], + cipher: "plain", + lastSeqNum: -1, + totalRate: 0, // in messages per second + clientRate: 0, // in messages per second + clientPenalty: 0, // in seconds + totalCount: 0, + clientCounts: {}, + }; + } + + const state = IoTScapeDevices._encryptionStates[service][id]; + + if (state.cipher == "linked") { + return IoTScapeDevices.getEncryptionState(state.key.service, state.key.id); + } + + return state; +}; + +/** + * Determine if a message should be accepted from a device + * @param {String} service Service device is contained in + * @param {String} id ID of device to use encryption settings for + * @param {String} clientId ID of client + * @param {Number} seqNum Sequence number of message + * @returns {Boolean} If message should be accepted + */ +IoTScapeDevices.accepts = function (service, id, clientId, seqNum = -1) { + const state = IoTScapeDevices.getEncryptionState(service, id); + + // Check sequence number + if ( + state.lastSeqNum >= 0 && + (seqNum <= state.lastSeqNum || seqNum > state.lastSeqNum + 100) + ) { + return false; + } else if (seqNum > -1) { + state.lastSeqNum = seqNum; + } + + let client = state.clientCounts[clientId]; + if (!client) { + client = { + count: 0, + penalty: 0, + }; + state.clientCounts[clientId] = client; + } + + if (client.penalty > 0) { + return false; + } + + // Check rate limits + if (state.clientRate > 0 && client.count + 1 > state.clientRate) { + client.penalty = 1 + state.clientPenalty; + return false; + } + + if (state.totalRate > 0 && state.totalCount + 1 > state.totalRate) { + return false; + } + + state.totalCount += 1; + client.count += 1; + state.clientCounts[clientId] = client; + IoTScapeDevices._encryptionStates[service][id] = state; + + return true; +}; + +/** + * Updates encryption settings for a device + * @param {String} service Service device is contained in + * @param {String} id ID of device to update encryption settings for + * @param {String=} key Key to set + * @param {String=} cipher Cipher to set + * @param {Number=} clientRate Maximum messages per second from a single client + * @param {Number=} penalty Penalty in seconds for exceeding clientRate + * @param {Number=} totalRate Maximum messages per second from all clients + */ +IoTScapeDevices.updateEncryptionState = function ( + service, + id, + key = null, + cipher = null, + clientRate = null, + penalty = null, + totalRate = null, +) { + logger.log(`Updating encryption state for ${service}:${id}`); + if (!IoTScapeDevices.deviceExists(service, id)) { + throw new Error("Device not found"); + } + + if (!Object.keys(IoTScapeDevices._encryptionStates).includes(service)) { + IoTScapeDevices._encryptionStates[service] = {}; + } + + if (!Object.keys(IoTScapeDevices._encryptionStates[service]).includes(id)) { + // Create entry with default + IoTScapeDevices._encryptionStates[service][id] = { + key: [0], + cipher: "plain", + lastSeqNum: -1, + totalRate: 0, // in messages per second + clientRate: 0, // in messages per second + clientPenalty: 0, // in seconds + totalCount: 0, + clientCounts: {}, + }; + } + + // Update key if requested + if (key != null) { + IoTScapeDevices._setKey(service, id, key, cipher); + + if ( + key != [0] && + IoTScapeDevices._encryptionStates[service][id].cipher == "plain" + ) { + cipher = "caesar"; + } + } + + // Update cipher if requested + cipher = (cipher || "").toLowerCase(); + + if (["linked", ...Object.keys(ciphers)].includes(cipher)) { + IoTScapeDevices._encryptionStates[service][id].cipher = cipher; + } else if (cipher != "") { + // Prevent attempts to use ciphers with no implementation + throw new Error("Invalid cipher"); + } + + // Update rates if requested + if (clientRate != null) { + // Parse client rate + if (clientRate instanceof String || typeof clientRate === "string") { + clientRate = parseInt(clientRate); + } + + if (clientRate >= 0) { + IoTScapeDevices._encryptionStates[service][id].clientRate = clientRate; + } else { + throw new Error("Invalid client rate"); + } + } + + if (penalty != null) { + // Parse penalty + if (penalty instanceof String || typeof penalty === "string") { + penalty = parseInt(penalty); + } + + if (penalty >= 0) { + IoTScapeDevices._encryptionStates[service][id].clientPenalty = penalty; + } else { + throw new Error("Invalid penalty"); + } + } + + if (totalRate != null) { + // Parse total rate + if (totalRate instanceof String || typeof totalRate === "string") { + totalRate = parseInt(totalRate); + } + + if (totalRate >= 0) { + IoTScapeDevices._encryptionStates[service][id].totalRate = totalRate; + } else { + throw new Error("Invalid total rate"); + } + } +}; + +IoTScapeDevices._setKey = function (service, id, key, cipher) { + // Set default cipher + if ( + IoTScapeDevices._encryptionStates[service][id].cipher === "plain" && + cipher == null + ) { + cipher = "caesar"; + } + + // Setting linked status does not require key to be parsed + if (cipher != "linked") { + key = key.map((c) => parseInt(c)); + + if (key.includes(NaN)) { + throw new Error("Invalid key"); + } + } + + IoTScapeDevices._encryptionStates[service][id].key = key; +}; + +/** + * Remove a device from a service + * @param {String} service Name of service + * @param {String} id ID of device to remove + */ +IoTScapeDevices.removeDevice = function (service, id) { + if (!IoTScapeDevices.deviceExists(service, id)) { + return; + } + + delete IoTScapeDevices._services[service][id]; + + if (Object.keys(IoTScapeDevices._encryptionStates).includes(service)) { + delete IoTScapeDevices._encryptionStates[service][id]; + } + + if ( + IoTScapeDevices._listeningClients[service] !== undefined && + IoTScapeDevices._listeningClients[service][id] !== undefined + ) { + delete IoTScapeDevices._listeningClients[service][id]; + } + + logger.log(`Removed ${service}:${id}`); +}; + +/** + * List IDs of devices associated for a service + * @param {String} service Name of service to get device IDs for + */ +IoTScapeDevices.getDevices = function (service) { + const serviceDict = IoTScapeDevices._services[service]; + return Object.keys(serviceDict || []); +}; + +/** + * Determine if a device with a given ID exists + * @param {String} service Name of service + * @param {String} id ID of device + * @returns {Boolean} If device exists + */ +IoTScapeDevices.deviceExists = function (service, id) { + return IoTScapeDevices.getDevices(service).includes(id); +}; + +/** + * Clear the encryption settings for a device + * @param {String} service Name of service + * @param {String} id ID of device + */ +IoTScapeDevices.clearEncryption = function (service, id) { + if (Object.keys(IoTScapeDevices._encryptionStates).includes(service)) { + delete IoTScapeDevices._encryptionStates[service][id]; + } +}; + +/** + * Set targetService's device with targetId as its ID to use the encryption settings of a different device + * @param {String} service Name of service + * @param {String} id ID of device + * @param {String} targetService + * @param {String} targetId + */ +IoTScapeDevices.link = function (service, id, targetService, targetId) { + // Validate input + if (service == targetService && id == targetId) { + throw new Error("Device cannot be linked to self"); + } + + // Prevent cycles and long chains by enforcing only one layer of linking + if (IoTScapeDevices.getEncryptionState(service, id).cipher == "linked") { + throw new Error("Cannot link to other linked device"); + } + + IoTScapeDevices.updateEncryptionState(targetService, targetId, { + service, + id, + }, "linked"); +}; + +// Clear rates every second +setInterval(() => { + for (let service in IoTScapeDevices._encryptionStates) { + for (let id in IoTScapeDevices._encryptionStates[service]) { + let state = IoTScapeDevices._encryptionStates[service][id]; + + state.totalCount = 0; + + for (let clientId in state.clientCounts) { + let client = state.clientCounts[clientId]; + client.count = 0; + client.penalty = Math.max(0, client.penalty - 1); + state.clientCounts[clientId] = client; + } + + IoTScapeDevices._encryptionStates[service][id] = state; + } + } +}, 1000); + +module.exports = IoTScapeDevices; diff --git a/src/procedures/iotscape/iotscape-services.js b/src/procedures/iotscape/iotscape-services.js index 8fe12175..fbc9bb62 100644 --- a/src/procedures/iotscape/iotscape-services.js +++ b/src/procedures/iotscape/iotscape-services.js @@ -1,12 +1,12 @@ const logger = require("../utils/logger")("iotscape-services"); const { setTimeout, setInterval } = require("../../timers"); +const IoTScapeDevices = require("./iotscape-devices"); /** * Stores information about registered services, with a list of IDs and their respective hosts */ const IoTScapeServices = {}; -IoTScapeServices._services = {}; IoTScapeServices._serviceDefinitions = {}; /** @@ -21,56 +21,32 @@ IoTScapeServices.updateOrCreateServiceInfo = function ( id, rinfo, ) { - let service = IoTScapeServices._services[name]; + let service = IoTScapeDevices._services[name]; IoTScapeServices._serviceDefinitions[name] = definition; - logger.log( - "Discovering " + name + ":" + id + " at " + rinfo.address + ":" + - rinfo.port, - ); - if (!service) { // Service not added yet - service = IoTScapeServices._services[name] = {}; + service = IoTScapeDevices._services[name] = {}; } - service[id] = rinfo; -}; - -/** - * Remove a device from a service - * @param {String} service Name of service - * @param {String} id ID of device to remove - */ -IoTScapeServices.removeDevice = function (service, id) { - if (!IoTScapeServices.deviceExists(service, id)) { + if (!rinfo) { + logger.log("Service " + name + " created without connection info"); return; } - delete IoTScapeServices._services[service][id]; - - if ( - IoTScapeServices._listeningClients[service] !== undefined && - IoTScapeServices._listeningClients[service][id] !== undefined - ) { - delete IoTScapeServices._listeningClients[service][id]; - } -}; + logger.log( + "Discovering " + name + ":" + id + " at " + rinfo.address + ":" + + rinfo.port, + ); -/** - * List IDs of devices associated for a service - * @param {String} service Name of service to get device IDs for - */ -IoTScapeServices.getDevices = function (service) { - const serviceDict = IoTScapeServices._services[service]; - return Object.keys(serviceDict || []); + service[id] = rinfo; }; /** * List services */ IoTScapeServices.getServices = function () { - return Object.keys(IoTScapeServices._services); + return Object.keys(IoTScapeDevices._services); }; /** @@ -79,27 +55,17 @@ IoTScapeServices.getServices = function () { */ IoTScapeServices.getMessageTypes = function (service) { if (!IoTScapeServices.serviceExists(service)) { - return []; + return {}; } // Parse events into NetsBlox-friendlier format - let eventsInfo = IoTScapeServices._serviceDefinitions[service].events; + let eventsInfo = IoTScapeServices._serviceDefinitions[service].events || {}; eventsInfo = Object.keys(eventsInfo).map( - (event) => [event, eventsInfo[event].params], + (event) => [event, ["id", ...eventsInfo[event].params]], ); return eventsInfo; }; -/** - * Determine if a device with a given ID exists - * @param {String} service Name of service - * @param {String} id ID of device - * @returns {Boolean} If device exists - */ -IoTScapeServices.deviceExists = function (service, id) { - return IoTScapeServices.getDevices(service).includes(id); -}; - /** * Determine if a service exists * @param {String} service Name of service @@ -109,6 +75,91 @@ IoTScapeServices.serviceExists = function (service) { return IoTScapeServices.getServices().includes(service); }; +IoTScapeServices._specialMethods = { + "heartbeat": { + returns: { + type: ["boolean"], + }, + }, + "setKey": { + params: [{ + "name": "key", + "documentation": "Key to set", + "type": "number", + "optional": false, + }], + returns: { + type: ["void"], + }, + }, + "setCipher": { + params: [{ + "name": "cipher", + "documentation": "Cipher to use", + "type": "string", + "optional": false, + }], + returns: { + type: ["void"], + }, + }, + "setClientRate": { + params: [{ + "name": "rate", + "documentation": "Maximum number of messages per second per client", + "type": "number", + "optional": false, + }, { + "name": "penalty", + "documentation": "Penalty for exceeding rate limit in seconds", + "type": "number", + "optional": false, + }], + returns: { + type: ["void"], + }, + }, + "setTotalRate": { + params: [{ + "name": "rate", + "documentation": "Maximum number of messages per second for all clients", + "type": "number", + "optional": false, + }], + returns: { + type: ["void"], + }, + }, + "resetRate": { + params: [], + returns: { + type: ["void"], + }, + }, + "_requestedKey": { + returns: { + type: ["void"], + }, + }, +}; + +/** + * List methods associated with a service + * @param {string} service Name of service + */ +IoTScapeServices.getMethods = function (service) { + if (!IoTScapeServices.serviceExists(service)) { + return {}; + } + + // Parse methods into NetsBlox-friendlier format + let methodsInfo = IoTScapeServices._serviceDefinitions[service].methods; + methodsInfo = Object.keys(methodsInfo).map( + (method) => [method, methodsInfo[method].params.map((param) => param.name)], + ); + return methodsInfo; +}; + /** * Determine if a service has a given function * @param {String} service Name of service @@ -120,30 +171,24 @@ IoTScapeServices.functionExists = function (service, func) { return false; } - return func === "heartbeat" || + return Object.keys(IoTScapeServices._specialMethods).includes(func) || IoTScapeServices.getFunctionInfo(service, func) !== undefined; }; -/** - * Get the remote host of a IoTScape device - * @param {String} service Name of service - * @param {String} id ID of device - */ -IoTScapeServices.getInfo = function (service, id) { - return IoTScapeServices._services[service][id]; -}; - /** * Get definition information for a given function * @param {String} service Name of service * @param {String} func Name of function */ IoTScapeServices.getFunctionInfo = function (service, func) { - if (func === "heartbeat") { - return { returns: { type: ["boolean"] } }; + if (Object.keys(IoTScapeServices._specialMethods).includes(func)) { + return IoTScapeServices._specialMethods[func]; } - return IoTScapeServices._serviceDefinitions[service].methods[func]; + let method = + (IoTScapeServices._serviceDefinitions[service] ?? { methods: [] }).methods + .filter((m) => m.name === func); + return method.length > 0 ? method[0] : undefined; }; IoTScapeServices._lastRequestID = 0; @@ -168,7 +213,7 @@ IoTScapeServices.listen = function (service, client, id) { id = id.toString(); // Validate name and ID - if (!IoTScapeServices.deviceExists(service, id)) { + if (!IoTScapeDevices.deviceExists(service, id)) { return false; } @@ -180,164 +225,357 @@ IoTScapeServices.listen = function (service, client, id) { IoTScapeServices._listeningClients[service][id] = []; } - // Prevent listen if this client is already listening + // Prevent listen if this client/role/project combination is already listening if ( - IoTScapeServices._listeningClients[service][id].some((c) => - c.clientId === client.clientId + !IoTScapeServices._listeningClients[service][id].some((existingClient) => + existingClient.clientId === client.clientId && + existingClient.roleId == client.roleId && + existingClient.projectId == client.projectId ) ) { - return; + IoTScapeServices._listeningClients[service][id].push(client); } - - IoTScapeServices._listeningClients[service][id].push(client); }; /** * Make a call to a IoTScape function * @param {String} service Name of service + * @param {String} func RPC on device to call * @param {String} id ID of device + * @param {Object} clientId Client making the call * @param {...any} args */ -IoTScapeServices.call = async function (service, func, id, ...args) { +IoTScapeServices.call = async function (service, func, id, clientId, ...args) { id = id.toString(); // Validate name, ID, and function if ( - !IoTScapeServices.deviceExists(service, id) || + !IoTScapeDevices.deviceExists(service, id) || !IoTScapeServices.functionExists(service, func) ) { + if (!IoTScapeDevices.deviceExists(service, id)) { + logger.log("Device does not exist"); + } + if (!IoTScapeServices.functionExists(service, func)) { + logger.log("Function does not exist"); + } + return false; } - // Create and send request + logger.log(`Calling ${service}:${id}.${func}(${args.join(", ")})`); + const reqid = IoTScapeServices._generateRequestID(); - let request = { - id: reqid, - service: service, - device: id, - function: func, - params: [...args], - }; - - // Determine response type - const methodInfo = IoTScapeServices.getFunctionInfo(service, func); - const responseType = methodInfo.returns.type; - - // Expects a value response - let attempt = (resolve) => { - const rinfo = IoTScapeServices.getInfo(service, id); - IoTScapeServices.socket.send( - JSON.stringify(request), - rinfo.port, - rinfo.address, - ); - IoTScapeServices._awaitingRequests[reqid] = { + // Don't send out serverside commands + if (func !== "heartbeat") { + // Create and send request + let request = { + id: reqid, service: service, + device: id, function: func, - resolve, + params: [...args], }; - }; - - let timeout = (_, reject) => { - // Time out eventually - setTimeout(() => { - delete IoTScapeServices._awaitingRequests[reqid]; - reject(); - }, 3000); - }; - - let promise = Promise.race([ - new Promise(attempt), - new Promise(timeout), - ]).then((result) => result).catch(() => { - // Make second attempt - logger.log("IoTScape request timed out, trying again"); - return Promise.race([new Promise(attempt), new Promise(timeout)]).then(( - result, - ) => result).catch(() => { - logger.log("IoTScape request timed out again, giving up"); - return "Response timed out."; - }); - }); - // No response required - if (responseType.length < 1 || responseType[0] == "void") { - return; - } + // Handle special functions + if ( + func == "setKey" || func == "setCipher" || func == "setClientRate" || + func == "setTotalRate" || func == "resetRate" + ) { + // Handle setKey/Cipher after relaying message to use old encryption + if (IoTScapeDevices.getEncryptionState(service, id).cipher != "linked") { + if (func === "setKey") { + IoTScapeDevices.updateEncryptionState(service, id, args, null); + } else if (func === "setCipher") { + IoTScapeDevices.updateEncryptionState(service, id, null, args[0]); + } else if (func === "setClientRate") { + IoTScapeDevices.updateEncryptionState( + service, + id, + null, + null, + args[0], + args[1], + ); + } else if (func === "setTotalRate") { + IoTScapeDevices.updateEncryptionState( + service, + id, + null, + null, + null, + null, + args[0], + ); + } else if (func === "resetRate") { + IoTScapeDevices.updateEncryptionState( + service, + id, + null, + null, + 0, + 0, + 0, + ); + } + } else { + // Not supported on linked device + return false; + } - // Event response type - if (responseType[0].startsWith("event")) { - return; - } + return true; + } - return promise; -}; + // Determine response type + const methodInfo = IoTScapeServices.getFunctionInfo(service, func); + const responseType = methodInfo.returns.type; -IoTScapeServices.start = function (socket) { - IoTScapeServices.socket = socket; + // Add caller info to request + if (clientId) { + request.clientId = clientId; + } else { + request.clientId = "server"; + } + // Expects a value response + let attempt = (resolve) => { + const rinfo = IoTScapeDevices.getInfo(service, id); + IoTScapeServices.socket.send( + JSON.stringify(request), + rinfo.port, + rinfo.address, + ); + + IoTScapeServices._awaitingRequests[reqid] = { + service: service, + function: func, + resolve, + }; + }; - // Handle incoming responses - IoTScapeServices.socket.on("message", function (message) { - let parsed; + let timeout = (_, reject) => { + // Time out eventually + setTimeout(() => { + delete IoTScapeServices._awaitingRequests[reqid]; + reject(); + }, 3000); + }; - try { - parsed = JSON.parse(message); - } catch (err) { - logger.log("Error parsing IoTScape message: " + err); + let promise = Promise.race([ + new Promise(attempt), + new Promise(timeout), + ]).then((result) => result).catch(() => { + // Make second attempt + logger.log("IoTScape request timed out, trying again"); + return Promise.race([new Promise(attempt), new Promise(timeout)]).then(( + result, + ) => result).catch(() => { + logger.log("IoTScape request timed out again, giving up"); + return "Response timed out."; + }); + }); + + // No response required + if (responseType.length < 1 || responseType[0] == "void") { return; } - // Ignore other messages - if (!parsed.request) { + // Event response type + if (responseType[0].startsWith("event")) { return; } - const requestID = parsed.request; + return promise; + } +}; +/** + * Map of message types which should be handled on the server to their handlers + */ +IoTScapeServices._specialMessageTypes = { + "_reset": (parsed) => { + // Reset encryption on device + logger.log(`Resetting ${parsed.service}:${parsed.id}`); + IoTScapeDevices.clearEncryption(parsed.service, parsed.id); + }, + "_requestKey": (parsed) => { if ( - Object.keys(IoTScapeServices._awaitingRequests).includes( - requestID.toString(), - ) + IoTScapeDevices.getEncryptionState(parsed.service, parsed.id).cipher == + "linked" ) { - if (parsed.response) { - // Return multiple results as a list, single result as a value - const methodInfo = IoTScapeServices.getFunctionInfo( - IoTScapeServices._awaitingRequests[requestID].service, - IoTScapeServices._awaitingRequests[requestID].function, - ); - const responseType = methodInfo.returns.type; - - try { - if (responseType.length > 1) { - IoTScapeServices._awaitingRequests[requestID].resolve( - parsed.response, - ); - } else { - IoTScapeServices._awaitingRequests[requestID].resolve( - ...parsed.response, - ); - } - } catch (error) { - logger.log("IoTScape response invalid: " + error); - } + logger.log( + `Refused to generate HW key for ${parsed.service}:${parsed.id} due to existing link`, + ); + return; + } + // Generate hardware key + let key = []; + + for (let i = 0; i < 4; i++) { + key.push(Math.floor(Math.random() * 16)); + } + + IoTScapeDevices.updateEncryptionState( + parsed.service, + parsed.id, + key, + "caesar", + ); + + // Tell device what the new key is, so it can display it + IoTScapeServices.call( + parsed.service, + "_requestedKey", + parsed.id, + null, + ...key, + ); + }, + "_link": (parsed) => { + const targetService = parsed.event.args.service; + const targetID = parsed.event.args.id; + + if (!IoTScapeDevices.deviceExists(targetService, targetID)) { + logger.log( + `Requested invalid link of ${parsed.service}:${parsed.id} to ${targetService}:${targetID}`, + ); + return; + } + + logger.log( + `Linking ${parsed.service}:${parsed.id} to ${targetService}:${targetID}`, + ); + + IoTScapeDevices.link(parsed.service, parsed.id, targetService, targetID); + }, +}; + +const _handleMessage = function (message, remote) { + let parsed = null; + + try { + parsed = JSON.parse(message); + } catch (err) { + logger.log("Error parsing IoTScape message: " + err); + return; + } + + if (parsed == null) { + logger.log("Invalid IoTScape message"); + return; + } + + // Ignore other messages + if (parsed.request) { + _handleResponse(parsed); + } + + if (parsed.event && IoTScapeDevices.deviceExists(parsed.service, parsed.id)) { + _handleEvent(parsed, remote); + } +}; + +const _handleResponse = function (parsed) { + const requestID = parsed.request; - delete IoTScapeServices._awaitingRequests[requestID]; + if ( + Object.keys(IoTScapeServices._awaitingRequests).includes( + requestID.toString(), + ) + ) { + if (parsed.response) { + // Return multiple results as a list, single result as a value + const methodInfo = IoTScapeServices.getFunctionInfo( + IoTScapeServices._awaitingRequests[requestID].service, + IoTScapeServices._awaitingRequests[requestID].function, + ); + const responseType = methodInfo.returns.type; + + try { + if (responseType.length > 1) { + IoTScapeServices._awaitingRequests[requestID].resolve( + parsed.response, + ); + } else { + IoTScapeServices._awaitingRequests[requestID].resolve( + ...parsed.response, + ); + } + } catch (error) { + logger.log("IoTScape response invalid: " + error); } + + delete IoTScapeServices._awaitingRequests[requestID]; } + } +}; - if (parsed.event) { - // Find listening clients - const clientsByID = IoTScapeServices._listeningClients[parsed.service] || - {}; - const clients = clientsByID[parsed.id.toString()] || []; +const _handleEvent = function (parsed, remote) { + // Handle special message types, but only if they come from the device + if ( + Object.keys(IoTScapeServices._specialMessageTypes).includes( + parsed.event.type, + ) && + IoTScapeDevices._services[parsed.service][parsed.id].address == + remote.address && + IoTScapeDevices._services[parsed.service][parsed.id].port == remote.port + ) { + IoTScapeServices._specialMessageTypes[parsed.event.type](parsed); + } else { + IoTScapeServices.sendMessageToListeningClients( + parsed.service, + parsed.id.toString(), + parsed.event.type, + { ...parsed.event.args }, + ); + } +}; - // Send responses - clients.forEach((client) => { - client.sendMessage(parsed.event.type, parsed.event.args); +/** + * Send a NetsBlox message to clients listening to a device + * @param {String} service Name of service + * @param {String} id ID of device + * @param {String} type Message type + * @param {Object} content Contents of message + */ +IoTScapeServices.sendMessageToListeningClients = function ( + service, + id, + type, + content, +) { + // Find listening clients + const clientsByID = IoTScapeServices._listeningClients[service] || {}; + const clients = clientsByID[id] || []; + + logger.log("Sending message to clients: " + JSON.stringify(clients)); + + if (type == "device command") { + // Send command directly + clients.forEach((client) => { + client.sendMessage(type, { service, device: id, ...content }); + }); + } else { + // Currently not used, but could be used to return device responses + clients.forEach((client) => { + client.sendMessage("device message", { + service, + device: id, + message: IoTScapeDevices.deviceEncrypt( + service, + id, + [type, ...Object.values(content)].join(" "), + ), }); - } - }); + }); + } +}; + +IoTScapeServices.start = function (socket) { + IoTScapeServices.socket = socket; + + // Handle incoming responses + IoTScapeServices.socket.on("message", _handleMessage); // Request heartbeats on interval async function heartbeat(service, device) { @@ -345,7 +583,7 @@ IoTScapeServices.start = function (socket) { try { // Send heartbeat request, will timeout if device does not respond - await IoTScapeServices.call(service, "heartbeat", device); + await IoTScapeServices.call(service, "heartbeat", device, null); } catch (e) { // Remove device if it didn't respond return false; @@ -356,14 +594,14 @@ IoTScapeServices.start = function (socket) { setInterval(async () => { for (const service of IoTScapeServices.getServices()) { - IoTScapeServices.getDevices(service).forEach(async (device) => { + IoTScapeDevices.getDevices(service).forEach(async (device) => { if (!(await heartbeat(service, device))) { // Send second heartbeat request, will timeout if device does not respond if (!(await heartbeat(service, device))) { logger.log( `${service}:${device} did not respond to heartbeat, removing from active devices`, ); - IoTScapeServices.removeDevice(service, device); + IoTScapeDevices.removeDevice(service, device); } } }); diff --git a/src/procedures/iotscape/iotscape.js b/src/procedures/iotscape/iotscape.js index f5decd98..d1f3a54e 100644 --- a/src/procedures/iotscape/iotscape.js +++ b/src/procedures/iotscape/iotscape.js @@ -17,6 +17,9 @@ const logger = require("../utils/logger")("iotscape"); const Storage = require("../../storage"); const ServiceEvents = require("../utils/service-events"); const IoTScapeServices = require("./iotscape-services"); +const IoTScapeDevices = require("./iotscape-devices"); +const Filter = require("bad-words"), + filter = new Filter(); const normalizeServiceName = (name) => name.toLowerCase().replace(/[^a-z0-9]/g, ""); @@ -54,7 +57,7 @@ IoTScape.getDevices = function (service) { throw new Error("Service not found"); } - return IoTScapeServices.getDevices(service); + return IoTScapeDevices.getDevices(service); }; /** @@ -74,6 +77,18 @@ IoTScape.getMessageTypes = function (service) { return IoTScapeServices.getMessageTypes(service); }; +/** + * List the methods associated with a service + * @param {String} service Name of service to get methods for + */ +IoTScape.getMethods = function (service) { + if (!IoTScapeServices.serviceExists(service)) { + throw new Error("Service not found"); + } + + return IoTScapeServices.getMethods(service); +}; + /** * Make a call to a device as a text command * @param {String} service Name of service to make call to @@ -81,22 +96,83 @@ IoTScape.getMessageTypes = function (service) { * @param {String} command Input to RPC */ IoTScape.send = function (service, id, command) { + return IoTScape._send(service, id, command, this.caller); +}; + +/** + * Internal method for sending a command to a device + * @param {String} service Name of service to make call to + * @param {String} id ID of device to make call to + * @param {String} command Input to RPC + * @param {Object} caller The caller object from the RPC + * @returns + */ +IoTScape._send = function (service, id, command, caller) { + const clientId = caller.clientId; + if (!IoTScapeServices.serviceExists(service)) { throw new Error("Service not found"); } - if (!IoTScapeServices.deviceExists(service, id)) { + if (!IoTScapeDevices.deviceExists(service, id)) { throw new Error("Device not found"); } - let parts = command.split(/\s+/g); + // Relay as-is message to listening clients + IoTScapeServices.sendMessageToListeningClients( + service, + id, + "device command", + { command }, + ); + + let parts = IoTScapeDevices.deviceDecrypt(service, id, command).split(/\s+/g); // Require at least a function name if (parts.length < 1) { throw new Error("Command too short or invalid"); } - return IoTScapeServices.call(service, parts[0], id, ...parts.slice(1)); + // Check for sequence number + let seqNum = -1; + if (parts[0].match(/^[0-9]+$/)) { + seqNum = parseInt(parts[0]); + parts = parts.slice(1); + } + + // Allow for RoboScape-esque "set"/"get" commands to be implemented simpler (e.g. "set speed" becomes "setSpeed" instead of a "set" method) + if (parts.length >= 2) { + // Combine first word "set", "get", and "reset" with the next words if it's a valid method in the service + if (["set", "get", "reset"].includes(parts[0])) { + let methodName = parts[0] + parts[1][0].toUpperCase() + parts[1].slice(1); + if (IoTScapeServices.functionExists(service, methodName)) { + parts = [methodName, ...parts.slice(2)]; + } else { + // Attempt with three words (e.g. "set client rate" becomes "setClientRate" instead of a "set" method) + if (parts.length >= 3) { + methodName = parts[0] + parts[1][0].toUpperCase() + + parts[1].slice(1) + + parts[2][0].toUpperCase() + parts[2].slice(1); + if (IoTScapeServices.functionExists(service, methodName)) { + parts = [methodName, ...parts.slice(3)]; + } + } + } + } + } + + // Check that call will be accepted + if (!IoTScapeDevices.accepts(service, id, clientId, seqNum)) { + return false; + } + + return IoTScapeServices.call( + service, + parts[0], + id, + clientId, + ...parts.slice(1), + ); }; /** @@ -105,26 +181,36 @@ IoTScape.send = function (service, id, command) { * @param {String} definition Service definition * @param {RemoteInfo} remote Remote host information */ -IoTScape._createService = async function (definition, remote) { - let parsed; +IoTScape._createService = async function (definition, remote = null) { + let parsed = null; - try { - parsed = JSON.parse(definition); - } catch (err) { - logger.log("Error parsing IoTScape service: " + err); - return; + // Handle buffer input + if (Buffer.isBuffer(definition)) { + definition = definition.toString(); } - // Ignore request messages sent to this method - if (parsed.request) { + if (typeof definition === "string") { + try { + parsed = JSON.parse(definition); + } catch (err) { + logger.log("Error parsing IoTScape service: " + err); + return; + } + } else { + parsed = definition; + } + + // Ignore empty and request messages sent to this method + if (parsed == null || parsed.request) { return; } const name = Object.keys(parsed)[0]; parsed = parsed[name]; - // Verify service definition in message - if (parsed.service == undefined) { + // Verify service definition is in message + if (typeof (parsed.service) == "undefined") { + logger.log("Service definition not found in message"); return; } @@ -135,7 +221,8 @@ IoTScape._createService = async function (definition, remote) { } const serviceInfo = parsed.service; - const methodsInfo = parsed.methods; + const methodsInfo = parsed.methods || {}; + let methods = _generateMethods(methodsInfo); // Validate method names if ( @@ -149,13 +236,16 @@ IoTScape._createService = async function (definition, remote) { } const version = serviceInfo.version; - const id = parsed.id; + const id = parsed.id.trim(); logger.log( `Received definition for service ${name} v${version} from ID ${id}`, ); - let methods = _generateMethods(methodsInfo); + if (!IoTScape._validateServiceStrings(name, id, serviceInfo, methods)) { + logger.log(`Service ${name} rejected due to invalid string`); + return; + } let service = { name: name, @@ -193,71 +283,154 @@ IoTScape._createService = async function (definition, remote) { } else { logger.log(`Service ${name} already exists and is up to date`); } - IoTScapeServices.updateOrCreateServiceInfo(name, parsed, id, remote); + IoTScapeServices.updateOrCreateServiceInfo(name, service, id, remote); +}; + +/** + * Check that strings provided in a service definition are valid and free of profanity + * @returns {boolean} Were the strings for this service considered valid + */ +IoTScape._validateServiceStrings = function (name, id, serviceInfo, methods) { + // Validate service name + if ( + !isValidServiceName(name) || name.replace(/[^a-zA-Z0-9]/g, "") !== name || + filter.isProfane(name.replace(/[A-Z]/g, " $&")) + ) { + logger.log(`Service name ${name} rejected`); + return false; + } + + if (id == "" || filter.isProfane(id.replace(/[A-Z]/g, " $&"))) { + logger.log("ID invalid"); + return false; + } + + // Additional profanity checks + if ( + filter.isProfane(serviceInfo.description) || + methods.map((method) => method.name).some((name) => + !isValidRPCName(name) || filter.isProfane(name) + ) || + methods.map((method) => method.documentation).some((doc) => + filter.isProfane(doc) + ) + ) { + logger.log(`Definition for service ${name} rejected`); + return false; + } + + return true; }; +// Methods used for all device services but not included in definitions +const _defaultMethods = [{ + name: "getDevices", + documentation: "Get a list of device IDs for this service", + arguments: [], + returns: { + documentation: "", + type: ["void"], + }, +}, { + name: "listen", + documentation: "Register for receiving messages from the given id", + arguments: [{ + name: "id", + optional: false, + documentation: "ID of device to listen to messages from", + }], + returns: { + documentation: "", + type: ["void"], + }, +}, { + name: "send", + documentation: "Send a text-based message to the service", + arguments: [{ + name: "id", + optional: false, + documentation: "ID of device to send request to", + }, { + name: "command", + optional: false, + documentation: "Request to send to device", + }], + returns: { + documentation: "", + type: ["any"], + }, +}, { + name: "getMessageTypes", + documentation: "Register for receiving messages from the given id", + arguments: [], + returns: { + documentation: "", + type: ["array"], + }, +}, { + name: "getMethods", + documentation: "Get methods associated with this service", + arguments: [], + returns: { + documentation: "", + type: ["array"], + }, +}]; + /** * Creates definitions for methods of an incoming service * @param {Object} methodsInfo Methods from parsed JSON data */ function _generateMethods(methodsInfo) { - // Add getDevices and listen methods by default - let methods = [ - { - name: "getDevices", - documentation: "Get a list of device IDs for this service", - arguments: [], - returns: { - documentation: "", - type: ["void"], - }, - }, - { - name: "listen", - documentation: "Register for receiving messages from the given id", - arguments: [{ - name: "id", - optional: false, - documentation: "ID of device to send request to", - }], - returns: { - documentation: "", - type: ["void"], - }, - }, - ...Object.keys(methodsInfo).map((methodName) => { - const methodInfo = methodsInfo[methodName]; - - const method = { - name: methodName, - documentation: methodInfo.documentation, - returns: methodInfo.returns, - }; - - method.arguments = methodInfo.params.map((param) => { - let type = param.type === "number" - ? { name: "Number", params: [] } - : null; - return { - name: param.name, - optional: param.optional, - documentation: param.documentation, - type, + if (!methodsInfo) { + logger.error("No methods definition for service"); + return []; + } + try { + // Add default methods first + let methods = [ + ..._defaultMethods, + ...Object.keys(methodsInfo).map((methodName) => { + const methodInfo = methodsInfo[methodName]; + + if (!methodInfo) { + throw new Error("Undefined method " + methodName); + } + + const method = { + name: methodName, + documentation: methodInfo.documentation, + categories: [[]], + returns: methodInfo.returns, }; - }); - - // Add ID argument to all non-getDevices methods - method.arguments = [{ - name: "id", - optional: false, - documentation: "ID of device to send request to", - }, ...method.arguments]; - return method; - }), - ]; - - return methods; + method.arguments = methodInfo.params.map((param) => { + let type = param.type === "number" + ? { name: "Number", params: [] } + : null; + return { + name: param.name, + optional: param.optional, + documentation: param.documentation, + type, + }; + }); + + // Add ID argument to all non-getDevices methods + method.arguments = [{ + name: "id", + optional: false, + documentation: "ID of device to send request to", + }, ...method.arguments]; + + return method; + }), + ]; + return methods; + } catch (error) { + logger.err(error); + return []; + } } /** diff --git a/src/procedures/iotscape/routes.js b/src/procedures/iotscape/routes.js index d6993dcd..f26f4f76 100644 --- a/src/procedures/iotscape/routes.js +++ b/src/procedures/iotscape/routes.js @@ -1,5 +1,10 @@ const express = require("express"); const router = express(); +const logger = require("../utils/logger")("iotscape-routes"); +const IoTScape = require("./iotscape"); +const bodyParser = require("body-parser"); +const IoTScapeDevices = require("./iotscape-devices"); +const IoTScapeServices = require("./iotscape-services"); router.get( "/port", @@ -10,4 +15,45 @@ router.get( }, ); +router.post( + "/announce", + bodyParser.json({ limit: "1mb" }), + async (req, res) => { + logger.info(`HTTP announcement from ${req.ip}`); + IoTScape._createService(req.body); + res.status(200).send("OK"); + }, +); + +router.post( + "/response", + bodyParser.json({ limit: "5mb" }), + async (req, res) => { + if (req.body) { + // Validate fields + if ( + !req.body.request || !req.body.response || + !Array.isArray(req.body.response) + ) { + return res.status(400).send("Invalid request: missing fields"); + } + + if ( + Object.keys(IoTScapeServices._awaitingRequests).includes( + req.body.request, + ) + ) { + IoTScapeServices._awaitingRequests[req.body.request].resolve( + ...req.body.response, + ); + return res.status(200).send("OK"); + } else { + return res.status(400).send("No request found for this response."); + } + } + + res.status(400).send("Invalid request."); + }, +); + module.exports = router; diff --git a/test/procedures/iotscape.spec.js b/test/procedures/iotscape.spec.js index f606e705..072308da 100644 --- a/test/procedures/iotscape.spec.js +++ b/test/procedures/iotscape.spec.js @@ -4,6 +4,7 @@ describe(utils.suiteName(__filename), function () { utils.verifyRPCInterfaces("IoTScape", [ ["getDevices", ["service"]], ["getMessageTypes", ["service"]], + ["getMethods", ["service"]], ["getServices", []], ["send", ["service", "id", "command"]], ]);