Skip to content

Commit

Permalink
Tweak telemetry events on batch middleware
Browse files Browse the repository at this point in the history
With a similar rationale from [this PR][1], tweak the telemetry events
from batch middleware.

The existing event contract stays the same, but move to run from inside
the Task used for batch execution. There is a small semantic difference
of when the :stop event fires - this patch sends the event at the end of
the task, while the current form may have a delay/sync caused by Task
await.

A complementary event is added as well to announce when the Resolver
continuation - the post batch - happens. Given that piece is part of the
resolver, the resolution struct is included as part of the metadata.

[1]: #1169
  • Loading branch information
andrewhr committed Jun 15, 2022
1 parent 9539806 commit 1c19410
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 49 deletions.
2 changes: 2 additions & 0 deletions guides/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ handler function to any of the following event names:
- `[:absinthe, :resolve, :field, :stop]` when field resolution finishes
- `[:absinthe, :middleware, :batch, :start]` when the batch processing starts
- `[:absinthe, :middleware, :batch, :stop]` when the batch processing finishes
- `[:absinthe, :middleware, :batch, :post, :start]` when the post batch resolution starts
- `[:absinthe, :middleware, :batch, :post, :stop]` when the post batch resolution finishes

Telemetry handlers are called with `measurements` and `metadata`. For details on
what is passed, checkout `Absinthe.Phase.Telemetry`, `Absinthe.Middleware.Telemetry`,
Expand Down
75 changes: 30 additions & 45 deletions lib/absinthe/middleware/batch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,23 @@ defmodule Absinthe.Middleware.Batch do
|> Map.fetch!(:output)
|> Map.fetch!(batch_key)

result =
:telemetry.span(
[:absinthe, :middleware, :batch, :post],
%{
resolution: res,
post_batch_fun: post_batch_fun,
batch_key: batch_key,
batch_results: batch_data_for_fun
},
fn ->
result = post_batch_fun.(batch_data_for_fun)
{result, %{result: result}}
end
)

res
|> Absinthe.Resolution.put_result(post_batch_fun.(batch_data_for_fun))
|> Absinthe.Resolution.put_result(result)
end

def after_resolution(exec) do
Expand All @@ -133,60 +148,30 @@ defmodule Absinthe.Middleware.Batch do
input
|> Enum.group_by(&elem(&1, 0), &elem(&1, 1))
|> Enum.map(fn {{batch_fun, batch_opts}, batch_data} ->
system_time = System.system_time()
start_time_mono = System.monotonic_time()

task =
Task.async(fn ->
{batch_fun, call_batch_fun(batch_fun, batch_data)}
start_metadata = %{
id: :erlang.unique_integer(),
batch_fun: batch_fun,
batch_opts: batch_opts,
batch_data: batch_data
}

:telemetry.span([:absinthe, :middleware, :batch], start_metadata, fn ->
result = call_batch_fun(batch_fun, batch_data)
{{batch_fun, result}, Map.merge(start_metadata, %{result: result})}
end)
end)

metadata = emit_start_event(system_time, batch_fun, batch_opts, batch_data)

{batch_opts, task, start_time_mono, metadata}
{batch_opts, task}
end)
|> Map.new(fn {batch_opts, task, start_time_mono, metadata} ->
|> Map.new(fn {batch_opts, task} ->
timeout = Keyword.get(batch_opts, :timeout, 5_000)
result = Task.await(task, timeout)

end_time_mono = System.monotonic_time()
duration = end_time_mono - start_time_mono
emit_stop_event(duration, end_time_mono, metadata, result)

result
Task.await(task, timeout)
end)
end

@batch_start [:absinthe, :middleware, :batch, :start]
@batch_stop [:absinthe, :middleware, :batch, :stop]
defp emit_start_event(system_time, batch_fun, batch_opts, batch_data) do
id = :erlang.unique_integer()

metadata = %{
id: id,
telemetry_span_context: id,
batch_fun: batch_fun,
batch_opts: batch_opts,
batch_data: batch_data
}

:telemetry.execute(
@batch_start,
%{system_time: system_time},
metadata
)

metadata
end

defp emit_stop_event(duration, end_time_mono, metadata, result) do
:telemetry.execute(
@batch_stop,
%{duration: duration, end_time_mono: end_time_mono},
Map.put(metadata, :result, result)
)
end

defp call_batch_fun({module, fun}, batch_data) do
call_batch_fun({module, fun, []}, batch_data)
end
Expand Down
42 changes: 38 additions & 4 deletions test/absinthe/middleware/batch_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,20 @@ defmodule Absinthe.Middleware.BatchTest do
"organization" => %{"id" => 1}
}

# events may run on separate processes
pid = self()

:ok =
:telemetry.attach_many(
"#{test}",
[
[:absinthe, :middleware, :batch, :start],
[:absinthe, :middleware, :batch, :stop]
[:absinthe, :middleware, :batch, :stop],
[:absinthe, :middleware, :batch, :post, :start],
[:absinthe, :middleware, :batch, :post, :stop]
],
fn name, measurements, metadata, _ ->
send(self(), {:telemetry_event, name, measurements, metadata})
send(pid, {:telemetry_event, name, measurements, metadata})
end,
nil
)
Expand All @@ -123,9 +128,38 @@ defmodule Absinthe.Middleware.BatchTest do
assert expected_data == data

assert_receive {:telemetry_event, [:absinthe, :middleware, :batch, :start], %{system_time: _},
%{id: _, batch_fun: _, batch_opts: _, batch_data: _}}
%{
id: _,
telemetry_span_context: _,
batch_fun: _,
batch_opts: _,
batch_data: _
}}

assert_receive {:telemetry_event, [:absinthe, :middleware, :batch, :stop], %{duration: _},
%{id: _, batch_fun: _, batch_opts: _, batch_data: _, result: _}}
%{
id: _,
telemetry_span_context: _,
batch_fun: _,
batch_opts: _,
batch_data: _,
result: _
}}

assert_receive {:telemetry_event, [:absinthe, :middleware, :batch, :post, :start],
%{system_time: _},
%{
telemetry_span_context: _,
post_batch_fun: _,
batch_key: _,
batch_results: _
}}

assert_receive {:telemetry_event, [:absinthe, :middleware, :batch, :post, :stop],
%{duration: _},
%{
telemetry_span_context: _,
result: _
}}
end
end

0 comments on commit 1c19410

Please sign in to comment.