Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tweak telemetry events on batch middleware #1170

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 144 additions & 2 deletions guides/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ handler function to any of the following event names:
- `[:absinthe, :subscription, :publish, :stop]` when a subscription finishes
- `[:absinthe, :resolve, :field, :start]` when field resolution starts
- `[:absinthe, :resolve, :field, :stop]` when field resolution finishes
- `[:absinthe, :middleware, :batch, :start]` when the batch processing starts
- `[:absinthe, :middleware, :batch, :stop]` when the batch processing finishes

Telemetry handlers are called with `measurements` and `metadata`. For details on
what is passed, checkout `Absinthe.Phase.Telemetry`, `Absinthe.Middleware.Telemetry`,
Expand All @@ -24,6 +22,150 @@ you need to know how long the underlying operation took, you'll need to hook
telemetry up to that underlying operation. See, for example, the recommended
telemetry events in the documentation for `Ecto.Repo`.

## Async Resolvers

`Absinthe.Middleware.Async` exposes the following events:

* `[:absinthe, :middleware, :async, :start]` - Dispatched before the
async function is invoked. Does not run when `Task` is provided
instead.

* Measurement: `%{system_time: integer(), monotonic_time: integer()}`
* Metadata:

```
%{
telemetry_span_context: term()
}
```

* `[:absinthe, :middleware, :async, :stop]` - Dispatched after the
async function is invoked. Does not run when `Task` is provided
instead.

* Measurement: `%{duration: integer(), monotonic_time: integer()}`
* Metadata:

```
%{
result: any(),
telemetry_span_context: term()
}
```

* `[:absinthe, :middleware, :async, :exception]` - Dispatched when
the async function encounters an exception. Does not run when `Task`
is provided instead.

* Measurement: `%{duration: integer(), monotonic_time: integer()}`
* Metadata:

```
%{
kind: :throw | :error | :exit,
reason: term(),
stacktrace: list(),
telemetry_span_context: term()
}
```

## Batch Resolvers

`Absinthe.Middleware.Batch` exposes the following events:

* `[:absinthe, :middleware, :batch, :start]` - Dispatched before the
provided batch function is invoked.

* Measurement: `%{system_time: integer(), monotonic_time: integer()}`
* Metadata:

```
%{
batch_fun: Absinthe.Middleware.Batch.batch_fun(),
batch_opts: term(),
batch_data: any(),
telemetry_span_context: term()
}
```

* `[:absinthe, :middleware, :batch, :stop]` - Dispatched after the
provided batch function is invoked.

* Measurement: `%{duration: integer(), monotonic_time: integer()}`
* Metadata:

```
%{
batch_fun: Absinthe.Middleware.Batch.batch_fun(),
batch_opts: term(),
batch_data: any(),
result: any(),
telemetry_span_context: term()
}
```

* `[:absinthe, :middleware, :batch, :exception]` - Dispatched when
the provided batch function encounters an exception.

* Measurement: `%{duration: integer(), monotonic_time: integer()}`
* Metadata:

```
%{
batch_fun: Absinthe.Middleware.Batch.batch_fun(),
batch_opts: term(),
batch_data: any(),
kind: :throw | :error | :exit,
reason: term(),
stacktrace: list(),
telemetry_span_context: term()
}
```

* `[:absinthe, :middleware, :batch, :post, :start]` - Dispatched before
the provided post batch function is invoked, used for field resolution.

* Measurement: `%{system_time: integer(), monotonic_time: integer()}`
* Metadata:

```
%{
resolution: Absinthe.Resolution.t(),
post_batch_fun: Absinthe.Middleware.Batch.post_batch_fun(),
batch_key: term(),
batch_results: any(),
telemetry_span_context: term()
}
```

* `[:absinthe, :middleware, :batch, :post, :stop]` - Dispatched after
the provided post batch function is invoked, used for field resolution.

* Measurement: `%{duration: integer(), monotonic_time: integer()}`
* Metadata:

```
%{
result: any(),
telemetry_span_context: term()
}
```

* `[:absinthe, :middleware, :batch, :post, :exception]` - Dispatched
when the provided post batch function encounters an exception.

* Measurement: `%{duration: integer(), monotonic_time: integer()}`
* Metadata:

```
%{
kind: :throw | :error | :exit,
reason: term(),
stacktrace: list(),
telemetry_span_context: term()
}
```

## Interactive Telemetry

As an example, you could attach a handler in an `iex -S mix` shell. Paste in:
Expand Down
5 changes: 4 additions & 1 deletion lib/absinthe/middleware/async.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ defmodule Absinthe.Middleware.Async do
def call(%{state: :unresolved} = res, {fun, opts}) when is_function(fun) do
task =
Task.async(fn ->
:telemetry.span([:absinthe, :middleware, :async, :task], %{}, fn -> {fun.(), %{}} end)
:telemetry.span([:absinthe, :middleware, :async, :task], %{}, fn ->
result = fun.()
{result, %{result: result}}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for this non-related change, but as I've wrote the telemetry docs I felt like could be a good consistency change as batch telemetry events returns results as well.

end)
end)

