diff --git a/conftest.py b/conftest.py deleted file mode 100644 index d3c6165..0000000 --- a/conftest.py +++ /dev/null @@ -1,83 +0,0 @@ -from __future__ import absolute_import - -# all imports are inline to make sure they happen after eventlet.monkey_patch -# which is called in pytest_load_initial_conftests (calling monkey_patch at -# import time breaks the pytest capturemanager) - -import pytest - - -def pytest_addoption(parser): - parser.addoption( - '--blocking-detection', - action='store_true', - dest='blocking_detection', - default=False, - help='turn on eventlet hub blocking detection') - - parser.addoption( - "--log-level", action="store", - default='DEBUG', - help=("The logging-level for the test run.")) - - parser.addoption( - "--amqp-uri", action="store", dest='AMQP_URI', - default='amqp://guest:guest@localhost:5672/nameko_test', - help=("The AMQP-URI to connect to rabbit with.")) - - parser.addoption( - "--rabbit-ctl-uri", action="store", dest='RABBIT_CTL_URI', - default='http://guest:guest@localhost:15672', - help=("The URI for rabbit's management API.")) - - -def pytest_load_initial_conftests(): - # make sure we monkey_patch before local conftests - import eventlet - eventlet.monkey_patch() - - -def pytest_configure(config): - import logging - import sys - - if config.option.blocking_detection: # pragma: no cover - from eventlet import debug - debug.hub_blocking_detection(True) - - log_level = config.getoption('log_level') - if log_level is not None: - log_level = getattr(logging, log_level) - logging.basicConfig(level=log_level, stream=sys.stderr) - - -@pytest.fixture -def ensure_cleanup_order(request): - """ Ensure ``rabbit_config`` is invoked early if it's used by any fixture - in ``request``. - """ - if "rabbit_config" in request.funcargnames: - request.getfuncargvalue("rabbit_config") - - -@pytest.yield_fixture -def container_factory(ensure_cleanup_order): - from nameko.containers import ServiceContainer - - all_containers = [] - - def make_container(service_cls, config, worker_ctx_cls=None): - container = ServiceContainer(service_cls, config, worker_ctx_cls) - all_containers.append(container) - return container - - yield make_container - - for c in all_containers: - try: - c.stop() - except: # pragma: no cover - pass - - - diff --git a/nameko_sentry.py b/nameko_sentry.py index 06b6b87..f3abc0d 100644 --- a/nameko_sentry.py +++ b/nameko_sentry.py @@ -1,20 +1,50 @@ import logging +from eventlet.queue import Queue from nameko.extensions import DependencyProvider from raven import Client -from raven.transport.eventlet import EventletHTTPTransport class SentryReporter(DependencyProvider): """ Send exceptions generated by entrypoints to a sentry server. """ + _gt = None + queue = None + client = None + + def _run(self): + + while True: + item = self.queue.get() + if item is None: + break + + exc_info, message, extra, data = item + self.client.captureException( + exc_info, message=message, extra=extra, data=data) + + # these will remain in scope until the next iteration and + # can potentially be large, so delete to reclaim the memory now + del exc_info, message, extra, data, item + + def start(self): + self._gt = self.container.spawn_managed_thread( + self._run, protected=True) + + def stop(self): + self.queue.put(None) + + if self._gt is not None: + self._gt.wait() + def setup(self): sentry_config = self.container.config.get('SENTRY') dsn = sentry_config['DSN'] kwargs = sentry_config.get('CLIENT_CONFIG', {}) - self.client = Client(dsn, transport=EventletHTTPTransport, **kwargs) + self.queue = Queue() + self.client = Client(dsn, **kwargs) def worker_result(self, worker_ctx, result, exc_info): if exc_info is None: @@ -51,5 +81,4 @@ def worker_result(self, worker_ctx, result, exc_info): extra = {'exc': exc} - self.client.captureException( - exc_info, message=message, extra=extra, data=data) + self.queue.put((exc_info, message, extra, data)) diff --git a/setup.py b/setup.py index 4581bd6..f4caed5 100644 --- a/setup.py +++ b/setup.py @@ -3,25 +3,22 @@ setup( name='nameko-sentry', - version='0.0.1', + version='0.0.2', description='Nameko extension sends entrypoint exceptions to sentry', author='Matt Bennett', author_email='matt@bennett.name', url='http://github.com/mattbennett/nameko-sentry', py_modules=['nameko_sentry'], install_requires=[ - "nameko>=2.0.0", + "nameko>=2.2.0", "raven>=3.0.0" ], extras_require={ 'dev': [ - "coverage==4.0a1", - "flake8==2.1.0", - "mccabe==0.3", - "pep8==1.6.1", - "pyflakes==0.8.1", - "pylint==1.0.0", - "pytest==2.4.2", + "coverage==4.0.3", + "flake8==2.5.0", + "pylint==1.5.1", + "pytest==2.8.3", ] }, dependency_links=[], diff --git a/test_nameko_sentry.py b/test_nameko_sentry.py index 334fadb..618043b 100644 --- a/test_nameko_sentry.py +++ b/test_nameko_sentry.py @@ -1,15 +1,15 @@ import logging -from mock import Mock, patch, call import pytest - +from eventlet.event import Event +from eventlet.queue import Queue +from mock import Mock, call, patch from nameko.containers import WorkerContext from nameko.extensions import Entrypoint from nameko.testing.services import dummy, entrypoint_hook, entrypoint_waiter from nameko.testing.utils import get_extension -from raven.transport.eventlet import EventletHTTPTransport - from nameko_sentry import SentryReporter +from raven.transport.threaded import ThreadedHTTPTransport class CustomException(Exception): @@ -28,6 +28,21 @@ def config(): } +@pytest.fixture +def service_cls(): + + class Service(object): + name = "service" + + sentry = SentryReporter() + + @dummy + def broken(self): + raise CustomException("Error!") + + return Service + + @pytest.fixture def container(config): return Mock(config=config) @@ -63,20 +78,55 @@ def test_setup(reporter): # client config and DSN applied correctly assert reporter.client.site == "site name" assert reporter.client.get_public_dsn() == "//user@localhost:9000/1" + assert reporter.client.is_enabled() # transport set correctly transport = reporter.client.remote.get_transport() - assert isinstance(transport, EventletHTTPTransport) + assert isinstance(transport, ThreadedHTTPTransport) + + # queue created + assert isinstance(reporter.queue, Queue) + + +def test_setup_without_optional_config(config): + + del config['SENTRY']['CLIENT_CONFIG'] + container = Mock(config=config) + + reporter = SentryReporter().bind(container, "sentry") + reporter.setup() + + # DSN applied correctly + assert reporter.client.get_public_dsn() == "//user@localhost:9000/1" + assert reporter.client.is_enabled() + + # transport set correctly + transport = reporter.client.remote.get_transport() + assert isinstance(transport, ThreadedHTTPTransport) + + # queue created + assert isinstance(reporter.queue, Queue) + + +def test_disabled(config): + config['SENTRY']['DSN'] = None + container = Mock(config=config) + + reporter = SentryReporter().bind(container, "sentry") + reporter.setup() + + # DSN applied correctly + assert reporter.client.get_public_dsn() is None + assert not reporter.client.is_enabled() def test_worker_result(reporter, worker_ctx): result = "OK!" reporter.setup() - with patch.object(reporter, 'client') as client: - reporter.worker_result(worker_ctx, result, None) + reporter.worker_result(worker_ctx, result, None) - assert not client.captureException.called + assert reporter.queue.qsize() == 0 def test_worker_exception(reporter, worker_ctx): @@ -85,50 +135,109 @@ def test_worker_exception(reporter, worker_ctx): exc_info = (CustomException, exc, None) reporter.setup() - with patch.object(reporter, 'client') as client: - reporter.worker_result(worker_ctx, None, exc_info) + reporter.worker_result(worker_ctx, None, exc_info) # generate expected call args logger = "{}.{}".format( worker_ctx.service_name, worker_ctx.entrypoint.method_name) - message = "Unhandled exception in call {}: {} {!r}".format( + expected_message = "Unhandled exception in call {}: {} {!r}".format( worker_ctx.call_id, CustomException.__name__, str(exc) ) - extra = {'exc': exc} + expected_extra = {'exc': exc} if isinstance(exc, worker_ctx.entrypoint.expected_exceptions): loglevel = logging.WARNING else: loglevel = logging.ERROR - data = { + expected_data = { 'logger': logger, 'level': loglevel, - 'message': message, + 'message': expected_message, 'tags': { 'call_id': worker_ctx.call_id, 'parent_call_id': worker_ctx.immediate_parent_call_id } } - # verify call + assert reporter.queue.qsize() == 1 + + _, message, extra, data = reporter.queue.get() + assert message == expected_message + assert extra == expected_extra + assert data == expected_data + + +def test_run(reporter): + + exc = CustomException("Error!") + exc_info = (CustomException, exc, None) + + message = "message" + extra = "extra" + data = "data" + + reporter.setup() + + reporter.queue.put((exc_info, message, extra, data)) + reporter.queue.put(None) + + with patch.object(reporter, 'client') as client: + reporter._run() + assert client.captureException.call_args_list == [ call(exc_info, message=message, extra=extra, data=data) ] -def test_end_to_end(container_factory, config): +def test_start(container_factory, service_cls, config): - class Service(object): - name = "service" + container = container_factory(service_cls, config) + reporter = get_extension(container, SentryReporter) - sentry = SentryReporter() + reporter.setup() - @dummy - def broken(self): - raise CustomException("Error!") + running = Event() + + def run(): + running.send(True) + + with patch.object(reporter, '_run', wraps=run) as patched_run: + reporter.start() + running.wait() + assert patched_run.call_count == 1 + + assert reporter._gt is not None + + +def test_stop(container_factory, service_cls, config): + + container = container_factory(service_cls, config) + reporter = get_extension(container, SentryReporter) + + reporter.setup() + reporter.start() + assert not reporter._gt.dead + reporter.stop() + assert reporter._gt.dead + + # subsequent stop has no adverse effect + reporter.stop() + + +def test_stop_not_started(container_factory, service_cls, config): + + container = container_factory(service_cls, config) + reporter = get_extension(container, SentryReporter) + + reporter.setup() + assert reporter._gt is None + reporter.stop() + + +def test_end_to_end(container_factory, service_cls, config): - container = container_factory(Service, config) + container = container_factory(service_cls, config) container.start() reporter = get_extension(container, SentryReporter)