Skip to content

Commit

Permalink
v6.0.3 (#194)
Browse files Browse the repository at this point in the history
* v6.0.3

* Fix flaky tests

* Rename start_producer fn
  • Loading branch information
cottinisimone authored May 31, 2024
1 parent 719581c commit 027ecd5
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 87 deletions.
14 changes: 13 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

---

## [6.0.3] - 2024-05-28

### Fixed

- ([#190](https://github.com/primait/amqpx/pull/191)) Suppress noisy error logs at GenServer shutdown.
- ([#191](https://github.com/primait/amqpx/pull/190)) GenServer now trap exit and gracefully shutdown instead of force
the process to exit.


---

## [6.0.2] - 2023-03-24
Expand Down Expand Up @@ -37,7 +48,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- ([#129](https://github.com/primait/amqpx/pull/)) Default binding for DLX queues instead of wildcard

[Unreleased]: https://github.com/primait/amqpx/compare/6.0.2...HEAD
[Unreleased]: https://github.com/primait/amqpx/compare/6.0.3...HEAD
[6.0.3]: https://github.com/primait/amqpx/compare/6.0.2...6.0.3
[6.0.2]: https://github.com/primait/amqpx/compare/6.0.1...6.0.2
[6.0.1]: https://github.com/primait/amqpx/compare/6.0.0...6.0.1
[6.0.0]: https://github.com/primait/amqpx/releases/tag/6.0.0
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Amqpx.MixProject do
[
app: :amqpx,
name: "amqpx",
version: "6.0.2-rc.0",
version: "6.0.3",
elixir: "~> 1.7",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :production,
Expand Down
194 changes: 109 additions & 85 deletions test/gen_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,93 +15,13 @@ defmodule Amqpx.Test.AmqpxTest do
import Mock

@moduletag capture_log: true

setup_all do
start_supervised!(%{
id: :amqp_connection,
start:
{Amqpx.Gen.ConnectionManager, :start_link,
[%{connection_params: Application.fetch_env!(:amqpx, :amqp_connection)}]}
})

start_supervised!(%{
id: :amqp_connection_two,
start:
{Amqpx.Gen.ConnectionManager, :start_link,
[%{connection_params: Application.fetch_env!(:amqpx, :amqp_connection_two)}]}
})

start_supervised!(%{
id: :producer,
start: {Amqpx.Gen.Producer, :start_link, [Application.fetch_env!(:amqpx, :producer)]}
})

start_supervised!(%{
id: :producer2,
start: {Amqpx.Gen.Producer, :start_link, [Application.fetch_env!(:amqpx, :producer2)]}
})

start_supervised!(%{
id: :producer_connection_two,
start: {Amqpx.Gen.Producer, :start_link, [Application.fetch_env!(:amqpx, :producer_connection_two)]}
})

start_supervised!(%{
id: :producer_with_retry_on_publish_error,
start: {Amqpx.Gen.Producer, :start_link, [Application.fetch_env!(:amqpx, :producer_with_retry_on_publish_error)]}
})

start_supervised!(%{
id: :producer_with_retry_on_publish_rejected,
start:
{Amqpx.Gen.Producer, :start_link, [Application.fetch_env!(:amqpx, :producer_with_retry_on_publish_rejected)]}
})

start_supervised!(%{
id: :producer_with_retry_on_confirm_delivery_timeout,
start:
{Amqpx.Gen.Producer, :start_link,
[Application.fetch_env!(:amqpx, :producer_with_retry_on_confirm_delivery_timeout)]}
})

start_supervised!(%{
id: :producer_with_retry_on_confirm_delivery_timeout_and_on_publish_error,
start:
{Amqpx.Gen.Producer, :start_link,
[
Application.fetch_env!(
:amqpx,
:producer_with_retry_on_confirm_delivery_timeout_and_on_publish_error
)
]}
})

start_supervised!(%{
id: :producer_with_jittered_backoff,
start:
{Amqpx.Gen.Producer, :start_link,
[
Application.fetch_env!(
:amqpx,
:producer_with_jittered_backoff
)
]}
})

Application.fetch_env!(:amqpx, :consumers)
|> Enum.with_index()
|> Enum.each(fn {opts, id} ->
start_supervised!(%{
id: :"consumer_#{id}",
start: {Amqpx.Gen.Consumer, :start_link, [opts]}
})
end)

:timer.sleep(1_000)
:ok
end
@start_supervised_timeout 20

test "e2e: should publish message and consume it" do
start_connection1!()
start_consumer_by_name!(Consumer1)
start_producer!(:producer)

payload = %{test: 1}

with_mock(Consumer1, handle_message: fn _, _, s -> {:ok, s} end) do
Expand All @@ -112,6 +32,13 @@ defmodule Amqpx.Test.AmqpxTest do
end

test "e2e: should publish message and trigger the right consumer" do
start_connection1!()
start_connection2!()
start_consumer_by_name!(Consumer1)
start_consumer_by_name!(Consumer2)
start_producer!(:producer)
start_producer!(:producer2)

payload = %{test: 1}
payload2 = %{test: 2}

Expand All @@ -131,12 +58,21 @@ defmodule Amqpx.Test.AmqpxTest do
end

test "e2e: try to publish to an exchange defined in producer conf" do
start_connection1!()
start_producer!(:producer)
payload = %{test: 1}

assert Producer3.send_payload(payload) === :ok
end

test "e2e: publish messages using more than one registered producer" do
start_connection1!()
start_connection2!()
start_consumer_by_name!(Consumer1)
start_consumer_by_name!(Consumer2)
start_producer!(:producer)
start_producer!(:producer2)

test_pid = self()

with_mock(Consumer1,
Expand Down Expand Up @@ -166,6 +102,10 @@ defmodule Amqpx.Test.AmqpxTest do
end

test "e2e: should handle message rejected when handle message fails" do
start_connection1!()
start_consumer_by_name!(HandleRejectionConsumer)
start_producer!(:producer)

test_pid = self()

with_mock(HandleRejectionConsumer,
Expand Down Expand Up @@ -195,6 +135,10 @@ defmodule Amqpx.Test.AmqpxTest do
end

test "e2e: messages should not be re-enqueued when re-enqueue option is disabled" do
start_connection1!()
start_consumer_by_name!(NoRequeueConsumer)
start_producer!(:producer)

test_pid = self()
error_message = "test_error"

Expand All @@ -221,6 +165,10 @@ defmodule Amqpx.Test.AmqpxTest do
end

test "e2e: handle_message_reject should be called upon first time when re-enqueue option is disabled" do
start_connection1!()
start_consumer_by_name!(NoRequeueConsumer)
start_producer!(:producer)

test_pid = self()
error_message = "test-error-requeue"

Expand Down Expand Up @@ -259,6 +207,13 @@ defmodule Amqpx.Test.AmqpxTest do
end

test "e2e: should publish and consume from the right connections" do
start_connection1!()
start_connection2!()
start_consumer_by_name!(Consumer1)
start_consumer_by_name!(ConsumerConnectionTwo)
start_producer!(:producer)
start_producer!(:producer_connection_two)

payload = %{test: 1}

with_mocks [
Expand All @@ -276,6 +231,8 @@ defmodule Amqpx.Test.AmqpxTest do

describe "when publish retry configurations are enabled" do
test "should retry publish in case of publish error" do
start_connection1!()
start_producer!(:producer_with_retry_on_publish_error)
payload = %{test: 1}

with_mock(Amqpx.Basic,
Expand All @@ -296,6 +253,8 @@ defmodule Amqpx.Test.AmqpxTest do
end

test "should not retry publish in case of publish error if on_publish_error retry_policy is not set" do
start_connection1!()
start_producer!(:producer_with_retry_on_publish_rejected)
payload = %{test: 1}

with_mock(Amqpx.Basic,
Expand All @@ -309,6 +268,8 @@ defmodule Amqpx.Test.AmqpxTest do
end

test "should retry publish in case of publish rejection" do
start_connection1!()
start_producer!(:producer_with_retry_on_publish_rejected)
payload = %{test: 1}

with_mock(Amqpx.Confirm,
Expand All @@ -329,6 +290,8 @@ defmodule Amqpx.Test.AmqpxTest do
end

test "should not retry publish in case of publish rejection if on_publish_rejected retry_policy is not set" do
start_connection1!()
start_producer!(:producer_with_retry_on_publish_error)
payload = %{test: 1}

with_mock(Amqpx.Confirm,
Expand All @@ -342,6 +305,8 @@ defmodule Amqpx.Test.AmqpxTest do
end

test "should retry publish in case of confirm delivery timeout" do
start_connection1!()
start_producer!(:producer_with_retry_on_confirm_delivery_timeout)
payload = %{test: 1}

with_mocks([
Expand Down Expand Up @@ -378,6 +343,8 @@ defmodule Amqpx.Test.AmqpxTest do
end

test "should retry publish in case of confirm delivery timeout and on publish error" do
start_connection1!()
start_producer!(:producer_with_retry_on_confirm_delivery_timeout_and_on_publish_error)
payload = %{test: 1}

with_mocks([
Expand Down Expand Up @@ -439,6 +406,8 @@ defmodule Amqpx.Test.AmqpxTest do
end

test "test retry configurations" do
start_connection1!()
start_producer!(:producer_with_retry_on_publish_error)
payload = %{test: 1}

with_mock(Amqpx.Basic,
Expand All @@ -464,6 +433,8 @@ defmodule Amqpx.Test.AmqpxTest do

describe "when publish retry configurations are not enabled" do
test "should not retry publish in case of error" do
start_connection1!()
start_producer!(:producer)
payload = %{test: 1}

with_mock(Amqpx.Basic,
Expand Down Expand Up @@ -520,6 +491,8 @@ defmodule Amqpx.Test.AmqpxTest do

describe "jittered backoff validation" do
test "should be invoked the configured number of times" do
start_connection1!()
start_producer!(:producer_with_jittered_backoff)
payload = %{test: 1}

with_mocks([
Expand Down Expand Up @@ -550,6 +523,8 @@ defmodule Amqpx.Test.AmqpxTest do
end

test "the consumer should stop gracefully" do
start_connection1!()

tmp_ex = %{name: "tmp_ex", type: :topic, routing_keys: ["amqpx.tmp_ex"], opts: [durable: true]}

consumer_config = %{
Expand Down Expand Up @@ -592,4 +567,53 @@ defmodule Amqpx.Test.AmqpxTest do
assert pid_2 in [consumer_pid, channel_pid]
end
end

defp start_connection1!() do
start_supervised!(%{
id: :amqp_connection,
start:
{Amqpx.Gen.ConnectionManager, :start_link,
[%{connection_params: Application.fetch_env!(:amqpx, :amqp_connection)}]}
})

:timer.sleep(@start_supervised_timeout)
end

defp start_connection2!() do
start_supervised!(%{
id: :amqp_connection_two,
start:
{Amqpx.Gen.ConnectionManager, :start_link,
[%{connection_params: Application.fetch_env!(:amqpx, :amqp_connection_two)}]}
})

:timer.sleep(@start_supervised_timeout)
end

defp start_producer!(name) when is_atom(name) do
start_supervised!(%{
id: name,
start: {Amqpx.Gen.Producer, :start_link, [Application.fetch_env!(:amqpx, name)]}
})

:timer.sleep(@start_supervised_timeout)
end

defp start_consumer_by_name!(name) when is_atom(name) do
opts =
:amqpx
|> Application.fetch_env!(:consumers)
|> Enum.find(&(&1.handler_module == name))

if is_nil(opts) do
raise "Consumer #{name} not found"
end

start_supervised!(%{
id: name,
start: {Amqpx.Gen.Consumer, :start_link, [opts]}
})

:timer.sleep(@start_supervised_timeout)
end
end

0 comments on commit 027ecd5

Please sign in to comment.