diff --git a/README.md b/README.md index 8081bb5..78720f4 100644 --- a/README.md +++ b/README.md @@ -26,3 +26,4 @@ end - [ ] Handle connection errors - [ ] Handle session errors - [ ] Backoff strategy to re-connect in case of an error +- [ ] Handle body type: `[#'v1_0.amqp_sequence'{}]` specified in this [doc](https://hexdocs.pm/amqp10_client/amqp10_msg.html#body-1) and this [PR](https://github.com/highmobility/off_broadway_amqp10/pull/114). diff --git a/lib/off_broadway_amqp10/amqp_10_client/client_impl.ex b/lib/off_broadway_amqp10/amqp_10_client/client_impl.ex index d9d9ae3..7427d52 100644 --- a/lib/off_broadway_amqp10/amqp_10_client/client_impl.ex +++ b/lib/off_broadway_amqp10/amqp_10_client/client_impl.ex @@ -3,8 +3,27 @@ defmodule OffBroadwayAmqp10.Amqp10.Client.Impl do AMQP Client Wrapper """ - alias OffBroadwayAmqp10.Amqp10.State - alias OffBroadwayAmqp10.Amqp10.Client + alias OffBroadwayAmqp10.Amqp10.{Client, State} + + require Record + + @hrl_path Path.join(:code.lib_dir(:amqp10_common), "include/amqp10_framing.hrl") + + @amqp_value_record_tag :"v1_0.amqp_value" + + Record.defrecord( + :amqp_value, + @amqp_value_record_tag, + Record.extract(@amqp_value_record_tag, from: @hrl_path) + ) + + @amqp_sequence_record_tag :"v1_0.amqp_sequence" + + Record.defrecord( + :amqp_sequence, + @amqp_sequence_record_tag, + Record.extract(@amqp_sequence_record_tag, from: @hrl_path) + ) @behaviour Client @@ -35,8 +54,19 @@ defmodule OffBroadwayAmqp10.Amqp10.Client.Impl do @impl Client def body(raw_msg) do - [payload] = :amqp10_msg.body(raw_msg) - payload + body = :amqp10_msg.body(raw_msg) + + cond do + Record.is_record(body, @amqp_value_record_tag) -> + packed_content = amqp_value(body, :content) + :amqp10_client_types.unpack(packed_content) + + match?([value] when is_binary(value), body) -> + List.first(body) + + is_list(body) && Record.is_record(List.first(body), @amqp_sequence_record_tag) -> + raise "Not implemented - AMQP 1.0 Sequence is not implemented - Please create an issue with a sample message if you need it" + end end @impl Client diff --git a/test/off_broadway_amqp10/amqp10_client/client_impl_test.exs b/test/off_broadway_amqp10/amqp10_client/client_impl_test.exs index 596070e..567bc67 100644 --- a/test/off_broadway_amqp10/amqp10_client/client_impl_test.exs +++ b/test/off_broadway_amqp10/amqp10_client/client_impl_test.exs @@ -4,9 +4,19 @@ defmodule OffBroadwayAmqp10.Amqp10.Client.ImplTest do alias OffBroadwayAmqp10.Amqp10.Client.Impl, as: SUT describe "body/1" do - test "extracts body" do + test "binary list: extracts body" do assert SUT.body(Fixture.raw_msg()) == "Itachi" end + + test "v1_0.amqp_value: extracts body" do + assert SUT.body(Fixture.raw_msg_amqp10_value()) == "Itachi" + end + + test "v1_0.amqp_sequence list: raises" do + assert_raise RuntimeError, ~r/Not implemented - AMQP 1.0 Sequence is not implemented/, fn -> + SUT.body(Fixture.raw_msg_amqp10_sequence()) + end + end end describe "headers/1" do diff --git a/test/test_helper.exs b/test/test_helper.exs index ca6be10..98773e8 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,6 +1,8 @@ ExUnit.start() defmodule Fixture do + require OffBroadwayAmqp10.Amqp10.Client.Impl + def configuration(opts \\ []) do connection_opts = [ hostname: "localhost", @@ -37,8 +39,8 @@ defmodule Fixture do OffBroadwayAmqp10.Producer.State.new(configuration(opts)) end - def raw_msg do - raw_msg = :amqp10_msg.new("delivery_tag", "Itachi") + def raw_msg(value \\ "Itachi") do + raw_msg = :amqp10_msg.new("delivery_tag", value) raw_msg = :amqp10_msg.set_headers(%{:durable => false, priority: 4}, raw_msg) raw_msg = @@ -65,6 +67,22 @@ defmodule Fixture do raw_msg ) end + + def raw_msg_amqp10_value do + value = :amqp10_client_types.utf8("Itachi") + record = OffBroadwayAmqp10.Amqp10.Client.Impl.amqp_value(content: value) + raw_msg(record) + end + + def raw_msg_amqp10_sequence do + values = [ + :amqp10_client_types.utf8("Itachi"), + :amqp10_client_types.utf8("Shisui") + ] + + records = [OffBroadwayAmqp10.Amqp10.Client.Impl.amqp_sequence(content: values)] + raw_msg(records) + end end defmodule FakeAmqpClient do