-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SIANXSVC-826: Added direct-reply result backend #5
base: main
Are you sure you want to change the base?
Conversation
cb210b8
to
75088c5
Compare
Fixed a bug where messages were received in the wrong order and added a unit-test for it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've absolutely no clue if this is a good idea as I don't completely understand how celery backends work. It has potential as at least during my short test I can see ~50% performance improvements with 1 worker process.
What needs to be fixed is that, right now, this only works when no concurrent executions happen. I marked the lines were I got the greenlet
exceptions but there might be more. It seems we need to lock this somehow but I'm not sure how and especially not how to achieve this within a Celery result backend.
celery_amqp_backend/backend.py
Outdated
consumer.consume() | ||
|
||
try: | ||
consumer.connection.drain_events(timeout=0.5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This call fails when multiple concurrent executions happen.
Exception: ConcurrentObjectUseError
Message: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent._gevent_c_waiter.Waiter object at 0x7fb6c0c1ebf0>>
Traceback:
File \"/usr/local/lib/python3.7/site-packages/celery/result.py\", line 231, in get
on_message=on_message,
File \"/usr/local/lib/python3.7/site-packages/celery/backends/base.py\", line 760, in wait_for_pending
no_ack=no_ack,
File \"/app/app/celery_amqp_backend/backend.py\", line 471, in wait_for
on_interval=on_interval
File \"/usr/local/lib/python3.7/site-packages/celery/backends/base.py\", line 783, in wait_for
meta = self.get_task_meta(task_id)
File \"/app/app/celery_amqp_backend/backend.py\", line 510, in get_task_meta
consumer.connection.drain_events(timeout=0.5)
File \"/usr/local/lib/python3.7/site-packages/kombu/connection.py\", line 316, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File \"/usr/local/lib/python3.7/site-packages/kombu/transport/pyamqp.py\", line 169, in drain_events
return connection.drain_events(**kwargs)
File \"/usr/local/lib/python3.7/site-packages/amqp/connection.py\", line 525, in drain_events
while not self.blocking_read(timeout):
File \"/usr/local/lib/python3.7/site-packages/amqp/connection.py\", line 530, in blocking_read
frame = self.transport.read_frame()
File \"/usr/local/lib/python3.7/site-packages/amqp/transport.py\", line 294, in read_frame
frame_header = read(7, True)
File \"/usr/local/lib/python3.7/site-packages/amqp/transport.py\", line 627, in _read
s = recv(n - len(rbuf))
File \"/usr/local/lib/python3.7/site-packages/gevent/_socketcommon.py\", line 663, in recv
self._wait(self._read_event)
File \"src/gevent/_hub_primitives.py\", line 317, in gevent._gevent_c_hub_primitives.wait_on_socket
File \"src/gevent/_hub_primitives.py\", line 322, in gevent._gevent_c_hub_primitives.wait_on_socket
File \"src/gevent/_hub_primitives.py\", line 297, in gevent._gevent_c_hub_primitives._primitive_wait
celery_amqp_backend/backend.py
Outdated
consumer = kombu.Consumer( | ||
channel, | ||
queues=[consumer_queue], | ||
auto_declare=True, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This call fails when multiple concurrent executions happen.
Exception: ConcurrentObjectUseError
Message: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent._gevent_c_waiter.Waiter object at 0x7fb6c0c1ebf0>>
Traceback:
File \"/usr/local/lib/python3.7/site-packages/celery/app/base.py\", line 787, in send_task
self.backend.on_task_call(P, task_id)
File \"/app/app/celery_amqp_backend/backend.py\", line 576, in on_task_call
producer.channel,
File \"/app/app/celery_amqp_backend/backend.py\", line 590, in _create_consumer
auto_declare=True,
File \"/usr/local/lib/python3.7/site-packages/kombu/messaging.py\", line 387, in __init__
self.revive(self.channel)
File \"/usr/local/lib/python3.7/site-packages/kombu/messaging.py\", line 409, in revive
self.declare()
File \"/usr/local/lib/python3.7/site-packages/kombu/messaging.py\", line 422, in declare
queue.declare()
File \"/usr/local/lib/python3.7/site-packages/kombu/entity.py\", line 606, in declare
self._create_queue(nowait=nowait, channel=channel)
File \"/usr/local/lib/python3.7/site-packages/kombu/entity.py\", line 615, in _create_queue
self.queue_declare(nowait=nowait, passive=False, channel=channel)
File \"/usr/local/lib/python3.7/site-packages/kombu/entity.py\", line 650, in queue_declare
nowait=nowait,
File \"/usr/local/lib/python3.7/site-packages/amqp/channel.py\", line 1163, in queue_declare
spec.Queue.DeclareOk, returns_tuple=True,
File \"/usr/local/lib/python3.7/site-packages/amqp/abstract_channel.py\", line 99, in wait
self.connection.drain_events(timeout=timeout)
File \"/usr/local/lib/python3.7/site-packages/amqp/connection.py\", line 525, in drain_events
while not self.blocking_read(timeout):
File \"/usr/local/lib/python3.7/site-packages/amqp/connection.py\", line 530, in blocking_read
frame = self.transport.read_frame()
File \"/usr/local/lib/python3.7/site-packages/amqp/transport.py\", line 294, in read_frame
frame_header = read(7, True)
File \"/usr/local/lib/python3.7/site-packages/amqp/transport.py\", line 627, in _read
s = recv(n - len(rbuf))
File \"/usr/local/lib/python3.7/site-packages/gevent/_socketcommon.py\", line 663, in recv
self._wait(self._read_event)
File \"src/gevent/_hub_primitives.py\", line 317, in gevent._gevent_c_hub_primitives.wait_on_socket
File \"src/gevent/_hub_primitives.py\", line 322, in gevent._gevent_c_hub_primitives.wait_on_socket
File \"src/gevent/_hub_primitives.py\", line 297, in gevent._gevent_c_hub_primitives._primitive_wait
1f7ff5e
to
3eb7708
Compare
3eb7708
to
51a2803
Compare
Codecov ReportBase: 86.66% // Head: 88.60% // Increases project coverage by
Additional details and impacted files@@ Coverage Diff @@
## main #5 +/- ##
==========================================
+ Coverage 86.66% 88.60% +1.93%
==========================================
Files 3 3
Lines 150 272 +122
==========================================
+ Hits 130 241 +111
- Misses 20 31 +11
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
Implemented changes:
DirectReplyAMQPBackend
that uses RabbitMQ direct-reply for resultsor groups)