Skip to content

Commit

Permalink
run_plug: Support into: :self
Browse files Browse the repository at this point in the history
Closes #440
  • Loading branch information
wojtekmach committed Dec 3, 2024
1 parent 18c1246 commit 978b637
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 2 deletions.
3 changes: 1 addition & 2 deletions lib/req/finch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,7 @@ defmodule Req.Finch do
cancel_fun: &cancel/1
}

resp = Req.Response.new(status: status, headers: headers)
resp = put_in(resp.body, async)
resp = Req.Response.new(status: status, headers: headers, body: async)
{req, resp}
end

Expand Down
38 changes: 38 additions & 0 deletions lib/req/steps.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,19 @@ defmodule Req.Steps do
raise ArgumentError, "expected {:cont, acc}, got: #{inspect(other)}"
end

:self ->
async = %Req.Response.Async{
pid: self(),
ref: make_ref(),
stream_fun: &plug_parse_message/2,
cancel_fun: &plug_cancel/1
}

resp = Req.Response.new(status: conn.status, headers: conn.resp_headers, body: async)
send(self(), {async.ref, {:data, conn.resp_body}})
send(self(), {async.ref, :done})
{request, resp}

collectable ->
response =
Req.Response.new(
Expand All @@ -1053,6 +1066,31 @@ defmodule Req.Steps do
end
end

defp plug_parse_message(ref, {ref, {:data, data}}) do
{:ok, [data: data]}
end

defp plug_parse_message(ref, {ref, :done}) do
{:ok, [:done]}
end

defp plug_parse_message(_, _) do
:unknown
end

defp plug_cancel(ref) do
plug_clean_responses(ref)
:ok
end

defp plug_clean_responses(ref) do
receive do
{^ref, _} -> plug_clean_responses(ref)
after
0 -> :ok
end
end

defp call_plug(conn, plug) when is_atom(plug) do
plug.call(conn, [])
end
Expand Down
24 changes: 24 additions & 0 deletions test/req/steps_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1947,6 +1947,30 @@ defmodule Req.StepsTest do
refute_receive _
end

test "into: self" do
req =
Req.new(
plug: fn conn ->
conn = Plug.Conn.send_chunked(conn, 200)
{:ok, conn} = Plug.Conn.chunk(conn, "foo")
{:ok, conn} = Plug.Conn.chunk(conn, "bar")
conn
end,
into: :self
)

resp = Req.request!(req)
assert resp.status == 200
assert {:ok, [data: "foobar"]} = Req.parse_message(resp, assert_receive(_))
assert {:ok, [:done]} = Req.parse_message(resp, assert_receive(_))
refute_receive _

resp = Req.request!(req)
assert resp.status == 200
assert Enum.to_list(resp.body) == ["foobar"]
refute_receive _
end

test "errors" do
req =
Req.new(
Expand Down

0 comments on commit 978b637

Please sign in to comment.