diff --git a/lib/gnat.ex b/lib/gnat.ex index 3b296a7..0a5147d 100644 --- a/lib/gnat.ex +++ b/lib/gnat.ex @@ -424,6 +424,12 @@ defmodule Gnat do def handle_info({:tcp_error, _, reason}, state) do {:stop, "tcp transport error #{inspect(reason)}", state} end + def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do + {sid, _receiver} = Enum.find(state.receivers, fn {_sid, receiver} -> receiver.recipient == pid end) + state = unsub_sid(sid, [], state) + + {:noreply, state} + end def handle_info(other, state) do Logger.error "#{__MODULE__} received unexpected message: #{inspect other}" {:noreply, state} @@ -437,7 +443,8 @@ defmodule Gnat do def handle_call({:sub, receiver, topic, opts}, _from, %{next_sid: sid}=state) do sub = Command.build(:sub, topic, sid, opts) :ok = socket_write(state, sub) - next_state = add_subscription_to_state(state, sid, receiver) |> Map.put(:next_sid, sid + 1) + ref = Process.monitor(receiver) + next_state = add_subscription_to_state(state, sid, receiver, ref) |> Map.put(:next_sid, sid + 1) {:reply, {:ok, sid}, next_state} end def handle_call({:pub, topic, message, opts}, from, state) do @@ -471,15 +478,9 @@ defmodule Gnat do {:reply, :ok, state} end end - def handle_call({:unsub, sid, opts}, _from, %{receivers: receivers}=state) do - case Map.has_key?(receivers, sid) do - false -> {:reply, :ok, state} - true -> - command = Command.build(:unsub, sid, opts) - :ok = socket_write(state, command) - state = cleanup_subscription_from_state(state, sid, opts) - {:reply, :ok, state} - end + def handle_call({:unsub, sid, opts}, _from, state) do + state = unsub_sid(sid, opts, state) + {:reply, :ok, state} end def handle_call({:ping, pinger}, _from, state) do :ok = socket_write(state, "PING\r\n") @@ -493,12 +494,25 @@ defmodule Gnat do {:reply, state.server_info, state} end + defp unsub_sid(sid, opts, state) do + case Map.get(state.receivers, sid) do + nil -> + state + %{monitor_ref: ref} -> + command = Command.build(:unsub, sid, opts) + :ok = socket_write(state, command) + Process.demonitor(ref) + state = cleanup_subscription_from_state(state, sid, opts) + state + end + end + defp create_request_subscription(%{request_inbox_prefix: request_inbox_prefix}=state) do # Example: "_INBOX.Jhf7AcTGP3x4dAV9.*" wildcard_inbox_topic = request_inbox_prefix <> "*" sub = Command.build(:sub, wildcard_inbox_topic, @request_sid, []) :ok = socket_write(state, [sub]) - add_subscription_to_state(state, @request_sid, self()) + add_subscription_to_state(state, @request_sid, self(), nil) end defp make_new_inbox(%{request_inbox_prefix: prefix}), do: prefix <> nuid() @@ -522,8 +536,8 @@ defmodule Gnat do end defp socket_write(%{socket: socket}, iodata), do: :gen_tcp.send(socket, iodata) - defp add_subscription_to_state(%{receivers: receivers}=state, sid, pid) do - receivers = Map.put(receivers, sid, %{recipient: pid, unsub_after: :infinity}) + defp add_subscription_to_state(%{receivers: receivers}=state, sid, pid, ref) do + receivers = Map.put(receivers, sid, %{recipient: pid, monitor_ref: ref, unsub_after: :infinity}) %{state | receivers: receivers} end diff --git a/test/gnat_test.exs b/test/gnat_test.exs index 00242d2..e287d18 100644 --- a/test/gnat_test.exs +++ b/test/gnat_test.exs @@ -188,6 +188,33 @@ defmodule GnatTest do end end + test "subscription is cleaned up when the subscribing process dies" do + topic = "testcleanup" + test_pid = self() + {:ok, pid} = Gnat.start_link() + + # one subscription created at boot + assert {:ok, 1} = Gnat.active_subscriptions(pid) + + %Task{pid: task_pid}= Task.async(fn -> + Gnat.sub(pid, self(), topic) + assert {:ok, 2} = Gnat.active_subscriptions(pid) + send(test_pid, "subscribed") + + receive do + :done -> :ok + end + end) + + assert_receive "subscribed", 1_000 + Gnat.server_info(pid) + Process.monitor(task_pid) + send(task_pid, :done) + assert_receive {:DOWN, _ref, :process, ^task_pid, _reason}, 1_000 + + assert {:ok, 1} = Gnat.active_subscriptions(pid) + end + test "request-reply convenience function" do topic = "req-resp" {:ok, pid} = Gnat.start_link()