Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for body of type #'v1_0.amqp_value'{} #114

Merged
merged 6 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ end
- [ ] Handle connection errors
- [ ] Handle session errors
- [ ] Backoff strategy to re-connect in case of an error
- [ ] Handle body type: `[#'v1_0.amqp_sequence'{}]` specified in this [doc](https://hexdocs.pm/amqp10_client/amqp10_msg.html#body-1) and this [PR](https://github.com/highmobility/off_broadway_amqp10/pull/114).
27 changes: 23 additions & 4 deletions lib/off_broadway_amqp10/amqp_10_client/client_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,19 @@ defmodule OffBroadwayAmqp10.Amqp10.Client.Impl do
AMQP Client Wrapper
"""

alias OffBroadwayAmqp10.Amqp10.State
alias OffBroadwayAmqp10.Amqp10.Client
alias OffBroadwayAmqp10.Amqp10.{Client, State}

require Record

@amqp_value_record_tag :"v1_0.amqp_value"

Record.defrecord(
:amqp_value,
@amqp_value_record_tag,
Record.extract(@amqp_value_record_tag,
from: Path.join(:code.lib_dir(:amqp10_common), "include/amqp10_framing.hrl")
)
)

@behaviour Client

Expand Down Expand Up @@ -35,8 +46,16 @@ defmodule OffBroadwayAmqp10.Amqp10.Client.Impl do

@impl Client
def body(raw_msg) do
[payload] = :amqp10_msg.body(raw_msg)
payload
body = :amqp10_msg.body(raw_msg)

cond do
match?([_], body) ->
List.first(body)

Record.is_record(body, @amqp_value_record_tag) ->
packed_content = amqp_value(body, :content)
:amqp10_client_types.unpack(packed_content)
scudelletti marked this conversation as resolved.
Show resolved Hide resolved
end
end

@impl Client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ defmodule OffBroadwayAmqp10.Amqp10.Client.ImplTest do
alias OffBroadwayAmqp10.Amqp10.Client.Impl, as: SUT

describe "body/1" do
test "extracts body" do
test "binary list: extracts body" do
assert SUT.body(Fixture.raw_msg()) == "Itachi"
end

test "v1_0.amqp_value: extracts body" do
assert SUT.body(Fixture.raw_msg_amqp10_value()) == "Itachi"
end
end

describe "headers/1" do
Expand Down
12 changes: 10 additions & 2 deletions test/test_helper.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
ExUnit.start()

defmodule Fixture do
require OffBroadwayAmqp10.Amqp10.Client.Impl

def configuration(opts \\ []) do
connection_opts = [
hostname: "localhost",
Expand Down Expand Up @@ -37,8 +39,8 @@ defmodule Fixture do
OffBroadwayAmqp10.Producer.State.new(configuration(opts))
end

def raw_msg do
raw_msg = :amqp10_msg.new("delivery_tag", "Itachi")
def raw_msg(value \\ "Itachi") do
raw_msg = :amqp10_msg.new("delivery_tag", value)
raw_msg = :amqp10_msg.set_headers(%{:durable => false, priority: 4}, raw_msg)

raw_msg =
Expand All @@ -65,6 +67,12 @@ defmodule Fixture do
raw_msg
)
end

def raw_msg_amqp10_value do
value = :amqp10_client_types.utf8("Itachi")
record = OffBroadwayAmqp10.Amqp10.Client.Impl.amqp_value(content: value)
raw_msg(record)
end
end

defmodule FakeAmqpClient do
Expand Down