Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SIANXSVC-826: Added direct-reply result backend #5

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

beachmachine
Copy link
Collaborator

@beachmachine beachmachine commented Oct 21, 2022

Implemented changes:

  • Added DirectReplyAMQPBackend that uses RabbitMQ direct-reply for results
  • Adjusted unit-tests for new backend (it does not support chords or groups)

@beachmachine beachmachine force-pushed the astocker/direct-reply-backend branch 3 times, most recently from cb210b8 to 75088c5 Compare October 25, 2022 11:21
@beachmachine
Copy link
Collaborator Author

Fixed a bug where messages were received in the wrong order and added a unit-test for it.

Copy link
Member

@PatrickTaibel PatrickTaibel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've absolutely no clue if this is a good idea as I don't completely understand how celery backends work. It has potential as at least during my short test I can see ~50% performance improvements with 1 worker process.

What needs to be fixed is that, right now, this only works when no concurrent executions happen. I marked the lines were I got the greenlet exceptions but there might be more. It seems we need to lock this somehow but I'm not sure how and especially not how to achieve this within a Celery result backend.

celery_amqp_backend/backend.py Outdated Show resolved Hide resolved
consumer.consume()

try:
consumer.connection.drain_events(timeout=0.5)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call fails when multiple concurrent executions happen.
Exception: ConcurrentObjectUseError
Message: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent._gevent_c_waiter.Waiter object at 0x7fb6c0c1ebf0>>
Traceback:

     File \"/usr/local/lib/python3.7/site-packages/celery/result.py\", line 231, in get
    on_message=on_message,
  File \"/usr/local/lib/python3.7/site-packages/celery/backends/base.py\", line 760, in wait_for_pending
    no_ack=no_ack,
  File \"/app/app/celery_amqp_backend/backend.py\", line 471, in wait_for
    on_interval=on_interval
  File \"/usr/local/lib/python3.7/site-packages/celery/backends/base.py\", line 783, in wait_for
    meta = self.get_task_meta(task_id)
  File \"/app/app/celery_amqp_backend/backend.py\", line 510, in get_task_meta
    consumer.connection.drain_events(timeout=0.5)
  File \"/usr/local/lib/python3.7/site-packages/kombu/connection.py\", line 316, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
  File \"/usr/local/lib/python3.7/site-packages/kombu/transport/pyamqp.py\", line 169, in drain_events
    return connection.drain_events(**kwargs)
  File \"/usr/local/lib/python3.7/site-packages/amqp/connection.py\", line 525, in drain_events
    while not self.blocking_read(timeout):
  File \"/usr/local/lib/python3.7/site-packages/amqp/connection.py\", line 530, in blocking_read
    frame = self.transport.read_frame()
  File \"/usr/local/lib/python3.7/site-packages/amqp/transport.py\", line 294, in read_frame
    frame_header = read(7, True)
  File \"/usr/local/lib/python3.7/site-packages/amqp/transport.py\", line 627, in _read
    s = recv(n - len(rbuf))
  File \"/usr/local/lib/python3.7/site-packages/gevent/_socketcommon.py\", line 663, in recv
    self._wait(self._read_event)
  File \"src/gevent/_hub_primitives.py\", line 317, in gevent._gevent_c_hub_primitives.wait_on_socket
  File \"src/gevent/_hub_primitives.py\", line 322, in gevent._gevent_c_hub_primitives.wait_on_socket
  File \"src/gevent/_hub_primitives.py\", line 297, in gevent._gevent_c_hub_primitives._primitive_wait

