From 7f4bbc9d703abc96bb94ca5d2dd3fd445633338d Mon Sep 17 00:00:00 2001 From: Markus Tacker Date: Fri, 20 Oct 2023 17:04:25 +0200 Subject: [PATCH] feat(nrplus): add gateway stream parser --- cdk/resources/NRPlusGateway.ts | 93 +++++++++++-------------- lambda/parseSinkMessages.ts | 43 +++++++----- nrplus/messageStreamParser.spec.ts | 59 ++++++++++++++++ nrplus/messageStreamParser.ts | 108 +++++++++++++++++++++++++++++ nrplus/nrplus-logs.txt | 36 ++++++++++ 5 files changed, 272 insertions(+), 67 deletions(-) create mode 100644 nrplus/messageStreamParser.spec.ts create mode 100644 nrplus/messageStreamParser.ts create mode 100644 nrplus/nrplus-logs.txt diff --git a/cdk/resources/NRPlusGateway.ts b/cdk/resources/NRPlusGateway.ts index 211fe71..a80fe41 100644 --- a/cdk/resources/NRPlusGateway.ts +++ b/cdk/resources/NRPlusGateway.ts @@ -1,11 +1,12 @@ -import { Construct } from 'constructs' -import Iot from 'aws-cdk-lib/aws-iot' +import { Duration, Stack } from 'aws-cdk-lib' import IAM from 'aws-cdk-lib/aws-iam' +import Iot from 'aws-cdk-lib/aws-iot' +import Kinesis, { StreamMode } from 'aws-cdk-lib/aws-kinesis' +import Lambda, { StartingPosition } from 'aws-cdk-lib/aws-lambda' +import { KinesisEventSource } from 'aws-cdk-lib/aws-lambda-event-sources' import Logs from 'aws-cdk-lib/aws-logs' -import Lambda from 'aws-cdk-lib/aws-lambda' -import { Duration, RemovalPolicy, Stack } from 'aws-cdk-lib' +import { Construct } from 'constructs' import type { PackedLambda } from '../backend' -import DynamoDB from 'aws-cdk-lib/aws-dynamodb' export class NRPlusGateway extends Construct { constructor( @@ -20,6 +21,14 @@ export class NRPlusGateway extends Construct { ) { super(parent, 'nrplus-gateway') + const stream = new Kinesis.Stream(this, 'kinesis-stream', { + shardCount: 1, + // streamMode must be set to PROVISIONED when specifying shardCount + streamMode: StreamMode.PROVISIONED, + // Minimum 1 day + retentionPeriod: Duration.days(1), + }) + const topicRuleRole = new IAM.Role(this, 'topicRule', { assumedBy: new IAM.ServicePrincipal('iot.amazonaws.com'), inlinePolicies: { @@ -37,19 +46,28 @@ export class NRPlusGateway extends Construct { }), }, }) + stream.grantWrite(topicRuleRole) - const table = new DynamoDB.Table(this, 'table', { - billingMode: DynamoDB.BillingMode.PAY_PER_REQUEST, - partitionKey: { - name: 'id', - type: DynamoDB.AttributeType.STRING, + new Iot.CfnTopicRule(this, 'sinkRule', { + topicRulePayload: { + sql: `SELECT * FROM '+/nrplus-sink'`, + awsIotSqlVersion: '2016-03-23', + actions: [ + { + kinesis: { + streamName: stream.streamName, + partitionKey: '${topic()}', + roleArn: topicRuleRole.roleArn, + }, + }, + ], + errorAction: { + republish: { + roleArn: topicRuleRole.roleArn, + topic: 'errors', + }, + }, }, - pointInTimeRecovery: true, - removalPolicy: - this.node.tryGetContext('isTest') === true - ? RemovalPolicy.DESTROY - : RemovalPolicy.RETAIN, - timeToLiveAttribute: 'ttl', }) const parseSinkMessages = new Lambda.Function(this, 'lambda', { @@ -64,46 +82,19 @@ export class NRPlusGateway extends Construct { description: 'Parse sink messages', environment: { VERSION: this.node.tryGetContext('version'), - TABLE_NAME: table.tableName, }, initialPolicy: [], logRetention: Logs.RetentionDays.ONE_WEEK, - // Only have one lambda at any given time processing messages reservedConcurrentExecutions: 1, }) - table.grantFullAccess(parseSinkMessages) - - const rule = new Iot.CfnTopicRule(this, 'sinkRule', { - topicRulePayload: { - sql: [ - `SELECT`, - // Lambda rule actions don't support binary payload input - `encode(*, 'base64') AS message,`, - `parse_time("yyyy-MM-dd'T'HH:mm:ss.S'Z'",`, - `timestamp()) as timestamp`, - `FROM '+/nrplus-sink'`, - ].join(' '), - awsIotSqlVersion: '2016-03-23', - actions: [ - { - lambda: { - functionArn: parseSinkMessages.functionArn, - }, - }, - ], - errorAction: { - republish: { - roleArn: topicRuleRole.roleArn, - topic: 'errors', - }, - }, - }, - }) - - parseSinkMessages.addPermission('storeUpdatesRule', { - principal: new IAM.ServicePrincipal('iot.amazonaws.com'), - sourceArn: rule.attrArn, - }) + parseSinkMessages.addEventSource( + new KinesisEventSource(stream, { + startingPosition: StartingPosition.TRIM_HORIZON, + batchSize: 100, + maxBatchingWindow: Duration.seconds(1), + parallelizationFactor: 1, + }), + ) } } diff --git a/lambda/parseSinkMessages.ts b/lambda/parseSinkMessages.ts index 16ca60d..ecd984d 100644 --- a/lambda/parseSinkMessages.ts +++ b/lambda/parseSinkMessages.ts @@ -1,18 +1,29 @@ -const buffer: string[] = [] +import type { KinesisStreamEvent } from 'aws-lambda' +import { parser } from '../nrplus/messageStreamParser.js' -export const handler = async ({ - timestamp, - message, -}: { - message: string // base64 encoded payload "VEVTVDQ=", - timestamp: string // e.g. "2023-10-19T15:19:35.5Z" -}): Promise => { - const decoded = Buffer.from(message, 'base64').toString('utf-8') - console.log( - JSON.stringify({ - timestamp, - message: decoded, - }), - ) - buffer.push(decoded) +const parserInstance = parser() +parserInstance.onMessage((deviceId, message) => { + console.log(JSON.stringify({ deviceId, message })) +}) + +export const handler = async (event: KinesisStreamEvent): Promise => { + const buffer: Record = {} + for (const { + kinesis: { data, partitionKey }, + } of event.Records) { + const message = Buffer.from(data, 'base64').toString('utf-8').trim() + const clientId = partitionKey.split('/')[0] as string // /nrplus-sink + if (buffer[clientId] === undefined) { + buffer[clientId] = [message] + } else { + buffer[clientId]?.push(message) + } + } + + for (const [clientId, lines] of Object.entries(buffer)) { + for (const line of lines.sort(ascending)) + parserInstance.addLine(clientId, line) + } } + +const ascending = (a: string, b: string) => a.localeCompare(b) diff --git a/nrplus/messageStreamParser.spec.ts b/nrplus/messageStreamParser.spec.ts new file mode 100644 index 0000000..56bc949 --- /dev/null +++ b/nrplus/messageStreamParser.spec.ts @@ -0,0 +1,59 @@ +import { describe, it, mock } from 'node:test' +import assert from 'node:assert/strict' +import { readFile } from 'node:fs/promises' +import { parser } from './messageStreamParser.js' + +void describe('messageStreamParser', () => { + void it('should parse sink messages', async () => { + const lines = (await readFile('./nrplus/nrplus-logs.txt', 'utf-8')).split( + '\n', + ) + + const onMessageCallback = mock.fn(() => undefined) + const p = parser() + p.onMessage(onMessageCallback) + for (const line of lines) { + p.addLine('nrplus-gw-jagu', line) + } + + assert.deepEqual(onMessageCallback.mock.calls[0]?.arguments, [ + 'nrplus-gw-jagu', + { + time: '358881336898', + snr: '88', + RSSI: '-60', + len: '83', + type: 'Data SDU', + expectedRXRSSI: '-60', + seqNbr: '366', + networkId: '22', + transmitterId: '40', + receiverId: '38', + sduLastSeenSeqNbr: '1', + sduDataLength: '40', + sduData: '{\\"data\\":\\"Yes, hello\\",\\"modem_temp\\":\\"33\\"}', + ieType: 'none', + }, + ]) + + assert.deepEqual(onMessageCallback.mock.calls[1]?.arguments, [ + 'nrplus-gw-jagu', + { + time: '364412319378', + snr: '96', + RSSI: '-60', + len: '83', + type: 'Data SDU', + expectedRXRSSI: '-60', + seqNbr: '382', + networkId: '22', + transmitterId: '40', + receiverId: '38', + sduLastSeenSeqNbr: '1', + sduDataLength: '40', + sduData: '{\\"data\\":\\"Yes, hello\\",\\"modem_temp\\":\\"33\\"}', + ieType: 'none', + }, + ]) + }) +}) diff --git a/nrplus/messageStreamParser.ts b/nrplus/messageStreamParser.ts new file mode 100644 index 0000000..6336387 --- /dev/null +++ b/nrplus/messageStreamParser.ts @@ -0,0 +1,108 @@ +/* +PCC: Physical Control Channel +PDC: Physical Data Channel +MCS: Modulation and Coding Scheme +SDU: Service Data Unit + +PCC comes before each PDC. +PDC contains the actual MAC PDU/SDU. +The content of the PDUs in here are just fMAC specific. +*/ + +type PDCInfo = { + time: string //e.g. '358881336898' + snr: string //e.g. '88' + RSSI: string //e.g. '-60' + len: string //e.g. '83' + type: string //e.g. 'Data SDU' + expectedRXRSSI: string //e.g. '-60' + seqNbr: string //e.g. '366' + networkId: string //e.g. '22' + transmitterId: string //e.g. '40' + receiverId: string //e.g. '38' + sduLastSeenSeqNbr: string //e.g. '1' + sduDataLength: string //e.g. '40' + sduData: string //e.g. '{\\"data\\":\\"Yes, hello\\",\\"modem_temp\\":\\"33\\"}' + ieType: string //e.g. 'none' +} + +type MessageListener = (deviceId: string, message: PDCInfo) => void + +const StreamParser = () => { + let index = 0 + let lastResult = 0 + const lines: string[] = [] + + const readLines = (regExps: RegExp[]): Record | null => { + const result: RegExpExecArray['groups'][] = [] + for (const regExp of regExps) { + const match = regExp.exec(lines[index] ?? '') + if (match === null) { + //console.warn(`Not matched`, lines[index], index, regExp) + return null + } + result.push(match.groups) + index++ + } + return result.reduce( + (obj, groups) => ({ ...obj, ...groups }), + {} as Record, + ) + } + + const parseLines = () => { + let result: unknown | null = null + while (index++ < lines.length - 1) { + const pdcData = readLines([ + /^PDC received \(time (?