Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: setting oban attributes #247

Merged
merged 3 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions instrumentation/opentelemetry_oban/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,37 @@
# Changelog

## 1.1.0

### Changed

* Improve `OpentelemetryOban.PluginHandler` Tracer span attributes.
The Plugin's span introduce a set of attributes prefixed with `oban.`.
Previously, no attributes were added to the span. The new attributes are:

* All Plugin:
* `oban.plugin`
* `Oban.Plugins.Cron` Plugin:
* `oban.jobs_count`
* `Oban.Plugins.Gossip` Plugin:
* `oban.gossip_count`
* `Oban.Plugins.Lifeline` Plugin:
* `oban.discarded_count`
* `oban.rescued_count`
* `Oban.Plugins.Pruner` Plugin:
* `oban.pruned_count`
* `Oban.Pro.Plugins.DynamicCron` Plugin:
* `oban.jobs_count`
* `Oban.Pro.Plugins.DynamicLifeline` Plugin:
* `oban.discarded_count`
* `oban.rescued_count`
* `Oban.Pro.Plugins.DynamicPrioritizer` Plugin:
* `oban.reprioritized_count`
* `Oban.Pro.Plugins.DynamicPruner` Plugin:
* `oban.pruned_count`
* `Oban.Pro.Plugins.DynamicScaler` Plugin:
* `oban.scaler.last_scaled_to`
* `oban.scaler.last_scaled_at`

## 1.0.0

### Changed
Expand Down
8 changes: 4 additions & 4 deletions instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,15 @@ defmodule OpentelemetryOban do
Trace.messaging_system() => :oban,
Trace.messaging_destination() => queue,
Trace.messaging_destination_kind() => :queue,
:"messaging.oban.worker" => worker
:"oban.job.worker" => worker
}
end

defp attributes_after_insert(job) do
%{
"messaging.oban.job_id": job.id,
"messaging.oban.priority": job.priority,
"messaging.oban.max_attempts": job.max_attempts
"oban.job.job_id": job.id,
"oban.job.priority": job.priority,
"oban.job.max_attempts": job.max_attempts
}
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,13 @@ defmodule OpentelemetryOban.JobHandler do
Trace.messaging_destination() => queue,
Trace.messaging_destination_kind() => :queue,
Trace.messaging_operation() => :process,
:"messaging.oban.job_id" => id,
:"messaging.oban.worker" => worker,
:"messaging.oban.priority" => priority,
:"messaging.oban.attempt" => attempt,
:"messaging.oban.max_attempts" => max_attempts,
:"messaging.oban.inserted_at" =>
if(inserted_at, do: DateTime.to_iso8601(inserted_at), else: nil),
:"messaging.oban.scheduled_at" => DateTime.to_iso8601(scheduled_at)
:"oban.job.job_id" => id,
:"oban.job.worker" => worker,
:"oban.job.priority" => priority,
:"oban.job.attempt" => attempt,
:"oban.job.max_attempts" => max_attempts,
:"oban.job.inserted_at" => DateTime.to_iso8601(inserted_at),
:"oban.job.scheduled_at" => DateTime.to_iso8601(scheduled_at)
}

span_name = "#{worker} process"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
defmodule OpentelemetryOban.PluginHandler do
alias OpenTelemetry.Tracer
alias OpenTelemetry.Span

@tracer_id __MODULE__
Expand Down Expand Up @@ -41,11 +42,12 @@ defmodule OpentelemetryOban.PluginHandler do
@tracer_id,
"#{plugin} process",
metadata,
%{}
%{attributes: %{"oban.plugin": plugin}}
)
end

def handle_plugin_stop(_event, _measurements, metadata, _config) do
Tracer.set_attributes(end_span_plugin_attrs(metadata))
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
end

Expand All @@ -63,4 +65,54 @@ defmodule OpentelemetryOban.PluginHandler do

OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
end

defp end_span_plugin_attrs(%{plugin: Oban.Plugins.Cron} = metadata) do
%{"oban.plugins.cron.jobs_count": length(metadata[:jobs])}
end

defp end_span_plugin_attrs(%{plugin: Oban.Plugins.Gossip} = metadata) do
%{"oban.plugins.gossip.gossip_count": metadata[:gossip_count]}
end

defp end_span_plugin_attrs(%{plugin: Oban.Plugins.Lifeline} = metadata) do
%{
"oban.plugins.lifeline.discarded_count": metadata[:discarded_count],
"oban.plugins.lifeline.rescued_count": metadata[:rescued_count]
}
end

defp end_span_plugin_attrs(%{plugin: Oban.Plugins.Pruner} = metadata) do
%{"oban.plugins.pruner.pruned_count": metadata[:pruned_count]}
end

defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicCron} = metadata) do
%{"oban.pro.plugins.dynamic_cron.jobs_count": length(metadata[:jobs])}
end

defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicLifeline} = metadata) do
%{
"oban.pro.plugins.dynamic_lifeline.discarded_count": metadata[:discarded_count],
"oban.pro.plugins.dynamic_lifeline.rescued_count": metadata[:rescued_count]
}
end

defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicPrioritizer} = metadata) do
%{"oban.pro.plugins.dynamic_prioritizer.reprioritized_count": metadata[:reprioritized_count]}
end

defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicPruner} = metadata) do
%{"oban.pro.plugins.dynamic_pruner.pruned_count": metadata[:pruned_count]}
end

defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicScaler} = metadata) do
%{
"oban.pro.plugins.dynamic_scaler.scaler.last_scaled_to": metadata[:scaler][:last_scaled_to],
"oban.pro.plugins.dynamic_scaler.scaler.last_scaled_at":
DateTime.to_iso8601(metadata[:scaler][:last_scaled_at])
yordis marked this conversation as resolved.
Show resolved Hide resolved
}
end

defp end_span_plugin_attrs(_) do
%{}
end
end
2 changes: 1 addition & 1 deletion instrumentation/opentelemetry_oban/mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule OpentelemetryOban.MixProject do
use Mix.Project

@version "1.0.0"
@version "1.1.0"

def project do
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,127 @@ defmodule OpentelemetryOban.PluginHandlerTest do
assert [:"exception.message", :"exception.stacktrace", :"exception.type"] ==
Enum.sort(Map.keys(:otel_attributes.map(event_attributes)))
end

describe "[:oban, :plugin, :stop] spans" do
test "Oban.Plugins.Cron plugin" do
execute_plugin(Oban.Plugins.Cron, %{jobs: [1, 3, 4]})

assert %{
"oban.plugin": Elixir.Oban.Plugins.Cron,
"oban.plugins.cron.jobs_count": 3
} ==
receive_span_attrs(Oban.Plugins.Cron)
end

test "Oban.Plugins.Gossip plugin" do
execute_plugin(Oban.Plugins.Gossip, %{gossip_count: 3})

assert %{
"oban.plugin": Elixir.Oban.Plugins.Gossip,
"oban.plugins.gossip.gossip_count": 3
} ==
receive_span_attrs(Oban.Plugins.Gossip)
end

test "Oban.Plugins.Lifeline plugin" do
execute_plugin(Oban.Plugins.Lifeline, %{discarded_count: 3, rescued_count: 2})

assert %{
"oban.plugin": Elixir.Oban.Plugins.Lifeline,
"oban.plugins.lifeline.discarded_count": 3,
"oban.plugins.lifeline.rescued_count": 2
} ==
receive_span_attrs(Oban.Plugins.Lifeline)
end

test "Oban.Plugins.Pruner plugin" do
execute_plugin(Oban.Plugins.Pruner, %{pruned_count: 3})

assert %{
"oban.plugin": Elixir.Oban.Plugins.Pruner,
"oban.plugins.pruner.pruned_count": 3
} ==
receive_span_attrs(Oban.Plugins.Pruner)
end

test "Oban.Pro.Plugins.DynamicCron plugin" do
execute_plugin(Oban.Pro.Plugins.DynamicCron, %{jobs: [1, 3, 4]})

assert %{
"oban.plugin": Elixir.Oban.Pro.Plugins.DynamicCron,
"oban.pro.plugins.dynamic_cron.jobs_count": 3
} ==
receive_span_attrs(Oban.Pro.Plugins.DynamicCron)
end

test "Oban.Pro.Plugins.DynamicLifeline plugin" do
execute_plugin(Oban.Pro.Plugins.DynamicLifeline, %{discarded_count: 3, rescued_count: 2})

assert %{
"oban.plugin": Elixir.Oban.Pro.Plugins.DynamicLifeline,
"oban.pro.plugins.dynamic_lifeline.discarded_count": 3,
"oban.pro.plugins.dynamic_lifeline.rescued_count": 2
} ==
receive_span_attrs(Oban.Pro.Plugins.DynamicLifeline)
end

test "Oban.Pro.Plugins.DynamicPrioritizer plugin" do
execute_plugin(Oban.Pro.Plugins.DynamicPrioritizer, %{reprioritized_count: 3})

assert %{
"oban.plugin": Elixir.Oban.Pro.Plugins.DynamicPrioritizer,
"oban.pro.plugins.dynamic_prioritizer.reprioritized_count": 3
} ==
receive_span_attrs(Oban.Pro.Plugins.DynamicPrioritizer)
end

test "Oban.Pro.Plugins.DynamicPruner plugin" do
execute_plugin(Oban.Pro.Plugins.DynamicPruner, %{pruned_count: 3})

