Skip to content

Commit

Permalink
feat(nrplus): add gateway stream parser
Browse files Browse the repository at this point in the history
  • Loading branch information
coderbyheart committed Oct 20, 2023
1 parent 8733267 commit 7f4bbc9
Show file tree
Hide file tree
Showing 5 changed files with 272 additions and 67 deletions.
93 changes: 42 additions & 51 deletions cdk/resources/NRPlusGateway.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { Construct } from 'constructs'
import Iot from 'aws-cdk-lib/aws-iot'
import { Duration, Stack } from 'aws-cdk-lib'
import IAM from 'aws-cdk-lib/aws-iam'
import Iot from 'aws-cdk-lib/aws-iot'
import Kinesis, { StreamMode } from 'aws-cdk-lib/aws-kinesis'
import Lambda, { StartingPosition } from 'aws-cdk-lib/aws-lambda'
import { KinesisEventSource } from 'aws-cdk-lib/aws-lambda-event-sources'
import Logs from 'aws-cdk-lib/aws-logs'
import Lambda from 'aws-cdk-lib/aws-lambda'
import { Duration, RemovalPolicy, Stack } from 'aws-cdk-lib'
import { Construct } from 'constructs'
import type { PackedLambda } from '../backend'
import DynamoDB from 'aws-cdk-lib/aws-dynamodb'

