From 5d4158be52b78f5918dfd067935a98c3fcb3d12f Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 27 Apr 2018 14:48:22 +0300 Subject: [PATCH] Created build version --- lib/config.lua | 20 + lib/flushdb.lua | 18 +- lib/handler.lua | 50 ++- lib/headers.lua | 4 +- lib/post-handler.lua | 5 - lib/redis.lua | 819 +++++++++++++++++++++++++++++------------ lib/wiola.lua | 852 +++++++++++++++++++++++++++++++++++-------- 7 files changed, 1349 insertions(+), 419 deletions(-) diff --git a/lib/config.lua b/lib/config.lua index 31f9c20..371a833 100644 --- a/lib/config.lua +++ b/lib/config.lua @@ -51,6 +51,11 @@ local wiolaConf = { --}, challengeCallback = nil, authCallback = nil + }, + metaAPI = { + session = false, + subscription = false, + registration = false } } @@ -117,6 +122,21 @@ function _M.config(config) wiolaConf.wampCRA.authCallback = config.wampCRA.authCallback end end + + if config.metaAPI then + + if config.metaAPI.session ~= nil then + wiolaConf.metaAPI.session = config.metaAPI.session + end + + if config.metaAPI.subscription ~= nil then + wiolaConf.metaAPI.subscription = config.metaAPI.subscription + end + + if config.metaAPI.registration ~= nil then + wiolaConf.metaAPI.registration = config.metaAPI.registration + end + end end return _M diff --git a/lib/flushdb.lua b/lib/flushdb.lua index ae6a0c3..324fc9d 100644 --- a/lib/flushdb.lua +++ b/lib/flushdb.lua @@ -1,16 +1,14 @@ --- --- Project: wiola --- User: Konstantin Burkalev --- Date: 06.04.17 --- +--- +--- Project: wiola +--- User: Konstantin Burkalev +--- Date: 06.04.17 +--- local _M = {} --- --- Cleans up all wiola sessions data in redis store --- --- redis - Redis instance on which to operate --- +--- +--- Cleans up all wiola sessions data in redis store +--- function _M.flushAll() local conf = require("wiola.config").config() diff --git a/lib/handler.lua b/lib/handler.lua index b6b6798..b36e96d 100644 --- a/lib/handler.lua +++ b/lib/handler.lua @@ -5,8 +5,9 @@ -- local wsServer = require "resty.websocket.server" local wiola = require "wiola" +local webSocket, wampServer, ok, err, bytes -local webSocket, err = wsServer:new({ +webSocket, err = wsServer:new({ timeout = tonumber(ngx.var.wiola_socket_timeout, 10) or 100, max_payload_len = tonumber(ngx.var.wiola_max_payload_len, 10) or 65535 }) @@ -15,22 +16,22 @@ if not webSocket then return ngx.exit(444) end -local wampServer, err = wiola:new() +wampServer, err = wiola:new() if not wampServer then return ngx.exit(444) end local sessionId, dataType = wampServer:addConnection(ngx.var.connection, ngx.header["Sec-WebSocket-Protocol"]) -local function removeConnection(premature, sessionId) +local function removeConnection(_, sessId) local config = require("wiola.config").config() local store = require('wiola.stores.' .. config.store) - local ok, err = store:init(config.storeConfig) + ok, err = store:init(config) if not ok then else - store:removeSession(sessionId) + store:removeSession(sessId) end end @@ -38,16 +39,37 @@ local function removeConnectionWrapper() removeConnection(true, sessionId) end -local ok, err = ngx.on_abort(removeConnectionWrapper) +ok, err = ngx.on_abort(removeConnectionWrapper) if not ok then ngx.exit(444) end while true do - local cliData, cliErr = wampServer:getPendingData(sessionId) + local cliData, data, typ, hflags + + hflags = wampServer:getHandlerFlags(sessionId) + if hflags ~= nil then + if hflags.sendLast == true then + cliData = wampServer:getPendingData(sessionId, true) + + if dataType == 'binary' then + bytes, err = webSocket:send_binary(cliData) + else + bytes, err = webSocket:send_text(cliData) + end + + if not bytes then + end + end + + if hflags.close == true then + ngx.timer.at(0, removeConnection, sessionId) + return ngx.exit(444) + end + end + cliData = wampServer:getPendingData(sessionId) while cliData ~= ngx.null do - local bytes, err if dataType == 'binary' then bytes, err = webSocket:send_binary(cliData) else @@ -57,7 +79,7 @@ while true do if not bytes then end - cliData, cliErr = wampServer:getPendingData(sessionId) + cliData = wampServer:getPendingData(sessionId) end if webSocket.fatal then @@ -65,18 +87,18 @@ while true do return ngx.exit(444) end - local data, typ, err = webSocket:recv_frame() + data, typ = webSocket:recv_frame() if not data then - local bytes, err = webSocket:send_ping() + bytes, err = webSocket:send_ping() if not bytes then ngx.timer.at(0, removeConnection, sessionId) return ngx.exit(444) end elseif typ == "close" then - local bytes, err = webSocket:send_close(1000, "Closing connection") + bytes, err = webSocket:send_close(1000, "Closing connection") if not bytes then return end @@ -86,13 +108,13 @@ while true do elseif typ == "ping" then - local bytes, err = webSocket:send_pong() + bytes, err = webSocket:send_pong() if not bytes then ngx.timer.at(0, removeConnection, sessionId) return ngx.exit(444) end - elseif typ == "pong" then +-- elseif typ == "pong" then elseif typ == "text" then -- Received something texty wampServer:receiveData(sessionId, data) diff --git a/lib/headers.lua b/lib/headers.lua index 98dbe25..6c60e2c 100644 --- a/lib/headers.lua +++ b/lib/headers.lua @@ -6,8 +6,8 @@ ngx.header["Server"] = "wiola/Lua v0.6.0" -function has(tab, val) - for index, value in ipairs (tab) do +local has = function(tab, val) + for _, value in ipairs (tab) do if value == val then return true end diff --git a/lib/post-handler.lua b/lib/post-handler.lua index ef7034f..3089f5c 100644 --- a/lib/post-handler.lua +++ b/lib/post-handler.lua @@ -8,11 +8,6 @@ local wiola = require "wiola" local wampServer = wiola:new() local realm = "testRealm" -local redisOk, redisErr = wampServer:setupRedis("unix:/tmp/redis.sock") -if not redisOk then - return ngx.exit(444) -end - ngx.req.read_body() local req = ngx.req.get_body_data() diff --git a/lib/redis.lua b/lib/redis.lua index 8088f1c..bcb5e28 100644 --- a/lib/redis.lua +++ b/lib/redis.lua @@ -9,17 +9,127 @@ local _M = {} local redis local config --- Format NUMBER for using in strings +--- Format NUMBER for using in strings local formatNumber = function(n) return string.format("%.0f", n) end +--- +--- Return index of obj in array t +--- +--- @param t table array table +--- @param obj any object to search +--- @return index of obj or -1 if not found +--------------------------------------------------- +local arrayIndexOf = function(t, obj) + if type(t) == 'table' then + for i = 1, #t do + if t[i] == obj then + return i + end + end --- --- Initialize store connection --- --- config - store configuration --- + return -1 + else + error("table.indexOf expects table for first argument, " .. type(t) .. " given") + end +end + +--- +--- Find URI in pattern based (prefix and wildcard) uri list +--- +--- +--- @param uriList table URI list (RPCs or Topics) +--- @param uri string Uri to find +--- @param all boolean Return all matches or just first +--- @return table array of matched URIs +--- +local findPatternedUri = function(uriList, uri, all) + local matchedUris = {} + + local comp = function(p1,p2) + local _, p1c, p2c + + _, p1c = string.gsub(p1, "%.", "") + _, p2c = string.gsub(p2, "%.", "") + + if p1c > p2c then -- reverse sort + return true + else + return false + end + end + table.sort(uriList, comp) + + -- trying to find prefix matched uri + for _, value in ipairs(uriList) do + if string.match(uri, "^" .. string.gsub(value, "%.", "%%.") .. "%.") then + if all then + table.insert(matchedUris, value) + else + return { value } + end + end + end + + local compWldCrd + compWldCrd = function(p1,p2) + local _, p1c, p2c, p1v, p2v, p1dots, p2dots, p1l, p2l + + p1l = string.len(p1) + p2l = string.len(p2) + + if p1l == 0 and p2l > 0 then + return false + elseif p2l == 0 and p1l > 0 then + return true + elseif p1l == 0 and p2l == 0 then + return true + end + + p1dots = string.find(p1, "..", 1, true) or p1l + p2dots = string.find(p2, "..", 1, true) or p2l + p1v = string.sub(p1, 1, p1dots) + p2v = string.sub(p2, 1, p2dots) + _, p1c = string.gsub(p1v, "%.", "") + _, p2c = string.gsub(p2v, "%.", "") + + if p1c > p2c then -- reverse sort + return true + elseif p1c < p2c then + return false + else + return compWldCrd(string.sub(p1, p1dots+2), string.sub(p2, p2dots+2)) + end + end + table.sort(uriList, compWldCrd) + + -- trying to find wildcard matched uri + for _, value in ipairs(uriList) do + local replUri, c = string.gsub(value, "%.%.", ".[0-9a-zA-Z_]+.") + + if c ~= nil then -- it's wildcard uri + local re = "^" .. string.gsub(replUri, "%.", "%%.") .. "$" + + if string.match(uri, re) then + if all then + table.insert(matchedUris, value) + else + return { value } + end + end + end + end + + return matchedUris +end + +--- +--- Initialize store connection +--- +--- @param cfg table store configuration +--- @return boolean, string is Ok flag, error description +--- function _M:init(cfg) local redisOk, redisErr @@ -27,22 +137,24 @@ function _M:init(cfg) redis = redisLib:new() config = cfg - if config.port == nil then - redisOk, redisErr = redis:connect(config.host) + if config.storeConfig.port == nil then + redisOk, redisErr = redis:connect(config.storeConfig.host) else - redisOk, redisErr = redis:connect(config.host, config.port) + redisOk, redisErr = redis:connect(config.storeConfig.host, config.storeConfig.port) end - if redisOk and config.db ~= nil then - redis:select(config.db) + if redisOk and config.storeConfig.db ~= nil then + redis:select(config.storeConfig.db) end return redisOk, redisErr end --- --- Generate unique Id --- +--- +--- Generate unique Id +--- +--- @return number unique Id +--- function _M:getRegId() local regId local max = 2 ^ 53 @@ -54,55 +166,58 @@ function _M:getRegId() repeat regId = math.random(max) -- regId = math.random(100000000000000) - until redis:sismember("wiolaIds", regId) + until redis:sismember("wiolaIds", formatNumber(regId)) return regId end --- --- Add new session Id to active list --- --- regId - session registration Id --- session - Session information --- +--- +--- Add new session Id to active list +--- +--- @param regId number session registration Id +--- @param session table Session information +--- function _M:addSession(regId, session) - session.sessId = formatNumber(session.sessId) - redis:sadd("wiolaIds", regId) + redis:sadd("wiolaIds", formatNumber(regId)) redis:hmset("wiSes" .. formatNumber(regId), session) - end --- --- Get session info --- --- regId - session registration Id --- +--- +--- Get session info +--- +--- @param regId number session registration Id +--- @return table session object or nil +--- function _M:getSession(regId) - local session = redis:array_to_hash(redis:hgetall("wiSes" .. formatNumber(regId))) - session.isWampEstablished = tonumber(session.isWampEstablished) - session.sessId = tonumber(session.sessId) - return session + local sessArr = redis:hgetall("wiSes" .. formatNumber(regId)) + if #sessArr > 0 then + local session = redis:array_to_hash(sessArr) + session.isWampEstablished = tonumber(session.isWampEstablished) + session.sessId = tonumber(session.sessId) + return session + else + return nil + end end --- --- Change session info --- --- regId - session registration Id --- session - Session information --- +--- +--- Change session info +--- +--- @param regId number session registration Id +--- @param session table Session information +--- function _M:changeSession(regId, session) session.isWampEstablished = formatNumber(session.isWampEstablished) session.sessId = formatNumber(session.sessId) redis:hmset("wiSes" .. formatNumber(regId), session) - end --- --- Remove session data from runtime store --- --- regId - session registration Id --- +--- +--- Remove session data from runtime store +--- +--- @param regId number session registration Id +--- function _M:removeSession(regId) local regIdStr = formatNumber(regId) @@ -113,6 +228,7 @@ function _M:removeSession(regId) for k, v in pairs(subscriptions) do redis:srem("wiRealm" .. session.realm .. "Sub" .. k .. "Sessions", regIdStr) + redis:del("wiRealm" .. session.realm .. "Sub" .. k .. "Session" .. regIdStr) if redis:scard("wiRealm" .. session.realm .. "Sub" .. k .. "Sessions") == 0 then redis:del("wiRealm" .. session.realm .. "Sub" .. k .. "Sessions") redis:hdel("wiRealm" .. session.realm .. "Subs",k) @@ -122,7 +238,7 @@ function _M:removeSession(regId) local rpcs = redis:array_to_hash(redis:hgetall("wiSes" .. regIdStr .. "RPCs")) - for k, v in pairs(rpcs) do + for k, _ in pairs(rpcs) do redis:srem("wiRealm" .. session.realm .. "RPCs",k) redis:del("wiRealm" .. session.realm .. "RPC" .. k) end @@ -141,41 +257,106 @@ function _M:removeSession(regId) redis:srem("wiolaIds",regIdStr) end --- Prepare data for sending to client --- --- session - Session information --- data - data for client --- +--- +--- Get session count in realm +--- +--- @param realm string realm to count sessions +--- @param authroles table optional authroles list +--- @return number, table session count, session Ids array +--- +function _M:getSessionCount(realm, authroles) + local count = 0 + local sessionsIdList = {} + local allSessions = redis:smembers("wiRealm" .. realm .. "Sessions") + + if type(authroles) == 'table' and #authroles > 0 then + + for _, sessId in ipairs(allSessions) do + local sessionInfo = self:getSession(sessId) + + if sessionInfo.authInfo and arrayIndexOf(authroles, sessionInfo.authInfo.authrole) > 0 then + count = count + 1 + table.insert(sessionsIdList, sessId) + end + end + else + count = redis:scard("wiRealm" .. realm .. "Sessions") + sessionsIdList = allSessions + end + + return count, sessionsIdList +end + +--- +--- Prepare data for sending to client +--- +--- @param session table Session information +--- @param data table data for client +--- function _M:putData(session, data) redis:rpush("wiSes" .. formatNumber(session.sessId) .. "Data", data) end --- --- Retrieve data, available for session --- --- regId - session registration Id --- -function _M:getPendingData(regId) - return redis:lpop("wiSes" .. formatNumber(regId) .. "Data") +--- +--- Retrieve data, available for session +--- +--- @param regId number session registration Id +--- @param last boolean return from the end of a queue +--- @return any client data +--- +function _M:getPendingData(regId, last) + if last == true then + return redis:rpop("wiSes" .. formatNumber(regId) .. "Data") + else + return redis:lpop("wiSes" .. formatNumber(regId) .. "Data") + end end --- --- Get Challenge info --- --- regId - session registration Id --- +--- +--- Set connection handler flags for session +--- +--- @param regId number session registration Id +--- @param flags table flags data +--- +function _M:setHandlerFlags(regId, flags) + return redis:hmset("wiSes" .. formatNumber(regId) .. "HandlerFlags", flags) +end + +--- +--- Retrieve connection handler flags, set up for session +--- +--- @param regId number session registration Id +--- @return table flags data +--- +function _M:getHandlerFlags(regId) + local flarr = redis:hgetall("wiSes" .. formatNumber(regId) .. "HandlerFlags") + if #flarr > 0 then + local fl = redis:array_to_hash(flarr) + + return fl + else + return nil + end +end + +--- +--- Get Challenge info +--- +--- @param regId number session registration Id +--- @return table challenge info object +--- function _M:getChallenge(regId) local challenge = redis:array_to_hash(redis:hgetall("wiSes" .. formatNumber(regId) .. "Challenge")) challenge.session = tonumber(challenge.session) return challenge end --- --- Change Challenge info --- --- regId - session registration Id --- challenge - Challenge information --- +--- +--- Change Challenge info +--- +--- @param regId number session registration Id +--- @param challenge table Challenge information +--- function _M:changeChallenge(regId, challenge) if challenge.session then challenge.session = formatNumber(challenge.session) @@ -183,106 +364,242 @@ function _M:changeChallenge(regId, challenge) redis:hmset("wiSes" .. formatNumber(regId) .. "Challenge", challenge) end --- --- Remove Challenge data from runtime store --- --- regId - session registration Id --- +--- +--- Remove Challenge data from runtime store +--- +--- @param regId number session registration Id +--- function _M:removeChallenge(regId) redis:del("wiSes" .. formatNumber(regId) .. "Challenge") end --- --- Add session to realm (creating one if needed) --- --- regId - session registration Id --- realm - session realm --- +--- +--- Add session to realm (creating one if needed) +--- +--- @param regId number session registration Id +--- @param realm string session realm +--- function _M:addSessionToRealm(regId, realm) if redis:sismember("wiolaRealms", realm) == 0 then redis:sadd("wiolaRealms", realm) + self:registerMetaRpc(realm) end redis:sadd("wiRealm" .. realm .. "Sessions", formatNumber(regId)) - end --- --- Get subscription id --- --- realm - realm --- uri - subscription uri --- +--- +--- Get subscription id +--- +--- @param realm string session realm +--- @param uri string subscription uri +--- @return number subscription Id +--- function _M:getSubscriptionId(realm, uri) return tonumber(redis:hget("wiRealm" .. realm .. "Subs", uri)) end --- --- Subscribe session to topic (also create topic if it doesn't exist) --- --- realm - realm --- uri - subscription uri --- regId - session registration Id --- -function _M:subscribeSession(realm, uri, regId) - local subscriptionId = tonumber(redis:hget("wiRealm" .. realm .. "Subs", uri)) +--- +--- Subscribe session to topic (also create topic if it doesn't exist) +--- +--- @param realm string session realm +--- @param uri string subscription uri +--- @param options table subscription options +--- @param regId number session registration Id +--- +function _M:subscribeSession(realm, uri, options, regId) + local subscriptionIdStr = redis:hget("wiRealm" .. realm .. "Subs", uri) + local subscriptionId = tonumber(subscriptionIdStr) + local isNewSubscription = false + local regIdStr = formatNumber(regId) if not subscriptionId then subscriptionId = self:getRegId() - local subscriptionIdStr = formatNumber(subscriptionId) + isNewSubscription = true + subscriptionIdStr = formatNumber(subscriptionId) redis:hset("wiRealm" .. realm .. "Subs", uri, subscriptionIdStr) redis:hset("wiRealm" .. realm .. "RevSubs", subscriptionIdStr, uri) + redis:hset("wiRealm" .. realm .. "Sub" .. uri, "exact", 0, "prefix", 0, "wildcard", 0) end - redis:sadd("wiRealm" .. realm .. "Sub" .. uri .. "Sessions", formatNumber(regId)) + redis:hmset("wiRealm" .. realm .. "Sub" .. uri .. "Session" .. regIdStr, + "subscriptionId", subscriptionIdStr, + "matchPolicy", options.match or "exact") + redis:sadd("wiRealm" .. realm .. "Sub" .. uri .. "Sessions", regIdStr) - return subscriptionId + return subscriptionId, isNewSubscription end --- --- Unsubscribe session from topic (also remove topic if there is no more subscribers) --- --- realm - realm --- subscId - subscription Id --- regId - session registration Id --- --- Returns flag was session subscribed to requested topic --- +--- +--- Unsubscribe session from topic (also remove topic if there is no more subscribers) +--- +--- @param realm string session realm +--- @param subscId number subscription Id +--- @param regId number session registration Id +--- +--- @return boolean, boolean was session unsubscribed from topic, was topic removed +--- function _M:unsubscribeSession(realm, subscId, regId) local subscIdStr = formatNumber(subscId) local regIdStr = formatNumber(regId) local subscr = redis:hget("wiRealm" .. realm .. "RevSubs", subscIdStr) local isSesSubscrbd = redis:sismember("wiRealm" .. realm .. "Sub" .. subscr .. "Sessions", regIdStr) + local wasTopicRemoved = false redis:srem("wiRealm" .. realm .. "Sub" .. subscr .. "Sessions", regIdStr) + redis:del("wiRealm" .. realm .. "Sub" .. subscr .. "Session" .. regIdStr) if redis:scard("wiRealm" .. realm .. "Sub" .. subscr .. "Sessions") == 0 then redis:del("wiRealm" .. realm .. "Sub" .. subscr .. "Sessions") redis:hdel("wiRealm" .. realm .. "Subs", subscr) redis:hdel("wiRealm" .. realm .. "RevSubs", subscIdStr) + redis:del("wiRealm" .. realm .. "Sub" .. subscr) + wasTopicRemoved = true end - return isSesSubscrbd + return isSesSubscrbd, wasTopicRemoved end --- --- Get sessions to deliver event --- --- realm - realm --- uri - subscription uri --- regId - session registration Id --- options - advanced profile options --- +--- +--- Get sessions subscribed to topic +--- +--- @param realm string realm +--- @param subscId number subscription Id +--- @return table array of session Ids subscribed to subscription +--- +function _M:getTopicSessionsBySubId(realm, subscId) + local uri = redis:hget("wiRealm" .. realm .. "RevSubs", formatNumber(subscId)) + if uri ~= ngx.null then + return redis:smembers("wiRealm" .. realm .. "Sub" .. uri .. "Sessions") + else + return nil + end +end + +--- +--- Get count of sessions subscribed to topic +--- +--- @param realm string realm +--- @param subscId number subscription Id +--- @return table array of session Ids subscribed to subscription +--- +function _M:getTopicSessionsCountBySubId(realm, subscId) + local uri = redis:hget("wiRealm" .. realm .. "RevSubs", formatNumber(subscId)) + if uri ~= ngx.null then + return redis:scard("wiRealm" .. realm .. "Sub" .. uri .. "Sessions") + else + return nil + end +end + +--- +--- Get sessions subscribed to topic +--- +--- @param realm string realm +--- @param uri string subscription uri +--- @return table array of session Ids subscribed to topic +--- +function _M:getTopicSessions(realm, uri) + return redis:smembers("wiRealm" .. realm .. "Sub" .. uri .. "Sessions") +end + +--- +--- Get sessions to deliver event +--- +--- @param realm string realm +--- @param uri string subscription uri +--- @param regId number session registration Id +--- @param options table advanced profile options +--- @return table array of session Ids to deliver event +--- function _M:getEventRecipients(realm, uri, regId, options) local regIdStr = formatNumber(regId) local recipients = {} + local details = {} + + local exactSubsIdStr = redis:hget("wiRealm" .. realm .. "Subs", uri) + local exactSubsId = tonumber(exactSubsIdStr) + + if options.disclose_me ~= nil and options.disclose_me == true then + details.publisher = regId + end + + if type(exactSubsId) == "number" and exactSubsId > 0 then + + -- we need to find sessions with exact subscription + local ss = redis:smembers("wiRealm" .. realm .. "Sub" .. uri .. "Sessions") + local exactSessions = {} + + for _, sesValue in ipairs(ss) do + local matchPolicy = redis:hget("wiRealm" .. realm .. "Sub" .. uri .. "Session" .. sesValue, + "matchPolicy") + if matchPolicy == "exact" then + table.insert(exactSessions, sesValue) + end + end + + if #exactSessions > 0 then + + table.insert(recipients, { + subId = exactSubsId, + sessions = self:filterEventRecipients(regIdStr, options, exactSessions), + details = details + }) + end + end + + -- Now lets find all patternBased subscriptions and their sessions + local allSubs = redis:hkeys("wiRealm" .. realm .. "Subs") + local matchedUris = findPatternedUri(allSubs, uri) + + details.topic = uri + + -- now we need to find sessions within matched Subs with pattern based subscription + for _, uriValue in ipairs(matchedUris) do + local ss = redis:smembers("wiRealm" .. realm .. "Sub" .. uriValue .. "Sessions") + local patternSessions = {} + + for _, sesValue in ipairs(ss) do + local matchPolicy = redis:hget("wiRealm" .. realm .. "Sub" .. uriValue .. "Session" .. sesValue, + "matchPolicy") + if matchPolicy ~= "exact" then + table.insert(patternSessions, sesValue) + end + end + + if #patternSessions > 0 then + + table.insert(recipients, { + subId = tonumber(redis:hget("wiRealm" .. realm .. "Subs", uriValue)), + sessions = self:filterEventRecipients(regIdStr, options, patternSessions), + details = details + }) + end + end + + return recipients +end + +--- +--- Filter subscribers in subscription for event +--- +--- @param regIdStr string session registration Id (as string) +--- @param options table advanced profile options +--- @param sessionsIdList table subscribers sessions Id list +--- @return table array of session Ids to deliver event +--- +function _M:filterEventRecipients(regIdStr, options, sessionsIdList) + local recipients + local tmpK = "wiSes" .. regIdStr .. "TmpSetK" local tmpL = "wiSes" .. regIdStr .. "TmpSetL" - redis:sdiffstore(tmpK, "wiRealm" .. realm .. "Sub" .. uri .. "Sessions") + for _, v in ipairs(sessionsIdList) do + redis:sadd(tmpK, formatNumber(v)) + end if options.eligible then -- There is eligible list - for k, v in ipairs(options.eligible) do + for _, v in ipairs(options.eligible) do redis:sadd(tmpL, formatNumber(v)) end @@ -292,7 +609,7 @@ function _M:getEventRecipients(realm, uri, regId, options) if options.eligible_authid then -- There is eligible authid list - for k, v in ipairs(redis:smembers(tmpK)) do + for _, v in ipairs(redis:smembers(tmpK)) do local s = redis:array_to_hash(redis:hgetall("wiSes" .. formatNumber(v))) for i = 1, #options.eligible_authid do @@ -308,7 +625,7 @@ function _M:getEventRecipients(realm, uri, regId, options) if options.eligible_authrole then -- There is eligible authrole list - for k, v in ipairs(redis:smembers(tmpK)) do + for _, v in ipairs(redis:smembers(tmpK)) do local s = redis:array_to_hash(redis:hgetall("wiSes" .. formatNumber(v))) for i = 1, #options.eligible_authrole do @@ -323,7 +640,7 @@ function _M:getEventRecipients(realm, uri, regId, options) end if options.exclude then -- There is exclude list - for k, v in ipairs(options.exclude) do + for _, v in ipairs(options.exclude) do redis:sadd(tmpL, formatNumber(v)) end @@ -333,7 +650,7 @@ function _M:getEventRecipients(realm, uri, regId, options) if options.exclude_authid then -- There is exclude authid list - for k, v in ipairs(redis:smembers(tmpK)) do + for _, v in ipairs(redis:smembers(tmpK)) do local s = redis:array_to_hash(redis:hgetall("wiSes" .. formatNumber(v))) for i = 1, #options.exclude_authid do @@ -349,7 +666,7 @@ function _M:getEventRecipients(realm, uri, regId, options) if options.exclude_authrole then -- There is exclude authrole list - for k, v in ipairs(redis:smembers(tmpK)) do + for _, v in ipairs(redis:smembers(tmpK)) do local s = redis:array_to_hash(redis:hgetall("wiSes" .. formatNumber(v))) for i = 1, #options.exclude_authrole do @@ -364,7 +681,7 @@ function _M:getEventRecipients(realm, uri, regId, options) end if options.exclude_me == nil or options.exclude_me == true then - redis:sadd(tmpL, regId) + redis:sadd(tmpL, regIdStr) redis:sdiffstore(tmpK, tmpK, tmpL) redis:del(tmpL) end @@ -375,81 +692,69 @@ function _M:getEventRecipients(realm, uri, regId, options) return recipients end --- --- Get subscription info --- --- regId - subscription registration Id --- -function _M:getSubscription(regId) - local subscription = redis:array_to_hash(redis:hgetall("wiSes" .. formatNumber(regId))) - subscription.isWampEstablished = tonumber(subscription.isWampEstablished) - return subscription +--- +--- Get subscriptions ids list +--- +--- @param realm string realm +--- @return table array of subscriptions Ids +--- +function _M:getSubscriptions(realm) + local subsIds = { exact = {}, prefix = {}, wildcard = {} } + -- TODO Make count of prefix/wildcard subscriptions + local allSubs = redis:array_to_hash(redis:hgetall("wiRealm" .. realm .. "Subs")) + + --for k, v in pairs(allSubs) do + -- + --end + + return subsIds end --- --- Remove subscription data from runtime store --- --- regId - subscription registration Id --- -function _M:removeSubscription(regId) - local regIdStr = formatNumber(regId) - - local subscription = redis:array_to_hash(redis:hgetall("wiSes" .. regIdStr)) - subscription.realm = subscription.realm or "" +--- +--- Get registered RPC info (if exists) +--- +--- @param realm string realm +--- @param uri string RPC registration uri +--- @return table RPC object +--- +function _M:getRPC(realm, uri) + local rpc = redis:hgetall("wiRealm" .. realm .. "RPC" .. uri) - local subscriptions = redis:array_to_hash(redis:hgetall("wiRealm" .. subscription.realm .. "Subs")) + if #rpc < 2 then -- no exactly matched rpc uri found + local allRPCs = redis:smembers("wiRealm" .. realm .. "RPCs") + local patternRPCs = {} - for k, v in pairs(subscriptions) do - redis:srem("wiRealm" .. subscription.realm .. "Sub" .. k .. "Subscriptions", regIdStr) - if redis:scard("wiRealm" .. subscription.realm .. "Sub" .. k .. "Subscriptions") == 0 then - redis:del("wiRealm" .. subscription.realm .. "Sub" .. k .. "Subscriptions") - redis:hdel("wiRealm" .. subscription.realm .. "Subs",k) - redis:hdel("wiRealm" .. subscription.realm .. "RevSubs",v) + for _, value in ipairs(allRPCs) do + local rp = redis:hget("wiRealm" .. realm .. "RPC" .. value, "matchPolicy") + if rp ~= ngx.null and rp ~= "exact" then + table.insert(patternRPCs, value) + end end + local matchedUri = findPatternedUri(patternRPCs, uri, false)[1] + if matchedUri then + rpc = redis:array_to_hash(redis:hgetall("wiRealm" .. realm .. "RPC" .. matchedUri)) + rpc.options = { procedure = uri } + else + return nil + end + else + rpc = redis:array_to_hash(rpc) end - local rpcs = redis:array_to_hash(redis:hgetall("wiSes" .. regIdStr .. "RPCs")) - - for k, v in pairs(rpcs) do - redis:srem("wiRealm" .. subscription.realm .. "RPCs",k) - redis:del("wiRPC" .. k) - end - - redis:del("wiSes" .. regIdStr .. "RPCs") - redis:del("wiSes" .. regIdStr .. "RevRPCs") - redis:del("wiSes" .. regIdStr .. "Challenge") - - redis:srem("wiRealm" .. subscription.realm .. "Subscriptions", regIdStr) - if redis:scard("wiRealm" .. subscription.realm .. "Subscriptions") == 0 then - redis:srem("wiolaRealms",subscription.realm) - end - - redis:del("wiSes" .. regIdStr .. "Data") - redis:del("wiSes" .. regIdStr) - redis:srem("wiolaIds",regIdStr) -end - --- --- Get registered RPC info (if exists) --- --- realm - realm --- uri - RPC registration uri --- -function _M:getRPC(realm, uri) - local rpc = redis:array_to_hash(redis:hgetall("wiRealm" .. realm .. "RPC" .. uri)) rpc.calleeSesId = tonumber(rpc.calleeSesId) rpc.registrationId = tonumber(rpc.registrationId) return rpc end --- --- Register session RPC --- --- realm - realm --- uri - RPC registration uri --- options - registration options --- regId - session registration Id --- +--- +--- Register session RPC +--- +--- @param realm string realm +--- @param uri string RPC registration uri +--- @param options table registration options +--- @param regId number session registration Id +--- @return number RPC registration Id +--- function _M:registerSessionRPC(realm, uri, options, regId) local registrationId, registrationIdStr local regIdStr = formatNumber(regId) @@ -459,7 +764,10 @@ function _M:registerSessionRPC(realm, uri, options, regId) registrationIdStr = formatNumber(registrationId) redis:sadd("wiRealm" .. realm .. "RPCs", uri) - redis:hmset("wiRealm" .. realm .. "RPC" .. uri, "calleeSesId", regIdStr, "registrationId", registrationIdStr) + redis:hmset("wiRealm" .. realm .. "RPC" .. uri, + "calleeSesId", regIdStr, + "registrationId", registrationIdStr, + "matchPolicy", options.match or "exact") if options.disclose_caller ~= nil and options.disclose_caller == true then redis:hmset("wiRPC" .. uri, "disclose_caller", true) @@ -472,15 +780,63 @@ function _M:registerSessionRPC(realm, uri, options, regId) return registrationId end --- --- Unregister session RPC --- --- realm - realm --- registrationId - RPC registration Id --- regId - session registration Id --- --- Returns flag was session registerd to requested topic --- +--- +--- Register Meta API RPCs, which are defined in config +--- +--- @param realm string realm +--- +function _M:registerMetaRpc(realm) + + local uris = {} + + if config.metaAPI.session == true then + table.insert(uris, 'wamp.session.count') + table.insert(uris, 'wamp.session.list') + table.insert(uris, 'wamp.session.get') + end + + if config.metaAPI.subscription == true then + table.insert(uris, 'wamp.subscription.list') + table.insert(uris, 'wamp.subscription.lookup') + table.insert(uris, 'wamp.subscription.match') + table.insert(uris, 'wamp.subscription.get') + table.insert(uris, 'wamp.subscription.list_subscribers') + table.insert(uris, 'wamp.subscription.count_subscribers') + end + + if config.metaAPI.registration == true then + table.insert(uris, 'wamp.registration.list') + table.insert(uris, 'wamp.registration.lookup') + table.insert(uris, 'wamp.registration.match') + table.insert(uris, 'wamp.registration.get') + table.insert(uris, 'wamp.registration.list_callees') + table.insert(uris, 'wamp.registration.count_callees') + end + + local registrationId, registrationIdStr + for _, uri in ipairs(uris) do + + if redis:sismember("wiRealm" .. realm .. "RPCs", uri) ~= 1 then + registrationId = self:getRegId() + registrationIdStr = formatNumber(registrationId) + + redis:sadd("wiRealm" .. realm .. "RPCs", uri) + redis:hmset("wiRealm" .. realm .. "RPC" .. uri, + "calleeSesId", "0", + "registrationId", registrationIdStr) + end + + end +end + +--- +--- Unregister session RPC +--- +--- @param realm string realm +--- @param registrationId number RPC registration Id +--- @param regId number session registration Id +--- @return table RPC object +--- function _M:unregisterSessionRPC(realm, registrationId, regId) local regIdStr = formatNumber(regId) local registrationIdStr = formatNumber(registrationId) @@ -496,11 +852,12 @@ function _M:unregisterSessionRPC(realm, registrationId, regId) return rpc end --- --- Get invocation info --- --- invocReqId - invocation request Id --- +--- +--- Get invocation info +--- +--- @param invocReqId number invocation request Id +--- @return table Invocation object +--- function _M:getInvocation(invocReqId) local invoc = redis:array_to_hash(redis:hgetall("wiInvoc" .. formatNumber(invocReqId))) invoc.CallReqId = tonumber(invoc.CallReqId) @@ -508,20 +865,21 @@ function _M:getInvocation(invocReqId) return invoc end --- --- Remove invocation --- --- invocReqId - invocation request Id --- +--- +--- Remove invocation +--- +--- @param invocReqId number invocation request Id +--- function _M:removeInvocation(invocReqId) redis:del("wiInvoc" .. formatNumber(invocReqId)) end --- --- Get call info --- --- callReqId - call request Id --- +--- +--- Get call info +--- +--- @param callReqId number call request Id +--- @return table Call object +--- function _M:getCall(callReqId) local call = redis:array_to_hash(redis:hgetall("wiCall" .. formatNumber(callReqId))) call.calleeSesId = tonumber(call.calleeSesId) @@ -529,31 +887,36 @@ function _M:getCall(callReqId) return call end --- --- Remove call --- --- callReqId - call request Id --- +--- +--- Remove call +--- +--- @param callReqId number call request Id +--- function _M:removeCall(callReqId) redis:del("wiCall" .. formatNumber(callReqId)) end --- --- Add RPC Call & invocation --- --- callReqId - call request Id --- callerSessId - caller session registration Id --- invocReqId - invocation request Id --- calleeSessId - callee session registration Id --- +--- +--- Add RPC Call & invocation +--- +--- @param callReqId number call request Id +--- @param callerSessId number caller session registration Id +--- @param invocReqId number invocation request Id +--- @param calleeSessId number callee session registration Id +--- function _M:addCallInvocation(callReqId, callerSessId, invocReqId, calleeSessId) local callReqIdStr = formatNumber(callReqId) local callerSessIdStr = formatNumber(callerSessId) local invocReqIdStr = formatNumber(invocReqId) local calleeSessIdStr = formatNumber(calleeSessId) - redis:hmset("wiCall" .. callReqIdStr, "callerSesId", callerSessIdStr, "calleeSesId", calleeSessIdStr, "wiInvocId", invocReqIdStr) - redis:hmset("wiInvoc" .. invocReqIdStr, "CallReqId", callReqIdStr, "callerSesId", callerSessIdStr) + redis:hmset("wiCall" .. callReqIdStr, + "callerSesId", callerSessIdStr, + "calleeSesId", calleeSessIdStr, + "wiInvocId", invocReqIdStr) + redis:hmset("wiInvoc" .. invocReqIdStr, + "CallReqId", callReqIdStr, + "callerSesId", callerSessIdStr) end return _M diff --git a/lib/wiola.lua b/lib/wiola.lua index e52fa29..e096095 100644 --- a/lib/wiola.lua +++ b/lib/wiola.lua @@ -23,17 +23,25 @@ local wamp_features = { roles = { broker = { features = { - subscriber_blackwhite_listing = true, + pattern_based_subscription = true, publisher_exclusion = true, - publisher_identification = true + publisher_identification = true, + subscriber_blackwhite_listing = true + -- meta api are exposing if they are configured (see below) + --session_meta_api = true, + --subscription_meta_api = true } }, dealer = { features = { - caller_identification = true, - progressive_call_results = true, call_canceling = true, - call_timeout = true + call_timeout = true, + caller_identification = true, + pattern_based_registration = true, + progressive_call_results = true + -- meta api are exposing if they are configured (see below) + --session_meta_api = true, + --registration_meta_api = true } } } @@ -46,6 +54,18 @@ local serializers = { } local store = require('wiola.stores.' .. config.store) +-- Add Meta API features announcements if they are configured +if config.metaAPI.session == true then + wamp_features.roles.broker.features.session_meta_api = true + wamp_features.roles.dealer.features.session_meta_api = true +end +if config.metaAPI.subscription == true then + wamp_features.roles.broker.features.subscription_meta_api = true +end +if config.metaAPI.registration == true then + wamp_features.roles.dealer.features.registration_meta_api = true +end + local WAMP_MSG_SPEC = { HELLO = 1, WELCOME = 2, @@ -73,9 +93,14 @@ local WAMP_MSG_SPEC = { YIELD = 70 } --- Check for a value in table +--- +--- Check for a value in table +--- +--- @param tab table Source table +--- @param val any Value to search +--- local has = function(tab, val) - for index, value in ipairs(tab) do + for _, value in ipairs(tab) do if value == val then return true end @@ -84,14 +109,14 @@ local has = function(tab, val) return false end --- --- Create a new instance --- --- returns wiola instance --- +--- +--- Create a new instance +--- +--- @return wiola instance +--- function _M.new() local self = setmetatable({}, _M) - local ok, err = store:init(config.storeConfig) + local ok, err = store:init(config) if not ok then return ok, err end @@ -100,41 +125,64 @@ function _M.new() end --- Generate a random string +--- +--- Generate a random string +--- +--- @param length number String length +--- @return string random string +--- function _M:_randomString(length) - local str = ""; - local time = self.redis:time() - - -- math.randomseed( os.time() ) -- Precision - only seconds, which is not acceptable - math.randomseed(time[1] * 1000000 + time[2]) + local str = {}; + math.randomseed(math.floor(ngx.now()*1000)) - for i = 1, length do - str = str .. string.char(math.random(32, 126)); + for _ = 1, length do + table.insert(str, string.char(math.random(32, 126))) end - return str; + return table.concat(str) end --- Validate uri for WAMP requirements -function _M:_validateURI(uri) - local m, err = ngx.re.match(uri, "^([0-9a-zA-Z_]{2,}\\.)*([0-9a-zA-Z_]{2,})$") - if not m or string.find(uri, 'wamp') == 1 then +--- +--- Validate uri for WAMP requirements +--- +--- @param uri string uri to validate +--- @param patternBased boolean allow wamp pattern based syntax or no +--- @param allowWAMP boolean allow wamp special prefixed uris or no +--- @return boolean is uri valid? +--- +function _M:_validateURI(uri, patternBased, allowWAMP) + local re = "^([0-9a-zA-Z_]+\\.)*([0-9a-zA-Z_]+)$" + local rePattern = "^([0-9a-zA-Z_]+\\.{1,2})*([0-9a-zA-Z_]+)$" + + if patternBased == true then + re = rePattern + end + + local m, err = ngx.re.match(uri, re) + + if not m then return false + elseif string.find(uri, 'wamp%.') == 1 then + if allowWAMP ~= true then + return false + else + return true, true + end else return true end end --- --- Add connection to wiola --- --- sid - nginx session connection ID --- wampProto - chosen WAMP protocol --- --- returns WAMP session registration ID, connection data type --- +--- +--- Add connection to wiola +--- +--- @param sid number nginx session connection ID +--- @param wampProto string chosen WAMP protocol +--- +--- @return number, string WAMP session registration ID, connection data type +--- function _M:addConnection(sid, wampProto) local regId = store:getRegId() - local wProto, dataType, session + local dataType if wampProto == nil or wampProto == "" then wampProto = 'wamp.2.json' -- Setting default protocol for encoding/decodig use @@ -160,17 +208,33 @@ function _M:addConnection(sid, wampProto) return regId, dataType end --- Prepare data for sending to client +--- +--- Prepare data for sending to client +--- +--- @param session table Session object +--- @param data any Client data +--- function _M:_putData(session, data) local dataObj = serializers[session.encoding].encode(data) store:putData(session, dataObj) end --- Publish event to sessions +--- +--- Publish event to sessions +--- +--- @param sessRegIds table Array of session Ids +--- @param subId number Subscription Id +--- @param pubId number Publication Id +--- @param details table Details hash-table +--- @param args table Array-like payload +--- @param argsKW table Object-like payload +--- function _M:_publishEvent(sessRegIds, subId, pubId, details, args, argsKW) -- WAMP SPEC: [EVENT, SUBSCRIBED.Subscription|id, PUBLISHED.Publication|id, Details|dict] - -- WAMP SPEC: [EVENT, SUBSCRIBED.Subscription|id, PUBLISHED.Publication|id, Details|dict, PUBLISH.Arguments|list] - -- WAMP SPEC: [EVENT, SUBSCRIBED.Subscription|id, PUBLISHED.Publication|id, Details|dict, PUBLISH.Arguments|list, PUBLISH.ArgumentKw|dict] + -- WAMP SPEC: [EVENT, SUBSCRIBED.Subscription|id, PUBLISHED.Publication|id, Details|dict, + -- PUBLISH.Arguments|list] + -- WAMP SPEC: [EVENT, SUBSCRIBED.Subscription|id, PUBLISHED.Publication|id, Details|dict, + -- PUBLISH.Arguments|list, PUBLISH.ArgumentKw|dict] local data if not args and not argsKW then @@ -181,18 +245,224 @@ function _M:_publishEvent(sessRegIds, subId, pubId, details, args, argsKW) data = { WAMP_MSG_SPEC.EVENT, subId, pubId, details, args, argsKW } end - for k, v in ipairs(sessRegIds) do + for _, v in ipairs(sessRegIds) do local session = store:getSession(v) self:_putData(session, data) end end --- --- Receive data from client --- --- regId - WAMP session registration ID --- data - data, received through websocket --- +--- +--- Publish META event to sessions +--- +--- @param part string META API section name +--- @param eventUri string event uri +--- @param session table session object +--- +function _M:_publishMetaEvent(part, eventUri, session, ...) + if not config.metaAPI[part] then + return + end + + local subId = store:getSubscriptionId(session.realm, eventUri) + if not subId then + return + end + + local pubId = store:getRegId() + local recipients = store:getTopicSessions(session.realm, eventUri) + local parameters = {n = select('#', ...), ...} + local argsL, argsKW = { session.sessId }, nil + + if eventUri == 'wamp.session.on_join' then + argsL = {{ session = session.sessId }} + if parameters[1] then + argsL[1].authid = parameters[1].authid + argsL[1].authrole = parameters[1].authrole + argsL[1].authmethod = parameters[1].authmethod + argsL[1].authprovider = parameters[1].authprovider + end + -- TODO Add information about transport + elseif eventUri == 'wamp.session.on_leave' then + -- nothing to add :) + elseif eventUri == 'wamp.subscription.on_create' then + local details = { + id = parameters[1], + created = parameters[2], + uri = parameters[3], + match = parameters[4] + } + table.insert(argsL, details) + elseif eventUri == 'wamp.subscription.on_subscribe' then + table.insert(argsL, parameters[1]) + elseif eventUri == 'wamp.subscription.on_unsubscribe' then + table.insert(argsL, parameters[1]) + elseif eventUri == 'wamp.subscription.on_delete' then + table.insert(argsL, parameters[1]) + elseif eventUri == 'wamp.registration.on_create' then + local details = { + id = parameters[1], + created = parameters[2], + uri = parameters[3], + match = parameters[4], + invoke = parameters[5] + } + table.insert(argsL, details) + elseif eventUri == 'wamp.registration.on_register' then + table.insert(argsL, parameters[1]) + elseif eventUri == 'wamp.registration.on_unregister' then + table.insert(argsL, parameters[1]) + elseif eventUri == 'wamp.registration.on_delete' then + table.insert(argsL, parameters[1]) + end + + self:_publishEvent(recipients, subId, pubId, {}, argsL, argsKW) +end + +--- +--- Process Call META RPC +--- +--- @param part string META API section name +--- @param rpcUri string rpc uri +--- @param session table session object +--- @param requestId number request Id +--- @param rpcArgsL table Array-like payload +--- @param rpcArgsKw table Object-like payload +--- +function _M:_callMetaRPC(part, rpcUri, session, requestId, rpcArgsL, rpcArgsKw) + local data + local details = setmetatable({}, { __jsontype = 'object' }) + + if config.metaAPI[part] == true then + + if rpcUri == 'wamp.session.count' then + + local count = store:getSessionCount(session.realm, rpcArgsL) + data = { WAMP_MSG_SPEC.RESULT, requestId, details, { count } } + + elseif rpcUri == 'wamp.session.list' then + + local count, sessList = store:getSessionCount(session.realm, rpcArgsL) + data = { WAMP_MSG_SPEC.RESULT, requestId, details, sessList } + + elseif rpcUri == 'wamp.session.get' then + + local sessionInfo = store:getSession(rpcArgsL[1]) + if sessionInfo ~= nil then + + local res = {} + if sessionInfo.authInfo then + res = sessionInfo.authInfo + end + res.session = sessionInfo.sessId + -- TODO Add transport info + + data = { WAMP_MSG_SPEC.RESULT, requestId, details, { res } } + else + data = { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.CALL, requestId, details, "wamp.error.no_such_session" } + end + + elseif rpcUri == 'wamp.subscription.list' then + + -- TODO Implement rpcUri == 'wamp.subscription.list' + -- need to count subs due to matching policy + --local subsIds = store:getSubscriptions(session.realm) + --data = { WAMP_MSG_SPEC.RESULT, requestId, details, subsIds } + + elseif rpcUri == 'wamp.subscription.lookup' then + + -- TODO Implement rpcUri == 'wamp.subscription.lookup' + + elseif rpcUri == 'wamp.subscription.match' then + + local subId = store:getSubscriptionId(session.realm, rpcArgsL[1]) + if subId then + data = { WAMP_MSG_SPEC.RESULT, requestId, details, { subId } } + else + data = { WAMP_MSG_SPEC.RESULT, requestId, details} + end + + elseif rpcUri == 'wamp.subscription.get' then + + -- TODO Implement rpcUri == 'wamp.subscription.get' + + elseif rpcUri == 'wamp.subscription.list_subscribers' then + + local sessList = store:getTopicSessionsBySubId(session.realm, rpcArgsL[1]) + if sessList ~= nil then + data = { WAMP_MSG_SPEC.RESULT, requestId, details, sessList } + else + data = { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.CALL, requestId, details, "wamp.error.no_such_subscription" } + end + + elseif rpcUri == 'wamp.subscription.count_subscribers' then + + local sessCount = store:getTopicSessionsCountBySubId(session.realm, rpcArgsL[1]) + if sessCount ~= nil then + data = { WAMP_MSG_SPEC.RESULT, requestId, details, { sessCount } } + else + data = { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.CALL, requestId, details, "wamp.error.no_such_subscription" } + end + + elseif rpcUri == 'wamp.registration.list' then + + -- TODO Implement rpcUri == 'wamp.registration.list' + + elseif rpcUri == 'wamp.registration.lookup' then + + -- TODO Implement rpcUri == 'wamp.registration.lookup' + + elseif rpcUri == 'wamp.registration.match' then + + local rpcInfo = store:getRPC(session.realm, rpcArgsL[1]) + + if rpcInfo then + data = { WAMP_MSG_SPEC.RESULT, requestId, details, { rpcInfo.registrationId } } + else + data = { WAMP_MSG_SPEC.RESULT, requestId, details} + end + + elseif rpcUri == 'wamp.registration.get' then + + -- TODO Implement rpcUri == 'wamp.registration.get' + + elseif rpcUri == 'wamp.registration.list_callees' then + + -- TODO Do not forget to update 'wamp.registration.list_callees' while implementing SHARED/SHARDED RPCs + local rpcInfo = store:getRPC(session.realm, rpcArgsL[1]) + + if rpcInfo then + data = { WAMP_MSG_SPEC.RESULT, requestId, details, { rpcInfo.calleeSesId } } + else + data = { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.CALL, requestId, details, "wamp.error.no_such_registration" } + end + + elseif rpcUri == 'wamp.registration.count_callees' then + + -- TODO Do not forget to update 'wamp.registration.count_callees' while implementing SHARED/SHARDED RPCs + local rpcInfo = store:getRPC(session.realm, rpcArgsL[1]) + + if rpcInfo then + data = { WAMP_MSG_SPEC.RESULT, requestId, details, { 1 } } + else + data = { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.CALL, requestId, details, "wamp.error.no_such_registration" } + end + + else + data = { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.CALL, requestId, details, "wamp.error.invalid_uri" } + end + else + data = { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.CALL, requestId, details, "wamp.error.no_suitable_callee" } + end + + self:_putData(session, data) +end + +--- +--- Receive data from client +--- +--- @param regId number WAMP session registration ID +--- @param data any data, received through websocket +--- function _M:receiveData(regId, data) local session = store:getSession(regId) @@ -203,10 +473,16 @@ function _M:receiveData(regId, data) if session.isWampEstablished == 1 then -- Protocol error: received second hello message - aborting -- WAMP SPEC: [GOODBYE, Details|dict, Reason|uri] - self:_putData(session, { WAMP_MSG_SPEC.GOODBYE, setmetatable({}, { __jsontype = 'object' }), "wamp.error.system_shutdown" }) + self:_putData(session, { + WAMP_MSG_SPEC.ABORT, + { message = 'Received WELCOME message after session was established' }, + "wamp.error.protocol_violation" + }) + store:setHandlerFlags(regId, { close = true, sendLast = true }) + self:_publishMetaEvent('session', 'wamp.session.on_leave', session) else local realm = dataObj[2] - if self:_validateURI(realm) then + if self:_validateURI(realm, false, false) then if config.wampCRA.authType ~= "none" then @@ -214,7 +490,10 @@ function _M:receiveData(regId, data) local challenge, challengeString, signature - store:changeChallenge(regId, { realm = realm, wampFeatures = serializers.json.encode(dataObj[3]) }) + store:changeChallenge(regId, { + realm = realm, + wampFeatures = serializers.json.encode(dataObj[3]) + }) if config.wampCRA.authType == "static" then @@ -233,9 +512,9 @@ function _M:receiveData(regId, data) challengeString = serializers.json.encode(challenge) local hmac = require "resty.hmac" - local hm, err = hmac:new(config.wampCRA.staticCredentials[dataObj[3].authid].secret) + local hm = hmac:new(config.wampCRA.staticCredentials[dataObj[3].authid].secret) - signature, err = hm:generate_signature("sha256", challengeString) + signature = hm:generate_signature("sha256", challengeString) if signature then @@ -243,15 +522,27 @@ function _M:receiveData(regId, data) store:changeChallenge(regId, challenge) -- WAMP SPEC: [CHALLENGE, AuthMethod|string, Extra|dict] - self:_putData(session, { WAMP_MSG_SPEC.CHALLENGE, "wampcra", { challenge = challengeString } }) + self:_putData(session, { + WAMP_MSG_SPEC.CHALLENGE, + "wampcra", + { challenge = challengeString } + }) else -- WAMP SPEC: [ABORT, Details|dict, Reason|uri] - self:_putData(session, { WAMP_MSG_SPEC.ABORT, setmetatable({}, { __jsontype = 'object' }), "wamp.error.authorization_failed" }) + self:_putData(session, { + WAMP_MSG_SPEC.ABORT, + setmetatable({}, { __jsontype = 'object' }), + "wamp.error.authorization_failed" + }) end else -- WAMP SPEC: [ABORT, Details|dict, Reason|uri] - self:_putData(session, { WAMP_MSG_SPEC.ABORT, setmetatable({}, { __jsontype = 'object' }), "wamp.error.authorization_failed" }) + self:_putData(session, { + WAMP_MSG_SPEC.ABORT, + setmetatable({}, { __jsontype = 'object' }), + "wamp.error.authorization_failed" + }) end elseif config.wampCRA.authType == "dynamic" then @@ -263,7 +554,11 @@ function _M:receiveData(regId, data) end else -- WAMP SPEC: [ABORT, Details|dict, Reason|uri] - self:_putData(session, { WAMP_MSG_SPEC.ABORT, setmetatable({}, { __jsontype = 'object' }), "wamp.error.authorization_failed" }) + self:_putData(session, { + WAMP_MSG_SPEC.ABORT, + setmetatable({}, { __jsontype = 'object' }), + "wamp.error.authorization_failed" + }) end else @@ -275,10 +570,15 @@ function _M:receiveData(regId, data) -- WAMP SPEC: [WELCOME, Session|id, Details|dict] self:_putData(session, { WAMP_MSG_SPEC.WELCOME, regId, wamp_features }) + self:_publishMetaEvent('session', 'wamp.session.on_join', session) end else -- WAMP SPEC: [ABORT, Details|dict, Reason|uri] - self:_putData(session, { WAMP_MSG_SPEC.ABORT, setmetatable({}, { __jsontype = 'object' }), "wamp.error.invalid_uri" }) + self:_putData(session, { + WAMP_MSG_SPEC.ABORT, + setmetatable({}, { __jsontype = 'object' }), + "wamp.error.invalid_uri" + }) end end elseif dataObj[1] == WAMP_MSG_SPEC.AUTHENTICATE then -- WAMP SPEC: [AUTHENTICATE, Signature|string, Extra|dict] @@ -286,7 +586,13 @@ function _M:receiveData(regId, data) if session.isWampEstablished == 1 then -- Protocol error: received second message - aborting -- WAMP SPEC: [GOODBYE, Details|dict, Reason|uri] - self:_putData(session, { WAMP_MSG_SPEC.GOODBYE, setmetatable({}, { __jsontype = 'object' }), "wamp.error.system_shutdown" }) + self:_putData(session, { + WAMP_MSG_SPEC.ABORT, + { message = 'Received AUTHENTICATE message after session was established' }, + "wamp.error.protocol_violation" + }) + store:setHandlerFlags(regId, { close = true, sendLast = true }) + self:_publishMetaEvent('session', 'wamp.session.on_leave', session) else local challenge = store:getChallenge(regId) @@ -312,6 +618,7 @@ function _M:receiveData(regId, data) session.isWampEstablished = 1 session.realm = challenge.realm session.wampFeatures = challenge.wampFeatures + session.authInfo = authInfo store:changeSession(regId, session) store:addSessionToRealm(regId, challenge.realm) @@ -323,68 +630,110 @@ function _M:receiveData(regId, data) -- WAMP SPEC: [WELCOME, Session|id, Details|dict] self:_putData(session, { WAMP_MSG_SPEC.WELCOME, regId, details }) + self:_publishMetaEvent('session', 'wamp.session.on_join', session, authInfo) else -- WAMP SPEC: [ABORT, Details|dict, Reason|uri] - self:_putData(session, { WAMP_MSG_SPEC.ABORT, setmetatable({}, { __jsontype = 'object' }), "wamp.error.authorization_failed" }) + self:_putData(session, { + WAMP_MSG_SPEC.ABORT, + setmetatable({}, { __jsontype = 'object' }), + "wamp.error.authorization_failed" + }) end end -- Clean up Challenge data in any case store:removeChallenge(regId) - elseif dataObj[1] == WAMP_MSG_SPEC.ABORT then -- WAMP SPEC: [ABORT, Details|dict, Reason|uri] + -- elseif dataObj[1] == WAMP_MSG_SPEC.ABORT then -- WAMP SPEC: [ABORT, Details|dict, Reason|uri] -- No response is expected elseif dataObj[1] == WAMP_MSG_SPEC.GOODBYE then -- WAMP SPEC: [GOODBYE, Details|dict, Reason|uri] if session.isWampEstablished == 1 then - self:_putData(session, { WAMP_MSG_SPEC.GOODBYE, setmetatable({}, { __jsontype = 'object' }), "wamp.error.goodbye_and_out" }) + self:_putData(session, { + WAMP_MSG_SPEC.GOODBYE, + setmetatable({}, { __jsontype = 'object' }), + "wamp.close.goodbye_and_out" + }) else - self:_putData(session, { WAMP_MSG_SPEC.GOODBYE, setmetatable({}, { __jsontype = 'object' }), "wamp.error.system_shutdown" }) + self:_putData(session, { + WAMP_MSG_SPEC.ABORT, + { message = 'Received GOODBYE message before session was established' }, + "wamp.error.protocol_violation" + }) + store:setHandlerFlags(regId, { close = true, sendLast = true }) end + self:_publishMetaEvent('session', 'wamp.session.on_leave', session) elseif dataObj[1] == WAMP_MSG_SPEC.ERROR then -- WAMP SPEC: [ERROR, REQUEST.Type|int, REQUEST.Request|id, Details|dict, Error|uri] - -- WAMP SPEC: [ERROR, REQUEST.Type|int, REQUEST.Request|id, Details|dict, Error|uri, Arguments|list] - -- WAMP SPEC: [ERROR, REQUEST.Type|int, REQUEST.Request|id, Details|dict, Error|uri, Arguments|list, ArgumentsKw|dict] + -- WAMP SPEC: [ERROR, REQUEST.Type|int, REQUEST.Request|id, Details|dict, Error|uri, + -- Arguments|list] + -- WAMP SPEC: [ERROR, REQUEST.Type|int, REQUEST.Request|id, Details|dict, Error|uri, + -- Arguments|list, ArgumentsKw|dict] if session.isWampEstablished == 1 then if dataObj[2] == WAMP_MSG_SPEC.INVOCATION then -- WAMP SPEC: [ERROR, INVOCATION, INVOCATION.Request|id, Details|dict, Error|uri] - -- WAMP SPEC: [ERROR, INVOCATION, INVOCATION.Request|id, Details|dict, Error|uri, Arguments|list] - -- WAMP SPEC: [ERROR, INVOCATION, INVOCATION.Request|id, Details|dict, Error|uri, Arguments|list, ArgumentsKw|dict] + -- WAMP SPEC: [ERROR, INVOCATION, INVOCATION.Request|id, Details|dict, Error|uri, + -- Arguments|list] + -- WAMP SPEC: [ERROR, INVOCATION, INVOCATION.Request|id, Details|dict, Error|uri, + -- Arguments|list, ArgumentsKw|dict] local invoc = store:getInvocation(dataObj[3]) local callerSess = store:getSession(invoc.callerSesId) if #dataObj == 6 then -- WAMP SPEC: [ERROR, CALL, CALL.Request|id, Details|dict, Error|uri, Arguments|list] - self:_putData(callerSess, { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.CALL, invoc.CallReqId, setmetatable({}, { __jsontype = 'object' }), dataObj[5], dataObj[6] }) + self:_putData(callerSess, { + WAMP_MSG_SPEC.ERROR, + WAMP_MSG_SPEC.CALL, + invoc.CallReqId, + setmetatable({}, { __jsontype = 'object' }), + dataObj[5], + dataObj[6] + }) elseif #dataObj == 7 then - -- WAMP SPEC: [ERROR, CALL, CALL.Request|id, Details|dict, Error|uri, Arguments|list, ArgumentsKw|dict] - self:_putData(callerSess, { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.CALL, invoc.CallReqId, setmetatable({}, { __jsontype = 'object' }), dataObj[5], dataObj[6], dataObj[7] }) + -- WAMP SPEC: [ERROR, CALL, CALL.Request|id, Details|dict, Error|uri, + -- Arguments|list, ArgumentsKw|dict] + self:_putData(callerSess, { + WAMP_MSG_SPEC.ERROR, + WAMP_MSG_SPEC.CALL, + invoc.CallReqId, + setmetatable({}, { __jsontype = 'object' }), + dataObj[5], + dataObj[6], + dataObj[7] + }) else -- WAMP SPEC: [ERROR, CALL, CALL.Request|id, Details|dict, Error|uri] - self:_putData(callerSess, { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.CALL, invoc.CallReqId, setmetatable({}, { __jsontype = 'object' }), dataObj[5] }) + self:_putData(callerSess, { + WAMP_MSG_SPEC.ERROR, + WAMP_MSG_SPEC.CALL, + invoc.CallReqId, + setmetatable({}, { __jsontype = 'object' }), + dataObj[5] + }) end store:removeInvocation(dataObj[3]) end + else + self:_putData(session, { + WAMP_MSG_SPEC.ABORT, + { message = 'Received ERROR message before session was established' }, + "wamp.error.protocol_violation" + }) + store:setHandlerFlags(regId, { close = true, sendLast = true }) end elseif dataObj[1] == WAMP_MSG_SPEC.PUBLISH then -- WAMP SPEC: [PUBLISH, Request|id, Options|dict, Topic|uri] -- WAMP SPEC: [PUBLISH, Request|id, Options|dict, Topic|uri, Arguments|list] -- WAMP SPEC: [PUBLISH, Request|id, Options|dict, Topic|uri, Arguments|list, ArgumentsKw|dict] if session.isWampEstablished == 1 then - if self:_validateURI(dataObj[4]) then + if self:_validateURI(dataObj[4], false, false) then local pubId = store:getRegId() local recipients = store:getEventRecipients(session.realm, dataObj[4], regId, dataObj[3]) - local details = {} - - if dataObj[3].disclose_me ~= nil and dataObj[3].disclose_me == true then - details.publisher = regId - end - local subId = store:getSubscriptionId(session.realm, dataObj[4]) - if subId then - self:_publishEvent(recipients, subId, pubId, details, dataObj[5], dataObj[6]) + for _, v in ipairs(recipients) do + self:_publishEvent(v.sessions, v.subId, pubId, v.details, dataObj[5], dataObj[6]) if dataObj[3].acknowledge and dataObj[3].acknowledge == true then -- WAMP SPEC: [PUBLISHED, PUBLISH.Request|id, Publication|id] @@ -392,123 +741,258 @@ function _M:receiveData(regId, data) end end else - self:_putData(session, { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.PUBLISH, dataObj[2], setmetatable({}, { __jsontype = 'object' }), "wamp.error.invalid_uri" }) + self:_putData(session, { + WAMP_MSG_SPEC.ERROR, + WAMP_MSG_SPEC.PUBLISH, + dataObj[2], + setmetatable({}, { __jsontype = 'object' }), + "wamp.error.invalid_uri" + }) end else - self:_putData(session, { WAMP_MSG_SPEC.GOODBYE, setmetatable({}, { __jsontype = 'object' }), "wamp.error.system_shutdown" }) + self:_putData(session, { + WAMP_MSG_SPEC.ABORT, + { message = 'Received PUBLISH message before session was established' }, + "wamp.error.protocol_violation" + }) + store:setHandlerFlags(regId, { close = true, sendLast = true }) + self:_publishMetaEvent('session', 'wamp.session.on_leave', session) end elseif dataObj[1] == WAMP_MSG_SPEC.SUBSCRIBE then -- WAMP SPEC: [SUBSCRIBE, Request|id, Options|dict, Topic|uri] if session.isWampEstablished == 1 then - if self:_validateURI(dataObj[4]) then - local subscriptionId = store:subscribeSession(session.realm, dataObj[4], regId) + local patternBased = false + if dataObj[3].match then + patternBased = true + end + + if self:_validateURI(dataObj[4], patternBased, true) then + local subscriptionId, isNewSubscription = store:subscribeSession( + session.realm, dataObj[4], dataObj[3], regId) -- WAMP SPEC: [SUBSCRIBED, SUBSCRIBE.Request|id, Subscription|id] self:_putData(session, { WAMP_MSG_SPEC.SUBSCRIBED, dataObj[2], subscriptionId }) + if isNewSubscription then + self:_publishMetaEvent('subscription', 'wamp.subscription.on_create', session, + subscriptionId, os.date("!%Y-%m-%dT%TZ"), dataObj[4], "exact") + end + self:_publishMetaEvent('subscription', 'wamp.subscription.on_subscribe', session, + subscriptionId) else - self:_putData(session, { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.SUBSCRIBE, dataObj[2], setmetatable({}, { __jsontype = 'object' }), "wamp.error.invalid_uri" }) + self:_putData(session, { + WAMP_MSG_SPEC.ERROR, + WAMP_MSG_SPEC.SUBSCRIBE, + dataObj[2], + setmetatable({}, { __jsontype = 'object' }), + "wamp.error.invalid_uri" + }) end else - self:_putData(session, { WAMP_MSG_SPEC.GOODBYE, setmetatable({}, { __jsontype = 'object' }), "wamp.error.system_shutdown" }) + self:_putData(session, { + WAMP_MSG_SPEC.ABORT, + { message = 'Received SUBSCRIBE message before session was established' }, + "wamp.error.protocol_violation" + }) + store:setHandlerFlags(regId, { close = true, sendLast = true }) + self:_publishMetaEvent('session', 'wamp.session.on_leave', session) end - elseif dataObj[1] == WAMP_MSG_SPEC.UNSUBSCRIBE then -- WAMP SPEC: [UNSUBSCRIBE, Request|id, SUBSCRIBED.Subscription|id] + elseif dataObj[1] == WAMP_MSG_SPEC.UNSUBSCRIBE then + -- WAMP SPEC: [UNSUBSCRIBE, Request|id, SUBSCRIBED.Subscription|id] if session.isWampEstablished == 1 then - local isSesSubscrbd = store:unsubscribeSession(session.realm, dataObj[3], regId) + local isSesSubscrbd, wasTopicRemoved = store:unsubscribeSession(session.realm, dataObj[3], regId) if isSesSubscrbd ~= ngx.null then -- WAMP SPEC: [UNSUBSCRIBED, UNSUBSCRIBE.Request|id] self:_putData(session, { WAMP_MSG_SPEC.UNSUBSCRIBED, dataObj[2] }) + self:_publishMetaEvent('subscription', 'wamp.subscription.on_unsubscribe', session, dataObj[3]) + if wasTopicRemoved then + self:_publishMetaEvent('subscription', 'wamp.subscription.on_delete', session, dataObj[3]) + end else - self:_putData(session, { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.UNSUBSCRIBE, dataObj[2], setmetatable({}, { __jsontype = 'object' }), "wamp.error.no_such_subscription" }) + self:_putData(session, { + WAMP_MSG_SPEC.ERROR, + WAMP_MSG_SPEC.UNSUBSCRIBE, + dataObj[2], + setmetatable({}, { __jsontype = 'object' }), + "wamp.error.no_such_subscription" + }) end else - self:_putData(session, { WAMP_MSG_SPEC.GOODBYE, setmetatable({}, { __jsontype = 'object' }), "wamp.error.system_shutdown" }) + self:_putData(session, { + WAMP_MSG_SPEC.ABORT, + { message = 'Received UNSUBSCRIBE message before session was established' }, + "wamp.error.protocol_violation" + }) + store:setHandlerFlags(regId, { close = true, sendLast = true }) + self:_publishMetaEvent('session', 'wamp.session.on_leave', session) end elseif dataObj[1] == WAMP_MSG_SPEC.CALL then -- WAMP SPEC: [CALL, Request|id, Options|dict, Procedure|uri] -- WAMP SPEC: [CALL, Request|id, Options|dict, Procedure|uri, Arguments|list] -- WAMP SPEC: [CALL, Request|id, Options|dict, Procedure|uri, Arguments|list, ArgumentsKw|dict] if session.isWampEstablished == 1 then - if self:_validateURI(dataObj[4]) then + local isUriValid, isWampSpecial = self:_validateURI(dataObj[4], false, true) + if isUriValid then - local rpcInfo = store:getRPC(session.realm, dataObj[4]) - - if not rpcInfo then - -- WAMP SPEC: [ERROR, CALL, CALL.Request|id, Details|dict, Error|uri] - self:_putData(session, { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.CALL, dataObj[2], setmetatable({}, { __jsontype = 'object' }), "wamp.error.no_suitable_callee" }) + if isWampSpecial then + -- Received a call for WAMP meta RPCs + local metapart = string.match(dataObj[4], "wamp.(%a+)") + self:_callMetaRPC(metapart, dataObj[4], session, dataObj[2], dataObj[5], dataObj[6]) else - local details = setmetatable({}, { __jsontype = 'object' }) - if config.callerIdentification == "always" or - (config.callerIdentification == "auto" and - ((dataObj[3].disclose_me ~= nil and dataObj[3].disclose_me == true) or - (rpcInfo.disclose_caller == true))) then - details.caller = regId - end + local rpcInfo = store:getRPC(session.realm, dataObj[4]) + + if not rpcInfo then + -- WAMP SPEC: [ERROR, CALL, CALL.Request|id, Details|dict, Error|uri] + self:_putData(session, { + WAMP_MSG_SPEC.ERROR, + WAMP_MSG_SPEC.CALL, + dataObj[2], + setmetatable({}, { __jsontype = 'object' }), + "wamp.error.no_suitable_callee" + }) + else + local details = setmetatable({}, { __jsontype = 'object' }) - if dataObj[3].receive_progress ~= nil and dataObj[3].receive_progress == true then - details.receive_progress = true - end + if config.callerIdentification == "always" or + (config.callerIdentification == "auto" and + ((dataObj[3].disclose_me ~= nil and dataObj[3].disclose_me == true) or + (rpcInfo.disclose_caller == true))) then + details.caller = regId + end - local calleeSess = store:getSession(rpcInfo.calleeSesId) - local invReqId = store:getRegId() + if dataObj[3].receive_progress ~= nil and dataObj[3].receive_progress == true then + details.receive_progress = true + end - if dataObj[3].timeout ~= nil and - dataObj[3].timeout > 0 and - calleeSess.wampFeatures.callee.features.call_timeout == true and - calleeSess.wampFeatures.callee.features.call_canceling == true then + local calleeSess = store:getSession(rpcInfo.calleeSesId) + local invReqId = store:getRegId() - -- Caller specified Timeout for CALL processing and callee support this feature - local function callCancel(premature, calleeSess, invReqId) + if rpcInfo.options and rpcInfo.options.procedure then + details.procedure = rpcInfo.options.procedure + end - local details = setmetatable({}, { __jsontype = 'object' }) + if dataObj[3].timeout ~= nil and + dataObj[3].timeout > 0 and + calleeSess.wampFeatures.callee.features.call_timeout == true and + calleeSess.wampFeatures.callee.features.call_canceling == true then - -- WAMP SPEC: [INTERRUPT, INVOCATION.Request|id, Options|dict] - self:_putData(calleeSess, { WAMP_MSG_SPEC.INTERRUPT, invReqId, details }) - end + -- Caller specified Timeout for CALL processing and callee support this feature + local function callCancel(_, calleeSession, invocReqId) - local ok, err = ngx.timer.at(dataObj[3].timeout, callCancel, calleeSess, invReqId) + -- WAMP SPEC: [INTERRUPT, INVOCATION.Request|id, Options|dict] + self:_putData(calleeSession, { + WAMP_MSG_SPEC.INTERRUPT, + invocReqId, + setmetatable({}, { __jsontype = 'object' }) + }) + end - if not ok then - end - end + local ok, err = ngx.timer.at(dataObj[3].timeout, callCancel, calleeSess, invReqId) - store:addCallInvocation(dataObj[2], session.sessId, invReqId, calleeSess.sessId) + if not ok then + end + end - if #dataObj == 5 then - -- WAMP SPEC: [INVOCATION, Request|id, REGISTERED.Registration|id, Details|dict, CALL.Arguments|list] - self:_putData(calleeSess, { WAMP_MSG_SPEC.INVOCATION, invReqId, rpcInfo.registrationId, details, dataObj[5] }) - elseif #dataObj == 6 then - -- WAMP SPEC: [INVOCATION, Request|id, REGISTERED.Registration|id, Details|dict, CALL.Arguments|list, CALL.ArgumentsKw|dict] - self:_putData(calleeSess, { WAMP_MSG_SPEC.INVOCATION, invReqId, rpcInfo.registrationId, details, dataObj[5], dataObj[6] }) - else - -- WAMP SPEC: [INVOCATION, Request|id, REGISTERED.Registration|id, Details|dict] - self:_putData(calleeSess, { WAMP_MSG_SPEC.INVOCATION, invReqId, rpcInfo.registrationId, details }) + store:addCallInvocation(dataObj[2], session.sessId, invReqId, calleeSess.sessId) + + if #dataObj == 5 then + -- WAMP SPEC: [INVOCATION, Request|id, REGISTERED.Registration|id, Details|dict, + -- CALL.Arguments|list] + self:_putData(calleeSess, { + WAMP_MSG_SPEC.INVOCATION, + invReqId, + rpcInfo.registrationId, + details, + dataObj[5] + }) + elseif #dataObj == 6 then + -- WAMP SPEC: [INVOCATION, Request|id, REGISTERED.Registration|id, Details|dict, + -- CALL.Arguments|list, CALL.ArgumentsKw|dict] + self:_putData(calleeSess, { + WAMP_MSG_SPEC.INVOCATION, + invReqId, + rpcInfo.registrationId, + details, + dataObj[5], + dataObj[6] + }) + else + -- WAMP SPEC: [INVOCATION, Request|id, REGISTERED.Registration|id, Details|dict] + self:_putData(calleeSess, { + WAMP_MSG_SPEC.INVOCATION, + invReqId, + rpcInfo.registrationId, + details + }) + end end end else - self:_putData(session, { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.CALL, dataObj[2], setmetatable({}, { __jsontype = 'object' }), "wamp.error.invalid_uri" }) + self:_putData(session, { + WAMP_MSG_SPEC.ERROR, + WAMP_MSG_SPEC.CALL, + dataObj[2], + setmetatable({}, { __jsontype = 'object' }), + "wamp.error.invalid_uri" + }) end else - self:_putData(session, { WAMP_MSG_SPEC.GOODBYE, setmetatable({}, { __jsontype = 'object' }), "wamp.error.system_shutdown" }) + self:_putData(session, { + WAMP_MSG_SPEC.ABORT, + { message = 'Received CALL message before session was established' }, + "wamp.error.protocol_violation" + }) + store:setHandlerFlags(regId, { close = true, sendLast = true }) + self:_publishMetaEvent('session', 'wamp.session.on_leave', session) end - elseif dataObj[1] == WAMP_MSG_SPEC.REGISTER then -- WAMP SPEC: [REGISTER, Request|id, Options|dict, Procedure|uri] + elseif dataObj[1] == WAMP_MSG_SPEC.REGISTER then + -- WAMP SPEC: [REGISTER, Request|id, Options|dict, Procedure|uri] if session.isWampEstablished == 1 then - if self:_validateURI(dataObj[4]) then + local patternBased = false + if dataObj[3].match then + patternBased = true + end + + if self:_validateURI(dataObj[4], patternBased, false) then local registrationId = store:registerSessionRPC(session.realm, dataObj[4], dataObj[3], regId) if not registrationId then - self:_putData(session, { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.REGISTER, dataObj[2], setmetatable({}, { __jsontype = 'object' }), "wamp.error.procedure_already_exists" }) + self:_putData(session, { + WAMP_MSG_SPEC.ERROR, + WAMP_MSG_SPEC.REGISTER, + dataObj[2], + setmetatable({}, { __jsontype = 'object' }), + "wamp.error.procedure_already_exists" + }) else -- WAMP SPEC: [REGISTERED, REGISTER.Request|id, Registration|id] self:_putData(session, { WAMP_MSG_SPEC.REGISTERED, dataObj[2], registrationId }) + -- TODO Refactor this in case of implementing shared registrations + self:_publishMetaEvent('registration', 'wamp.registration.on_create', session, + registrationId, os.date("!%Y-%m-%dT%TZ"), dataObj[4], "exact", "single") + self:_publishMetaEvent('registration', 'wamp.registration.on_register', session, + registrationId) end else - self:_putData(session, { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.REGISTER, dataObj[2], setmetatable({}, { __jsontype = 'object' }), "wamp.error.invalid_uri" }) + self:_putData(session, { + WAMP_MSG_SPEC.ERROR, + WAMP_MSG_SPEC.REGISTER, + dataObj[2], + setmetatable({}, { __jsontype = 'object' }), + "wamp.error.invalid_uri" + }) end else - self:_putData(session, { WAMP_MSG_SPEC.GOODBYE, setmetatable({}, { __jsontype = 'object' }), "wamp.error.system_shutdown" }) + self:_putData(session, { + WAMP_MSG_SPEC.ABORT, + { message = 'Received REGISTER message before session was established' }, + "wamp.error.protocol_violation" + }) + store:setHandlerFlags(regId, { close = true, sendLast = true }) + self:_publishMetaEvent('session', 'wamp.session.on_leave', session) end - elseif dataObj[1] == WAMP_MSG_SPEC.UNREGISTER then -- WAMP SPEC: [UNREGISTER, Request|id, REGISTERED.Registration|id] + elseif dataObj[1] == WAMP_MSG_SPEC.UNREGISTER then + -- WAMP SPEC: [UNREGISTER, Request|id, REGISTERED.Registration|id] if session.isWampEstablished == 1 then local rpc = store:unregisterSessionRPC(session.realm, dataObj[3], regId) @@ -516,11 +1000,26 @@ function _M:receiveData(regId, data) if rpc ~= ngx.null then -- WAMP SPEC: [UNREGISTERED, UNREGISTER.Request|id] self:_putData(session, { WAMP_MSG_SPEC.UNREGISTERED, dataObj[2] }) + self:_publishMetaEvent('registration', 'wamp.registration.on_unregister', session, dataObj[3]) + -- TODO Refactor this in case of implementing shared registrations + self:_publishMetaEvent('registration', 'wamp.registration.on_delete', session, dataObj[3]) else - self:_putData(session, { WAMP_MSG_SPEC.ERROR, WAMP_MSG_SPEC.UNREGISTER, dataObj[2], setmetatable({}, { __jsontype = 'object' }), "wamp.error.no_such_registration" }) + self:_putData(session, { + WAMP_MSG_SPEC.ERROR, + WAMP_MSG_SPEC.UNREGISTER, + dataObj[2], + setmetatable({}, { __jsontype = 'object' }), + "wamp.error.no_such_registration" + }) end else - self:_putData(session, { WAMP_MSG_SPEC.GOODBYE, setmetatable({}, { __jsontype = 'object' }), "wamp.error.system_shutdown" }) + self:_putData(session, { + WAMP_MSG_SPEC.ABORT, + { message = 'Received UNREGISTER message before session was established' }, + "wamp.error.protocol_violation" + }) + store:setHandlerFlags(regId, { close = true, sendLast = true }) + self:_publishMetaEvent('session', 'wamp.session.on_leave', session) end elseif dataObj[1] == WAMP_MSG_SPEC.YIELD then -- WAMP SPEC: [YIELD, INVOCATION.Request|id, Options|dict] @@ -550,7 +1049,13 @@ function _M:receiveData(regId, data) self:_putData(callerSess, { WAMP_MSG_SPEC.RESULT, invoc.CallReqId, details }) end else - self:_putData(session, { WAMP_MSG_SPEC.GOODBYE, setmetatable({}, { __jsontype = 'object' }), "wamp.error.system_shutdown" }) + self:_putData(session, { + WAMP_MSG_SPEC.ABORT, + { message = 'Received YIELD message before session was established' }, + "wamp.error.protocol_violation" + }) + store:setHandlerFlags(regId, { close = true, sendLast = true }) + self:_publishMetaEvent('session', 'wamp.session.on_leave', session) end elseif dataObj[1] == WAMP_MSG_SPEC.CANCEL then -- WAMP SPEC: [CANCEL, CALL.Request|id, Options|dict] @@ -570,30 +1075,57 @@ function _M:receiveData(regId, data) self:_putData(calleeSess, { WAMP_MSG_SPEC.INTERRUPT, wiCall.wiInvocId, details }) end else - self:_putData(session, { WAMP_MSG_SPEC.GOODBYE, setmetatable({}, { __jsontype = 'object' }), "wamp.error.system_shutdown" }) + self:_putData(session, { + WAMP_MSG_SPEC.ABORT, + { message = 'Received CANCEL message before session was established' }, + "wamp.error.protocol_violation" + }) + store:setHandlerFlags(regId, { close = true, sendLast = true }) + self:_publishMetaEvent('session', 'wamp.session.on_leave', session) end else + -- Received non-compliant WAMP message + -- WAMP SPEC: [ABORT, Details|dict, Reason|uri] + self:_putData(session, { + WAMP_MSG_SPEC.ABORT, + { message = 'Received non-compliant WAMP message' }, + "wamp.error.protocol_violation" + }) + store:setHandlerFlags(regId, { close = true, sendLast = true }) end end --- --- Retrieve data, available for session --- --- regId - WAMP session registration ID --- --- returns first WAMP message from the session data queue --- -function _M:getPendingData(regId) - return store:getPendingData(regId) +--- +--- Retrieve data, available for session +--- +--- @param regId number WAMP session registration ID +--- @param last boolean return from the end of a queue +--- +--- @return any first WAMP message from the session data queue +--- +function _M:getPendingData(regId, last) + return store:getPendingData(regId, last) end --- --- Process lightweight publish POST data from client --- --- sid - nginx session connection ID --- realm - WAMP Realm to operate in --- data - data, received through POST --- +--- +--- Retrieve connection handler flags, set up for session +--- +--- @param regId number WAMP session registration ID +--- +--- @return table flags data table +--- +function _M:getHandlerFlags(regId) + return store:getHandlerFlags(regId) +end + + +--- +--- Process lightweight publish POST data from client +--- +--- @param sid number nginx session connection ID +--- @param realm string WAMP Realm to operate in +--- @param data any data, received through POST +--- function _M:processPostData(sid, realm, data) local dataObj = serializers.json.decode(data) @@ -601,7 +1133,7 @@ function _M:processPostData(sid, realm, data) local httpCode if dataObj[1] == WAMP_MSG_SPEC.PUBLISH then - local regId, dataType = self.addConnection(sid, nil) + local regId = self.addConnection(sid, nil) -- Make a session legal :) local session = store:getSession(regId) @@ -611,7 +1143,7 @@ function _M:processPostData(sid, realm, data) self.receiveData(regId, data) - local cliData, cliErr = self.getPendingData(regId) + local cliData = self.getPendingData(regId) if cliData ~= ngx.null then res = cliData httpCode = ngx.HTTP_FORBIDDEN