Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/instrument offline messages in redis #69

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions apps/vmq_server/priv/lua_scripts/delete_subs_offline_messages.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!lua name=delete_subs_offline_messages

--[[
Input:
KEYS[1] = subscriberId

Output: count(Number) | Error
]]

local function delete_subs_offline_messages(KEYS, _ARGV)
local OFFLINE_MESSAGE_COUNT_METRIC = 'offline_messages_count'

local subscriberId = KEYS[1]

local size = redis.call('LLEN', subscriberId)
redis.call('DEL', subscriberId)
return redis.call('DECRBY', OFFLINE_MESSAGE_COUNT_METRIC, size)
end

redis.register_function('delete_subs_offline_messages', delete_subs_offline_messages)
19 changes: 19 additions & 0 deletions apps/vmq_server/priv/lua_scripts/pop_offline_message.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!lua name=pop_offline_message

--[[
Input:
KEYS[1] = subscriberId

Output: count(Number) | Error
]]

local function pop_offline_message(KEYS, _ARGV)
local OFFLINE_MESSAGE_COUNT_METRIC = 'offline_messages_count'

local subscriberId = KEYS[1]

redis.call('LPOP', subscriberId, 1)
return redis.call('DECR', OFFLINE_MESSAGE_COUNT_METRIC)
end

redis.register_function('pop_offline_message', pop_offline_message)
21 changes: 21 additions & 0 deletions apps/vmq_server/priv/lua_scripts/write_offline_message.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!lua name=write_offline_message

--[[
Input:
KEYS[1] = subscriberId
ARGV[1] = message

Output: count(Number) | Error
]]

local function write_offline_message(KEYS, ARGV)
local OFFLINE_MESSAGE_COUNT_METRIC = 'offline_messages_count'

local subscriberId = KEYS[1]
local message = ARGV[1]

redis.call('RPUSH', subscriberId, message)
return redis.call('INCR', OFFLINE_MESSAGE_COUNT_METRIC)
end

redis.register_function('write_offline_message', write_offline_message)
116 changes: 97 additions & 19 deletions apps/vmq_server/src/vmq_message_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,108 @@
read/2,
delete/1,
delete/2,
find/1
find/1,
nr_of_offline_messages/0
]).

-define(OFFLINE_MESSAGES, offline_messages).

start() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
Ret = supervisor:start_link({local, ?MODULE}, ?MODULE, []),
load_redis_functions(),
Ret.

write(SubscriberId, Msg) ->
vmq_redis:query(
load_redis_functions() ->
LuaDir = application:get_env(vmq_server, redis_lua_dir, "./etc/lua"),

{ok, PopOfflineMessageScript} = file:read_file(LuaDir ++ "/pop_offline_message.lua"),
{ok, WriteOfflineMessageScript} = file:read_file(LuaDir ++ "/write_offline_message.lua"),
{ok, DeleteSubsOfflineMessagesScript} = file:read_file(
LuaDir ++ "/delete_subs_offline_messages.lua"
),

{ok, <<"pop_offline_message">>} = vmq_redis:query(
vmq_message_store_redis_client,
[?FUNCTION, "LOAD", "REPLACE", PopOfflineMessageScript],
?FUNCTION_LOAD,
?POP_OFFLINE_MESSAGE
),
{ok, <<"write_offline_message">>} = vmq_redis:query(
vmq_message_store_redis_client,
["RPUSH", term_to_binary(SubscriberId), term_to_binary(Msg)],
?RPUSH,
?MSG_STORE_WRITE
[?FUNCTION, "LOAD", "REPLACE", WriteOfflineMessageScript],
?FUNCTION_LOAD,
?WRITE_OFFLINE_MESSAGE
),
{ok, <<"delete_subs_offline_messages">>} = vmq_redis:query(
vmq_message_store_redis_client,
[?FUNCTION, "LOAD", "REPLACE", DeleteSubsOfflineMessagesScript],
?FUNCTION_LOAD,
?DELETE_SUBS_OFFLINE_MESSAGES
).

write(SubscriberId, Msg) ->
case
vmq_redis:query(
vmq_message_store_redis_client,
[
?FCALL,
?WRITE_OFFLINE_MESSAGE,
1,
term_to_binary(SubscriberId),
term_to_binary(Msg)
],
?FCALL,
?WRITE_OFFLINE_MESSAGE
)
of
{ok, OfflineMsgCount} ->
ets:insert(?OFFLINE_MESSAGES, {count, binary_to_integer(OfflineMsgCount)});
{error, _} ->
{error, not_supported}
end.

read(_SubscriberId, _MsgRef) ->
{error, not_supported}.

delete(SubscriberId) ->
vmq_redis:query(
vmq_message_store_redis_client,
["DEL", term_to_binary(SubscriberId)],
?DEL,
?MSG_STORE_DELETE
).
case
vmq_redis:query(
vmq_message_store_redis_client,
[
?FCALL,
?DELETE_SUBS_OFFLINE_MESSAGES,
1,
term_to_binary(SubscriberId)
],
?FCALL,
?DELETE_SUBS_OFFLINE_MESSAGES
)
of
{ok, OfflineMsgCount} ->
ets:insert(?OFFLINE_MESSAGES, {count, binary_to_integer(OfflineMsgCount)});
{error, _} ->
{error, not_supported}
end.

delete(SubscriberId, _MsgRef) ->
vmq_redis:query(
vmq_message_store_redis_client,
["LPOP", term_to_binary(SubscriberId), 1],
?LPOP,
?MSG_STORE_DELETE
).
case
vmq_redis:query(
vmq_message_store_redis_client,
[
?FCALL,
?POP_OFFLINE_MESSAGE,
1,
term_to_binary(SubscriberId)
],
?FCALL,
?POP_OFFLINE_MESSAGE
)
of
{ok, OfflineMsgCount} ->
ets:insert(?OFFLINE_MESSAGES, {count, binary_to_integer(OfflineMsgCount)});
{error, _} ->
{error, not_supported}
end.

find(SubscriberId) ->
case
Expand All @@ -71,6 +141,12 @@ find(SubscriberId) ->
Res
end.

nr_of_offline_messages() ->
case ets:lookup(?OFFLINE_MESSAGES, count) of
[] -> 0;
[{count, Count}] -> Count
end.

%% ===================================================================
%% Supervisor callbacks
%% ===================================================================
Expand All @@ -81,6 +157,8 @@ find(SubscriberId) ->
{atom(), {atom(), atom(), list()}, permanent, pos_integer(), worker, [atom()]}
]}}.
init([]) ->
ets:new(?OFFLINE_MESSAGES, [named_table, public, {write_concurrency, true}]),