export class NRPlusGateway extends Construct {
constructor(
Expand All @@ -20,6 +21,14 @@ export class NRPlusGateway extends Construct {
) {
super(parent, 'nrplus-gateway')

const stream = new Kinesis.Stream(this, 'kinesis-stream', {
shardCount: 1,
// streamMode must be set to PROVISIONED when specifying shardCount
streamMode: StreamMode.PROVISIONED,
// Minimum 1 day
retentionPeriod: Duration.days(1),
})

const topicRuleRole = new IAM.Role(this, 'topicRule', {
assumedBy: new IAM.ServicePrincipal('iot.amazonaws.com'),
inlinePolicies: {
Expand All @@ -37,19 +46,28 @@ export class NRPlusGateway extends Construct {
}),
},
})
stream.grantWrite(topicRuleRole)

const table = new DynamoDB.Table(this, 'table', {
billingMode: DynamoDB.BillingMode.PAY_PER_REQUEST,
partitionKey: {
name: 'id',
type: DynamoDB.AttributeType.STRING,
new Iot.CfnTopicRule(this, 'sinkRule', {
topicRulePayload: {
sql: `SELECT * FROM '+/nrplus-sink'`,
awsIotSqlVersion: '2016-03-23',
actions: [
{
kinesis: {
streamName: stream.streamName,
partitionKey: '${topic()}',
roleArn: topicRuleRole.roleArn,
},
},
],
errorAction: {
republish: {
roleArn: topicRuleRole.roleArn,
topic: 'errors',
},
},
},
pointInTimeRecovery: true,
removalPolicy:
this.node.tryGetContext('isTest') === true
? RemovalPolicy.DESTROY
: RemovalPolicy.RETAIN,
timeToLiveAttribute: 'ttl',
})

const parseSinkMessages = new Lambda.Function(this, 'lambda', {
Expand All @@ -64,46 +82,19 @@ export class NRPlusGateway extends Construct {
description: 'Parse sink messages',
environment: {
VERSION: this.node.tryGetContext('version'),
TABLE_NAME: table.tableName,
},
initialPolicy: [],
logRetention: Logs.RetentionDays.ONE_WEEK,
// Only have one lambda at any given time processing messages
reservedConcurrentExecutions: 1,
})

table.grantFullAccess(parseSinkMessages)

const rule = new Iot.CfnTopicRule(this, 'sinkRule', {
topicRulePayload: {
sql: [
`SELECT`,
// Lambda rule actions don't support binary payload input
`encode(*, 'base64') AS message,`,
`parse_time("yyyy-MM-dd'T'HH:mm:ss.S'Z'",`,
`timestamp()) as timestamp`,
`FROM '+/nrplus-sink'`,
].join(' '),
awsIotSqlVersion: '2016-03-23',
actions: [
{
lambda: {
functionArn: parseSinkMessages.functionArn,
},
},
],
errorAction: {
republish: {
roleArn: topicRuleRole.roleArn,
topic: 'errors',
},
},
},
})

parseSinkMessages.addPermission('storeUpdatesRule', {
principal: new IAM.ServicePrincipal('iot.amazonaws.com'),
sourceArn: rule.attrArn,
})
parseSinkMessages.addEventSource(
new KinesisEventSource(stream, {
startingPosition: StartingPosition.TRIM_HORIZON,
batchSize: 100,
maxBatchingWindow: Duration.seconds(1),
parallelizationFactor: 1,
}),
)
}
}
43 changes: 27 additions & 16 deletions lambda/parseSinkMessages.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
const buffer: string[] = []
import type { KinesisStreamEvent } from 'aws-lambda'
import { parser } from '../nrplus/messageStreamParser.js'

export const handler = async ({
timestamp,
message,
}: {
message: string // base64 encoded payload "VEVTVDQ=",
timestamp: string // e.g. "2023-10-19T15:19:35.5Z"
}): Promise<void> => {
const decoded = Buffer.from(message, 'base64').toString('utf-8')
console.log(
JSON.stringify({
timestamp,
message: decoded,
}),
)
buffer.push(decoded)
const parserInstance = parser()
parserInstance.onMessage((deviceId, message) => {
console.log(JSON.stringify({ deviceId, message }))
})

export const handler = async (event: KinesisStreamEvent): Promise<void> => {
const buffer: Record<string, string[]> = {}
for (const {
kinesis: { data, partitionKey },
} of event.Records) {
const message = Buffer.from(data, 'base64').toString('utf-8').trim()
const clientId = partitionKey.split('/')[0] as string // <client id>/nrplus-sink
if (buffer[clientId] === undefined) {
buffer[clientId] = [message]
} else {
buffer[clientId]?.push(message)
}
}

for (const [clientId, lines] of Object.entries(buffer)) {
for (const line of lines.sort(ascending))
parserInstance.addLine(clientId, line)
}
}

const ascending = (a: string, b: string) => a.localeCompare(b)
59 changes: 59 additions & 0 deletions nrplus/messageStreamParser.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { describe, it, mock } from 'node:test'
import assert from 'node:assert/strict'
import { readFile } from 'node:fs/promises'
import { parser } from './messageStreamParser.js'

void describe('messageStreamParser', () => {
void it('should parse sink messages', async () => {
const lines = (await readFile('./nrplus/nrplus-logs.txt', 'utf-8')).split(
'\n',
)

const onMessageCallback = mock.fn(() => undefined)
const p = parser()
p.onMessage(onMessageCallback)
for (const line of lines) {
p.addLine('nrplus-gw-jagu', line)
}

assert.deepEqual(onMessageCallback.mock.calls[0]?.arguments, [
'nrplus-gw-jagu',
{
time: '358881336898',
snr: '88',
RSSI: '-60',
len: '83',
type: 'Data SDU',
expectedRXRSSI: '-60',
seqNbr: '366',
networkId: '22',
transmitterId: '40',
receiverId: '38',
sduLastSeenSeqNbr: '1',
sduDataLength: '40',
sduData: '{\\"data\\":\\"Yes, hello\\",\\"modem_temp\\":\\"33\\"}',
ieType: 'none',
},
])

assert.deepEqual(onMessageCallback.mock.calls[1]?.arguments, [
'nrplus-gw-jagu',
{
time: '364412319378',
snr: '96',
RSSI: '-60',
len: '83',
type: 'Data SDU',
expectedRXRSSI: '-60',
seqNbr: '382',
networkId: '22',
transmitterId: '40',
receiverId: '38',
sduLastSeenSeqNbr: '1',
sduDataLength: '40',
sduData: '{\\"data\\":\\"Yes, hello\\",\\"modem_temp\\":\\"33\\"}',
ieType: 'none',
},
])
})
})
108 changes: 108 additions & 0 deletions nrplus/messageStreamParser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
PCC: Physical Control Channel
PDC: Physical Data Channel
MCS: Modulation and Coding Scheme
SDU: Service Data Unit
PCC comes before each PDC.
PDC contains the actual MAC PDU/SDU.
The content of the PDUs in here are just fMAC specific.
*/

type PDCInfo = {
time: string //e.g. '358881336898'
snr: string //e.g. '88'
RSSI: string //e.g. '-60'
len: string //e.g. '83'
type: string //e.g. 'Data SDU'
expectedRXRSSI: string //e.g. '-60'
seqNbr: string //e.g. '366'
networkId: string //e.g. '22'
transmitterId: string //e.g. '40'
receiverId: string //e.g. '38'
sduLastSeenSeqNbr: string //e.g. '1'
sduDataLength: string //e.g. '40'
sduData: string //e.g. '{\\"data\\":\\"Yes, hello\\",\\"modem_temp\\":\\"33\\"}'
ieType: string //e.g. 'none'
}

type MessageListener = (deviceId: string, message: PDCInfo) => void

const StreamParser = () => {
let index = 0
let lastResult = 0
const lines: string[] = []

const readLines = (regExps: RegExp[]): Record<string, unknown> | null => {
const result: RegExpExecArray['groups'][] = []
for (const regExp of regExps) {
const match = regExp.exec(lines[index] ?? '')
if (match === null) {
//console.warn(`Not matched`, lines[index], index, regExp)
return null
}
result.push(match.groups)
index++
}
return result.reduce(
(obj, groups) => ({ ...obj, ...groups }),
{} as Record<string, unknown>,
)
}

const parseLines = () => {
let result: unknown | null = null
while (index++ < lines.length - 1) {
const pdcData = readLines([
/^PDC received \(time (?<time>[0-9]+)\): snr (?<snr>[0-9]+), RSSI (?<RSSI>[-0-9]+), len (?<len>[0-9]+)/,
/^Received data:/,
/^ +Type: +(?<type>.+)/,
/^ +Power control:/,
/^ +Expected RX RSSI level \(dBm\): +(?<expectedRXRSSI>[-0-9]+)/,
/^ +Seq nbr: +(?<seqNbr>[0-9]+)/,
/^ +Network ID: +(?<networkId>[0-9]+)/,
/^ +Transmitter long ID: +(?<transmitterId>[0-9]+)/,
/^ +Receiver long ID: +(?<receiverId>[0-9]+)/,
/^ +SDU last seen seq nbr: +(?<sduLastSeenSeqNbr>[0-9]+)/,
/^ +SDU data length: +(?<sduDataLength>[0-9]+)/,
/^ +SDU data: +(?<sduData>.+)/,
/^ +IE type: +(?<ieType>.+)/,
])
if (pdcData !== null) {
lastResult = index
result = pdcData
}
}
index = lastResult
return result
}

return {
add: (data: string) => {
lines.push(data)
return parseLines()
},
}
}

export const parser = (): {
addLine: (device: string, line: string) => void
onMessage: (fn: MessageListener) => void
} => {
const parser: Record<string, ReturnType<typeof StreamParser>> = {}
const listeners: MessageListener[] = []
return {
addLine: (device, line) => {
if (parser[device] === undefined) {
parser[device] = StreamParser()
}
const maybeResult = parser[device]?.add(line)
if (maybeResult !== null) {
listeners.map((fn) => fn(device, maybeResult as PDCInfo))
}
},
onMessage: (fn) => {
listeners.push(fn)
},
}
}
36 changes: 36 additions & 0 deletions nrplus/nrplus-logs.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
receiver id: 38
MCS 0, TX pwr: -12 dBm
PDC received (time 358881336898): snr 88, RSSI -60, len 83
Received data:
Type: Data SDU
Power control:
Expected RX RSSI level (dBm): -60
Seq nbr: 366
Network ID: 22
Transmitter long ID: 40
Receiver long ID: 38
SDU last seen seq nbr: 1
SDU data length: 40
SDU data: {\"data\":\"Yes, hello\",\"modem_temp\":\"33\"}
IE type: none
PCC received (time 359572555405): status: \"valid - PDC can be received\", snr 83, stf_start_time 359572537298
phy header: short nw id 22, transmitter id 39
receiver id: 38
MCS 0, TX pwr: -12 dBm
PDC received (time 364412319378): snr 96, RSSI -60, len 83
Received data:
Type: Data SDU
Power control:
Expected RX RSSI level (dBm): -60
Seq nbr: 382
Network ID: 22
Transmitter long ID: 40
Receiver long ID: 38
SDU last seen seq nbr: 1
SDU data length: 40
SDU data: {\"data\":\"Yes, hello\",\"modem_temp\":\"33\"}
IE type: none
PCC received (time 365111832209): status: \"valid - PDC can be received\", snr 61, stf_start_time 365111814178
phy header: short nw id 22, transmitter id 39
receiver id: 38
MCS 0, TX pwr: -12 dBm

0 comments on commit 7f4bbc9

Please sign in to comment.