Skip to content

Commit

Permalink
Default binding for DLX queues instead of wildcard (#129)
Browse files Browse the repository at this point in the history
* Default binding for DLX queues instead of wildcard

Since rabbitMQ by default sends messages in DLX with the original routing key, the default binding for the `*_errored` queue should be the same as the original queue.

```elixir
%{
  exchanges: [
    %{
      name: "exchange_name",
      routing_keys: ["example_key"],
      type: :topic
    }
  ],
  opts: [
    durable: true,
    arguments: [
      {"x-dead-letter-exchange", :longstr, "errors_exchange"}
    ]
  ],
  queue: "queue_name"
}
```
**before:**
```
TO queue_name_errored
ROUTING KEY #
```
**after:**
```
TO queue_name_errored
ROUTING KEY example_key
```

Of course things could be very complex if the original queue has more than one exchange etc, but then a DLX may not be feasible at all 🤷🏻

* restrict empty dead letter exchange configuration

* scarsezza

* Fix entrypoint

* Fix format

* Bump major since this is a change in the behaviour of the APIs

* Add some tests

* Add missing test

* Fix match

* Fix test

* Update test/helper_test.exs

Co-authored-by: Daniele Bartocci <[email protected]>

* Fix behaviour

* Fix behaviour

* My bad

* It's the final fix

* io non credo

* Collapse into one error

* Update test/helper_test.exs

Co-authored-by: Cristiano Piemontese <[email protected]>

* Update test/helper_test.exs

Co-authored-by: Cristiano Piemontese <[email protected]>

* Fix test description

* Better error message

* Update lib/amqp/helper.ex

Co-authored-by: Cristiano Piemontese <[email protected]>

* Fix test description

Co-authored-by: Enrico Galassi <[email protected]>
Co-authored-by: Cristiano Piemontese <[email protected]>
  • Loading branch information
3 people authored Dec 21, 2022
1 parent b286282 commit 4ca8613
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 9 deletions.
8 changes: 7 additions & 1 deletion entrypoint
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

if [ "$1" == "mix" ]; then
exec "$@"
else [ -n "$1" ];
elif [ -n "$1" ]; then
sh -c "$@"
else
mix deps.get
mix ecto.setup
mix phx.server

trap : TERM INT; sleep infinity & wait
fi
31 changes: 24 additions & 7 deletions lib/amqp/helper.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,13 @@ defmodule Amqpx.Helper do
channel,
%{
queue: qname,
opts: opts
opts: opts,
exchanges: exchanges
} = queue
) do
case Enum.find(opts[:arguments], &match?({"x-dead-letter-exchange", _, _}, &1)) do
case Enum.find(opts[:arguments], &match?({"x-dead-letter-exchange", :longstr, _}, &1)) do
{_, _, dle} ->
case Enum.find(opts[:arguments], &match?({"x-dead-letter-routing-key", _, _}, &1)) do
case Enum.find(opts[:arguments], &match?({"x-dead-letter-routing-key", :longstr, _}, &1)) do
{_, _, dlrk} ->
setup_dead_lettering(channel, %{
queue: "#{qname}_errored",
Expand All @@ -65,7 +66,11 @@ defmodule Amqpx.Helper do
})

nil ->
setup_dead_lettering(channel, %{queue: "#{qname}_errored", exchange: dle})
setup_dead_lettering(channel, %{
queue: "#{qname}_errored",
exchange: dle,
original_routing_keys: Enum.map(exchanges, & &1.routing_keys)
})
end

nil ->
Expand All @@ -79,20 +84,32 @@ defmodule Amqpx.Helper do
setup_queue(channel, queue)
end

def setup_dead_lettering(channel, %{queue: dlq, exchange: ""}) do
def setup_dead_lettering(channel, %{queue: dlq, exchange: "", routing_key: dlq}) do
# DLX will work through [default exchange](https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchange-default)
# since `x-dead-letter-routing-key` matches the queue name
Queue.declare(channel, dlq, durable: true)
end

def setup_dead_lettering(_channel, %{queue: dlq, exchange: "", routing_key: bad_dlq}) do
raise "If x-dead-letter-exchange is an empty string, x-dead-letter-routing-key should be '#{dlq}' instead of '#{bad_dlq}'"
end

def setup_dead_lettering(channel, %{queue: dlq, exchange: exchange, routing_key: routing_key}) do
Exchange.declare(channel, exchange, :topic, durable: true)
Queue.declare(channel, dlq, durable: true)
Queue.bind(channel, dlq, exchange, routing_key: routing_key)
end

def setup_dead_lettering(channel, %{queue: dlq, exchange: exchange}) do
def setup_dead_lettering(channel, %{queue: dlq, exchange: exchange, original_routing_keys: original_routing_keys}) do
Exchange.declare(channel, exchange, :topic, durable: true)
Queue.declare(channel, dlq, durable: true)
Queue.bind(channel, dlq, exchange, routing_key: "#")

