Skip to content

Commit

Permalink
[COART-181]: Check Gracefully termination in all our applications (#208)
Browse files Browse the repository at this point in the history
  • Loading branch information
frnmjn authored Nov 29, 2024
1 parent 32ee228 commit eb9dda4
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 8 deletions.
11 changes: 10 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

---

## [6.1.0] - 2024-11-29

### Added

- ([#208](https://github.com/primait/amqpx/pull/208)) Introduces the possibility of configuring a signal handler which can be used for graceful termination. When the SIGTERM arrive, we cancel all the consumer to stop taking new messages.

---

## [6.0.4] - 2024-09-02

### Added
Expand Down Expand Up @@ -57,7 +65,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- ([#129](https://github.com/primait/amqpx/pull/)) Default binding for DLX queues instead of wildcard

[Unreleased]: https://github.com/primait/amqpx/compare/6.0.4...HEAD
[Unreleased]: https://github.com/primait/amqpx/compare/6.1.0...HEAD
[6.1.0]: https://github.com/primait/amqpx/compare/6.0.4...6.1.0
[6.0.4]: https://github.com/primait/amqpx/compare/6.0.3...6.0.4
[6.0.3]: https://github.com/primait/amqpx/compare/6.0.2...6.0.3
[6.0.2]: https://github.com/primait/amqpx/compare/6.0.1...6.0.2
Expand Down
57 changes: 51 additions & 6 deletions lib/amqp/gen/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ defmodule Amqpx.Gen.Consumer do
prefetch_count: 50,
backoff: 5_000,
connection_name: Amqpx.Gen.ConnectionManager,
requeue_on_reject: true
requeue_on_reject: true,
cancelled?: false
]

@type state() :: %__MODULE__{}
Expand Down Expand Up @@ -86,7 +87,7 @@ defmodule Amqpx.Gen.Consumer do

# Confirmation sent by the broker to the consumer process after a Basic.cancel
def handle_info({:"basic.cancel_ok", _consumer_tag}, state) do
{:stop, :basic_cancel_ok, state}
{:stop, {:shutdown, :basic_cancel_ok}, state}
end

def handle_info(
Expand Down Expand Up @@ -188,6 +189,8 @@ defmodule Amqpx.Gen.Consumer do
def terminate(_, %__MODULE__{channel: nil}), do: nil

def terminate(reason, %__MODULE__{channel: channel}) do
Logger.info("Terminating consumer with reason #{inspect(reason)}")

case reason do
:normal -> close_channel(channel)
:shutdown -> close_channel(channel)
Expand All @@ -211,17 +214,23 @@ defmodule Amqpx.Gen.Consumer do

defp handle_message(
message,
%{delivery_tag: tag, redelivered: redelivered} = meta,
%{delivery_tag: tag, redelivered: redelivered, consumer_tag: consumer_tag} = meta,
%__MODULE__{
handler_module: handler_module,
handler_state: handler_state,
backoff: backoff,
requeue_on_reject: requeue_on_reject
} = state
) do
{:ok, handler_state} = handler_module.handle_message(message, meta, handler_state)
Basic.ack(state.channel, tag)
%{state | handler_state: handler_state}
case handle_signals(state, consumer_tag) do
{:ok, state} ->
{:ok, handler_state} = handler_module.handle_message(message, meta, handler_state)
Basic.ack(state.channel, tag)
%{state | handler_state: handler_state}

{:stop, state} ->
state
end
rescue
e in _ ->
Logger.error(Exception.format(:error, e, __STACKTRACE__))
Expand All @@ -242,4 +251,40 @@ defmodule Amqpx.Gen.Consumer do

state
end

@type signal_status :: :stopping | :draining | :running

@spec get_signal_status :: signal_status()
defp get_signal_status do
cond do
signal_handler().stopping?() -> :stopping
signal_handler().draining?() -> :draining
true -> :running
end
end

@spec handle_signals(signal_status(), state(), String.t()) :: {:ok | :stop, state()}
defp handle_signals(signal_status \\ get_signal_status(), state, consumer_tag)

# Close channel when we we need to stop.
defp handle_signals(:stopping, state, _) do
close_channel(state.channel)
{:stop, state}
end

# Continue processing prefetched messages while draining
defp handle_signals(:draining, %{cancelled?: true} = state, _), do: {:ok, state}

# Stop consuming new messages and move to cancelled state
# to continue processing prefetched messages
defp handle_signals(:draining, state, consumer_tag) do
Logger.info("Cancelling consumer #{consumer_tag}")
Basic.cancel(state.channel, consumer_tag)
{:ok, %{state | cancelled?: true}}
end

# No signals received run as normal
defp handle_signals(:running, state, _), do: {:ok, state}

defp signal_handler, do: Application.get_env(:amqpx, :signal_handler, Amqpx.NoSignalHandler)
end
16 changes: 16 additions & 0 deletions lib/amqp/no_signal_handler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule Amqpx.NoSignalHandler do
@moduledoc """
Dummy signal handler module that does not handle the graceful termination.
It always returns `false` for `draining?/0` and `stopping?/0`.
I.e. the consumer will continue without handling signals.
"""

@behaviour Amqpx.SignalHandler

@impl true
def draining?, do: false

@impl true
def stopping?, do: false
end
20 changes: 20 additions & 0 deletions lib/amqp/signal_handler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule Amqpx.SignalHandler do
@moduledoc """
Signal handler behaviour is used to catch the SIGTERM signal and gracefully stop the application.
In the context of Rabbitmq, it will:
cancel the channel when we are in draining mode to stop prefetch new messages.
close the channel when we are in stopping mode to reject all the unacked messages that we did't start to consume.
Check in Peano how to use it.
"""
@doc """
Check if the application is in draining mode.
"""
@callback draining? :: boolean

@doc """
Check if the application is in stopping mode.
"""
@callback stopping? :: boolean
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Amqpx.MixProject do
[
app: :amqpx,
name: "amqpx",
version: "6.0.4",
version: "6.1.0",
elixir: "~> 1.7",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :production,
Expand Down
35 changes: 35 additions & 0 deletions test/gen_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,41 @@ defmodule Amqpx.Test.AmqpxTest do
end
end

test "should handle stop signal by not consuming any more messages" do
start_connection1!()
start_consumer_by_name!(Consumer1)
start_producer!(:producer)

payload = %{test: 1}

with_mocks [
{Amqpx.NoSignalHandler, [], stopping?: fn -> true end},
{Consumer1, [], []}
] do
Producer1.send_payload(payload)
:timer.sleep(50)
refute called(Consumer1.handle_message(Jason.encode!(payload), :_, :_))
end
end

test "should continue to handle messages while draining but not while stopping" do
start_connection1!()
start_consumer_by_name!(Consumer1)
start_producer!(:producer)

payload = %{test: 1}

with_mocks [
{Amqpx.NoSignalHandler, [], stopping?: [in_series([], [false, true])], draining?: fn -> true end},
{Consumer1, [], [handle_message: fn _, _, state -> {:ok, state} end]}
] do
Producer1.send_payload(payload)
Producer1.send_payload(payload)
:timer.sleep(50)
assert_called_exactly(Consumer1.handle_message(Jason.encode!(payload), :_, :_), 1)
end
end

test "the consumer should stop gracefully" do
start_connection1!()

Expand Down

0 comments on commit eb9dda4

Please sign in to comment.