diff --git a/app/aws/xray_celery_handlers.py b/app/aws/xray_celery_handlers.py index 4dcb301b0b..a239a365b2 100644 --- a/app/aws/xray_celery_handlers.py +++ b/app/aws/xray_celery_handlers.py @@ -17,10 +17,8 @@ CELERY_NAMESPACE = "celery" -def xray_before_task_publish( - sender=None, headers=None, exchange=None, routing_key=None, properties=None, declare=None, retry_policy=None, **kwargs -): - logger.info(f"xray-celery: before publish: sender={sender}, headers={headers}, kwargs={kwargs}") +def xray_before_task_publish(sender=None, headers=None, **kwargs): + logger.info(f"xray-celery: before publish: sender: {sender} headers: {headers}") headers = headers if headers else {} task_id = headers.get("id") @@ -33,15 +31,13 @@ def xray_before_task_publish( inject_trace_header(headers, subsegment) -def xray_after_task_publish(headers=None, body=None, exchange=None, routing_key=None, **kwargs): - logger.info( - f"xray-celery: after publish: headers={headers}, body={body}, exchange={exchange}, routing_key={routing_key}, kwargs={kwargs}" - ) +def xray_after_task_publish(**kwargs): + logger.info(f"xray-celery: after publish: {kwargs}") xray_recorder.end_subsegment() -def xray_task_prerun(task_id=None, task=None, args=None, **kwargs): - logger.info(f"xray-celery: prerun: task_id={task_id}, task={task}, kwargs={kwargs}") +def xray_task_prerun(task_id=None, task=None, **kwargs): + logger.info(f"xray-celery: prerun: {task_id} {task}") xray_header = construct_xray_header(task.request) segment = xray_recorder.begin_segment(name=task.name, traceid=xray_header.root, parent_id=xray_header.parent) segment.save_origin_trace_header(xray_header) @@ -50,13 +46,12 @@ def xray_task_prerun(task_id=None, task=None, args=None, **kwargs): segment.put_metadata("task_id", task_id, namespace=CELERY_NAMESPACE) -def xray_task_postrun(task_id=None, task=None, args=None, **kwargs): - logger.info(f"xray-celery: postrun: kwargs={kwargs}") +def xray_task_postrun(**kwargs): + logger.info(f"xray-celery: postrun: {kwargs}") xray_recorder.end_segment() -def xray_task_failure(task_id=None, exception=None, **kwargs): - logger.info(f"xray-celery: failure: task_id={task_id}, e={exception}, kwargs={kwargs}") +def xray_task_failure(exception=None, **kwargs): segment = xray_recorder.current_segment() if not segment: logger.error( diff --git a/app/celery/celery.py b/app/celery/celery.py index 94e9cee064..2a47530ed4 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -51,16 +51,12 @@ def init_app(self, app): if app.config["AWS_XRAY_ENABLED"]: # Register the xray handlers - def handle_beat_signal(sender=None, **kwargs): - app.logger.info("Beat signal received: sender={}, kwargs={}".format(sender, kwargs)) - signals.after_task_publish.connect(xray_after_task_publish) signals.before_task_publish.connect(xray_before_task_publish) signals.task_failure.connect(xray_task_failure) signals.task_postrun.connect(xray_task_postrun) signals.task_prerun.connect(xray_task_prerun) - signals.beat_init.connect(handle_beat_signal) - signals.beat_embedded_init.connect(handle_beat_signal) + signals.beat_init.connect(xray_task_prerun) # See https://docs.celeryproject.org/en/stable/userguide/configuration.html self.conf.update(