assert %{
"oban.plugin": Elixir.Oban.Pro.Plugins.DynamicPruner,
"oban.pro.plugins.dynamic_pruner.pruned_count": 3
} ==
receive_span_attrs(Oban.Pro.Plugins.DynamicPruner)
end

test "Oban.Pro.Plugins.DynamicScaler plugin" do
execute_plugin(Oban.Pro.Plugins.DynamicScaler, %{
scaler: %{last_scaled_to: 3, last_scaled_at: ~U[2021-08-01 12:00:00Z]}
})

assert %{
"oban.plugin": Elixir.Oban.Pro.Plugins.DynamicScaler,
"oban.pro.plugins.dynamic_scaler.scaler.last_scaled_to": 3,
"oban.pro.plugins.dynamic_scaler.scaler.last_scaled_at": "2021-08-01T12:00:00Z"
} ==
receive_span_attrs(Oban.Pro.Plugins.DynamicScaler)
end
end

defp receive_span_attrs(name) do
name = "#{name} process"

assert_receive(
{:span, span(name: ^name, attributes: attributes)},
100,
"expected span with name #{name} to be received"
)

elem(attributes, 4)
end

defp execute_plugin(plugin_name, metadata) do
:telemetry.execute(
[:oban, :plugin, :start],
%{system_time: System.system_time()},
%{plugin: plugin_name}
)

:telemetry.execute(
[:oban, :plugin, :stop],
%{duration: 42069},
Map.merge(metadata, %{plugin: plugin_name})
)
end
end
50 changes: 25 additions & 25 deletions instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ defmodule OpentelemetryObanTest do
assert %{
"messaging.destination": "events",
"messaging.destination_kind": :queue,
"messaging.oban.job_id": _job_id,
"messaging.oban.max_attempts": 1,
"messaging.oban.priority": 0,
"messaging.oban.worker": "TestJob",
"oban.job.job_id": _job_id,
"oban.job.max_attempts": 1,
"oban.job.priority": 0,
"oban.job.worker": "TestJob",
"messaging.system": :oban
} = :otel_attributes.map(attributes)
end
Expand Down Expand Up @@ -147,13 +147,13 @@ defmodule OpentelemetryObanTest do
assert %{
"messaging.destination": "events",
"messaging.destination_kind": :queue,
"messaging.oban.attempt": 1,
"messaging.oban.inserted_at": _inserted_at,
"messaging.oban.job_id": _job_id,
"messaging.oban.max_attempts": 1,
"messaging.oban.priority": 0,
"messaging.oban.scheduled_at": _scheduled_at,
"messaging.oban.worker": "TestJob",
"oban.job.attempt": 1,
"oban.job.inserted_at": _inserted_at,
"oban.job.job_id": _job_id,
"oban.job.max_attempts": 1,
"oban.job.priority": 0,
"oban.job.scheduled_at": _scheduled_at,
"oban.job.worker": "TestJob",
"messaging.operation": :process,
"messaging.system": :oban
} = :otel_attributes.map(attributes)
Expand All @@ -177,13 +177,13 @@ defmodule OpentelemetryObanTest do
assert %{
"messaging.destination": "events",
"messaging.destination_kind": :queue,
"messaging.oban.attempt": 1,
"messaging.oban.inserted_at": _inserted_at,
"messaging.oban.job_id": _job_id,
"messaging.oban.max_attempts": 1,
"messaging.oban.priority": 0,
"messaging.oban.scheduled_at": _scheduled_at,
"messaging.oban.worker": "TestJobThatReturnsError",
"oban.job.attempt": 1,
"oban.job.inserted_at": _inserted_at,
"oban.job.job_id": _job_id,
"oban.job.max_attempts": 1,
"oban.job.priority": 0,
"oban.job.scheduled_at": _scheduled_at,
"oban.job.worker": "TestJobThatReturnsError",
"messaging.operation": :process,
"messaging.system": :oban
} = :otel_attributes.map(attributes)
Expand Down Expand Up @@ -255,13 +255,13 @@ defmodule OpentelemetryObanTest do
assert %{
"messaging.destination": "events",
"messaging.destination_kind": :queue,
"messaging.oban.attempt": 1,
"messaging.oban.inserted_at": _inserted_at,
"messaging.oban.job_id": _job_id,
"messaging.oban.max_attempts": 1,
"messaging.oban.priority": 0,
"messaging.oban.scheduled_at": _scheduled_at,
"messaging.oban.worker": "TestJobThatThrowsException",
"oban.job.attempt": 1,
"oban.job.inserted_at": _inserted_at,
"oban.job.job_id": _job_id,
"oban.job.max_attempts": 1,
"oban.job.priority": 0,
"oban.job.scheduled_at": _scheduled_at,
"oban.job.worker": "TestJobThatThrowsException",
"messaging.operation": :process,
"messaging.system": :oban
} = :otel_attributes.map(attributes)
Expand Down
Loading