From 978b6372468fa6e66bf587daa1f1e1a7a7785128 Mon Sep 17 00:00:00 2001 From: Wojtek Mach Date: Tue, 3 Dec 2024 14:18:22 +0100 Subject: [PATCH] `run_plug`: Support `into: :self` Closes #440 --- lib/req/finch.ex | 3 +-- lib/req/steps.ex | 38 ++++++++++++++++++++++++++++++++++++++ test/req/steps_test.exs | 24 ++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 2 deletions(-) diff --git a/lib/req/finch.ex b/lib/req/finch.ex index 07846a7..0e0f208 100644 --- a/lib/req/finch.ex +++ b/lib/req/finch.ex @@ -244,8 +244,7 @@ defmodule Req.Finch do cancel_fun: &cancel/1 } - resp = Req.Response.new(status: status, headers: headers) - resp = put_in(resp.body, async) + resp = Req.Response.new(status: status, headers: headers, body: async) {req, resp} end diff --git a/lib/req/steps.ex b/lib/req/steps.ex index 5822cbd..61604ee 100644 --- a/lib/req/steps.ex +++ b/lib/req/steps.ex @@ -1035,6 +1035,19 @@ defmodule Req.Steps do raise ArgumentError, "expected {:cont, acc}, got: #{inspect(other)}" end + :self -> + async = %Req.Response.Async{ + pid: self(), + ref: make_ref(), + stream_fun: &plug_parse_message/2, + cancel_fun: &plug_cancel/1 + } + + resp = Req.Response.new(status: conn.status, headers: conn.resp_headers, body: async) + send(self(), {async.ref, {:data, conn.resp_body}}) + send(self(), {async.ref, :done}) + {request, resp} + collectable -> response = Req.Response.new( @@ -1053,6 +1066,31 @@ defmodule Req.Steps do end end + defp plug_parse_message(ref, {ref, {:data, data}}) do + {:ok, [data: data]} + end + + defp plug_parse_message(ref, {ref, :done}) do + {:ok, [:done]} + end + + defp plug_parse_message(_, _) do + :unknown + end + + defp plug_cancel(ref) do + plug_clean_responses(ref) + :ok + end + + defp plug_clean_responses(ref) do + receive do + {^ref, _} -> plug_clean_responses(ref) + after + 0 -> :ok + end + end + defp call_plug(conn, plug) when is_atom(plug) do plug.call(conn, []) end diff --git a/test/req/steps_test.exs b/test/req/steps_test.exs index e9565f6..679a1a7 100644 --- a/test/req/steps_test.exs +++ b/test/req/steps_test.exs @@ -1947,6 +1947,30 @@ defmodule Req.StepsTest do refute_receive _ end + test "into: self" do + req = + Req.new( + plug: fn conn -> + conn = Plug.Conn.send_chunked(conn, 200) + {:ok, conn} = Plug.Conn.chunk(conn, "foo") + {:ok, conn} = Plug.Conn.chunk(conn, "bar") + conn + end, + into: :self + ) + + resp = Req.request!(req) + assert resp.status == 200 + assert {:ok, [data: "foobar"]} = Req.parse_message(resp, assert_receive(_)) + assert {:ok, [:done]} = Req.parse_message(resp, assert_receive(_)) + refute_receive _ + + resp = Req.request!(req) + assert resp.status == 200 + assert Enum.to_list(resp.body) == ["foobar"] + refute_receive _ + end + test "errors" do req = Req.new(