original_routing_keys
|> List.flatten()
|> Enum.uniq()
|> Enum.each(fn rk ->
:ok = Queue.bind(channel, dlq, exchange, routing_key: rk)
end)
end

def setup_queue(channel, %{
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Amqpx.MixProject do
[
app: :amqpx,
name: "amqpx",
version: "5.9.0",
version: "6.0.0",
elixir: "~> 1.7",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :production,
Expand Down
126 changes: 126 additions & 0 deletions test/helper_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
defmodule HelperTest do
use ExUnit.Case

alias Amqpx.{Channel, Connection, Queue, Exchange, Helper}

setup do
{:ok, conn} = Connection.open(Application.fetch_env!(:amqpx, :amqp_connection))
{:ok, chan} = Channel.open(conn)
on_exit(fn -> :ok = Connection.close(conn) end)
{:ok, conn: conn, chan: chan}
end

test "declare a queue with a bind to an exchange and a dead letter queue with an errored exchange", meta do
queue_name = rand_name()
routing_key_name = rand_name()
exchange_name = rand_name()

queue_name_errored = "#{queue_name}_errored"
exchange_name_errored = "#{exchange_name}_errored"

assert :ok =
Helper.declare(meta[:chan], %{
exchanges: [
%{name: exchange_name, opts: [durable: true], routing_keys: [routing_key_name], type: :topic}
],
opts: [
durable: true,
arguments: [
{"x-dead-letter-exchange", :longstr, exchange_name_errored},
{"x-dead-letter-routing-key", :longstr, routing_key_name}
]
],
queue: queue_name
})

assert :ok = Queue.unbind(meta[:chan], queue_name, exchange_name)
assert :ok = Queue.unbind(meta[:chan], queue_name_errored, exchange_name_errored)
assert :ok = Exchange.delete(meta[:chan], exchange_name)
assert :ok = Exchange.delete(meta[:chan], exchange_name_errored)
assert {:ok, %{message_count: 0}} = Queue.delete(meta[:chan], queue_name)
assert {:ok, %{message_count: 0}} = Queue.delete(meta[:chan], queue_name_errored)
end

test "configuration without an exchange and with routing key set with correct dead letter queue should not raise an error",
meta do
queue_name = rand_name()
routing_key_name = rand_name()
exchange_name = rand_name()

queue_name_errored = "#{queue_name}_errored"

assert :ok =
Helper.declare(meta[:chan], %{
exchanges: [
%{name: exchange_name, opts: [durable: true], routing_keys: [routing_key_name], type: :topic}
],
opts: [
durable: true,
arguments: [
{"x-dead-letter-exchange", :longstr, ""},
{"x-dead-letter-routing-key", :longstr, queue_name_errored}
]
],
queue: queue_name
})

assert :ok = Queue.unbind(meta[:chan], queue_name, exchange_name)
assert :ok = Exchange.delete(meta[:chan], exchange_name)
assert {:ok, %{message_count: 0}} = Queue.delete(meta[:chan], queue_name)
assert {:ok, %{message_count: 0}} = Queue.delete(meta[:chan], queue_name_errored)
end

test "bad configuration with dead letter exchange empty and routing key set should raise an error", meta do
queue_name = rand_name()
routing_key_name = rand_name()
exchange_name = rand_name()

queue_name_errored = "BadDeadLetterQueue"

assert_raise RuntimeError,
"If x-dead-letter-exchange is an empty string, x-dead-letter-routing-key should be '#{queue_name}_errored' instead of '#{queue_name_errored}'",
fn ->
Helper.declare(meta[:chan], %{
exchanges: [
%{name: exchange_name, opts: [durable: true], routing_keys: [routing_key_name], type: :topic}
],
opts: [
durable: true,
arguments: [
{"x-dead-letter-exchange", :longstr, ""},
{"x-dead-letter-routing-key", :longstr, queue_name_errored}
]
],
queue: queue_name
})
end
end

test "bad configuration with empty dead letter exchange and routing key should raise an error", meta do
queue_name = rand_name()
routing_key_name = rand_name()
exchange_name = rand_name()

assert_raise RuntimeError,
"If x-dead-letter-exchange is an empty string, x-dead-letter-routing-key should be '#{queue_name}_errored' instead of ''",
fn ->
Helper.declare(meta[:chan], %{
exchanges: [
%{name: exchange_name, opts: [durable: true], routing_keys: [routing_key_name], type: :topic}
],
opts: [
durable: true,
arguments: [
{"x-dead-letter-exchange", :longstr, ""},
{"x-dead-letter-routing-key", :longstr, ""}
]
],
queue: queue_name
})
end
end

defp rand_name do
:crypto.strong_rand_bytes(8) |> Base.encode64()
end
end

0 comments on commit 4ca8613

Please sign in to comment.