Comment on lines 587 to 591
consumer = kombu.Consumer(
channel,
queues=[consumer_queue],
auto_declare=True,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call fails when multiple concurrent executions happen.
Exception: ConcurrentObjectUseError
Message: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent._gevent_c_waiter.Waiter object at 0x7fb6c0c1ebf0>>
Traceback:

  File \"/usr/local/lib/python3.7/site-packages/celery/app/base.py\", line 787, in send_task
    self.backend.on_task_call(P, task_id)
  File \"/app/app/celery_amqp_backend/backend.py\", line 576, in on_task_call
    producer.channel,
  File \"/app/app/celery_amqp_backend/backend.py\", line 590, in _create_consumer
    auto_declare=True,
  File \"/usr/local/lib/python3.7/site-packages/kombu/messaging.py\", line 387, in __init__
    self.revive(self.channel)
  File \"/usr/local/lib/python3.7/site-packages/kombu/messaging.py\", line 409, in revive
    self.declare()
  File \"/usr/local/lib/python3.7/site-packages/kombu/messaging.py\", line 422, in declare
    queue.declare()
  File \"/usr/local/lib/python3.7/site-packages/kombu/entity.py\", line 606, in declare
    self._create_queue(nowait=nowait, channel=channel)
  File \"/usr/local/lib/python3.7/site-packages/kombu/entity.py\", line 615, in _create_queue
    self.queue_declare(nowait=nowait, passive=False, channel=channel)
  File \"/usr/local/lib/python3.7/site-packages/kombu/entity.py\", line 650, in queue_declare
    nowait=nowait,
  File \"/usr/local/lib/python3.7/site-packages/amqp/channel.py\", line 1163, in queue_declare
    spec.Queue.DeclareOk, returns_tuple=True,
  File \"/usr/local/lib/python3.7/site-packages/amqp/abstract_channel.py\", line 99, in wait
    self.connection.drain_events(timeout=timeout)
  File \"/usr/local/lib/python3.7/site-packages/amqp/connection.py\", line 525, in drain_events
    while not self.blocking_read(timeout):
  File \"/usr/local/lib/python3.7/site-packages/amqp/connection.py\", line 530, in blocking_read
    frame = self.transport.read_frame()
  File \"/usr/local/lib/python3.7/site-packages/amqp/transport.py\", line 294, in read_frame
    frame_header = read(7, True)
  File \"/usr/local/lib/python3.7/site-packages/amqp/transport.py\", line 627, in _read
    s = recv(n - len(rbuf))
  File \"/usr/local/lib/python3.7/site-packages/gevent/_socketcommon.py\", line 663, in recv
    self._wait(self._read_event)
  File \"src/gevent/_hub_primitives.py\", line 317, in gevent._gevent_c_hub_primitives.wait_on_socket
  File \"src/gevent/_hub_primitives.py\", line 322, in gevent._gevent_c_hub_primitives.wait_on_socket
  File \"src/gevent/_hub_primitives.py\", line 297, in gevent._gevent_c_hub_primitives._primitive_wait

@beachmachine beachmachine force-pushed the astocker/direct-reply-backend branch 3 times, most recently from 1f7ff5e to 3eb7708 Compare November 30, 2022 17:34
@beachmachine beachmachine force-pushed the astocker/direct-reply-backend branch from 3eb7708 to 51a2803 Compare November 30, 2022 18:06
@codecov-commenter
Copy link

Codecov Report

Base: 86.66% // Head: 88.60% // Increases project coverage by +1.93% 🎉

Coverage data is based on head (51a2803) compared to base (c617b06).
Patch coverage: 90.32% of modified lines in pull request are covered.

Additional details and impacted files
@@            Coverage Diff             @@
##             main       #5      +/-   ##
==========================================
+ Coverage   86.66%   88.60%   +1.93%     
==========================================
  Files           3        3              
  Lines         150      272     +122     
==========================================
+ Hits          130      241     +111     
- Misses         20       31      +11     
Impacted Files Coverage Δ
celery_amqp_backend/backend.py 87.98% <90.24%> (+2.69%) ⬆️
celery_amqp_backend/__init__.py 100.00% <100.00%> (ø)

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report at Codecov.
📢 Do you have feedback about the report comment? Let us know in this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants