diff --git a/app/aws/xray_celery_handlers.py b/app/aws/xray_celery_handlers.py new file mode 100644 index 0000000000..a239a365b2 --- /dev/null +++ b/app/aws/xray_celery_handlers.py @@ -0,0 +1,64 @@ +import logging + +from aws_xray_sdk.core import xray_recorder +from aws_xray_sdk.core.utils import stacktrace +from aws_xray_sdk.ext.util import construct_xray_header, inject_trace_header + +__all__ = ( + "xray_after_task_publish", + "xray_before_task_publish", + "xray_task_failure", + "xray_task_postrun", + "xray_task_prerun", +) + +logger = logging.getLogger("celery_aws_xray_sdk_extension") + +CELERY_NAMESPACE = "celery" + + +def xray_before_task_publish(sender=None, headers=None, **kwargs): + logger.info(f"xray-celery: before publish: sender: {sender} headers: {headers}") + headers = headers if headers else {} + task_id = headers.get("id") + + subsegment = xray_recorder.begin_subsegment(name=sender, namespace="remote") + if not subsegment: + logger.error("Failed to create a X-Ray subsegment on task publish", extra={"celery": {"task_id": task_id}}) + return + + subsegment.put_metadata("task_id", task_id, namespace=CELERY_NAMESPACE) + inject_trace_header(headers, subsegment) + + +def xray_after_task_publish(**kwargs): + logger.info(f"xray-celery: after publish: {kwargs}") + xray_recorder.end_subsegment() + + +def xray_task_prerun(task_id=None, task=None, **kwargs): + logger.info(f"xray-celery: prerun: {task_id} {task}") + xray_header = construct_xray_header(task.request) + segment = xray_recorder.begin_segment(name=task.name, traceid=xray_header.root, parent_id=xray_header.parent) + segment.save_origin_trace_header(xray_header) + segment.put_annotation("routing_key", task.request.properties["delivery_info"]["routing_key"]) + segment.put_annotation("task_name", task.name) + segment.put_metadata("task_id", task_id, namespace=CELERY_NAMESPACE) + + +def xray_task_postrun(**kwargs): + logger.info(f"xray-celery: postrun: {kwargs}") + xray_recorder.end_segment() + + +def xray_task_failure(exception=None, **kwargs): + segment = xray_recorder.current_segment() + if not segment: + logger.error( + "Failed to get the current X-Ray segment on task failure", extra={"celery": {"task_id": kwargs.get("task_id")}} + ) + return + + if exception: + stack = stacktrace.get_stacktrace(limit=xray_recorder._max_trace_back) + segment.add_exception(exception, stack) diff --git a/app/celery/celery.py b/app/celery/celery.py index be5a5593aa..17f39476d0 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -2,7 +2,14 @@ from flask import current_app -from celery import Celery, Task +from app.aws.xray_celery_handlers import ( + xray_after_task_publish, + xray_before_task_publish, + xray_task_failure, + xray_task_postrun, + xray_task_prerun, +) +from celery import Celery, Task, signals from celery.signals import worker_process_shutdown @@ -42,6 +49,13 @@ def init_app(self, app): task_cls=make_task(app), ) + # Register the xray handlers + signals.after_task_publish.connect(xray_after_task_publish) + signals.before_task_publish.connect(xray_before_task_publish) + signals.task_failure.connect(xray_task_failure) + signals.task_postrun.connect(xray_task_postrun) + signals.task_prerun.connect(xray_task_prerun) + # See https://docs.celeryproject.org/en/stable/userguide/configuration.html self.conf.update( { diff --git a/poetry.lock b/poetry.lock index 3f44be0bab..804f05bd27 100644 --- a/poetry.lock +++ b/poetry.lock @@ -598,20 +598,6 @@ yaml = ["PyYAML (>=3.10)"] zookeeper = ["kazoo (>=1.3.1)"] zstd = ["zstandard (==0.22.0)"] -[[package]] -name = "celery-aws-xray-sdk-extension" -version = "0.1.2" -description = "Extension for AWS X-Ray SDK which enables tracing of Celery tasks" -optional = false -python-versions = ">=3.6" -files = [ - {file = "celery-aws-xray-sdk-extension-0.1.2.tar.gz", hash = "sha256:81e8a21259560074cb9dbfd54bf5d47b2d1ba0c64a54d959dd1d960e731186a5"}, -] - -[package.dependencies] -aws-xray-sdk = ">=2.9,<3" -celery = ">=5,<6" - [[package]] name = "certifi" version = "2023.11.17" @@ -4392,4 +4378,4 @@ testing = ["coverage (>=5.0.3)", "zope.event", "zope.testing"] [metadata] lock-version = "2.0" python-versions = "~3.10.9" -content-hash = "5b6f7a813ca4930df646a92420fe1c43eba25511c968c4ae646e5c27cfc6722a" +content-hash = "2ee1dec4988da93430da6559224721e2a55512e6613ba877af6d91eea9e23849" diff --git a/pyproject.toml b/pyproject.toml index 4bd96613bb..bd28e57131 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,8 +76,7 @@ simple-salesforce = "^1.12.3" certifi = "^2023.7.22" # pinned for security reasons: https://github.com/cds-snc/notification-api/security/dependabot/119 idna = "2.10" # pinned to align with test moto dependency requirements (for <=2.9) flask-marshmallow = "0.14.0" -aws-xray-sdk = "^2.14.0" -celery-aws-xray-sdk-extension = "^0.1.2" +aws-xray-sdk = "2.14.0" [tool.poetry.group.test.dependencies] flake8 = "6.1.0"