Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Celery tests in POTel #3772

Merged
merged 33 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
4b15a0e
Fix transaction name setting
antonpirker Nov 11, 2024
e396730
Some cleanup
antonpirker Nov 11, 2024
b8dcbef
Merge branch 'potel-base' into antonpirker/potel/fix-celery
antonpirker Nov 11, 2024
892cdde
Disable scope clearing between tests
antonpirker Nov 12, 2024
ed55be8
Do not clear scopes between test because it breaks potel tests
antonpirker Nov 12, 2024
459cb0d
Set the correct status on spans
antonpirker Nov 12, 2024
4f3b627
Better test output
antonpirker Nov 12, 2024
a6d59c2
naming
antonpirker Nov 13, 2024
386e9ac
Removed because this messes up already set span ops
antonpirker Nov 13, 2024
1ed7175
Set span status to OK when span is finished without an status set
antonpirker Nov 13, 2024
1610301
Cleanup
antonpirker Nov 13, 2024
98b8d24
More cleanup
antonpirker Nov 13, 2024
eebbade
We now have always a status in a span.
antonpirker Nov 13, 2024
48e5adb
Merge branch 'potel-base' into antonpirker/potel/fix-celery
antonpirker Nov 16, 2024
e7952b2
Merge branch 'potel-base' into antonpirker/potel/fix-celery
antonpirker Nov 27, 2024
6373551
Merge branch 'potel-base' into antonpirker/potel/fix-celery
antonpirker Dec 5, 2024
5c56b83
Merge branch 'potel-base' into antonpirker/potel/fix-celery
antonpirker Dec 20, 2024
73e333b
Merge branch 'potel-base' into antonpirker/potel/fix-celery
antonpirker Dec 23, 2024
2120e18
Use new status property
antonpirker Dec 23, 2024
ae5f130
Some cleanup
antonpirker Dec 23, 2024
ae9a14d
better naming
antonpirker Dec 23, 2024
bcccadf
more naming
antonpirker Dec 23, 2024
176aaa2
There is now only one call, dont know why
antonpirker Dec 23, 2024
0f84b11
fixed one test
antonpirker Dec 23, 2024
781e630
Merge branch 'potel-base' into antonpirker/potel/fix-celery
antonpirker Jan 7, 2025
04cc677
Merge branch 'potel-base' into antonpirker/potel/fix-celery
antonpirker Jan 10, 2025
b20962b
Fixed test
antonpirker Jan 13, 2025
31f48e7
Merge branch 'potel-base' into antonpirker/potel/fix-celery
antonpirker Jan 13, 2025
23e99c7
Nicer test output in case it is failing
antonpirker Jan 13, 2025
801be65
cleanup
antonpirker Jan 13, 2025
2196b5c
Merge branch 'potel-base' into antonpirker/potel/fix-celery
sl0thentr0py Jan 14, 2025
ed39550
Revert some stuff
sl0thentr0py Jan 15, 2025
7448b68
Fix the propagation test
sl0thentr0py Jan 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions sentry_sdk/integrations/celery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ def _capture_exception(task, exc_info):
return

if isinstance(exc_info[1], CELERY_CONTROL_FLOW_EXCEPTIONS):
# ??? Doesn't map to anything
_set_status("aborted")
return

Expand Down Expand Up @@ -276,6 +275,7 @@ def apply_async(*args, **kwargs):
op=OP.QUEUE_SUBMIT_CELERY,
name=task_name,
origin=CeleryIntegration.origin,
only_if_parent=True,
)
if not task_started_from_beat
else NoOpMgr()
Expand Down Expand Up @@ -306,11 +306,13 @@ def _inner(*args, **kwargs):
with isolation_scope() as scope:
scope._name = "celery"
scope.clear_breadcrumbs()
scope.set_transaction_name(task.name, source=TRANSACTION_SOURCE_TASK)
scope.add_event_processor(_make_event_processor(task, *args, **kwargs))

# Celery task objects are not a thing to be trusted. Even
# something such as attribute access can fail.
headers = args[3].get("headers") or {}

with sentry_sdk.continue_trace(headers):
with sentry_sdk.start_span(
op=OP.QUEUE_TASK_CELERY,
Expand All @@ -320,9 +322,13 @@ def _inner(*args, **kwargs):
# for some reason, args[1] is a list if non-empty but a
# tuple if empty
attributes=_prepopulate_attributes(task, list(args[1]), args[2]),
) as transaction:
transaction.set_status(SPANSTATUS.OK)
return f(*args, **kwargs)
) as root_span:
return_value = f(*args, **kwargs)

