From 3b39b4fd24e8fd3966f59b63ad911c4ccc23ea90 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Mon, 24 Feb 2020 15:08:09 +0100 Subject: [PATCH 1/4] WIP: Shared subscriptions --- persistence.js | 48 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 40 insertions(+), 8 deletions(-) diff --git a/persistence.js b/persistence.js index b239691..820ea09 100644 --- a/persistence.js +++ b/persistence.js @@ -83,8 +83,8 @@ MemoryPersistence.prototype.addSubscriptions = function (client, subs, cb) { for (var i = 0; i < subs.length; i += 1) { var sub = subs[i] - var qos = stored.get(sub.topic) - var hasQoSGreaterThanZero = (qos !== undefined) && (qos > 0) + var _sub = stored.get(sub.topic) + var hasQoSGreaterThanZero = (_sub !== undefined) && (_sub.qos > 0) if (sub.qos > 0) { trie.add(sub.topic, { clientId: client.id, @@ -97,7 +97,7 @@ MemoryPersistence.prototype.addSubscriptions = function (client, subs, cb) { topic: sub.topic }) } - stored.set(sub.topic, sub.qos) + stored.set(sub.topic, { qos: sub.qos }) } cb(null, client) @@ -110,9 +110,9 @@ MemoryPersistence.prototype.removeSubscriptions = function (client, subs, cb) { if (stored) { for (var i = 0; i < subs.length; i += 1) { var topic = subs[i] - var qos = stored.get(topic) - if (qos !== undefined) { - if (qos > 0) { + var _sub = stored.get(topic) + if (_sub !== undefined) { + if (_sub.qos > 0) { trie.remove(topic, { clientId: client.id, topic: topic }) } stored.delete(topic) @@ -134,7 +134,9 @@ MemoryPersistence.prototype.subscriptionsByClient = function (client, cb) { if (stored) { subs = [] for (var topicAndQos of stored) { - subs.push({ topic: topicAndQos[0], qos: topicAndQos[1] }) + var sub = topicAndQos[1] + sub.topic = topicAndQos[0] + subs.push(sub) } } cb(null, subs, client) @@ -148,13 +150,43 @@ MemoryPersistence.prototype.subscriptionsByTopic = function (pattern, cb) { cb(null, this._trie.match(pattern)) } +MemoryPersistence.prototype.nextSharedSubscription = function (topic, group, cb) { + this.subscriptionsByTopic('$share/' + group + '/' + topic, function (subs) { + var sub = null + for (let i = 0, len = subs.length; i < len; i++) { + if (subs[i].lastUpdate === undefined) { + sub = subs[i] + break + } else if (subs[i].lastUpdate < sub.lastUpdate) { + sub = subs[i] + } + } + cb(null, sub) + }) +} + +MemoryPersistence.prototype.updateSharedSubscription = function (client, topic, group, cb) { + var stored = this._subscriptions.get(client.id) + topic = '$share/' + group + '/' + topic + + if (stored) { + var sub = stored.get(topic) + + if (sub) { + stored.set(topic, { qos: stored.qos, lastUpdate: Date.now() }) + } + } + + cb(null) +} + MemoryPersistence.prototype.cleanSubscriptions = function (client, cb) { var trie = this._trie var stored = this._subscriptions.get(client.id) if (stored) { for (var topicAndQos of stored) { - if (topicAndQos[1] > 0) { + if (topicAndQos[1].qos > 0) { var topic = topicAndQos[0] trie.remove(topic, { clientId: client.id, topic: topic }) } From b08f428c0bdf6bb66f0ecc4ba389196aa8b76f5a Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Mon, 24 Feb 2020 16:36:49 +0100 Subject: [PATCH 2/4] fix: Handle errors --- persistence.js | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/persistence.js b/persistence.js index 820ea09..e7f06bd 100644 --- a/persistence.js +++ b/persistence.js @@ -7,7 +7,8 @@ var Packet = require('aedes-packet') var QlobberOpts = { wildcard_one: '+', wildcard_some: '#', - separator: '/' + separator: '/', + match_empty_levels: true } function MemoryPersistence () { @@ -151,17 +152,20 @@ MemoryPersistence.prototype.subscriptionsByTopic = function (pattern, cb) { } MemoryPersistence.prototype.nextSharedSubscription = function (topic, group, cb) { - this.subscriptionsByTopic('$share/' + group + '/' + topic, function (subs) { - var sub = null - for (let i = 0, len = subs.length; i < len; i++) { - if (subs[i].lastUpdate === undefined) { - sub = subs[i] - break - } else if (subs[i].lastUpdate < sub.lastUpdate) { - sub = subs[i] + this.subscriptionsByTopic('$share/' + group + '/' + topic, function (err, subs) { + if (err) cb(err, null) + else { + var sub = null + for (let i = 0, len = subs.length; i < len; i++) { + if (subs[i].lastUpdate === undefined) { + sub = subs[i] + break + } else if (subs[i].lastUpdate < sub.lastUpdate) { + sub = subs[i] + } } + cb(null, sub) } - cb(null, sub) }) } From b0c33e43cbdb236722e0ca457dd6a37ad0330cf7 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Mon, 24 Feb 2020 16:50:43 +0100 Subject: [PATCH 3/4] AddShared and RemoveSHared subscriptions methods --- persistence.js | 82 +++++++++++++++++++++++++++++++++++--------------- 1 file changed, 57 insertions(+), 25 deletions(-) diff --git a/persistence.js b/persistence.js index e7f06bd..ee9106a 100644 --- a/persistence.js +++ b/persistence.js @@ -19,8 +19,10 @@ function MemoryPersistence () { this._retained = [] // clientId -> topic -> qos this._subscriptions = new Map() + this._sharedSubscriptions = new Map() this._clientsCount = 0 this._trie = new QlobberSub(QlobberOpts) + this._trieShared = new QlobberSub(QlobberOpts) this._outgoing = {} this._incoming = {} this._wills = {} @@ -84,8 +86,8 @@ MemoryPersistence.prototype.addSubscriptions = function (client, subs, cb) { for (var i = 0; i < subs.length; i += 1) { var sub = subs[i] - var _sub = stored.get(sub.topic) - var hasQoSGreaterThanZero = (_sub !== undefined) && (_sub.qos > 0) + var qos = stored.get(sub.topic) + var hasQoSGreaterThanZero = (qos !== undefined) && (qos > 0) if (sub.qos > 0) { trie.add(sub.topic, { clientId: client.id, @@ -98,7 +100,43 @@ MemoryPersistence.prototype.addSubscriptions = function (client, subs, cb) { topic: sub.topic }) } - stored.set(sub.topic, { qos: sub.qos }) + stored.set(sub.topic, sub.qos) + } + + cb(null, client) +} + +MemoryPersistence.prototype.addSharedSubscription = function (client, sub, cb) { + var stored = this._sharedSubscriptions.get(client.id) + var trie = this._trieShared + + if (!stored) { + stored = new Map() + this._sharedSubscriptions.set(client.id, stored) + } + + trie.add(sub.topic, { + clientId: client.id, + topic: sub.topic, + qos: sub.qos + }) + + stored.set(sub.topic, { qos: sub.qos, lastUpdate: 0 }) + + cb(null, client) +} + +MemoryPersistence.prototype.removeSharedSubscriptions = function (client, topic, cb) { + var stored = this._sharedSubscriptions.get(client.id) + var trie = this._trieShared + + if (stored) { + trie.remove(topic, { clientId: client.id, topic: topic }) + stored.delete(topic) + } + + if (stored.size === 0) { + this._sharedSubscriptions.delete(client.id) } cb(null, client) @@ -111,9 +149,9 @@ MemoryPersistence.prototype.removeSubscriptions = function (client, subs, cb) { if (stored) { for (var i = 0; i < subs.length; i += 1) { var topic = subs[i] - var _sub = stored.get(topic) - if (_sub !== undefined) { - if (_sub.qos > 0) { + var qos = stored.get(topic) + if (qos !== undefined) { + if (qos > 0) { trie.remove(topic, { clientId: client.id, topic: topic }) } stored.delete(topic) @@ -135,9 +173,7 @@ MemoryPersistence.prototype.subscriptionsByClient = function (client, cb) { if (stored) { subs = [] for (var topicAndQos of stored) { - var sub = topicAndQos[1] - sub.topic = topicAndQos[0] - subs.push(sub) + subs.push({ topic: topicAndQos[0], qos: topicAndQos[1] }) } } cb(null, subs, client) @@ -152,25 +188,21 @@ MemoryPersistence.prototype.subscriptionsByTopic = function (pattern, cb) { } MemoryPersistence.prototype.nextSharedSubscription = function (topic, group, cb) { - this.subscriptionsByTopic('$share/' + group + '/' + topic, function (err, subs) { - if (err) cb(err, null) - else { - var sub = null - for (let i = 0, len = subs.length; i < len; i++) { - if (subs[i].lastUpdate === undefined) { - sub = subs[i] - break - } else if (subs[i].lastUpdate < sub.lastUpdate) { - sub = subs[i] - } - } - cb(null, sub) + var subs = this._trieShared.match('$share/' + group + '/' + topic) + var sub = null + for (let i = 0, len = subs.length; i < len; i++) { + if (subs[i].lastUpdate === 0) { + sub = subs[i] + break + } else if (subs[i].lastUpdate < sub.lastUpdate) { + sub = subs[i] } - }) + } + cb(null, sub) } MemoryPersistence.prototype.updateSharedSubscription = function (client, topic, group, cb) { - var stored = this._subscriptions.get(client.id) + var stored = this._sharedSubscriptions.get(client.id) topic = '$share/' + group + '/' + topic if (stored) { @@ -190,7 +222,7 @@ MemoryPersistence.prototype.cleanSubscriptions = function (client, cb) { if (stored) { for (var topicAndQos of stored) { - if (topicAndQos[1].qos > 0) { + if (topicAndQos[1] > 0) { var topic = topicAndQos[0] trie.remove(topic, { clientId: client.id, topic: topic }) } From 305788bd569bd8630f7bd3bfc5f73b9a5e598eff Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Mon, 24 Feb 2020 18:12:24 +0100 Subject: [PATCH 4/4] fix next shared --- persistence.js | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/persistence.js b/persistence.js index ee9106a..79da086 100644 --- a/persistence.js +++ b/persistence.js @@ -126,7 +126,7 @@ MemoryPersistence.prototype.addSharedSubscription = function (client, sub, cb) { cb(null, client) } -MemoryPersistence.prototype.removeSharedSubscriptions = function (client, topic, cb) { +MemoryPersistence.prototype.removeSharedSubscription = function (client, topic, cb) { var stored = this._sharedSubscriptions.get(client.id) var trie = this._trieShared @@ -189,12 +189,18 @@ MemoryPersistence.prototype.subscriptionsByTopic = function (pattern, cb) { MemoryPersistence.prototype.nextSharedSubscription = function (topic, group, cb) { var subs = this._trieShared.match('$share/' + group + '/' + topic) + var stored = this._sharedSubscriptions + var sub = null for (let i = 0, len = subs.length; i < len; i++) { - if (subs[i].lastUpdate === 0) { + var s = stored.get(subs[i].clientId) + if (!s || !s.get(subs[i].topic)) continue + s = s.get(subs[i].topic) + + if (s.lastUpdate === 0) { sub = subs[i] break - } else if (subs[i].lastUpdate < sub.lastUpdate) { + } else if (s.lastUpdate < sub.lastUpdate) { sub = subs[i] } } @@ -209,7 +215,7 @@ MemoryPersistence.prototype.updateSharedSubscription = function (client, topic, var sub = stored.get(topic) if (sub) { - stored.set(topic, { qos: stored.qos, lastUpdate: Date.now() }) + sub.lastUpdate = Date.now() } }