Skip to content

Commit

Permalink
Defer call to prime function until continuation
Browse files Browse the repository at this point in the history
This allows the caller to complete their pubsub call before the prime
function, avoiding data races.
  • Loading branch information
bernardd committed Jun 22, 2022
1 parent 96e803c commit 27441d0
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 28 deletions.
6 changes: 5 additions & 1 deletion lib/absinthe.ex
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ defmodule Absinthe do
]

@type run_result :: {:ok, result_t} | {:more, result_t} | {:error, String.t()}
@type continue_result :: run_result | :no_more_results

@spec run(
binary | Absinthe.Language.Source.t() | Absinthe.Language.Document.t(),
Expand All @@ -118,7 +119,7 @@ defmodule Absinthe do
|> build_result()
end

@spec continue([Absinthe.Blueprint.Continuation.t()]) :: run_result()
@spec continue([Absinthe.Blueprint.Continuation.t()]) :: continue_result
def continue(continuation) do
continuation
|> Absinthe.Pipeline.continue()
Expand All @@ -127,6 +128,9 @@ defmodule Absinthe do

defp build_result(output) do
case output do
{:ok, %{result: :no_more_results}, _phases} ->
:no_more_results

{:ok, %{result: %{continuation: c} = result}, _phases} when c != [] ->
{:more, result}

Expand Down
42 changes: 40 additions & 2 deletions lib/absinthe/phase/subscription/prime.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,46 @@
defmodule Absinthe.Phase.Subscription.Prime do
@moduledoc false

alias Absinthe.Blueprint.Continuation
alias Absinthe.Phase

@spec run(any(), Keyword.t()) :: Absinthe.Phase.result_t()
def run(blueprint, prime_result: cr) do
{:ok, put_in(blueprint.execution.root_value, cr)}
def run(blueprint, prime_result: prime_result) do
{:ok, put_in(blueprint.execution.root_value, prime_result)}
end

def run(blueprint, prime_fun: prime_fun, resolution_options: options) do
{:ok, prime_results} = prime_fun.(blueprint.execution)

case prime_results do
[first | rest] ->
blueprint = put_in(blueprint.execution.root_value, first)
blueprint = maybe_add_continuations(blueprint, rest, options)
{:ok, blueprint}

[] ->
blueprint = put_in(blueprint.result, :no_more_results)
{:replace, blueprint, []}
end
end

defp maybe_add_continuations(blueprint, [], _options), do: blueprint

defp maybe_add_continuations(blueprint, remaining_results, options) do
continuations =
Enum.map(
remaining_results,
&%Continuation{
phase_input: blueprint,
pipeline: [
{__MODULE__, [prime_result: &1]},
{Phase.Document.Execution.Resolution, options},
Phase.Subscription.GetOrdinal,
Phase.Document.Result
]
}
)

put_in(blueprint.result, %{continuation: continuations})
end
end
36 changes: 13 additions & 23 deletions lib/absinthe/phase/subscription/result.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule Absinthe.Phase.Subscription.Result do
{:ok, put_in(blueprint.result, result)}

prime_fun when is_function(prime_fun, 1) ->
do_prime(prime_fun, result, blueprint, options)
stash_prime(prime_fun, result, blueprint, options)

val ->
raise """
Expand All @@ -30,28 +30,18 @@ defmodule Absinthe.Phase.Subscription.Result do
end
end

def do_prime(prime_fun, base_result, blueprint, options) do
{:ok, prime_results} = prime_fun.(blueprint.execution)

result =
if prime_results != [] do
continuations =
Enum.map(prime_results, fn cr ->
%Continuation{
phase_input: blueprint,
pipeline: [
{Phase.Subscription.Prime, [prime_result: cr]},
{Phase.Document.Execution.Resolution, options},
Phase.Subscription.GetOrdinal,
Phase.Document.Result
]
}
end)

Map.put(base_result, :continuation, continuations)
else
base_result
end
def stash_prime(prime_fun, base_result, blueprint, options) do
continuation = %Continuation{
phase_input: blueprint,
pipeline: [
{Phase.Subscription.Prime, [prime_fun: prime_fun, resolution_options: options]},
{Phase.Document.Execution.Resolution, options},
Phase.Subscription.GetOrdinal,
Phase.Document.Result
]
}

result = Map.put(base_result, :continuation, [continuation])

{:ok, put_in(blueprint.result, result)}
end
Expand Down
6 changes: 4 additions & 2 deletions lib/absinthe/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ defmodule Absinthe.Pipeline do

@type run_result_t :: {:ok, data_t, [Phase.t()]} | {:error, String.t(), [Phase.t()]}

@type continue_result_t :: run_result_t | :no_more_results

@type phase_config_t :: Phase.t() | {Phase.t(), Keyword.t()}

@type t :: [phase_config_t | [phase_config_t]]
Expand All @@ -30,7 +32,7 @@ defmodule Absinthe.Pipeline do
|> run_phase(input)
end

@spec continue([Continuation.t()]) :: run_result_t
@spec continue([Continuation.t()]) :: continue_result_t
def continue([continuation | rest]) do
result = run_phase(continuation.pipeline, continuation.phase_input)

Expand All @@ -41,7 +43,7 @@ defmodule Absinthe.Pipeline do
{:ok, blueprint, phases} ->
bp_result = Map.put(blueprint.result, :continuation, rest)
blueprint = Map.put(blueprint, :result, bp_result)
{:ok, blueprint, phases}
{:more, blueprint, phases}

error ->
error
Expand Down
17 changes: 17 additions & 0 deletions test/absinthe/execution/subscription_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,23 @@ defmodule Absinthe.Execution.SubscriptionTest do
Absinthe.continue(continuation)
end

test "continuation with no extra data" do
client_id = "abc"

assert {:more, %{"subscribed" => _topic, continuation: continuation}} =
run_subscription(
@query,
Schema,
variables: %{
"primeData" => [],
"clientId" => client_id
},
context: %{prime_id: "test_prime_id"}
)

assert :no_more_results == Absinthe.continue(continuation)
end

@query """
subscription ($clientId: ID!) {
ordinal(clientId: $clientId) {
Expand Down

0 comments on commit 27441d0

Please sign in to comment.