Skip to content

Commit

Permalink
Use gevent and eventlet wait() functions to remove busy-wait (celery#…
Browse files Browse the repository at this point in the history
…5974)

* Use gevent and eventlet wait() functions to remove busy-wait

Fixes issue celery#4999.

Calling AsyncResult.get() in a gevent context would cause the async Drainer to repeatedly call wait_for until the result was completed.  I've updated the code to have a specific implementation for gevent and eventlet that will cause wait_for to only return every "timeout" # of seconds, rather than repeatedly returning.

Some things I'd like some feedback on:
* Where's the best place to add test coverage for this?  It doesn't look like there are any tests that directly exercised the Drainer yet so I would probably look to add some of these to the backends/ unit tests.
* The way I did this for the Eventlet interface was to rely on the private _exit_event member of the GreenThread instance; to do this without relying on a private member would require some additional changes to the backend Drainer interface so that we could wait for an eventlet-specific event in wait_for().  I can do this, just wanted to get some feedback before.

* Add unit tests for Drainer classes

In order for this to work without monkeypatching in the tests, I needed to call sleep(0) to let the gevent/eventlet greenlets to yield control back to the calling thread.  I also made the check interval configurable in the drainer so that we didn't need to sleep multiples of 1 second in the tests.

* Weaken asserts since they don't pass on CI

* Fix eventlet auto-patching DNS resolver module on import

By default it looks like "import eventlet" imports the greendns module unless the environment EVENTLET_NO_GREENDNS is set to true.  This broke a pymongo test.

* Add tests ensuring that the greenlet loop isn't blocked

These tests make sure that while drain_events_until is running that other gevent/eventlet concurrency can run.

* Clean up tests and make sure they wait for all the threads to stop
  • Loading branch information
tildedave authored Feb 29, 2020
1 parent 21a906b commit 6892beb
Show file tree
Hide file tree
Showing 2 changed files with 234 additions and 16 deletions.
39 changes: 23 additions & 16 deletions celery/backends/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from weakref import WeakKeyDictionary

from kombu.utils.compat import detect_environment
from kombu.utils.objects import cached_property

from celery import states
from celery.exceptions import TimeoutError
Expand Down Expand Up @@ -44,7 +43,7 @@ def start(self):
def stop(self):
pass

def drain_events_until(self, p, timeout=None, on_interval=None, wait=None):
def drain_events_until(self, p, timeout=None, interval=1, on_interval=None, wait=None):
wait = wait or self.result_consumer.drain_events
time_start = monotonic()

Expand All @@ -53,7 +52,7 @@ def drain_events_until(self, p, timeout=None, on_interval=None, wait=None):
if timeout and monotonic() - time_start >= timeout:
raise socket.timeout()
try:
yield self.wait_for(p, wait, timeout=1)
yield self.wait_for(p, wait, timeout=interval)
except socket.timeout:
pass
if on_interval:
Expand Down Expand Up @@ -93,28 +92,36 @@ def stop(self):
self._stopped.set()
self._shutdown.wait(THREAD_TIMEOUT_MAX)

def wait_for(self, p, wait, timeout=None):
self.start()
if not p.ready:
sleep(0)


@register_drainer('eventlet')
class eventletDrainer(greenletDrainer):

@cached_property
def spawn(self):
from eventlet import spawn
return spawn
def spawn(self, func):
from eventlet import spawn, sleep
g = spawn(func)
sleep(0)
return g

def wait_for(self, p, wait, timeout=None):
self.start()
if not p.ready:
self._g._exit_event.wait(timeout=timeout)


@register_drainer('gevent')
class geventDrainer(greenletDrainer):

@cached_property
def spawn(self):
from gevent import spawn
return spawn
def spawn(self, func):
from gevent import spawn, sleep
g = spawn(func)
sleep(0)
return g

def wait_for(self, p, wait, timeout=None):
import gevent
self.start()
if not p.ready:
gevent.wait([self._g], timeout=timeout)


class AsyncBackendMixin(object):
Expand Down
211 changes: 211 additions & 0 deletions t/unit/backends/test_asynchronous.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
import os
import socket
import time
import threading

import pytest
from case import patch, skip, Mock
from vine import promise

from celery.backends.asynchronous import BaseResultConsumer
from celery.backends.base import Backend
from celery.utils import cached_property


@pytest.fixture(autouse=True)
def setup_eventlet():
# By default eventlet will patch the DNS resolver when imported.
os.environ.update(EVENTLET_NO_GREENDNS='yes')


class DrainerTests(object):
"""
Base test class for the Default / Gevent / Eventlet drainers.
"""

