From 3bfd6b0bd73526797158c85aefd03364b6dbbbc6 Mon Sep 17 00:00:00 2001 From: Jimmy Royer Date: Fri, 23 Aug 2024 12:18:52 -0400 Subject: [PATCH] Skip subsegment creation if segment is missing in current thread context (#2269) * Reapply "Added back logging signals for beat workers (#2266)" (#2267) This reverts commit 27b522e019667f3ac7814e51a3b8491566f53cfb. * Skip xray subsegment creation if no segment lives in current context * More comments * Better logging * Only close subsegment when one is found in current thread context * Better logging --- app/aws/xray_celery_handlers.py | 59 +++++++++++++++++++++------------ app/celery/celery.py | 1 - 2 files changed, 38 insertions(+), 22 deletions(-) diff --git a/app/aws/xray_celery_handlers.py b/app/aws/xray_celery_handlers.py index a239a365b2..e0c5fb439b 100644 --- a/app/aws/xray_celery_handlers.py +++ b/app/aws/xray_celery_handlers.py @@ -17,27 +17,43 @@ CELERY_NAMESPACE = "celery" -def xray_before_task_publish(sender=None, headers=None, **kwargs): - logger.info(f"xray-celery: before publish: sender: {sender} headers: {headers}") +def xray_before_task_publish( + sender=None, headers=None, exchange=None, routing_key=None, properties=None, declare=None, retry_policy=None, **kwargs +): + logger.info(f"xray-celery: before publish: sender={sender}, headers={headers}, kwargs={kwargs}") headers = headers if headers else {} task_id = headers.get("id") - - subsegment = xray_recorder.begin_subsegment(name=sender, namespace="remote") - if not subsegment: - logger.error("Failed to create a X-Ray subsegment on task publish", extra={"celery": {"task_id": task_id}}) - return - - subsegment.put_metadata("task_id", task_id, namespace=CELERY_NAMESPACE) - inject_trace_header(headers, subsegment) - - -def xray_after_task_publish(**kwargs): - logger.info(f"xray-celery: after publish: {kwargs}") - xray_recorder.end_subsegment() + current_segment = xray_recorder.current_segment() + # Checks if there is a current segment to create a subsegment, + # otherwise we might be in a starter task. The prerun handler will + # create the segment for us down the road as it will be called after. + if current_segment: + subsegment = xray_recorder.begin_subsegment(name=sender, namespace="remote") + if subsegment: + subsegment.put_metadata("task_id", task_id, namespace=CELERY_NAMESPACE) + inject_trace_header(headers, subsegment) + else: + logger.error( + "xray-celery: Failed to create a X-Ray subsegment on task publish", extra={"celery": {"task_id": task_id}} + ) + else: + logger.warn("xray-celery: No parent segment found for task {task_id} when trying to create subsegment", task_id) + + +def xray_after_task_publish(headers=None, body=None, exchange=None, routing_key=None, **kwargs): + logger.info( + f"xray-celery: after publish: headers={headers}, body={body}, exchange={exchange}, routing_key={routing_key}, kwargs={kwargs}" + ) + if xray_recorder.current_subsegment(): + xray_recorder.end_subsegment() + else: + logger.warn( + "xray-celery: Skipping subsegment closing after publish as no subsegment was found: {headers}", headers=headers + ) -def xray_task_prerun(task_id=None, task=None, **kwargs): - logger.info(f"xray-celery: prerun: {task_id} {task}") +def xray_task_prerun(task_id=None, task=None, args=None, **kwargs): + logger.info(f"xray-celery: prerun: task_id={task_id}, task={task}, kwargs={kwargs}") xray_header = construct_xray_header(task.request) segment = xray_recorder.begin_segment(name=task.name, traceid=xray_header.root, parent_id=xray_header.parent) segment.save_origin_trace_header(xray_header) @@ -46,16 +62,17 @@ def xray_task_prerun(task_id=None, task=None, **kwargs): segment.put_metadata("task_id", task_id, namespace=CELERY_NAMESPACE) -def xray_task_postrun(**kwargs): - logger.info(f"xray-celery: postrun: {kwargs}") +def xray_task_postrun(task_id=None, task=None, args=None, **kwargs): + logger.info(f"xray-celery: postrun: kwargs={kwargs}") xray_recorder.end_segment() -def xray_task_failure(exception=None, **kwargs): +def xray_task_failure(task_id=None, exception=None, **kwargs): + logger.info(f"xray-celery: failure: task_id={task_id}, e={exception}, kwargs={kwargs}") segment = xray_recorder.current_segment() if not segment: logger.error( - "Failed to get the current X-Ray segment on task failure", extra={"celery": {"task_id": kwargs.get("task_id")}} + "xray-celery: Failed to get the current segment on task failure", extra={"celery": {"task_id": kwargs.get("task_id")}} ) return diff --git a/app/celery/celery.py b/app/celery/celery.py index 28cd2d3153..17f39476d0 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -55,7 +55,6 @@ def init_app(self, app): signals.task_failure.connect(xray_task_failure) signals.task_postrun.connect(xray_task_postrun) signals.task_prerun.connect(xray_task_prerun) - signals.beat_init.connect(xray_task_prerun) # See https://docs.celeryproject.org/en/stable/userguide/configuration.html self.conf.update(