Skip to content

Commit

Permalink
Add response body streaming to process via into: self()
Browse files Browse the repository at this point in the history
Also added:

  * `Req.parse_message(resp, message)`
  * `Req.cancel_async_response(resp)`

This is a 2nd interation on response body streaming that was previously
experimental and undocumented.

The following functions:

  * `Req.async_request/2`
  * `Req.async_request!/2`
  * `Req.parse_message(req, message)`
  * `Req.cancel_async_request(req)`

were never part of the public API and they are now deprecated to ease
transition for early adopters (thank you!) and will be removed in v1.0.

Closes #304
  • Loading branch information
wojtekmach committed Mar 4, 2024
1 parent 7871683 commit 935fdee
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 50 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ write new ones.

* Request body streaming (by setting `body: enumerable`.)

* Response body streaming (by setting `into: fun | collectable`.)
* Response body streaming (by setting `into: fun | collectable | pid`.)

* Follows redirects (via [`redirect`] step.)

Expand Down Expand Up @@ -108,6 +108,8 @@ iex> resp.body
%IO.Stream{}
```

(See [`Req`] module documentation for more examples of response body streaming.)

If you are planning to make several similar requests, you can build up a request struct with
desired common options and re-use it:

Expand Down Expand Up @@ -237,7 +239,7 @@ limitations under the License.
[`Req.new/1`]: https://hexdocs.pm/req/Req.html#new/1
[`Req.get!/2`]: https://hexdocs.pm/req/Req.html#get!/2
[`Req.post!/2`]: https://hexdocs.pm/req/Req.html#post!/2
[`Req.async_request/2`]: https://hexdocs.pm/req/Req.html#async_request/2
[`Req`]: https://hexdocs.pm/req
[`Req.Request`]: https://hexdocs.pm/req/Req.Request.html
[`Req.Steps`]: https://hexdocs.pm/req/Req.Steps.html
[`Req.Test`]: https://hexdocs.pm/req/Req.Test.html
Expand Down
68 changes: 51 additions & 17 deletions lib/req.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ defmodule Req do
iex> resp.body
%IO.Stream{}
Stream response body to the current process:
iex> resp = Req.get!("http://httpbin.org/stream/2", into: self())
iex> Req.parse_message(resp, receive do message -> message end)
{:ok, [data: "{\"url\": \"http://httpbin.org/stream/2\", ..., \"id\": 0}\n"]}
iex> Req.parse_message(resp, receive do message -> message end)
{:ok, [data: "{\"url\": \"http://httpbin.org/stream/2\", ..., \"id\": 1}\n"]}
iex> Req.parse_message(resp, receive do message -> message end)
{:ok, [:done]}
""
## Header Names
The HTTP specification requires that header names should be case-insensitive.
Expand Down Expand Up @@ -228,6 +239,9 @@ defmodule Req do
* `collectable` - stream response body into a `t:Collectable.t/0`.
* `pid` - stream response body into a process mailbox. The messages should be parsed using
`Req.parse_message/2`.
Response redirect options ([`redirect`](`Req.Steps.redirect/1`) step):
* `:redirect` - if set to `false`, disables automatic response redirects. Defaults to `true`.
Expand Down Expand Up @@ -985,10 +999,12 @@ defmodule Req do
end

@doc false
@deprecated "use Req.request(into: self()) instead"
def async_request(request, options \\ []) do
Req.Request.run_request(%{new(request, options) | into: :self})
end

@deprecated "use Req.request!(into: self()) instead"
@doc false
def async_request!(request, options \\ []) do
case async_request(request, options) do
Expand All @@ -1000,33 +1016,51 @@ defmodule Req do
end
end

@doc """
Parses asynchronous response body message.
## Examples
iex> resp = Req.get!("http://httpbin.org/stream/2", into: self())
iex> Req.parse_message(resp, receive do message -> message end)
{:ok, [data: "{\"url\": \"http://httpbin.org/stream/2\", ..., \"id\": 0}\n"]}
iex> Req.parse_message(resp, receive do message -> message end)
{:ok, [data: "{\"url\": \"http://httpbin.org/stream/2\", ..., \"id\": 1}\n"]}
iex> Req.parse_message(resp, receive do message -> message end)
{:ok, [:done]}
"""
def parse_message(%Req.Response{} = resp, message) do
resp.async.stream_fun.(resp.async.ref, message)
end

@doc false
def parse_message(%Req.Request{} = request, message) do
IO.warn(
"passing %Req.Request{} to parse_message/2 is deprecated. Pass %Req.Response{} instead"
)

request.async.stream_fun.(request.async.ref, message)
end

@doc """
Cancels an asynchronous response.
## Examples
iex> resp = Req.get!("http://httpbin.org/stream/2", into: self())
iex> Req.cancel_async_response(resp)
:ok
"""
def cancel_async_response(%Req.Response{} = response) do
response.async.cancel_fun.(response.async.ref)
end

@deprecated "use Req.cancel_async_response(resp)) instead"
@doc false
def cancel_async_request(%Req.Request{} = request) do
request.async.cancel_fun.(request.async.ref)
end

# TODO: Req.run/2?
# defp run_request(request, options \\ []) do
# request
# |> Req.merge(options)
# |> Req.Request.run_request()
# end

# defp run_request!(request, options \\ []) do
# case run_request(request, options) do
# {request, %Req.Response{} = response} ->
# {request, response}

# {_request, exception} ->
# raise exception
# end
# end

@doc """
Returns default options.
Expand Down
3 changes: 2 additions & 1 deletion lib/req/response.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ defmodule Req.Response do
headers: if(Req.MixProject.legacy_headers_as_lists?(), do: [], else: %{}),
body: "",
trailers: %{},
private: %{}
private: %{},
async: nil

@doc """
Returns a new response.
Expand Down
44 changes: 41 additions & 3 deletions lib/req/steps.ex
Original file line number Diff line number Diff line change
Expand Up @@ -842,9 +842,13 @@ defmodule Req.Steps do
finch_stream_into_fun(req, finch_req, finch_name, finch_options, fun)

:self ->
IO.warn("setting into: :self is deprecated, set into: self() instead")
finch_stream_into_self(req, finch_req, finch_name, finch_options)

collectable when collectable != :self ->
pid when is_pid(pid) ->
finch_stream_into_pid(req, finch_req, finch_name, finch_options, pid)

collectable ->
finch_stream_into_collectable(req, finch_req, finch_name, finch_options, collectable)
end
end
Expand Down Expand Up @@ -913,7 +917,6 @@ defmodule Req.Steps do
end
end

# TODO: WIP
defp finch_stream_into_self(req, finch_req, finch_name, finch_options) do
ref = Finch.async_request(finch_req, finch_name, finch_options)

Expand All @@ -926,7 +929,6 @@ defmodule Req.Steps do
headers =
receive do
{^ref, message} ->
# TODO: handle trailers
{:headers, headers} = message

Enum.reduce(headers, %{}, fn {name, value}, acc ->
Expand All @@ -945,6 +947,42 @@ defmodule Req.Steps do
{req, resp}
end

defp finch_stream_into_pid(req, finch_req, finch_name, finch_options, pid) do
if pid != self() do
raise ArgumentError,
"`into: pid` only supports the calling process at the moment, i.e. `self()`"
end

ref = Finch.async_request(finch_req, finch_name, finch_options)

{:status, status} =
receive do
{^ref, message} ->
message
end

headers =
receive do
{^ref, message} ->
# TODO: handle trailers
{:headers, headers} = message

Enum.reduce(headers, %{}, fn {name, value}, acc ->
Map.update(acc, name, [value], &(&1 ++ [value]))
end)
end

async = %Req.Async{
ref: ref,
stream_fun: &finch_parse_message/2,
cancel_fun: &finch_cancel/1
}

resp = Req.Response.new(status: status, headers: headers)
resp = put_in(resp.async, async)
{req, resp}
end

defp run_finch_request(finch_request, finch_name, finch_options) do
case Finch.request(finch_request, finch_name, finch_options) do
{:ok, response} -> Req.Response.new(response)
Expand Down
47 changes: 28 additions & 19 deletions test/req/httpc_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ defmodule Req.HttpcTest do
assert resp.body == "foofoo"
end

test "stream callback", %{req: req, bypass: bypass} do
test "into: fun", %{req: req, bypass: bypass} do
Bypass.expect(bypass, "GET", "/", fn conn ->
conn = Plug.Conn.send_chunked(conn, 200)
{:ok, conn} = Plug.Conn.chunk(conn, "foo")
Expand Down Expand Up @@ -88,41 +88,41 @@ defmodule Req.HttpcTest do
refute_receive _
end

test "async request", %{req: req, bypass: bypass} do
test "into: pid", %{req: req, bypass: bypass} do
Bypass.expect(bypass, "GET", "/", fn conn ->
conn = Plug.Conn.send_chunked(conn, 200)
{:ok, conn} = Plug.Conn.chunk(conn, "foo")
{:ok, conn} = Plug.Conn.chunk(conn, "bar")
conn
end)

{req, resp} = Req.async_request!(req)
resp = Req.get!(req, into: self())
assert resp.status == 200

# httpc seems to randomly chunk things
assert Req.parse_message(req, assert_receive(_)) in [
assert Req.parse_message(resp, assert_receive(_)) in [
{:ok, [data: "foo"]},
{:ok, [data: "foobar"]}
]

assert Req.parse_message(req, assert_receive(_)) in [
assert Req.parse_message(resp, assert_receive(_)) in [
{:ok, [data: "bar"]},
{:ok, [data: ""]},
{:ok, [:done]}
]
end

test "async request cancellation", %{req: req, bypass: bypass} do
test "into: pid cancel", %{req: req, bypass: bypass} do
Bypass.expect(bypass, "GET", "/", fn conn ->
conn = Plug.Conn.send_chunked(conn, 200)
{:ok, conn} = Plug.Conn.chunk(conn, "foo")
{:ok, conn} = Plug.Conn.chunk(conn, "bar")
conn
end)

{req, resp} = Req.async_request!(req)
resp = Req.get!(req, into: self())
assert resp.status == 200
assert :ok = Req.cancel_async_request(req)
assert :ok = Req.cancel_async_response(resp)
end
end

Expand Down Expand Up @@ -180,8 +180,8 @@ defmodule Req.HttpcTest do
nil ->
httpc_request(request, httpc_req, httpc_http_options, httpc_options)

:self ->
httpc_async(request, httpc_req, httpc_http_options, httpc_options, nil)
pid when is_pid(pid) ->
httpc_async(request, httpc_req, httpc_http_options, httpc_options, pid)

fun ->
httpc_async(request, httpc_req, httpc_http_options, httpc_options, fun)
Expand Down Expand Up @@ -223,15 +223,17 @@ defmodule Req.HttpcTest do
end
end

defp httpc_async(request, httpc_req, httpc_http_options, httpc_options, fun) do
httpc_stream =
if fun do
{:self, :once}
else
:self
defp httpc_async(request, httpc_req, httpc_http_options, httpc_options, pid_or_fun) do
stream =
case pid_or_fun do
pid when is_pid(pid) ->
:self

fun when is_function(fun) ->
{:self, :once}
end

httpc_options = [sync: false, stream: httpc_stream] ++ httpc_options
httpc_options = [sync: false, stream: stream] ++ httpc_options
{:ok, ref} = :httpc.request(request.method, httpc_req, httpc_http_options, httpc_options)

receive do
Expand All @@ -253,8 +255,8 @@ defmodule Req.HttpcTest do
cancel_fun: &httpc_cancel/1
}

request = put_in(request.async, async)
response = Req.Response.new(status: status, headers: headers)
response = put_in(response.async, async)
{request, response}

{:http, {ref, :stream_start, headers, pid}} ->
Expand All @@ -270,7 +272,14 @@ defmodule Req.HttpcTest do
end

response = Req.Response.new(status: status, headers: headers)
httpc_loop(request, response, ref, pid, fun)

case pid_or_fun do
fun when is_function(fun) ->
httpc_loop(request, response, ref, pid, fun)

pid when is_pid(pid) ->
{request, response}
end

{:http, {^ref, {{_, status, _}, headers, body}}} ->
headers =
Expand Down
16 changes: 8 additions & 8 deletions test/req/steps_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2069,33 +2069,33 @@ defmodule Req.StepsTest do
refute_receive _
end

test "async request", c do
test "into: pid", c do
Bypass.expect(c.bypass, "GET", "/", fn conn ->
conn = Plug.Conn.send_chunked(conn, 200)
{:ok, conn} = Plug.Conn.chunk(conn, "foo")
{:ok, conn} = Plug.Conn.chunk(conn, "bar")
conn
end)

{req, resp} = Req.async_request!(url: "http://localhost:#{c.bypass.port}")
resp = Req.get!(url: "http://localhost:#{c.bypass.port}", into: self())
assert resp.status == 200
assert {:ok, [data: "foo"]} = Req.parse_message(req, assert_receive(_))
assert {:ok, [data: "bar"]} = Req.parse_message(req, assert_receive(_))
assert {:ok, [:done]} = Req.parse_message(req, assert_receive(_))
assert {:ok, [data: "foo"]} = Req.parse_message(resp, assert_receive(_))
assert {:ok, [data: "bar"]} = Req.parse_message(resp, assert_receive(_))
assert {:ok, [:done]} = Req.parse_message(resp, assert_receive(_))
refute_receive _
end

test "async request cancellation", c do
test "into: pid cancel", c do
Bypass.expect(c.bypass, "GET", "/", fn conn ->
conn = Plug.Conn.send_chunked(conn, 200)
{:ok, conn} = Plug.Conn.chunk(conn, "foo")
{:ok, conn} = Plug.Conn.chunk(conn, "bar")
conn
end)

{req, resp} = Req.async_request!(url: "http://localhost:#{c.bypass.port}")
resp = Req.get!(url: "http://localhost:#{c.bypass.port}", into: self())
assert resp.status == 200
assert :ok = Req.cancel_async_request(req)
assert :ok = Req.cancel_async_response(resp)
end
end

Expand Down

0 comments on commit 935fdee

Please sign in to comment.