interval = 0.1 # Check every tenth of a second
MAX_TIMEOUT = 10 # Specify a max timeout so it doesn't run forever

def get_drainer(self, environment):
with patch('celery.backends.asynchronous.detect_environment') as d:
d.return_value = environment
backend = Backend(self.app)
consumer = BaseResultConsumer(backend, self.app, backend.accept,
pending_results={},
pending_messages={})
consumer.drain_events = Mock(side_effect=self.result_consumer_drain_events)
return consumer.drainer

@pytest.fixture(autouse=True)
def setup_drainer(self):
raise NotImplementedError

@cached_property
def sleep(self):
"""
Sleep on the event loop.
"""
raise NotImplementedError

def schedule_thread(self, thread):
"""
Set up a thread that runs on the event loop.
"""
raise NotImplementedError

def teardown_thread(self, thread):
"""
Wait for a thread to stop.
"""
raise NotImplementedError

def result_consumer_drain_events(self, timeout=None):
"""
Subclasses should override this method to define the behavior of
drainer.result_consumer.drain_events.
"""
raise NotImplementedError

def test_drain_checks_on_interval(self):
p = promise()

def fulfill_promise_thread():
self.sleep(self.interval * 2)
p('done')

fulfill_thread = self.schedule_thread(fulfill_promise_thread)

on_interval = Mock()
for _ in self.drainer.drain_events_until(p,
on_interval=on_interval,
interval=self.interval,
timeout=self.MAX_TIMEOUT):
pass

self.teardown_thread(fulfill_thread)

assert p.ready, 'Should have terminated with promise being ready'
assert on_interval.call_count < 20, 'Should have limited number of calls to on_interval'

def test_drain_does_not_block_event_loop(self):
"""
This test makes sure that other greenlets can still operate while drain_events_until is
running.
"""
p = promise()
liveness_mock = Mock()

def fulfill_promise_thread():
self.sleep(self.interval * 2)
p('done')

def liveness_thread():
while 1:
if p.ready:
return
self.sleep(self.interval / 10)
liveness_mock()

fulfill_thread = self.schedule_thread(fulfill_promise_thread)
liveness_thread = self.schedule_thread(liveness_thread)

on_interval = Mock()
for _ in self.drainer.drain_events_until(p,
on_interval=on_interval,
interval=self.interval,
timeout=self.MAX_TIMEOUT):
pass

self.teardown_thread(fulfill_thread)
self.teardown_thread(liveness_thread)

assert p.ready, 'Should have terminated with promise being ready'
assert on_interval.call_count < liveness_mock.call_count, \
'Should have served liveness_mock while waiting for event'

def test_drain_timeout(self):
p = promise()
on_interval = Mock()

with pytest.raises(socket.timeout):
for _ in self.drainer.drain_events_until(p,
on_interval=on_interval,
interval=self.interval,
timeout=self.interval * 5):
pass

assert not p.ready, 'Promise should remain un-fulfilled'
assert on_interval.call_count < 20, 'Should have limited number of calls to on_interval'


@skip.unless_module('eventlet')
class test_EventletDrainer(DrainerTests):
@pytest.fixture(autouse=True)
def setup_drainer(self):
self.drainer = self.get_drainer('eventlet')

@cached_property
def sleep(self):
from eventlet import sleep
return sleep

def result_consumer_drain_events(self, timeout=None):
import eventlet
eventlet.sleep(0)

def schedule_thread(self, thread):
import eventlet
g = eventlet.spawn(thread)
eventlet.sleep(0)
return g

def teardown_thread(self, thread):
thread.wait()


class test_Drainer(DrainerTests):
@pytest.fixture(autouse=True)
def setup_drainer(self):
self.drainer = self.get_drainer('default')

@cached_property
def sleep(self):
from time import sleep
return sleep

def result_consumer_drain_events(self, timeout=None):
time.sleep(timeout)

def schedule_thread(self, thread):
t = threading.Thread(target=thread)
t.start()
return t

def teardown_thread(self, thread):
thread.join()


@skip.unless_module('gevent')
class test_GeventDrainer(DrainerTests):
@pytest.fixture(autouse=True)
def setup_drainer(self):
self.drainer = self.get_drainer('gevent')

@cached_property
def sleep(self):
from gevent import sleep
return sleep

def result_consumer_drain_events(self, timeout=None):
import gevent
gevent.sleep(0)

def schedule_thread(self, thread):
import gevent
g = gevent.spawn(thread)
gevent.sleep(0)
return g

def teardown_thread(self, thread):
import gevent
gevent.wait([thread])

0 comments on commit 6892beb

Please sign in to comment.