diff --git a/persistence.js b/persistence.js index b239691..79da086 100644 --- a/persistence.js +++ b/persistence.js @@ -7,7 +7,8 @@ var Packet = require('aedes-packet') var QlobberOpts = { wildcard_one: '+', wildcard_some: '#', - separator: '/' + separator: '/', + match_empty_levels: true } function MemoryPersistence () { @@ -18,8 +19,10 @@ function MemoryPersistence () { this._retained = [] // clientId -> topic -> qos this._subscriptions = new Map() + this._sharedSubscriptions = new Map() this._clientsCount = 0 this._trie = new QlobberSub(QlobberOpts) + this._trieShared = new QlobberSub(QlobberOpts) this._outgoing = {} this._incoming = {} this._wills = {} @@ -103,6 +106,42 @@ MemoryPersistence.prototype.addSubscriptions = function (client, subs, cb) { cb(null, client) } +MemoryPersistence.prototype.addSharedSubscription = function (client, sub, cb) { + var stored = this._sharedSubscriptions.get(client.id) + var trie = this._trieShared + + if (!stored) { + stored = new Map() + this._sharedSubscriptions.set(client.id, stored) + } + + trie.add(sub.topic, { + clientId: client.id, + topic: sub.topic, + qos: sub.qos + }) + + stored.set(sub.topic, { qos: sub.qos, lastUpdate: 0 }) + + cb(null, client) +} + +MemoryPersistence.prototype.removeSharedSubscription = function (client, topic, cb) { + var stored = this._sharedSubscriptions.get(client.id) + var trie = this._trieShared + + if (stored) { + trie.remove(topic, { clientId: client.id, topic: topic }) + stored.delete(topic) + } + + if (stored.size === 0) { + this._sharedSubscriptions.delete(client.id) + } + + cb(null, client) +} + MemoryPersistence.prototype.removeSubscriptions = function (client, subs, cb) { var stored = this._subscriptions.get(client.id) var trie = this._trie @@ -148,6 +187,41 @@ MemoryPersistence.prototype.subscriptionsByTopic = function (pattern, cb) { cb(null, this._trie.match(pattern)) } +MemoryPersistence.prototype.nextSharedSubscription = function (topic, group, cb) { + var subs = this._trieShared.match('$share/' + group + '/' + topic) + var stored = this._sharedSubscriptions + + var sub = null + for (let i = 0, len = subs.length; i < len; i++) { + var s = stored.get(subs[i].clientId) + if (!s || !s.get(subs[i].topic)) continue + s = s.get(subs[i].topic) + + if (s.lastUpdate === 0) { + sub = subs[i] + break + } else if (s.lastUpdate < sub.lastUpdate) { + sub = subs[i] + } + } + cb(null, sub) +} + +MemoryPersistence.prototype.updateSharedSubscription = function (client, topic, group, cb) { + var stored = this._sharedSubscriptions.get(client.id) + topic = '$share/' + group + '/' + topic + + if (stored) { + var sub = stored.get(topic) + + if (sub) { + sub.lastUpdate = Date.now() + } + } + + cb(null) +} + MemoryPersistence.prototype.cleanSubscriptions = function (client, cb) { var trie = this._trie var stored = this._subscriptions.get(client.id)