if root_span.status is None:
root_span.set_status(SPANSTATUS.OK)

return return_value

return _inner # type: ignore

Expand Down Expand Up @@ -359,6 +365,7 @@ def _inner(*args, **kwargs):
op=OP.QUEUE_PROCESS,
name=task.name,
origin=CeleryIntegration.origin,
only_if_parent=True,
) as span:
_set_messaging_destination_name(task, span)

Expand Down Expand Up @@ -390,6 +397,7 @@ def _inner(*args, **kwargs):
)

return f(*args, **kwargs)

except Exception:
exc_info = sys.exc_info()
with capture_internal_exceptions():
Expand Down
82 changes: 49 additions & 33 deletions tests/integrations/celery/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import pytest
from celery import Celery, VERSION
from celery.bin import worker
from celery.app.task import Task
from opentelemetry import trace as otel_trace, context

import sentry_sdk
from sentry_sdk import start_span, get_current_span
from sentry_sdk import get_current_span
from sentry_sdk.integrations.celery import (
CeleryIntegration,
_wrap_task_run,
Expand Down Expand Up @@ -126,14 +128,14 @@ def dummy_task(x, y):
foo = 42 # noqa
return x / y

with start_span(op="unit test transaction") as transaction:
with sentry_sdk.start_span(op="unit test transaction") as root_span:
celery_invocation(dummy_task, 1, 2)
_, expected_context = celery_invocation(dummy_task, 1, 0)

(_, error_event, _, _) = events

assert error_event["contexts"]["trace"]["trace_id"] == transaction.trace_id
assert error_event["contexts"]["trace"]["span_id"] != transaction.span_id
assert error_event["contexts"]["trace"]["trace_id"] == root_span.trace_id
assert error_event["contexts"]["trace"]["span_id"] != root_span.span_id
assert error_event["transaction"] == "dummy_task"
assert "celery_task_id" in error_event["tags"]
assert error_event["extra"]["celery-job"] == dict(
Expand Down Expand Up @@ -190,17 +192,14 @@ def test_transaction_events(capture_events, init_celery, celery_invocation, task
def dummy_task(x, y):
return x / y

# XXX: For some reason the first call does not get instrumented properly.
celery_invocation(dummy_task, 1, 1)

events = capture_events()

with start_span(name="submission") as transaction:
with sentry_sdk.start_span(name="submission") as root_span:
celery_invocation(dummy_task, 1, 0 if task_fails else 1)

if task_fails:
error_event = events.pop(0)
assert error_event["contexts"]["trace"]["trace_id"] == transaction.trace_id
assert error_event["contexts"]["trace"]["trace_id"] == root_span.trace_id
assert error_event["exception"]["values"][0]["type"] == "ZeroDivisionError"

execution_event, submission_event = events
Expand All @@ -211,24 +210,21 @@ def dummy_task(x, y):
assert submission_event["transaction_info"] == {"source": "custom"}

assert execution_event["type"] == submission_event["type"] == "transaction"
assert execution_event["contexts"]["trace"]["trace_id"] == transaction.trace_id
assert submission_event["contexts"]["trace"]["trace_id"] == transaction.trace_id
assert execution_event["contexts"]["trace"]["trace_id"] == root_span.trace_id
assert submission_event["contexts"]["trace"]["trace_id"] == root_span.trace_id

if task_fails:
assert execution_event["contexts"]["trace"]["status"] == "internal_error"
else:
assert execution_event["contexts"]["trace"]["status"] == "ok"

assert len(execution_event["spans"]) == 1
assert (
execution_event["spans"][0].items()
>= {
"trace_id": str(transaction.trace_id),
"same_process_as_parent": True,
assert execution_event["spans"][0] == ApproxDict(
{
"trace_id": str(root_span.trace_id),
"op": "queue.process",
"description": "dummy_task",
"data": ApproxDict(),
}.items()
}
)
assert submission_event["spans"] == [
{
Expand All @@ -237,11 +233,14 @@ def dummy_task(x, y):
"op": "queue.submit.celery",
"origin": "auto.queue.celery",
"parent_span_id": submission_event["contexts"]["trace"]["span_id"],
"same_process_as_parent": True,
"span_id": submission_event["spans"][0]["span_id"],
"start_timestamp": submission_event["spans"][0]["start_timestamp"],
"timestamp": submission_event["spans"][0]["timestamp"],
"trace_id": str(transaction.trace_id),
"trace_id": str(root_span.trace_id),
"status": "ok",
"tags": {
"status": "ok",
},
}
]

Expand Down Expand Up @@ -275,7 +274,7 @@ def test_simple_no_propagation(capture_events, init_celery):
def dummy_task():
1 / 0

with start_span(name="task") as root_span:
with sentry_sdk.start_span(name="task") as root_span:
dummy_task.delay()

(event,) = events
Expand Down Expand Up @@ -350,7 +349,7 @@ def dummy_task(self):
runs.append(1)
1 / 0

with start_span(name="submit_celery"):
with sentry_sdk.start_span(name="submit_celery"):
# Curious: Cannot use delay() here or py2.7-celery-4.2 crashes
res = dummy_task.apply_async()

Expand Down Expand Up @@ -445,7 +444,7 @@ def walk_dogs(x, y):
walk_dogs, [["Maisey", "Charlie", "Bodhi", "Cory"], "Dog park round trip"], 1
)

sampling_context = traces_sampler.call_args_list[1][0][0]
sampling_context = traces_sampler.call_args_list[0][0][0]
assert sampling_context["celery.job.task"] == "dog_walk"
for i, arg in enumerate(args_kwargs["args"]):
assert sampling_context[f"celery.job.args.{i}"] == str(arg)
Expand All @@ -469,7 +468,7 @@ def __call__(self, *args, **kwargs):
def dummy_task(x, y):
return x / y

with start_span(name="celery"):
with sentry_sdk.start_span(name="celery"):
celery_invocation(dummy_task, 1, 0)

assert not events
Expand Down Expand Up @@ -510,7 +509,7 @@ def test_baggage_propagation(init_celery):
def dummy_task(self, x, y):
return _get_headers(self)

with start_span(name="task") as root_span:
with sentry_sdk.start_span(name="task") as root_span:
result = dummy_task.apply_async(
args=(1, 0),
headers={"baggage": "custom=value"},
Expand All @@ -520,6 +519,7 @@ def dummy_task(self, x, y):
[
"sentry-release=abcdef",
"sentry-trace_id={}".format(root_span.trace_id),
"sentry-transaction=task",
"sentry-environment=production",
"sentry-sample_rate=1.0",
"sentry-sampled=true",
Expand All @@ -537,26 +537,42 @@ def test_sentry_propagate_traces_override(init_celery):
propagate_traces=True, traces_sample_rate=1.0, release="abcdef"
)

# Since we're applying the task inline eagerly,
# we need to cleanup the otel context for this test.
# and since we patch build_tracer, we need to do this before that runs...
# TODO: the right way is to not test this inline
original_apply = Task.apply

def cleaned_apply(*args, **kwargs):
token = context.attach(otel_trace.set_span_in_context(otel_trace.INVALID_SPAN))
rv = original_apply(*args, **kwargs)
context.detach(token)
return rv

Task.apply = cleaned_apply

@celery.task(name="dummy_task", bind=True)
def dummy_task(self, message):
trace_id = get_current_span().trace_id
return trace_id

with start_span(name="task") as root_span:
transaction_trace_id = root_span.trace_id
with sentry_sdk.start_span(name="task") as root_span:
root_span_trace_id = root_span.trace_id

# should propagate trace
task_transaction_id = dummy_task.apply_async(
task_trace_id = dummy_task.apply_async(
args=("some message",),
).get()
assert transaction_trace_id == task_transaction_id
assert root_span_trace_id == task_trace_id, "Trace should be propagated"

# should NOT propagate trace (overrides `propagate_traces` parameter in integration constructor)
task_transaction_id = dummy_task.apply_async(
task_trace_id = dummy_task.apply_async(
args=("another message",),
headers={"sentry-propagate-traces": False},
).get()
assert transaction_trace_id != task_transaction_id
assert root_span_trace_id != task_trace_id, "Trace should NOT be propagated"

Task.apply = original_apply


def test_apply_async_manually_span(sentry_init):
Expand Down Expand Up @@ -710,7 +726,7 @@ def publish(*args, **kwargs):
@celery.task()
def task(): ...

with start_span(name="task"):
with sentry_sdk.start_span(name="task"):
task.apply_async()

(event,) = events
Expand Down Expand Up @@ -773,7 +789,7 @@ def publish(*args, **kwargs):
@celery.task()
def task(): ...

with start_span(name="custom_transaction"):
with sentry_sdk.start_span(name="custom_transaction"):
task.apply_async()

(event,) = events
Expand Down
Loading