diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c19bdc..a0f25ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +--- + +## [6.0.3] - 2024-05-28 + +### Fixed + +- ([#190](https://github.com/primait/amqpx/pull/191)) Suppress noisy error logs at GenServer shutdown. +- ([#191](https://github.com/primait/amqpx/pull/190)) GenServer now trap exit and gracefully shutdown instead of force + the process to exit. + + --- ## [6.0.2] - 2023-03-24 @@ -37,7 +48,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - ([#129](https://github.com/primait/amqpx/pull/)) Default binding for DLX queues instead of wildcard -[Unreleased]: https://github.com/primait/amqpx/compare/6.0.2...HEAD +[Unreleased]: https://github.com/primait/amqpx/compare/6.0.3...HEAD +[6.0.3]: https://github.com/primait/amqpx/compare/6.0.2...6.0.3 [6.0.2]: https://github.com/primait/amqpx/compare/6.0.1...6.0.2 [6.0.1]: https://github.com/primait/amqpx/compare/6.0.0...6.0.1 [6.0.0]: https://github.com/primait/amqpx/releases/tag/6.0.0 diff --git a/mix.exs b/mix.exs index 7f964e3..12f4739 100644 --- a/mix.exs +++ b/mix.exs @@ -5,7 +5,7 @@ defmodule Amqpx.MixProject do [ app: :amqpx, name: "amqpx", - version: "6.0.2-rc.0", + version: "6.0.3", elixir: "~> 1.7", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :production, diff --git a/test/gen_test.exs b/test/gen_test.exs index 85c0872..fe81ee7 100644 --- a/test/gen_test.exs +++ b/test/gen_test.exs @@ -15,93 +15,13 @@ defmodule Amqpx.Test.AmqpxTest do import Mock @moduletag capture_log: true - - setup_all do - start_supervised!(%{ - id: :amqp_connection, - start: - {Amqpx.Gen.ConnectionManager, :start_link, - [%{connection_params: Application.fetch_env!(:amqpx, :amqp_connection)}]} - }) - - start_supervised!(%{ - id: :amqp_connection_two, - start: - {Amqpx.Gen.ConnectionManager, :start_link, - [%{connection_params: Application.fetch_env!(:amqpx, :amqp_connection_two)}]} - }) - - start_supervised!(%{ - id: :producer, - start: {Amqpx.Gen.Producer, :start_link, [Application.fetch_env!(:amqpx, :producer)]} - }) - - start_supervised!(%{ - id: :producer2, - start: {Amqpx.Gen.Producer, :start_link, [Application.fetch_env!(:amqpx, :producer2)]} - }) - - start_supervised!(%{ - id: :producer_connection_two, - start: {Amqpx.Gen.Producer, :start_link, [Application.fetch_env!(:amqpx, :producer_connection_two)]} - }) - - start_supervised!(%{ - id: :producer_with_retry_on_publish_error, - start: {Amqpx.Gen.Producer, :start_link, [Application.fetch_env!(:amqpx, :producer_with_retry_on_publish_error)]} - }) - - start_supervised!(%{ - id: :producer_with_retry_on_publish_rejected, - start: - {Amqpx.Gen.Producer, :start_link, [Application.fetch_env!(:amqpx, :producer_with_retry_on_publish_rejected)]} - }) - - start_supervised!(%{ - id: :producer_with_retry_on_confirm_delivery_timeout, - start: - {Amqpx.Gen.Producer, :start_link, - [Application.fetch_env!(:amqpx, :producer_with_retry_on_confirm_delivery_timeout)]} - }) - - start_supervised!(%{ - id: :producer_with_retry_on_confirm_delivery_timeout_and_on_publish_error, - start: - {Amqpx.Gen.Producer, :start_link, - [ - Application.fetch_env!( - :amqpx, - :producer_with_retry_on_confirm_delivery_timeout_and_on_publish_error - ) - ]} - }) - - start_supervised!(%{ - id: :producer_with_jittered_backoff, - start: - {Amqpx.Gen.Producer, :start_link, - [ - Application.fetch_env!( - :amqpx, - :producer_with_jittered_backoff - ) - ]} - }) - - Application.fetch_env!(:amqpx, :consumers) - |> Enum.with_index() - |> Enum.each(fn {opts, id} -> - start_supervised!(%{ - id: :"consumer_#{id}", - start: {Amqpx.Gen.Consumer, :start_link, [opts]} - }) - end) - - :timer.sleep(1_000) - :ok - end + @start_supervised_timeout 20 test "e2e: should publish message and consume it" do + start_connection1!() + start_consumer_by_name!(Consumer1) + start_producer!(:producer) + payload = %{test: 1} with_mock(Consumer1, handle_message: fn _, _, s -> {:ok, s} end) do @@ -112,6 +32,13 @@ defmodule Amqpx.Test.AmqpxTest do end test "e2e: should publish message and trigger the right consumer" do + start_connection1!() + start_connection2!() + start_consumer_by_name!(Consumer1) + start_consumer_by_name!(Consumer2) + start_producer!(:producer) + start_producer!(:producer2) + payload = %{test: 1} payload2 = %{test: 2} @@ -131,12 +58,21 @@ defmodule Amqpx.Test.AmqpxTest do end test "e2e: try to publish to an exchange defined in producer conf" do + start_connection1!() + start_producer!(:producer) payload = %{test: 1} assert Producer3.send_payload(payload) === :ok end test "e2e: publish messages using more than one registered producer" do + start_connection1!() + start_connection2!() + start_consumer_by_name!(Consumer1) + start_consumer_by_name!(Consumer2) + start_producer!(:producer) + start_producer!(:producer2) + test_pid = self() with_mock(Consumer1, @@ -166,6 +102,10 @@ defmodule Amqpx.Test.AmqpxTest do end test "e2e: should handle message rejected when handle message fails" do + start_connection1!() + start_consumer_by_name!(HandleRejectionConsumer) + start_producer!(:producer) + test_pid = self() with_mock(HandleRejectionConsumer, @@ -195,6 +135,10 @@ defmodule Amqpx.Test.AmqpxTest do end test "e2e: messages should not be re-enqueued when re-enqueue option is disabled" do + start_connection1!() + start_consumer_by_name!(NoRequeueConsumer) + start_producer!(:producer) + test_pid = self() error_message = "test_error" @@ -221,6 +165,10 @@ defmodule Amqpx.Test.AmqpxTest do end test "e2e: handle_message_reject should be called upon first time when re-enqueue option is disabled" do + start_connection1!() + start_consumer_by_name!(NoRequeueConsumer) + start_producer!(:producer) + test_pid = self() error_message = "test-error-requeue" @@ -259,6 +207,13 @@ defmodule Amqpx.Test.AmqpxTest do end test "e2e: should publish and consume from the right connections" do + start_connection1!() + start_connection2!() + start_consumer_by_name!(Consumer1) + start_consumer_by_name!(ConsumerConnectionTwo) + start_producer!(:producer) + start_producer!(:producer_connection_two) + payload = %{test: 1} with_mocks [ @@ -276,6 +231,8 @@ defmodule Amqpx.Test.AmqpxTest do describe "when publish retry configurations are enabled" do test "should retry publish in case of publish error" do + start_connection1!() + start_producer!(:producer_with_retry_on_publish_error) payload = %{test: 1} with_mock(Amqpx.Basic, @@ -296,6 +253,8 @@ defmodule Amqpx.Test.AmqpxTest do end test "should not retry publish in case of publish error if on_publish_error retry_policy is not set" do + start_connection1!() + start_producer!(:producer_with_retry_on_publish_rejected) payload = %{test: 1} with_mock(Amqpx.Basic, @@ -309,6 +268,8 @@ defmodule Amqpx.Test.AmqpxTest do end test "should retry publish in case of publish rejection" do + start_connection1!() + start_producer!(:producer_with_retry_on_publish_rejected) payload = %{test: 1} with_mock(Amqpx.Confirm, @@ -329,6 +290,8 @@ defmodule Amqpx.Test.AmqpxTest do end test "should not retry publish in case of publish rejection if on_publish_rejected retry_policy is not set" do + start_connection1!() + start_producer!(:producer_with_retry_on_publish_error) payload = %{test: 1} with_mock(Amqpx.Confirm, @@ -342,6 +305,8 @@ defmodule Amqpx.Test.AmqpxTest do end test "should retry publish in case of confirm delivery timeout" do + start_connection1!() + start_producer!(:producer_with_retry_on_confirm_delivery_timeout) payload = %{test: 1} with_mocks([ @@ -378,6 +343,8 @@ defmodule Amqpx.Test.AmqpxTest do end test "should retry publish in case of confirm delivery timeout and on publish error" do + start_connection1!() + start_producer!(:producer_with_retry_on_confirm_delivery_timeout_and_on_publish_error) payload = %{test: 1} with_mocks([ @@ -439,6 +406,8 @@ defmodule Amqpx.Test.AmqpxTest do end test "test retry configurations" do + start_connection1!() + start_producer!(:producer_with_retry_on_publish_error) payload = %{test: 1} with_mock(Amqpx.Basic, @@ -464,6 +433,8 @@ defmodule Amqpx.Test.AmqpxTest do describe "when publish retry configurations are not enabled" do test "should not retry publish in case of error" do + start_connection1!() + start_producer!(:producer) payload = %{test: 1} with_mock(Amqpx.Basic, @@ -520,6 +491,8 @@ defmodule Amqpx.Test.AmqpxTest do describe "jittered backoff validation" do test "should be invoked the configured number of times" do + start_connection1!() + start_producer!(:producer_with_jittered_backoff) payload = %{test: 1} with_mocks([ @@ -550,6 +523,8 @@ defmodule Amqpx.Test.AmqpxTest do end test "the consumer should stop gracefully" do + start_connection1!() + tmp_ex = %{name: "tmp_ex", type: :topic, routing_keys: ["amqpx.tmp_ex"], opts: [durable: true]} consumer_config = %{ @@ -592,4 +567,53 @@ defmodule Amqpx.Test.AmqpxTest do assert pid_2 in [consumer_pid, channel_pid] end end + + defp start_connection1!() do + start_supervised!(%{ + id: :amqp_connection, + start: + {Amqpx.Gen.ConnectionManager, :start_link, + [%{connection_params: Application.fetch_env!(:amqpx, :amqp_connection)}]} + }) + + :timer.sleep(@start_supervised_timeout) + end + + defp start_connection2!() do + start_supervised!(%{ + id: :amqp_connection_two, + start: + {Amqpx.Gen.ConnectionManager, :start_link, + [%{connection_params: Application.fetch_env!(:amqpx, :amqp_connection_two)}]} + }) + + :timer.sleep(@start_supervised_timeout) + end + + defp start_producer!(name) when is_atom(name) do + start_supervised!(%{ + id: name, + start: {Amqpx.Gen.Producer, :start_link, [Application.fetch_env!(:amqpx, name)]} + }) + + :timer.sleep(@start_supervised_timeout) + end + + defp start_consumer_by_name!(name) when is_atom(name) do + opts = + :amqpx + |> Application.fetch_env!(:consumers) + |> Enum.find(&(&1.handler_module == name)) + + if is_nil(opts) do + raise "Consumer #{name} not found" + end + + start_supervised!(%{ + id: name, + start: {Amqpx.Gen.Consumer, :start_link, [opts]} + }) + + :timer.sleep(@start_supervised_timeout) + end end