Skip to content

Commit

Permalink
Skip subsegment creation if segment is missing in current thread cont…
Browse files Browse the repository at this point in the history
…ext (#2269)

* Reapply "Added back logging signals for beat workers (#2266)" (#2267)

This reverts commit 27b522e.

* Skip xray subsegment creation if no segment lives in current context

* More comments

* Better logging

* Only close subsegment when one is found in current thread context

* Better logging
  • Loading branch information
jimleroyer authored Aug 23, 2024
1 parent 5fac480 commit 3bfd6b0
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 22 deletions.
59 changes: 38 additions & 21 deletions app/aws/xray_celery_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,43 @@
CELERY_NAMESPACE = "celery"


def xray_before_task_publish(sender=None, headers=None, **kwargs):
logger.info(f"xray-celery: before publish: sender: {sender} headers: {headers}")
def xray_before_task_publish(
sender=None, headers=None, exchange=None, routing_key=None, properties=None, declare=None, retry_policy=None, **kwargs
):
logger.info(f"xray-celery: before publish: sender={sender}, headers={headers}, kwargs={kwargs}")
headers = headers if headers else {}
task_id = headers.get("id")

subsegment = xray_recorder.begin_subsegment(name=sender, namespace="remote")
if not subsegment:
logger.error("Failed to create a X-Ray subsegment on task publish", extra={"celery": {"task_id": task_id}})
return

subsegment.put_metadata("task_id", task_id, namespace=CELERY_NAMESPACE)
inject_trace_header(headers, subsegment)


def xray_after_task_publish(**kwargs):
logger.info(f"xray-celery: after publish: {kwargs}")
xray_recorder.end_subsegment()
current_segment = xray_recorder.current_segment()
# Checks if there is a current segment to create a subsegment,
# otherwise we might be in a starter task. The prerun handler will
# create the segment for us down the road as it will be called after.
if current_segment:
subsegment = xray_recorder.begin_subsegment(name=sender, namespace="remote")
if subsegment:
subsegment.put_metadata("task_id", task_id, namespace=CELERY_NAMESPACE)
inject_trace_header(headers, subsegment)
else:
logger.error(
"xray-celery: Failed to create a X-Ray subsegment on task publish", extra={"celery": {"task_id": task_id}}
)
else:
logger.warn("xray-celery: No parent segment found for task {task_id} when trying to create subsegment", task_id)


def xray_after_task_publish(headers=None, body=None, exchange=None, routing_key=None, **kwargs):
logger.info(
f"xray-celery: after publish: headers={headers}, body={body}, exchange={exchange}, routing_key={routing_key}, kwargs={kwargs}"
)
if xray_recorder.current_subsegment():
xray_recorder.end_subsegment()
else:
logger.warn(
"xray-celery: Skipping subsegment closing after publish as no subsegment was found: {headers}", headers=headers
)


def xray_task_prerun(task_id=None, task=None, **kwargs):
logger.info(f"xray-celery: prerun: {task_id} {task}")
def xray_task_prerun(task_id=None, task=None, args=None, **kwargs):
logger.info(f"xray-celery: prerun: task_id={task_id}, task={task}, kwargs={kwargs}")
xray_header = construct_xray_header(task.request)
segment = xray_recorder.begin_segment(name=task.name, traceid=xray_header.root, parent_id=xray_header.parent)
segment.save_origin_trace_header(xray_header)
Expand All @@ -46,16 +62,17 @@ def xray_task_prerun(task_id=None, task=None, **kwargs):
segment.put_metadata("task_id", task_id, namespace=CELERY_NAMESPACE)


def xray_task_postrun(**kwargs):
logger.info(f"xray-celery: postrun: {kwargs}")
def xray_task_postrun(task_id=None, task=None, args=None, **kwargs):
logger.info(f"xray-celery: postrun: kwargs={kwargs}")
xray_recorder.end_segment()


def xray_task_failure(exception=None, **kwargs):
def xray_task_failure(task_id=None, exception=None, **kwargs):
logger.info(f"xray-celery: failure: task_id={task_id}, e={exception}, kwargs={kwargs}")
segment = xray_recorder.current_segment()
if not segment:
logger.error(
"Failed to get the current X-Ray segment on task failure", extra={"celery": {"task_id": kwargs.get("task_id")}}
"xray-celery: Failed to get the current segment on task failure", extra={"celery": {"task_id": kwargs.get("task_id")}}
)
return

Expand Down
1 change: 0 additions & 1 deletion app/celery/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def init_app(self, app):
signals.task_failure.connect(xray_task_failure)
signals.task_postrun.connect(xray_task_postrun)
signals.task_prerun.connect(xray_task_prerun)
signals.beat_init.connect(xray_task_prerun)

# See https://docs.celeryproject.org/en/stable/userguide/configuration.html
self.conf.update(
Expand Down

0 comments on commit 3bfd6b0

Please sign in to comment.