Skip to content

Commit

Permalink
Merge pull request #245 from csuermann/mqtt5
Browse files Browse the repository at this point in the history
fix(connection): Fix race condition which could lead to VSH devices appearing as offline to Alexa
  • Loading branch information
csuermann authored Feb 24, 2024
2 parents 3f0bb79 + be43192 commit 3da4b86
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 134 deletions.
19 changes: 9 additions & 10 deletions MqttClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@ class MqttClient extends EventEmitter {

this.options = {
...options,
reconnectPeriod: 30000,
keepalive: 90,
reconnectPeriod: 60_000, //in milliseconds, interval between two reconnections.
connectTimeout: 30_000, //in milliseconds, time to wait before a CONNACK is received
keepalive: 90, //in seconds
rejectUnauthorized: true,
resubscribe: false,
clean: true,
protocolVersion: 5,
}

this.client = null
Expand All @@ -18,9 +22,9 @@ class MqttClient extends EventEmitter {
connect() {
this.client = mqtt.connect(`mqtts://${this.options.host}`, this.options)

this.client.on('connect', (connack) => {
//console.log('EVENT connect', connack)
this.emit('connect', connack)
this.client.on('connect', (conAck) => {
//console.log('EVENT connect', conAck)
this.emit('connect', conAck)
})

// this.client.on('disconnect', (args) => {
Expand All @@ -37,11 +41,6 @@ class MqttClient extends EventEmitter {
this.emit('close')
})

this.client.on('reconnect', () => {
//console.log('EVENT reconnect')
this.emit('reconnect')
})

this.client.on('error', (error) => {
//console.log('EVENT error', error)
this.emit('error', error)
Expand Down
82 changes: 51 additions & 31 deletions connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ module.exports = function (RED) {
RED.auth.needsPermission('vsh-virtual-device.read'),
(req, res) => {
const connectionNode = RED.nodes.getNode(req.params.nodeId)
res.json({ plan: connectionNode.getPlan() })
res.json({ plan: connectionNode?.getPlan() ?? 'unknown' })
}
)

Expand Down Expand Up @@ -81,6 +81,11 @@ module.exports = function (RED) {
await this.rater.destroy()

if (!this.credentials.thingId) {
this.logger(
'no thingId present while closing vsh-connection',
null,
'warn'
)
return done()
}

Expand All @@ -89,7 +94,7 @@ module.exports = function (RED) {
try {
await this.disconnect()
} catch (e) {
console.log('connection.js:this:on:close::', e)
this.logger('disconnect() failed', e, 'error')
}

this.execCallbackForAll('onDisconnect')
Expand Down Expand Up @@ -124,7 +129,9 @@ module.exports = function (RED) {

if (this.isError) {
fill = 'red'
text = this.errorCode
text = `ERROR: ${this.errorCode}. ${
this.isReconnecting() ? 'Periodically retrying...' : ''
}`
} else if (this.isKilled) {
fill = 'red'
text = this.killedStatusText
Expand Down Expand Up @@ -234,6 +241,9 @@ module.exports = function (RED) {

markShadowAsConnected() {
if (!this.isConnected()) {
this.logger(
`skipping markShadowAsConnected() because isConnected() is false`
)
return false
}

Expand All @@ -250,9 +260,9 @@ module.exports = function (RED) {

markShadowAsConnectedDebounced = debounce(
this.markShadowAsConnected,
7000,
5000,
{
immediate: true,
immediate: false, //to mitigate race condition with LWT setting device shadow to disconnected
}
)

Expand Down Expand Up @@ -588,7 +598,12 @@ module.exports = function (RED) {
return
}

console.warn('CONNECTION KILLED! Reason:', reason || 'undefined')
this.logger(
'CONNECTION KILLED! Reason:',
reason || 'undefined',
null,
'warn'
)
this.isKilled = true
this.killedStatusText = reason ? reason : 'KILLED'
this.isInitializing = false
Expand Down Expand Up @@ -625,6 +640,7 @@ module.exports = function (RED) {
default:
this.logger(
`received service request (${message.operation}) that is not supported by this VSH version. Updating to the latest version might fix this!`,
null,
'warn'
)
}
Expand All @@ -651,7 +667,7 @@ module.exports = function (RED) {
// }
return response.data
} catch (error) {
console.log(error)
this.logger('checkVersion() failed', error, 'error')
throw new Error(
`HTTP Error Response: ${response.status || 'n/a'} ${
response.statusText || 'n/a'
Expand Down Expand Up @@ -710,7 +726,8 @@ module.exports = function (RED) {
return
}
} catch (e) {
this.errorCode = 'version check failed'
this.errorCode =
'version check failed. Ensure internet connectivity and restart the flow'
this.isError = true
this.refreshChildrenNodeStatus()
return this.logger(`version check failed! ${e.message}`, null, 'error')
Expand All @@ -725,6 +742,9 @@ module.exports = function (RED) {
cert: decodeBase64(this.credentials.cert),
ca: decodeBase64(this.credentials.caCert),
clientId: this.credentials.thingId,
log: (message) => {
this.logger('mqtt.js: ' + message, null, 'debug')
},
will: {
topic: `vsh/${this.credentials.thingId}/update`,
payload: JSON.stringify({
Expand All @@ -739,27 +759,28 @@ module.exports = function (RED) {
// register event listeners:

this.mqttClient.on('connect', (_conAck) => {
this.logger(`MQTT: connected to ${options.host}:${options.port}`)
this.stats.connectionCount++
this.logger(
`MQTT: connected to ${options.host}:${options.port}, connection #${this.stats.connectionCount}`
)
this.isError = false
this.refreshChildrenNodeStatus()
this.markShadowAsConnectedDebounced()

if (!this.isSubscribed) {
const topicsToSubscribe = [
`$aws/things/${this.credentials.thingId}/shadow/get/accepted`,
`vsh/${this.credentials.thingId}/+/directive`,
`vsh/service`,
`vsh/version/${VSH_VERSION}/+`,
`vsh/${this.credentials.thingId}/service`,
]

this.logger('MQTT: subscribe to topics', topicsToSubscribe)

this.mqttClient.subscribe(topicsToSubscribe).catch((error) => {
console.error('MQTT: subscription failed', error)
})
}
const topicsToSubscribe = [
`$aws/things/${this.credentials.thingId}/shadow/get/accepted`,
`vsh/${this.credentials.thingId}/+/directive`,
`vsh/service`,
`vsh/version/${VSH_VERSION}/+`,
`vsh/${this.credentials.thingId}/service`,
]

this.logger('MQTT: subscribe to topics', topicsToSubscribe)

this.mqttClient.subscribe(topicsToSubscribe).catch((error) => {
this.logger('MQTT: subscription failed', error, 'error')
})

this.markShadowAsConnectedDebounced()
})

this.mqttClient.on('offline', () => {
Expand All @@ -768,15 +789,13 @@ module.exports = function (RED) {
})

this.mqttClient.on('close', () => {
this.logger('MQTT: connection closed')
this.isSubscribed = false
this.refreshChildrenNodeStatus()
})

this.mqttClient.on('reconnect', () => {
this.logger('MQTT: reconnecting...')
this.refreshChildrenNodeStatus('Reconnecting...')
})

this.mqttClient.on('error', (error) => {
this.logger('MQTT: error', error)
this.isError = true
this.errorCode = error.code
this.refreshChildrenNodeStatus()
Expand Down Expand Up @@ -824,7 +843,7 @@ module.exports = function (RED) {
}
})

this.mqttClient.on('subscribed', (topic, messageObj) => {
this.mqttClient.on('subscribed', (_subscriptions) => {
this.isSubscribed = true
})

Expand All @@ -837,6 +856,7 @@ module.exports = function (RED) {

async disconnect() {
if (this.isDisconnecting) {
this.logger('ignoring disconnect() as already disconnecting')
return
}

Expand Down
Loading

0 comments on commit 3da4b86

Please sign in to comment.