Skip to content

Commit

Permalink
fix: passing custom oban attributes to span
Browse files Browse the repository at this point in the history
  • Loading branch information
yordis committed Feb 5, 2024
1 parent 086ffc0 commit 1fb1976
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 39 deletions.
32 changes: 32 additions & 0 deletions instrumentation/opentelemetry_oban/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,37 @@
# Changelog

## 1.1.0

### Changed

* Improve `OpentelemetryOban.PluginHandler` Tracer span attributes.
The Plugin's span introduce a set of attributes prefixed with `oban.`.
Previously, no attributes were added to the span. The new attributes are:

* All Plugin:
* `oban.plugin`
* `Oban.Plugins.Cron` Plugin:
* `oban.jobs_count`
* `Oban.Plugins.Gossip` Plugin:
* `oban.gossip_count`
* `Oban.Plugins.Lifeline` Plugin:
* `oban.discarded_count`
* `oban.rescued_count`
* `Oban.Plugins.Pruner` Plugin:
* `oban.pruned_count`
* `Oban.Pro.Plugins.DynamicCron` Plugin:
* `oban.jobs_count`
* `Oban.Pro.Plugins.DynamicLifeline` Plugin:
* `oban.discarded_count`
* `oban.rescued_count`
* `Oban.Pro.Plugins.DynamicPrioritizer` Plugin:
* `oban.reprioritized_count`
* `Oban.Pro.Plugins.DynamicPruner` Plugin:
* `oban.pruned_count`
* `Oban.Pro.Plugins.DynamicScaler` Plugin:
* `oban.scaler.last_scaled_to`
* `oban.scaler.last_scaled_at`

## 1.0.0

### Changed
Expand Down
8 changes: 4 additions & 4 deletions instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,15 @@ defmodule OpentelemetryOban do
Trace.messaging_system() => :oban,
Trace.messaging_destination() => queue,
Trace.messaging_destination_kind() => :queue,
:"messaging.oban.worker" => worker
:"oban.job.worker" => worker
}
end

defp attributes_after_insert(job) do
%{
"messaging.oban.job_id": job.id,
"messaging.oban.priority": job.priority,
"messaging.oban.max_attempts": job.max_attempts
"oban.job.job_id": job.id,
"oban.job.priority": job.priority,
"oban.job.max_attempts": job.max_attempts
}
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,13 @@ defmodule OpentelemetryOban.JobHandler do
Trace.messaging_destination() => queue,
Trace.messaging_destination_kind() => :queue,
Trace.messaging_operation() => :process,
:"messaging.oban.job_id" => id,
:"messaging.oban.worker" => worker,
:"messaging.oban.priority" => priority,
:"messaging.oban.attempt" => attempt,
:"messaging.oban.max_attempts" => max_attempts,
:"messaging.oban.inserted_at" =>
if(inserted_at, do: DateTime.to_iso8601(inserted_at), else: nil),
:"messaging.oban.scheduled_at" => DateTime.to_iso8601(scheduled_at)
:"oban.job.job_id" => id,
:"oban.job.worker" => worker,
:"oban.job.priority" => priority,
:"oban.job.attempt" => attempt,
:"oban.job.max_attempts" => max_attempts,
:"oban.job.inserted_at" => DateTime.to_iso8601(inserted_at),
:"oban.job.scheduled_at" => DateTime.to_iso8601(scheduled_at)
}

span_name = "#{worker} process"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
defmodule OpentelemetryOban.PluginHandler do
alias OpenTelemetry.Tracer
alias OpenTelemetry.Span

@tracer_id __MODULE__
Expand Down Expand Up @@ -41,11 +42,12 @@ defmodule OpentelemetryOban.PluginHandler do
@tracer_id,
"#{plugin} process",
metadata,
%{}
%{attributes: %{"oban.plugin": plugin}}
)
end

def handle_plugin_stop(_event, _measurements, metadata, _config) do
Tracer.set_attributes(end_span_plugin_attrs(metadata))
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
end

Expand All @@ -63,4 +65,54 @@ defmodule OpentelemetryOban.PluginHandler do

OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
end

defp end_span_plugin_attrs(%{plugin: Oban.Plugins.Cron} = metadata) do
%{"oban.plugins.cron.jobs_count": length(metadata[:jobs])}
end

defp end_span_plugin_attrs(%{plugin: Oban.Plugins.Gossip} = metadata) do
%{"oban.plugins.gossip.gossip_count": metadata[:gossip_count]}
end

defp end_span_plugin_attrs(%{plugin: Oban.Plugins.Lifeline} = metadata) do
%{
"oban.plugins.lifeline.discarded_count": metadata[:discarded_count],
"oban.plugins.lifeline.rescued_count": metadata[:rescued_count]
}
end

defp end_span_plugin_attrs(%{plugin: Oban.Plugins.Pruner} = metadata) do
%{"oban.plugins.pruner.pruned_count": metadata[:pruned_count]}
end

defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicCron} = metadata) do
%{"oban.pro.plugins.dynamic_cron.jobs_count": length(metadata[:jobs])}
end

defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicLifeline} = metadata) do
%{
"oban.pro.plugins.dynamic_lifeline.discarded_count": metadata[:discarded_count],
"oban.pro.plugins.dynamic_lifeline.rescued_count": metadata[:rescued_count]
}
end

defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicPrioritizer} = metadata) do
%{"oban.pro.plugins.dynamic_prioritizer.reprioritized_count": metadata[:reprioritized_count]}
end

defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicPruner} = metadata) do
%{"oban.pro.plugins.dynamic_pruner.pruned_count": metadata[:pruned_count]}
end

defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicScaler} = metadata) do
%{
"oban.pro.plugins.dynamic_scaler.scaler.last_scaled_to": metadata[:scaler][:last_scaled_to],
"oban.pro.plugins.dynamic_scaler.scaler.last_scaled_at":
DateTime.to_iso8601(metadata[:scaler][:last_scaled_at])
}
end

defp end_span_plugin_attrs(_) do
%{}
end
end
2 changes: 1 addition & 1 deletion instrumentation/opentelemetry_oban/mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule OpentelemetryOban.MixProject do
use Mix.Project

@version "1.0.0"
@version "1.1.0"

def project do
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,127 @@ defmodule OpentelemetryOban.PluginHandlerTest do
assert [:"exception.message", :"exception.stacktrace", :"exception.type"] ==
Enum.sort(Map.keys(:otel_attributes.map(event_attributes)))
end

describe "[:oban, :plugin, :stop] spans" do
test "Oban.Plugins.Cron plugin" do
execute_plugin(Oban.Plugins.Cron, %{jobs: [1, 3, 4]})

assert %{
"oban.plugin": Elixir.Oban.Plugins.Cron,
"oban.plugins.cron.jobs_count": 3
} ==
receive_span_attrs(Oban.Plugins.Cron)
end

test "Oban.Plugins.Gossip plugin" do
execute_plugin(Oban.Plugins.Gossip, %{gossip_count: 3})

assert %{
"oban.plugin": Elixir.Oban.Plugins.Gossip,
"oban.plugins.gossip.gossip_count": 3
} ==
receive_span_attrs(Oban.Plugins.Gossip)
end

test "Oban.Plugins.Lifeline plugin" do
execute_plugin(Oban.Plugins.Lifeline, %{discarded_count: 3, rescued_count: 2})

assert %{
"oban.plugin": Elixir.Oban.Plugins.Lifeline,
"oban.plugins.lifeline.discarded_count": 3,
"oban.plugins.lifeline.rescued_count": 2
} ==
receive_span_attrs(Oban.Plugins.Lifeline)
end

test "Oban.Plugins.Pruner plugin" do
execute_plugin(Oban.Plugins.Pruner, %{pruned_count: 3})

assert %{
"oban.plugin": Elixir.Oban.Plugins.Pruner,
"oban.plugins.pruner.pruned_count": 3
} ==
receive_span_attrs(Oban.Plugins.Pruner)
end

test "Oban.Pro.Plugins.DynamicCron plugin" do
execute_plugin(Oban.Pro.Plugins.DynamicCron, %{jobs: [1, 3, 4]})

assert %{
"oban.plugin": Elixir.Oban.Pro.Plugins.DynamicCron,
"oban.pro.plugins.dynamic_cron.jobs_count": 3
} ==
receive_span_attrs(Oban.Pro.Plugins.DynamicCron)
end

test "Oban.Pro.Plugins.DynamicLifeline plugin" do
execute_plugin(Oban.Pro.Plugins.DynamicLifeline, %{discarded_count: 3, rescued_count: 2})

assert %{
"oban.plugin": Elixir.Oban.Pro.Plugins.DynamicLifeline,
"oban.pro.plugins.dynamic_lifeline.discarded_count": 3,
"oban.pro.plugins.dynamic_lifeline.rescued_count": 2
} ==
receive_span_attrs(Oban.Pro.Plugins.DynamicLifeline)
end

test "Oban.Pro.Plugins.DynamicPrioritizer plugin" do
execute_plugin(Oban.Pro.Plugins.DynamicPrioritizer, %{reprioritized_count: 3})

assert %{
"oban.plugin": Elixir.Oban.Pro.Plugins.DynamicPrioritizer,
"oban.pro.plugins.dynamic_prioritizer.reprioritized_count": 3
} ==
receive_span_attrs(Oban.Pro.Plugins.DynamicPrioritizer)
end

test "Oban.Pro.Plugins.DynamicPruner plugin" do
execute_plugin(Oban.Pro.Plugins.DynamicPruner, %{pruned_count: 3})

