From 935fdeead7dbd65ef8815552658bfeb719c306c1 Mon Sep 17 00:00:00 2001 From: Wojtek Mach Date: Mon, 4 Mar 2024 16:05:07 +0100 Subject: [PATCH] Add response body streaming to process via `into: self()` Also added: * `Req.parse_message(resp, message)` * `Req.cancel_async_response(resp)` This is a 2nd interation on response body streaming that was previously experimental and undocumented. The following functions: * `Req.async_request/2` * `Req.async_request!/2` * `Req.parse_message(req, message)` * `Req.cancel_async_request(req)` were never part of the public API and they are now deprecated to ease transition for early adopters (thank you!) and will be removed in v1.0. Closes #304 --- README.md | 6 ++-- lib/req.ex | 68 ++++++++++++++++++++++++++++++----------- lib/req/response.ex | 3 +- lib/req/steps.ex | 44 ++++++++++++++++++++++++-- test/req/httpc_test.exs | 47 ++++++++++++++++------------ test/req/steps_test.exs | 16 +++++----- 6 files changed, 134 insertions(+), 50 deletions(-) diff --git a/README.md b/README.md index 791d61cb..7d77314f 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ write new ones. * Request body streaming (by setting `body: enumerable`.) - * Response body streaming (by setting `into: fun | collectable`.) + * Response body streaming (by setting `into: fun | collectable | pid`.) * Follows redirects (via [`redirect`] step.) @@ -108,6 +108,8 @@ iex> resp.body %IO.Stream{} ``` +(See [`Req`] module documentation for more examples of response body streaming.) + If you are planning to make several similar requests, you can build up a request struct with desired common options and re-use it: @@ -237,7 +239,7 @@ limitations under the License. [`Req.new/1`]: https://hexdocs.pm/req/Req.html#new/1 [`Req.get!/2`]: https://hexdocs.pm/req/Req.html#get!/2 [`Req.post!/2`]: https://hexdocs.pm/req/Req.html#post!/2 -[`Req.async_request/2`]: https://hexdocs.pm/req/Req.html#async_request/2 +[`Req`]: https://hexdocs.pm/req [`Req.Request`]: https://hexdocs.pm/req/Req.Request.html [`Req.Steps`]: https://hexdocs.pm/req/Req.Steps.html [`Req.Test`]: https://hexdocs.pm/req/Req.Test.html diff --git a/lib/req.ex b/lib/req.ex index cdd2f491..04e7e968 100644 --- a/lib/req.ex +++ b/lib/req.ex @@ -60,6 +60,17 @@ defmodule Req do iex> resp.body %IO.Stream{} + Stream response body to the current process: + + iex> resp = Req.get!("http://httpbin.org/stream/2", into: self()) + iex> Req.parse_message(resp, receive do message -> message end) + {:ok, [data: "{\"url\": \"http://httpbin.org/stream/2\", ..., \"id\": 0}\n"]} + iex> Req.parse_message(resp, receive do message -> message end) + {:ok, [data: "{\"url\": \"http://httpbin.org/stream/2\", ..., \"id\": 1}\n"]} + iex> Req.parse_message(resp, receive do message -> message end) + {:ok, [:done]} + "" + ## Header Names The HTTP specification requires that header names should be case-insensitive. @@ -228,6 +239,9 @@ defmodule Req do * `collectable` - stream response body into a `t:Collectable.t/0`. + * `pid` - stream response body into a process mailbox. The messages should be parsed using + `Req.parse_message/2`. + Response redirect options ([`redirect`](`Req.Steps.redirect/1`) step): * `:redirect` - if set to `false`, disables automatic response redirects. Defaults to `true`. @@ -985,10 +999,12 @@ defmodule Req do end @doc false + @deprecated "use Req.request(into: self()) instead" def async_request(request, options \\ []) do Req.Request.run_request(%{new(request, options) | into: :self}) end + @deprecated "use Req.request!(into: self()) instead" @doc false def async_request!(request, options \\ []) do case async_request(request, options) do @@ -1000,33 +1016,51 @@ defmodule Req do end end + @doc """ + Parses asynchronous response body message. + + ## Examples + + iex> resp = Req.get!("http://httpbin.org/stream/2", into: self()) + iex> Req.parse_message(resp, receive do message -> message end) + {:ok, [data: "{\"url\": \"http://httpbin.org/stream/2\", ..., \"id\": 0}\n"]} + iex> Req.parse_message(resp, receive do message -> message end) + {:ok, [data: "{\"url\": \"http://httpbin.org/stream/2\", ..., \"id\": 1}\n"]} + iex> Req.parse_message(resp, receive do message -> message end) + {:ok, [:done]} + """ + def parse_message(%Req.Response{} = resp, message) do + resp.async.stream_fun.(resp.async.ref, message) + end + @doc false def parse_message(%Req.Request{} = request, message) do + IO.warn( + "passing %Req.Request{} to parse_message/2 is deprecated. Pass %Req.Response{} instead" + ) + request.async.stream_fun.(request.async.ref, message) end + @doc """ + Cancels an asynchronous response. + + ## Examples + + iex> resp = Req.get!("http://httpbin.org/stream/2", into: self()) + iex> Req.cancel_async_response(resp) + :ok + """ + def cancel_async_response(%Req.Response{} = response) do + response.async.cancel_fun.(response.async.ref) + end + + @deprecated "use Req.cancel_async_response(resp)) instead" @doc false def cancel_async_request(%Req.Request{} = request) do request.async.cancel_fun.(request.async.ref) end - # TODO: Req.run/2? - # defp run_request(request, options \\ []) do - # request - # |> Req.merge(options) - # |> Req.Request.run_request() - # end - - # defp run_request!(request, options \\ []) do - # case run_request(request, options) do - # {request, %Req.Response{} = response} -> - # {request, response} - - # {_request, exception} -> - # raise exception - # end - # end - @doc """ Returns default options. diff --git a/lib/req/response.ex b/lib/req/response.ex index 88cf8d55..d9f66cf2 100644 --- a/lib/req/response.ex +++ b/lib/req/response.ex @@ -30,7 +30,8 @@ defmodule Req.Response do headers: if(Req.MixProject.legacy_headers_as_lists?(), do: [], else: %{}), body: "", trailers: %{}, - private: %{} + private: %{}, + async: nil @doc """ Returns a new response. diff --git a/lib/req/steps.ex b/lib/req/steps.ex index 63c130dd..daa47d7b 100644 --- a/lib/req/steps.ex +++ b/lib/req/steps.ex @@ -842,9 +842,13 @@ defmodule Req.Steps do finch_stream_into_fun(req, finch_req, finch_name, finch_options, fun) :self -> + IO.warn("setting into: :self is deprecated, set into: self() instead") finch_stream_into_self(req, finch_req, finch_name, finch_options) - collectable when collectable != :self -> + pid when is_pid(pid) -> + finch_stream_into_pid(req, finch_req, finch_name, finch_options, pid) + + collectable -> finch_stream_into_collectable(req, finch_req, finch_name, finch_options, collectable) end end @@ -913,7 +917,6 @@ defmodule Req.Steps do end end - # TODO: WIP defp finch_stream_into_self(req, finch_req, finch_name, finch_options) do ref = Finch.async_request(finch_req, finch_name, finch_options) @@ -926,7 +929,6 @@ defmodule Req.Steps do headers = receive do {^ref, message} -> - # TODO: handle trailers {:headers, headers} = message Enum.reduce(headers, %{}, fn {name, value}, acc -> @@ -945,6 +947,42 @@ defmodule Req.Steps do {req, resp} end + defp finch_stream_into_pid(req, finch_req, finch_name, finch_options, pid) do + if pid != self() do + raise ArgumentError, + "`into: pid` only supports the calling process at the moment, i.e. `self()`" + end + + ref = Finch.async_request(finch_req, finch_name, finch_options) + + {:status, status} = + receive do + {^ref, message} -> + message + end + + headers = + receive do + {^ref, message} -> + # TODO: handle trailers + {:headers, headers} = message + + Enum.reduce(headers, %{}, fn {name, value}, acc -> + Map.update(acc, name, [value], &(&1 ++ [value])) + end) + end + + async = %Req.Async{ + ref: ref, + stream_fun: &finch_parse_message/2, + cancel_fun: &finch_cancel/1 + } + + resp = Req.Response.new(status: status, headers: headers) + resp = put_in(resp.async, async) + {req, resp} + end + defp run_finch_request(finch_request, finch_name, finch_options) do case Finch.request(finch_request, finch_name, finch_options) do {:ok, response} -> Req.Response.new(response) diff --git a/test/req/httpc_test.exs b/test/req/httpc_test.exs index 245ed46c..9cb1245a 100644 --- a/test/req/httpc_test.exs +++ b/test/req/httpc_test.exs @@ -55,7 +55,7 @@ defmodule Req.HttpcTest do assert resp.body == "foofoo" end - test "stream callback", %{req: req, bypass: bypass} do + test "into: fun", %{req: req, bypass: bypass} do Bypass.expect(bypass, "GET", "/", fn conn -> conn = Plug.Conn.send_chunked(conn, 200) {:ok, conn} = Plug.Conn.chunk(conn, "foo") @@ -88,7 +88,7 @@ defmodule Req.HttpcTest do refute_receive _ end - test "async request", %{req: req, bypass: bypass} do + test "into: pid", %{req: req, bypass: bypass} do Bypass.expect(bypass, "GET", "/", fn conn -> conn = Plug.Conn.send_chunked(conn, 200) {:ok, conn} = Plug.Conn.chunk(conn, "foo") @@ -96,23 +96,23 @@ defmodule Req.HttpcTest do conn end) - {req, resp} = Req.async_request!(req) + resp = Req.get!(req, into: self()) assert resp.status == 200 # httpc seems to randomly chunk things - assert Req.parse_message(req, assert_receive(_)) in [ + assert Req.parse_message(resp, assert_receive(_)) in [ {:ok, [data: "foo"]}, {:ok, [data: "foobar"]} ] - assert Req.parse_message(req, assert_receive(_)) in [ + assert Req.parse_message(resp, assert_receive(_)) in [ {:ok, [data: "bar"]}, {:ok, [data: ""]}, {:ok, [:done]} ] end - test "async request cancellation", %{req: req, bypass: bypass} do + test "into: pid cancel", %{req: req, bypass: bypass} do Bypass.expect(bypass, "GET", "/", fn conn -> conn = Plug.Conn.send_chunked(conn, 200) {:ok, conn} = Plug.Conn.chunk(conn, "foo") @@ -120,9 +120,9 @@ defmodule Req.HttpcTest do conn end) - {req, resp} = Req.async_request!(req) + resp = Req.get!(req, into: self()) assert resp.status == 200 - assert :ok = Req.cancel_async_request(req) + assert :ok = Req.cancel_async_response(resp) end end @@ -180,8 +180,8 @@ defmodule Req.HttpcTest do nil -> httpc_request(request, httpc_req, httpc_http_options, httpc_options) - :self -> - httpc_async(request, httpc_req, httpc_http_options, httpc_options, nil) + pid when is_pid(pid) -> + httpc_async(request, httpc_req, httpc_http_options, httpc_options, pid) fun -> httpc_async(request, httpc_req, httpc_http_options, httpc_options, fun) @@ -223,15 +223,17 @@ defmodule Req.HttpcTest do end end - defp httpc_async(request, httpc_req, httpc_http_options, httpc_options, fun) do - httpc_stream = - if fun do - {:self, :once} - else - :self + defp httpc_async(request, httpc_req, httpc_http_options, httpc_options, pid_or_fun) do + stream = + case pid_or_fun do + pid when is_pid(pid) -> + :self + + fun when is_function(fun) -> + {:self, :once} end - httpc_options = [sync: false, stream: httpc_stream] ++ httpc_options + httpc_options = [sync: false, stream: stream] ++ httpc_options {:ok, ref} = :httpc.request(request.method, httpc_req, httpc_http_options, httpc_options) receive do @@ -253,8 +255,8 @@ defmodule Req.HttpcTest do cancel_fun: &httpc_cancel/1 } - request = put_in(request.async, async) response = Req.Response.new(status: status, headers: headers) + response = put_in(response.async, async) {request, response} {:http, {ref, :stream_start, headers, pid}} -> @@ -270,7 +272,14 @@ defmodule Req.HttpcTest do end response = Req.Response.new(status: status, headers: headers) - httpc_loop(request, response, ref, pid, fun) + + case pid_or_fun do + fun when is_function(fun) -> + httpc_loop(request, response, ref, pid, fun) + + pid when is_pid(pid) -> + {request, response} + end {:http, {^ref, {{_, status, _}, headers, body}}} -> headers = diff --git a/test/req/steps_test.exs b/test/req/steps_test.exs index b197ce60..4f08e7a1 100644 --- a/test/req/steps_test.exs +++ b/test/req/steps_test.exs @@ -2069,7 +2069,7 @@ defmodule Req.StepsTest do refute_receive _ end - test "async request", c do + test "into: pid", c do Bypass.expect(c.bypass, "GET", "/", fn conn -> conn = Plug.Conn.send_chunked(conn, 200) {:ok, conn} = Plug.Conn.chunk(conn, "foo") @@ -2077,15 +2077,15 @@ defmodule Req.StepsTest do conn end) - {req, resp} = Req.async_request!(url: "http://localhost:#{c.bypass.port}") + resp = Req.get!(url: "http://localhost:#{c.bypass.port}", into: self()) assert resp.status == 200 - assert {:ok, [data: "foo"]} = Req.parse_message(req, assert_receive(_)) - assert {:ok, [data: "bar"]} = Req.parse_message(req, assert_receive(_)) - assert {:ok, [:done]} = Req.parse_message(req, assert_receive(_)) + assert {:ok, [data: "foo"]} = Req.parse_message(resp, assert_receive(_)) + assert {:ok, [data: "bar"]} = Req.parse_message(resp, assert_receive(_)) + assert {:ok, [:done]} = Req.parse_message(resp, assert_receive(_)) refute_receive _ end - test "async request cancellation", c do + test "into: pid cancel", c do Bypass.expect(c.bypass, "GET", "/", fn conn -> conn = Plug.Conn.send_chunked(conn, 200) {:ok, conn} = Plug.Conn.chunk(conn, "foo") @@ -2093,9 +2093,9 @@ defmodule Req.StepsTest do conn end) - {req, resp} = Req.async_request!(url: "http://localhost:#{c.bypass.port}") + resp = Req.get!(url: "http://localhost:#{c.bypass.port}", into: self()) assert resp.status == 200 - assert :ok = Req.cancel_async_request(req) + assert :ok = Req.cancel_async_response(resp) end end