Skip to content

Commit

Permalink
Req.parse_message/2, run_finch: Handle unrelated messages
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtekmach committed Jul 18, 2024
1 parent 4b43e6d commit 9928489
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 3 deletions.
17 changes: 15 additions & 2 deletions lib/req.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1217,9 +1217,20 @@ defmodule Req do
end

@doc """
Parses asynchronous response message.
Parses asynchronous response body message.
A request with option `:into` set to `:self` returns response with asynchronous body.
In that case, Req sends chunks to the calling process as messages. You'd typically
get them using `receive/1` or [`handle_info/2`](`c:GenServer.handle_info/2`) in a GenServer.
These messages should be parsed using this function. The possible return values are:
* `{:ok, chunks}` - where a chunk can be `{:data, binary}`, `{:trailers, trailers}`, or
`:done`.
* `{:error, reason}` - an error occured
* `:unknown` - the message was not meant for this response.
An asynchronous response is a result of request with `into: :self`.
See also `Req.Response.Async`.
## Examples
Expand All @@ -1231,6 +1242,8 @@ defmodule Req do
{:ok, [data: "{\"url\": \"http://httpbin.org/stream/2\", ..., \"id\": 1}\\n"]}
iex> Req.parse_message(resp, receive do message -> message end)
{:ok, [:done]}
iex> Req.parse_message(resp, :other)
:unknown
"""
@doc type: :async
def parse_message(response, message)
Expand Down
4 changes: 4 additions & 0 deletions lib/req/finch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ defmodule Req.Finch do
{:error, reason}
end

defp parse_message(_, _) do
:unknown
end

defp cancel(ref) do
Finch.cancel_async_request(ref)
clean_responses(ref)
Expand Down
4 changes: 3 additions & 1 deletion lib/req/response_async.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ defmodule Req.Response.Async do
raise "expected to read body chunk in the process #{inspect(async.pid)} which made the request, got: #{inspect(self())}"
end

ref = async.ref

receive do
message ->
{^ref, _} = message ->
case async.stream_fun.(async.ref, message) do
{:ok, [data: data]} ->
result =
Expand Down
13 changes: 13 additions & 0 deletions test/req/finch_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ defmodule Req.FinchTest do
assert {:ok, [data: "foo"]} = Req.parse_message(resp, assert_receive(_))
assert {:ok, [data: "bar"]} = Req.parse_message(resp, assert_receive(_))
assert {:ok, [:done]} = Req.parse_message(resp, assert_receive(_))
assert :unknown = Req.parse_message(resp, :other)
refute_receive _
end

Expand Down Expand Up @@ -430,6 +431,18 @@ defmodule Req.FinchTest do

assert Req.get!(req).body |> Enum.to_list() == ["ok"]
end

test "into: :self enumerable with unrelated message" do
%{url: url} =
start_http_server(fn conn ->
Plug.Conn.send_resp(conn, 200, "ok")
end)

send(self(), :other)
resp = Req.get!(url: url, into: :self)
assert Enum.to_list(resp.body) == ["ok"]
assert_received :other
end
end

describe "pool_options" do
Expand Down

0 comments on commit 9928489

Please sign in to comment.