assert %{
"oban.plugin": Elixir.Oban.Pro.Plugins.DynamicPruner,
"oban.pro.plugins.dynamic_pruner.pruned_count": 3
} ==
receive_span_attrs(Oban.Pro.Plugins.DynamicPruner)
end

test "Oban.Pro.Plugins.DynamicScaler plugin" do
execute_plugin(Oban.Pro.Plugins.DynamicScaler, %{
scaler: %{last_scaled_to: 3, last_scaled_at: ~U[2021-08-01 12:00:00Z]}
})

assert %{
"oban.plugin": Elixir.Oban.Pro.Plugins.DynamicScaler,
"oban.pro.plugins.dynamic_scaler.scaler.last_scaled_to": 3,
"oban.pro.plugins.dynamic_scaler.scaler.last_scaled_at": "2021-08-01T12:00:00Z"
} ==
receive_span_attrs(Oban.Pro.Plugins.DynamicScaler)
end
end

defp receive_span_attrs(name) do
name = "#{name} process"

assert_receive(
{:span, span(name: ^name, attributes: attributes)},
100,
"expected span with name #{name} to be received"
)

elem(attributes, 4)
end

defp execute_plugin(plugin_name, metadata) do
:telemetry.execute(
[:oban, :plugin, :start],
%{system_time: System.system_time()},
%{plugin: plugin_name}
)

:telemetry.execute(
[:oban, :plugin, :stop],
%{duration: 42069},
Map.merge(metadata, %{plugin: plugin_name})
)
end
end
50 changes: 25 additions & 25 deletions instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ defmodule OpentelemetryObanTest do
assert %{
"messaging.destination": "events",
"messaging.destination_kind": :queue,
"messaging.oban.job_id": _job_id,
"messaging.oban.max_attempts": 1,
"messaging.oban.priority": 0,
"messaging.oban.worker": "TestJob",
"oban.job.job_id": _job_id,
"oban.job.max_attempts": 1,
"oban.job.priority": 0,
"oban.job.worker": "TestJob",
"messaging.system": :oban
} = :otel_attributes.map(attributes)
end
Expand Down Expand Up @@ -147,13 +147,13 @@ defmodule OpentelemetryObanTest do
assert %{
"messaging.destination": "events",
"messaging.destination_kind": :queue,
"messaging.oban.attempt": 1,
"messaging.oban.inserted_at": _inserted_at,
"messaging.oban.job_id": _job_id,
"messaging.oban.max_attempts": 1,
"messaging.oban.priority": 0,
"messaging.oban.scheduled_at": _scheduled_at,
"messaging.oban.worker": "TestJob",
"oban.job.attempt": 1,
"oban.job.inserted_at": _inserted_at,
"oban.job.job_id": _job_id,
"oban.job.max_attempts": 1,
"oban.job.priority": 0,
"oban.job.scheduled_at": _scheduled_at,
"oban.job.worker": "TestJob",
"messaging.operation": :process,
"messaging.system": :oban
} = :otel_attributes.map(attributes)
Expand All @@ -177,13 +177,13 @@ defmodule OpentelemetryObanTest do
assert %{
"messaging.destination": "events",
"messaging.destination_kind": :queue,
"messaging.oban.attempt": 1,
"messaging.oban.inserted_at": _inserted_at,
"messaging.oban.job_id": _job_id,
"messaging.oban.max_attempts": 1,
"messaging.oban.priority": 0,
"messaging.oban.scheduled_at": _scheduled_at,
"messaging.oban.worker": "TestJobThatReturnsError",
"oban.job.attempt": 1,
"oban.job.inserted_at": _inserted_at,
"oban.job.job_id": _job_id,
"oban.job.max_attempts": 1,
"oban.job.priority": 0,
"oban.job.scheduled_at": _scheduled_at,
"oban.job.worker": "TestJobThatReturnsError",
"messaging.operation": :process,
"messaging.system": :oban
} = :otel_attributes.map(attributes)
Expand Down Expand Up @@ -255,13 +255,13 @@ defmodule OpentelemetryObanTest do
assert %{
"messaging.destination": "events",
"messaging.destination_kind": :queue,
"messaging.oban.attempt": 1,
"messaging.oban.inserted_at": _inserted_at,
"messaging.oban.job_id": _job_id,
"messaging.oban.max_attempts": 1,
"messaging.oban.priority": 0,
"messaging.oban.scheduled_at": _scheduled_at,
"messaging.oban.worker": "TestJobThatThrowsException",
"oban.job.attempt": 1,
"oban.job.inserted_at": _inserted_at,
"oban.job.job_id": _job_id,
"oban.job.max_attempts": 1,
"oban.job.priority": 0,
"oban.job.scheduled_at": _scheduled_at,
"oban.job.worker": "TestJobThatThrowsException",
"messaging.operation": :process,
"messaging.system": :oban
} = :otel_attributes.map(attributes)
Expand Down

0 comments on commit 1fb1976

Please sign in to comment.