Skip to content

Commit

Permalink
Monitor processes, unsubscribe them when they go down
Browse files Browse the repository at this point in the history
* Use Process.monitor to monitor recipients
* Handle DOWN messages from recipients. Unsub them from NATS, remove from state.
* Process.demonitor whenever a receipient usubs
  • Loading branch information
acco authored and mmmries committed Feb 22, 2024
1 parent 52f4a12 commit 497eca7
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 13 deletions.
40 changes: 27 additions & 13 deletions lib/gnat.ex
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,12 @@ defmodule Gnat do
def handle_info({:tcp_error, _, reason}, state) do
{:stop, "tcp transport error #{inspect(reason)}", state}
end
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
{sid, _receiver} = Enum.find(state.receivers, fn {_sid, receiver} -> receiver.recipient == pid end)
state = unsub_sid(sid, [], state)

{:noreply, state}
end
def handle_info(other, state) do
Logger.error "#{__MODULE__} received unexpected message: #{inspect other}"
{:noreply, state}
Expand All @@ -437,7 +443,8 @@ defmodule Gnat do
def handle_call({:sub, receiver, topic, opts}, _from, %{next_sid: sid}=state) do
sub = Command.build(:sub, topic, sid, opts)
:ok = socket_write(state, sub)
next_state = add_subscription_to_state(state, sid, receiver) |> Map.put(:next_sid, sid + 1)
ref = Process.monitor(receiver)
next_state = add_subscription_to_state(state, sid, receiver, ref) |> Map.put(:next_sid, sid + 1)
{:reply, {:ok, sid}, next_state}
end
def handle_call({:pub, topic, message, opts}, from, state) do
Expand Down Expand Up @@ -471,15 +478,9 @@ defmodule Gnat do
{:reply, :ok, state}
end
end
def handle_call({:unsub, sid, opts}, _from, %{receivers: receivers}=state) do
case Map.has_key?(receivers, sid) do
false -> {:reply, :ok, state}
true ->
command = Command.build(:unsub, sid, opts)
:ok = socket_write(state, command)
state = cleanup_subscription_from_state(state, sid, opts)
{:reply, :ok, state}
end
def handle_call({:unsub, sid, opts}, _from, state) do
state = unsub_sid(sid, opts, state)
{:reply, :ok, state}
end
def handle_call({:ping, pinger}, _from, state) do
:ok = socket_write(state, "PING\r\n")
Expand All @@ -493,12 +494,25 @@ defmodule Gnat do
{:reply, state.server_info, state}
end

defp unsub_sid(sid, opts, state) do
case Map.get(state.receivers, sid) do
nil ->
state
%{monitor_ref: ref} ->
command = Command.build(:unsub, sid, opts)
:ok = socket_write(state, command)
Process.demonitor(ref)
state = cleanup_subscription_from_state(state, sid, opts)
state
end
end

defp create_request_subscription(%{request_inbox_prefix: request_inbox_prefix}=state) do
# Example: "_INBOX.Jhf7AcTGP3x4dAV9.*"
wildcard_inbox_topic = request_inbox_prefix <> "*"
sub = Command.build(:sub, wildcard_inbox_topic, @request_sid, [])
:ok = socket_write(state, [sub])
add_subscription_to_state(state, @request_sid, self())
add_subscription_to_state(state, @request_sid, self(), nil)
end

defp make_new_inbox(%{request_inbox_prefix: prefix}), do: prefix <> nuid()
Expand All @@ -522,8 +536,8 @@ defmodule Gnat do
end
defp socket_write(%{socket: socket}, iodata), do: :gen_tcp.send(socket, iodata)

defp add_subscription_to_state(%{receivers: receivers}=state, sid, pid) do
receivers = Map.put(receivers, sid, %{recipient: pid, unsub_after: :infinity})
defp add_subscription_to_state(%{receivers: receivers}=state, sid, pid, ref) do
receivers = Map.put(receivers, sid, %{recipient: pid, monitor_ref: ref, unsub_after: :infinity})
%{state | receivers: receivers}
end

Expand Down
27 changes: 27 additions & 0 deletions test/gnat_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,33 @@ defmodule GnatTest do
end
end

test "subscription is cleaned up when the subscribing process dies" do
topic = "testcleanup"
test_pid = self()
{:ok, pid} = Gnat.start_link()

# one subscription created at boot
assert {:ok, 1} = Gnat.active_subscriptions(pid)

%Task{pid: task_pid}= Task.async(fn ->
Gnat.sub(pid, self(), topic)
assert {:ok, 2} = Gnat.active_subscriptions(pid)
send(test_pid, "subscribed")

receive do
:done -> :ok
end
end)

assert_receive "subscribed", 1_000
Gnat.server_info(pid)
Process.monitor(task_pid)
send(task_pid, :done)
assert_receive {:DOWN, _ref, :process, ^task_pid, _reason}, 1_000

assert {:ok, 1} = Gnat.active_subscriptions(pid)
end

test "request-reply convenience function" do
topic = "req-resp"
{:ok, pid} = Gnat.start_link()
Expand Down

0 comments on commit 497eca7

Please sign in to comment.