Skip to content

Commit

Permalink
Celery adjustment (#2250)
Browse files Browse the repository at this point in the history
* Moving xray code to a more centralized location

* moving the signal code to the initialization section

* Added missing imports and applied sort import

* Added custom aws sdk module of ours, forking from sdk celery xray;

* Wire the imports to use our custom sdk aws xray instead of the lib version

* fixing formatting

* fixing poetry references

* lock file

---------

Co-authored-by: Jimmy Royer <[email protected]>
  • Loading branch information
P0NDER0SA and jimleroyer authored Aug 20, 2024
1 parent 3477e85 commit 4ca5ede
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 18 deletions.
64 changes: 64 additions & 0 deletions app/aws/xray_celery_handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import logging

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core.utils import stacktrace
from aws_xray_sdk.ext.util import construct_xray_header, inject_trace_header

__all__ = (
"xray_after_task_publish",
"xray_before_task_publish",
"xray_task_failure",
"xray_task_postrun",
"xray_task_prerun",
)

logger = logging.getLogger("celery_aws_xray_sdk_extension")

CELERY_NAMESPACE = "celery"


def xray_before_task_publish(sender=None, headers=None, **kwargs):
logger.info(f"xray-celery: before publish: sender: {sender} headers: {headers}")
headers = headers if headers else {}
task_id = headers.get("id")

subsegment = xray_recorder.begin_subsegment(name=sender, namespace="remote")
if not subsegment:
logger.error("Failed to create a X-Ray subsegment on task publish", extra={"celery": {"task_id": task_id}})
return

subsegment.put_metadata("task_id", task_id, namespace=CELERY_NAMESPACE)
inject_trace_header(headers, subsegment)


def xray_after_task_publish(**kwargs):
logger.info(f"xray-celery: after publish: {kwargs}")
xray_recorder.end_subsegment()


def xray_task_prerun(task_id=None, task=None, **kwargs):
logger.info(f"xray-celery: prerun: {task_id} {task}")
xray_header = construct_xray_header(task.request)
segment = xray_recorder.begin_segment(name=task.name, traceid=xray_header.root, parent_id=xray_header.parent)
segment.save_origin_trace_header(xray_header)
segment.put_annotation("routing_key", task.request.properties["delivery_info"]["routing_key"])
segment.put_annotation("task_name", task.name)
segment.put_metadata("task_id", task_id, namespace=CELERY_NAMESPACE)


def xray_task_postrun(**kwargs):
logger.info(f"xray-celery: postrun: {kwargs}")
xray_recorder.end_segment()


def xray_task_failure(exception=None, **kwargs):
segment = xray_recorder.current_segment()
if not segment:
logger.error(
"Failed to get the current X-Ray segment on task failure", extra={"celery": {"task_id": kwargs.get("task_id")}}
)
return

if exception:
stack = stacktrace.get_stacktrace(limit=xray_recorder._max_trace_back)
segment.add_exception(exception, stack)
16 changes: 15 additions & 1 deletion app/celery/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@

from flask import current_app

from celery import Celery, Task
from app.aws.xray_celery_handlers import (
xray_after_task_publish,
xray_before_task_publish,
xray_task_failure,
xray_task_postrun,
xray_task_prerun,
)
from celery import Celery, Task, signals
from celery.signals import worker_process_shutdown


Expand Down Expand Up @@ -42,6 +49,13 @@ def init_app(self, app):
task_cls=make_task(app),
)

# Register the xray handlers
signals.after_task_publish.connect(xray_after_task_publish)
signals.before_task_publish.connect(xray_before_task_publish)
signals.task_failure.connect(xray_task_failure)
signals.task_postrun.connect(xray_task_postrun)
signals.task_prerun.connect(xray_task_prerun)

# See https://docs.celeryproject.org/en/stable/userguide/configuration.html
self.conf.update(
{
Expand Down
16 changes: 1 addition & 15 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ simple-salesforce = "^1.12.3"
certifi = "^2023.7.22" # pinned for security reasons: https://github.com/cds-snc/notification-api/security/dependabot/119
idna = "2.10" # pinned to align with test moto dependency requirements (for <=2.9)
flask-marshmallow = "0.14.0"
aws-xray-sdk = "^2.14.0"
celery-aws-xray-sdk-extension = "^0.1.2"
aws-xray-sdk = "2.14.0"

[tool.poetry.group.test.dependencies]
flake8 = "6.1.0"
Expand Down

0 comments on commit 4ca5ede

Please sign in to comment.