StoreCfgs = application:get_env(vmq_server, message_store, [
{redis, [
{connect_options, "[{sentinel, [{endpoints, [{\"localhost\", 26379}]}]},{database,2}]"}
Expand Down
106 changes: 30 additions & 76 deletions apps/vmq_server/src/vmq_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,10 @@ redis_def() ->
?POLL_MAIN_QUEUE,
?GET_LIVE_NODES,
?MIGRATE_OFFLINE_QUEUE,
?REAP_SUBSCRIBERS
?REAP_SUBSCRIBERS,
?WRITE_OFFLINE_MESSAGE,
?POP_OFFLINE_MESSAGE,
?DELETE_SUBS_OFFLINE_MESSAGES
],
REDIS_DEF_1 =
[
Expand Down Expand Up @@ -1077,69 +1080,6 @@ redis_def() ->
],
REDIS_DEF_3 =
[
m(
counter,
[{cmd, rcn_to_str(?RPUSH)}, {operation, rcn_to_str(?MSG_STORE_WRITE)}],
{?REDIS_CMD, ?RPUSH, ?MSG_STORE_WRITE},
redis_cmd_total,
<<"The number of redis cmd calls.">>
),
m(
counter,
[{cmd, rcn_to_str(?RPUSH)}, {operation, rcn_to_str(?MSG_STORE_WRITE)}],
{?REDIS_CMD_ERROR, ?RPUSH, ?MSG_STORE_WRITE},
redis_cmd_errors_total,
<<"The number of times redis cmd call failed.">>
),
m(
counter,
[{cmd, rcn_to_str(?RPUSH)}, {operation, rcn_to_str(?MSG_STORE_WRITE)}],
{?REDIS_CMD_MISS, ?RPUSH, ?MSG_STORE_WRITE},
redis_cmd_miss_total,
<<"The number of times redis cmd returned empty/undefined due to entry not exists.">>
),
m(
counter,
[{cmd, rcn_to_str(?DEL)}, {operation, rcn_to_str(?MSG_STORE_DELETE)}],
{?REDIS_CMD, ?DEL, ?MSG_STORE_DELETE},
redis_cmd_total,
<<"The number of redis cmd calls.">>
),
m(
counter,
[{cmd, rcn_to_str(?DEL)}, {operation, rcn_to_str(?MSG_STORE_DELETE)}],
{?REDIS_CMD_ERROR, ?DEL, ?MSG_STORE_DELETE},
redis_cmd_errors_total,
<<"The number of times redis cmd call failed.">>
),
m(
counter,
[{cmd, rcn_to_str(?DEL)}, {operation, rcn_to_str(?MSG_STORE_DELETE)}],
{?REDIS_CMD_MISS, ?DEL, ?MSG_STORE_DELETE},
redis_cmd_miss_total,
<<"The number of times redis cmd returned empty/undefined due to entry not exists.">>
),
m(
counter,
[{cmd, rcn_to_str(?LPOP)}, {operation, rcn_to_str(?MSG_STORE_DELETE)}],
{?REDIS_CMD, ?LPOP, ?MSG_STORE_DELETE},
redis_cmd_total,
<<"The number of redis cmd calls.">>
),
m(
counter,
[{cmd, rcn_to_str(?LPOP)}, {operation, rcn_to_str(?MSG_STORE_DELETE)}],
{?REDIS_CMD_ERROR, ?LPOP, ?MSG_STORE_DELETE},
redis_cmd_errors_total,
<<"The number of times redis cmd call failed.">>
),
m(
counter,
[{cmd, rcn_to_str(?LPOP)}, {operation, rcn_to_str(?MSG_STORE_DELETE)}],
{?REDIS_CMD_MISS, ?LPOP, ?MSG_STORE_DELETE},
redis_cmd_miss_total,
<<"The number of times redis cmd returned empty/undefined due to entry not exists.">>
),
m(
counter,
[{cmd, rcn_to_str(?FIND)}, {operation, rcn_to_str(?MSG_STORE_FIND)}],
Expand Down Expand Up @@ -2218,7 +2158,8 @@ misc_statistics() ->
{router_memory, SMemory},
{retain_messages, NrOfRetain},
{retain_memory, RMemory},
{queue_processes, fetch_external_metric(vmq_queue_sup_sup, nr_of_queues, 0)}
{queue_processes, fetch_external_metric(vmq_queue_sup_sup, nr_of_queues, 0)},
{offline_messages, fetch_external_metric(vmq_message_store, nr_of_offline_messages, 0)}
].

-spec misc_stats_def() -> [metric_def()].
Expand Down Expand Up @@ -2252,7 +2193,8 @@ misc_stats_def() ->
retain_memory,
<<"The number of bytes used for storing retained messages.">>
),
m(gauge, [], queue_processes, queue_processes, <<"The number of MQTT queue processes.">>)
m(gauge, [], queue_processes, queue_processes, <<"The number of MQTT queue processes.">>),
m(gauge, [], offline_messages, offline_messages, <<"The number of offline messages">>)
].

-spec system_statistics() -> [{metric_id(), any()}].
Expand Down Expand Up @@ -2780,18 +2722,9 @@ met2idx({?REDIS_STALE_CMD, ?FCALL, ?ENQUEUE_MSG}) -> 271;
met2idx({?REDIS_STALE_CMD, ?FCALL, ?POLL_MAIN_QUEUE}) -> 272;
met2idx({?UNAUTH_REDIS_CMD, ?FCALL, ?ENQUEUE_MSG}) -> 273;
met2idx({?UNAUTH_REDIS_CMD, ?FCALL, ?POLL_MAIN_QUEUE}) -> 274;
met2idx({?REDIS_CMD, ?RPUSH, ?MSG_STORE_WRITE}) -> 285;
met2idx({?REDIS_CMD, ?DEL, ?MSG_STORE_DELETE}) -> 286;
met2idx({?REDIS_CMD, ?FIND, ?MSG_STORE_FIND}) -> 287;
met2idx({?REDIS_CMD_ERROR, ?RPUSH, ?MSG_STORE_WRITE}) -> 288;
met2idx({?REDIS_CMD_ERROR, ?DEL, ?MSG_STORE_DELETE}) -> 289;
met2idx({?REDIS_CMD_ERROR, ?FIND, ?MSG_STORE_FIND}) -> 290;
met2idx({?REDIS_CMD_MISS, ?RPUSH, ?MSG_STORE_WRITE}) -> 291;
met2idx({?REDIS_CMD_MISS, ?DEL, ?MSG_STORE_DELETE}) -> 292;
met2idx({?REDIS_CMD_MISS, ?FIND, ?MSG_STORE_FIND}) -> 293;
met2idx({?REDIS_CMD, ?LPOP, ?MSG_STORE_DELETE}) -> 294;
met2idx({?REDIS_CMD_ERROR, ?LPOP, ?MSG_STORE_DELETE}) -> 295;
met2idx({?REDIS_CMD_MISS, ?LPOP, ?MSG_STORE_DELETE}) -> 296;
met2idx({?QOS1_SUBSCRIPTION_OPTS, ?NON_RETRY, ?NON_PERSISTENCE}) -> 297;
met2idx({?QOS1_SUBSCRIPTION_OPTS, ?RETRY, ?NON_PERSISTENCE}) -> 298;
met2idx({?QOS1_SUBSCRIPTION_OPTS, ?NON_RETRY, ?PERSISTENCE}) -> 299;
Expand Down Expand Up @@ -2849,7 +2782,28 @@ met2idx(shared_subscription_group_publish_attempt_failed) -> 350;
met2idx(?METRIC_WEB_SOCKET_OPEN) -> 351;
met2idx(?METRIC_WEB_SOCKET_CLOSE) -> 352;
met2idx({?SIDECAR_EVENTS, ?ON_MESSAGE_DROP}) -> 353;
met2idx({?SIDECAR_EVENTS_ERROR, ?ON_MESSAGE_DROP}) -> 354.
met2idx({?SIDECAR_EVENTS_ERROR, ?ON_MESSAGE_DROP}) -> 354;
met2idx({?REDIS_CMD, ?FCALL, ?WRITE_OFFLINE_MESSAGE}) -> 355;
met2idx({?REDIS_CMD_ERROR, ?FCALL, ?WRITE_OFFLINE_MESSAGE}) -> 356;
met2idx({?REDIS_CMD_MISS, ?FCALL, ?WRITE_OFFLINE_MESSAGE}) -> 357;
met2idx({?REDIS_STALE_CMD, ?FCALL, ?WRITE_OFFLINE_MESSAGE}) -> 358;
met2idx({?UNAUTH_REDIS_CMD, ?FCALL, ?WRITE_OFFLINE_MESSAGE}) -> 359;
met2idx({?REDIS_CMD, ?FUNCTION_LOAD, ?WRITE_OFFLINE_MESSAGE}) -> 360;
met2idx({?REDIS_CMD_ERROR, ?FUNCTION_LOAD, ?WRITE_OFFLINE_MESSAGE}) -> 361;
met2idx({?REDIS_CMD, ?FCALL, ?POP_OFFLINE_MESSAGE}) -> 362;
met2idx({?REDIS_CMD_ERROR, ?FCALL, ?POP_OFFLINE_MESSAGE}) -> 363;
met2idx({?REDIS_CMD_MISS, ?FCALL, ?POP_OFFLINE_MESSAGE}) -> 364;
met2idx({?REDIS_STALE_CMD, ?FCALL, ?POP_OFFLINE_MESSAGE}) -> 365;
met2idx({?UNAUTH_REDIS_CMD, ?FCALL, ?POP_OFFLINE_MESSAGE}) -> 366;
met2idx({?REDIS_CMD, ?FUNCTION_LOAD, ?POP_OFFLINE_MESSAGE}) -> 367;
met2idx({?REDIS_CMD_ERROR, ?FUNCTION_LOAD, ?POP_OFFLINE_MESSAGE}) -> 368;
met2idx({?REDIS_CMD, ?FCALL, ?DELETE_SUBS_OFFLINE_MESSAGES}) -> 369;
met2idx({?REDIS_CMD_ERROR, ?FCALL, ?DELETE_SUBS_OFFLINE_MESSAGES}) -> 370;
met2idx({?REDIS_CMD_MISS, ?FCALL, ?DELETE_SUBS_OFFLINE_MESSAGES}) -> 371;
met2idx({?REDIS_STALE_CMD, ?FCALL, ?DELETE_SUBS_OFFLINE_MESSAGES}) -> 372;
met2idx({?UNAUTH_REDIS_CMD, ?FCALL, ?DELETE_SUBS_OFFLINE_MESSAGES}) -> 373;
met2idx({?REDIS_CMD, ?FUNCTION_LOAD, ?DELETE_SUBS_OFFLINE_MESSAGES}) -> 374;
met2idx({?REDIS_CMD_ERROR, ?FUNCTION_LOAD, ?DELETE_SUBS_OFFLINE_MESSAGES}) -> 375.

-ifdef(TEST).
clear_stored_rates() ->
Expand Down
3 changes: 3 additions & 0 deletions apps/vmq_server/src/vmq_server.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@
-define(REAP_SUBSCRIBERS, reap_subscribers).
-define(SCARD, scard).
-define(ENSURE_NO_LOCAL_CLIENT, ensure_no_local_client).
-define(WRITE_OFFLINE_MESSAGE, write_offline_message).
-define(POP_OFFLINE_MESSAGE, pop_offline_message).
-define(DELETE_SUBS_OFFLINE_MESSAGES, delete_subs_offline_messages).

-define(PRODUCER, "producer").
-define(CONSUMER, "consumer").
Expand Down
3 changes: 1 addition & 2 deletions apps/vmq_server/src/vmq_server_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@

-spec start(_, _) -> {'error', _} | {'ok', pid()} | {'ok', pid(), _}.
start(_StartType, _StartArgs) ->
{ok, _pid} = vmq_message_store:start(),

case vmq_server_sup:start_link() of
{error, _} = E ->
E;
R ->
{ok, _pid} = vmq_message_store:start(),
%% we'll wait for some millis, this
%% enables the vmq_plugin mechanism to be prepared...
%% vmq_plugin_mgr waits for the 'vmq_server_sup' process
Expand Down
Loading
Loading