From dcf521f4ffe2de1b53c7770d06b60b562c2bcdbb Mon Sep 17 00:00:00 2001 From: Emilio Junior Francischetti Date: Mon, 2 Dec 2024 15:02:06 +0100 Subject: [PATCH] Reverse the logic from pull to push. (#210) --- CHANGELOG.md | 12 ++++++++- lib/amqp/gen/consumer.ex | 17 +++---------- lib/amqp/helper.ex | 11 ++++++++- lib/amqp/no_signal_handler.ex | 16 ------------ lib/amqp/signal_handler.ex | 46 ++++++++++++++++++++++++----------- mix.exs | 2 +- test/gen_test.exs | 7 ++++-- 7 files changed, 62 insertions(+), 49 deletions(-) delete mode 100644 lib/amqp/no_signal_handler.ex diff --git a/CHANGELOG.md b/CHANGELOG.md index aa74036..3103907 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,15 @@ and this project adheres to --- +## [6.1.1] - 2024-12-02 + +### Added + +- ([#208](https://github.com/primait/amqpx/pull/208)) Reverse the logic for draining. + Now the application signal handler call the Amqpx.SignalHandler to trigger the drain. + +--- + ## [6.1.0] - 2024-11-29 ### Added @@ -79,7 +88,8 @@ and this project adheres to - ([#129](https://github.com/primait/amqpx/pull/)) Default binding for DLX queues instead of wildcard -[Unreleased]: https://github.com/primait/amqpx/compare/6.1.0...HEAD +[Unreleased]: https://github.com/primait/amqpx/compare/6.1.1...HEAD +[6.1.1]: https://github.com/primait/amqpx/compare/6.1.0...6.1.1 [6.1.0]: https://github.com/primait/amqpx/compare/6.0.4...6.1.0 [6.0.4]: https://github.com/primait/amqpx/compare/6.0.3...6.0.4 [6.0.3]: https://github.com/primait/amqpx/compare/6.0.2...6.0.3 diff --git a/lib/amqp/gen/consumer.ex b/lib/amqp/gen/consumer.ex index 62e4a7c..4b9a234 100644 --- a/lib/amqp/gen/consumer.ex +++ b/lib/amqp/gen/consumer.ex @@ -5,7 +5,7 @@ defmodule Amqpx.Gen.Consumer do require Logger use GenServer import Amqpx.Core - alias Amqpx.{Basic, Channel} + alias Amqpx.{Basic, Channel, SignalHandler} defstruct [ :channel, @@ -252,19 +252,10 @@ defmodule Amqpx.Gen.Consumer do state end - @type signal_status :: :stopping | :draining | :running - - @spec get_signal_status :: signal_status() - defp get_signal_status do - cond do - signal_handler().stopping?() -> :stopping - signal_handler().draining?() -> :draining - true -> :running - end - end + @type signal_status :: :running | :draining | :stopping @spec handle_signals(signal_status(), state(), String.t()) :: {:ok | :stop, state()} - defp handle_signals(signal_status \\ get_signal_status(), state, consumer_tag) + defp handle_signals(signal_status \\ SignalHandler.get_signal_status(), state, consumer_tag) # Close channel when we we need to stop. defp handle_signals(:stopping, state, _) do @@ -285,6 +276,4 @@ defmodule Amqpx.Gen.Consumer do # No signals received run as normal defp handle_signals(:running, state, _), do: {:ok, state} - - defp signal_handler, do: Application.get_env(:amqpx, :signal_handler, Amqpx.NoSignalHandler) end diff --git a/lib/amqp/helper.ex b/lib/amqp/helper.ex index de34164..adf8592 100644 --- a/lib/amqp/helper.ex +++ b/lib/amqp/helper.ex @@ -10,7 +10,8 @@ defmodule Amqpx.Helper do end def consumers_supervisor_configuration(handlers_conf) do - Enum.map(handlers_conf, &Supervisor.child_spec({Amqpx.Gen.Consumer, &1}, id: UUID.uuid1())) + amqp_signal_handler() ++ + Enum.map(handlers_conf, &Supervisor.child_spec({Amqpx.Gen.Consumer, &1}, id: UUID.uuid1())) end def producer_supervisor_configuration(producer_conf) do @@ -179,4 +180,12 @@ defmodule Amqpx.Helper do def setup_exchange(channel, %{name: name, type: type}) do Exchange.declare(channel, name, type) end + + defp amqp_signal_handler, + do: [ + %{ + id: Amqpx.SignalHandler, + start: {Amqpx.SignalHandler, :start_link, []} + } + ] end diff --git a/lib/amqp/no_signal_handler.ex b/lib/amqp/no_signal_handler.ex deleted file mode 100644 index eaad508..0000000 --- a/lib/amqp/no_signal_handler.ex +++ /dev/null @@ -1,16 +0,0 @@ -defmodule Amqpx.NoSignalHandler do - @moduledoc """ - Dummy signal handler module that does not handle the graceful termination. - - It always returns `false` for `draining?/0` and `stopping?/0`. - I.e. the consumer will continue without handling signals. - """ - - @behaviour Amqpx.SignalHandler - - @impl true - def draining?, do: false - - @impl true - def stopping?, do: false -end diff --git a/lib/amqp/signal_handler.ex b/lib/amqp/signal_handler.ex index 2eb4ff2..2cc7110 100644 --- a/lib/amqp/signal_handler.ex +++ b/lib/amqp/signal_handler.ex @@ -1,20 +1,38 @@ defmodule Amqpx.SignalHandler do @moduledoc """ - Signal handler behaviour is used to catch the SIGTERM signal and gracefully stop the application. - In the context of Rabbitmq, it will: - cancel the channel when we are in draining mode to stop prefetch new messages. - close the channel when we are in stopping mode to reject all the unacked messages that we did't start to consume. + This module is responsible for handling signals sent to the application. + """ + use GenServer - Check in Peano how to use it. + def start_link do + GenServer.start_link(__MODULE__, [], name: __MODULE__) + end - """ - @doc """ - Check if the application is in draining mode. - """ - @callback draining? :: boolean + def init(_) do + {:ok, :running} + end - @doc """ - Check if the application is in stopping mode. - """ - @callback stopping? :: boolean + def draining do + GenServer.call(__MODULE__, :draining) + end + + def stopping do + GenServer.call(__MODULE__, :stopping) + end + + def get_signal_status do + GenServer.call(__MODULE__, :get_signal_status) + end + + def handle_call(:draining, _from, _state) do + {:reply, :ok, :draining} + end + + def handle_call(:stopping, _from, _state) do + {:reply, :ok, :stopping} + end + + def handle_call(:get_signal_status, _from, state) do + {:reply, state, state} + end end diff --git a/mix.exs b/mix.exs index 8a2e4a6..0ad897e 100644 --- a/mix.exs +++ b/mix.exs @@ -5,7 +5,7 @@ defmodule Amqpx.MixProject do [ app: :amqpx, name: "amqpx", - version: "6.1.0", + version: "6.1.1", elixir: "~> 1.14", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :production, diff --git a/test/gen_test.exs b/test/gen_test.exs index 113cbef..8043e4b 100644 --- a/test/gen_test.exs +++ b/test/gen_test.exs @@ -11,6 +11,7 @@ defmodule Amqpx.Test.AmqpxTest do alias Amqpx.Test.Support.Producer3 alias Amqpx.Test.Support.ProducerWithRetry alias Amqpx.Test.Support.ProducerConnectionTwo + alias Amqpx.SignalHandler import Mock @@ -530,7 +531,7 @@ defmodule Amqpx.Test.AmqpxTest do payload = %{test: 1} with_mocks [ - {Amqpx.NoSignalHandler, [], stopping?: fn -> true end}, + {SignalHandler, [], get_signal_status: fn -> :stopping end}, {Consumer1, [], []} ] do Producer1.send_payload(payload) @@ -547,7 +548,7 @@ defmodule Amqpx.Test.AmqpxTest do payload = %{test: 1} with_mocks [ - {Amqpx.NoSignalHandler, [], stopping?: [in_series([], [false, true])], draining?: fn -> true end}, + {SignalHandler, [], get_signal_status: [in_series([], [:draining, :stopping])]}, {Consumer1, [], [handle_message: fn _, _, state -> {:ok, state} end]} ] do Producer1.send_payload(payload) @@ -640,6 +641,8 @@ defmodule Amqpx.Test.AmqpxTest do |> Application.fetch_env!(:consumers) |> Enum.find(&(&1.handler_module == name)) + SignalHandler.start_link() + if is_nil(opts) do raise "Consumer #{name} not found" end