call(res, {task, opts})
Expand Down
75 changes: 30 additions & 45 deletions lib/absinthe/middleware/batch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,23 @@ defmodule Absinthe.Middleware.Batch do
|> Map.fetch!(:output)
|> Map.fetch!(batch_key)

result =
:telemetry.span(
[:absinthe, :middleware, :batch, :post],
%{
resolution: res,
post_batch_fun: post_batch_fun,
batch_key: batch_key,
batch_results: batch_data_for_fun
},
fn ->
result = post_batch_fun.(batch_data_for_fun)
{result, %{result: result}}
end
)

res
|> Absinthe.Resolution.put_result(post_batch_fun.(batch_data_for_fun))
|> Absinthe.Resolution.put_result(result)
end

def after_resolution(exec) do
Expand All @@ -133,60 +148,30 @@ defmodule Absinthe.Middleware.Batch do
input
|> Enum.group_by(&elem(&1, 0), &elem(&1, 1))
|> Enum.map(fn {{batch_fun, batch_opts}, batch_data} ->
system_time = System.system_time()
start_time_mono = System.monotonic_time()

task =
Task.async(fn ->
{batch_fun, call_batch_fun(batch_fun, batch_data)}
start_metadata = %{
id: :erlang.unique_integer(),
batch_fun: batch_fun,
batch_opts: batch_opts,
batch_data: batch_data
}

:telemetry.span([:absinthe, :middleware, :batch], start_metadata, fn ->
result = call_batch_fun(batch_fun, batch_data)
{{batch_fun, result}, Map.merge(start_metadata, %{result: result})}
end)
end)

metadata = emit_start_event(system_time, batch_fun, batch_opts, batch_data)

{batch_opts, task, start_time_mono, metadata}
{batch_opts, task}
end)
|> Map.new(fn {batch_opts, task, start_time_mono, metadata} ->
|> Map.new(fn {batch_opts, task} ->
timeout = Keyword.get(batch_opts, :timeout, 5_000)
result = Task.await(task, timeout)

end_time_mono = System.monotonic_time()
duration = end_time_mono - start_time_mono
emit_stop_event(duration, end_time_mono, metadata, result)

result
Task.await(task, timeout)
end)
end

@batch_start [:absinthe, :middleware, :batch, :start]
@batch_stop [:absinthe, :middleware, :batch, :stop]
defp emit_start_event(system_time, batch_fun, batch_opts, batch_data) do
id = :erlang.unique_integer()

metadata = %{
id: id,
telemetry_span_context: id,
batch_fun: batch_fun,
batch_opts: batch_opts,
batch_data: batch_data
}

:telemetry.execute(
@batch_start,
%{system_time: system_time},
metadata
)

metadata
end

defp emit_stop_event(duration, end_time_mono, metadata, result) do
:telemetry.execute(
@batch_stop,
%{duration: duration, end_time_mono: end_time_mono},
Map.put(metadata, :result, result)
)
end

defp call_batch_fun({module, fun}, batch_data) do
call_batch_fun({module, fun, []}, batch_data)
end
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ defmodule Absinthe.Mixfile do
defp deps do
[
{:nimble_parsec, "~> 1.2.2 or ~> 1.3.0"},
{:telemetry, "~> 1.0 or ~> 0.4"},
{:telemetry, "~> 1.1"},
{:dataloader, "~> 1.0.0", optional: true},
{:decimal, "~> 1.0 or ~> 2.0", optional: true},
{:ex_doc, "~> 0.22", only: :dev},
Expand Down
14 changes: 7 additions & 7 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
%{
"benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm", "3ad58ae787e9c7c94dd7ceda3b587ec2c64604563e049b2a0e8baafae832addb"},
"dataloader": {:hex, :dataloader, "1.0.8", "114294362db98a613f231589246aa5b0ce847412e8e75c4c94f31f204d272cbf", [:mix], [{:ecto, ">= 3.4.3 and < 4.0.0", [hex: :ecto, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "eaf3c2aa2bc9dbd2f1e960561d616b7f593396c4754185b75904f6d66c82a667"},
"dataloader": {:hex, :dataloader, "1.0.10", "a42f07641b1a0572e0b21a2a5ae1be11da486a6790f3d0d14512d96ff3e3bbe9", [:mix], [{:ecto, ">= 3.4.3 and < 4.0.0", [hex: :ecto, repo: "hexpm", optional: true]}, {:telemetry, "~> 1.0 or ~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "54cd70cec09addf4b2ace14cc186a283a149fd4d3ec5475b155951bf33cd963f"},
"decimal": {:hex, :decimal, "2.0.0", "a78296e617b0f5dd4c6caf57c714431347912ffb1d0842e998e9792b5642d697", [:mix], [], "hexpm", "34666e9c55dea81013e77d9d87370fe6cb6291d1ef32f46a1600230b1d44f577"},
"deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"},
"dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"},
"earmark_parser": {:hex, :earmark_parser, "1.4.19", "de0d033d5ff9fc396a24eadc2fcf2afa3d120841eb3f1004d138cbf9273210e8", [:mix], [], "hexpm", "527ab6630b5c75c3a3960b75844c314ec305c76d9899bb30f71cb85952a9dc45"},
"earmark_parser": {:hex, :earmark_parser, "1.4.25", "2024618731c55ebfcc5439d756852ec4e85978a39d0d58593763924d9a15916f", [:mix], [], "hexpm", "56749c5e1c59447f7b7a23ddb235e4b3defe276afc220a6227237f3efe83f51e"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.27.3", "d09ed7ab590b71123959d9017f6715b54a448d76b43cf909eb0b2e5a78a977b2", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "ee60b329d08195039bfeb25231a208749be4f2274eae42ce38f9be0538a2f2e6"},
"ex_doc": {:hex, :ex_doc, "0.28.4", "001a0ea6beac2f810f1abc3dbf4b123e9593eaa5f00dd13ded024eae7c523298", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bf85d003dd34911d89c8ddb8bda1a958af3471a274a4c2150a9c01c78ac3f8ed"},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took the opportunity to update the library. If there are other preferred means to do so, I'm glad to revert.

"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"},
"makeup_elixir": {:hex, :makeup_elixir, "0.15.2", "dc72dfe17eb240552857465cc00cce390960d9a0c055c4ccd38b70629227e97c", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "fd23ae48d09b32eff49d4ced2b43c9f086d402ee4fd4fcb2d7fad97fa8823e75"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"makeup_graphql": {:hex, :makeup_graphql, "0.1.2", "81e2939aab6d2b81d39ee5d9e13fae02599e9ca6e1152e0eeed737a98a5f96aa", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "3390ab04ba388d52a94bbe64ef62aa4d7923ceaffac43ec948f58f631440e8fb"},
"mix_test_watch": {:hex, :mix_test_watch, "1.0.2", "34900184cbbbc6b6ed616ed3a8ea9b791f9fd2088419352a6d3200525637f785", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "47ac558d8b06f684773972c6d04fcc15590abdb97aeb7666da19fcbfdc441a07"},
"nimble_parsec": {:hex, :nimble_parsec, "1.2.2", "b99ca56bbce410e9d5ee4f9155a212e942e224e259c7ebbf8f2c86ac21d4fa3c", [:mix], [], "hexpm", "98d51bd64d5f6a2a9c6bb7586ee8129e27dfaab1140b5a4753f24dac0ba27d2f"},
"telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"},
"nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"},
"telemetry": {:hex, :telemetry, "1.1.0", "a589817034a27eab11144ad24d5c0f9fab1f58173274b1e9bae7074af9cbee51", [:rebar3], [], "hexpm", "b727b2a1f75614774cff2d7565b64d0dfa5bd52ba517f16543e6fc7efcc0df48"},
}
20 changes: 10 additions & 10 deletions test/absinthe/middleware/async_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -80,34 +80,30 @@ defmodule Absinthe.Middleware.AsyncTest do
{asyncThing}
"""

pid = self()

:ok =
:telemetry.attach_many(
"#{test}",
[
[:absinthe, :middleware, :async, :task, :start],
[:absinthe, :middleware, :async, :task, :stop]
],
fn name, measurements, metadata, _config ->
send(pid, {:telemetry_event, name, measurements, metadata})
end,
_config = %{}
&__MODULE__.capture_telemetry_event/4,
_config = %{pid: self()}
)

assert {:ok, %{data: %{"asyncThing" => "we async now"}}} == Absinthe.run(doc, Schema)

assert_receive {:telemetry_event, [:absinthe, :middleware, :async, :task, :start],
%{system_time: _}, %{}}
%{system_time: _, monotonic_time: _}, %{}}

assert_receive {:telemetry_event, [:absinthe, :middleware, :async, :task, :stop],
%{duration: _}, %{}}
%{duration: _, monotonic_time: _}, %{result: _}}

assert_receive {:telemetry_event, [:absinthe, :middleware, :async, :task, :start],
%{system_time: _}, %{}}
%{system_time: _, monotonic_time: _}, %{}}

assert_receive {:telemetry_event, [:absinthe, :middleware, :async, :task, :stop],
%{duration: _}, %{}}
%{duration: _, monotonic_time: _}, %{result: _}}
end

test "can resolve a field using a cooler but probably confusing to some people helper" do
Expand All @@ -125,4 +121,8 @@ defmodule Absinthe.Middleware.AsyncTest do

assert {:ok, %{data: %{"returnsNil" => nil}}} == Absinthe.run(doc, Schema)
end

def capture_telemetry_event(name, measurements, metadata, %{pid: pid} = _config) do
send(pid, {:telemetry_event, name, measurements, metadata})
end
end
Loading