Skip to content

Commit

Permalink
Add ZMQ queue class and task for asynchronous reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
marctc committed May 25, 2016
1 parent 3f727b1 commit bbfe3a3
Show file tree
Hide file tree
Showing 15 changed files with 169 additions and 4 deletions.
4 changes: 4 additions & 0 deletions django_kaneda/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@
# RQ queue settings
RQ_REDIS_URL = getattr(settings, 'KANEDA_RQ_REDIS_URL', 'kaneda')
RQ_QUEUE_NAME = getattr(settings, 'KANEDA_RQ_QUEUE_NAME', None)

# ZMQ queue settings
ZMQ_CONNECTION_URL = getattr(settings, 'KANEDA_ZMQ_CONNECTION_URL', '')
ZMQ_TIMEOUT = getattr(settings, 'KANEDA_ZMQ_TIMEOUT', 300)
8 changes: 8 additions & 0 deletions docs/django.rst
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@ KANEDA_RQ_REDIS_URL (='')
KANEDA_RQ_QUEUE_NAME (='kaneda')
Name of the RQ queue.

ZMQ
---
KANEDA_ZMQ_CONNECTION_URL (='')
ZMQ connection url.

KANEDA_ZMQ_TIMEOUT (=300)
ZMQ socket timeout (milliseconds).

Debug
-----
KANEDA_DEBUG (=True)
Expand Down
24 changes: 24 additions & 0 deletions docs/queues.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,28 @@ To run the worker execute this command::
The default queue is "kaneda".

.. autoclass:: kaneda.queues.RQQueue
:members:

ZMQ
~~~

ZMQ (or ZeroMQ) is a library which extends the standard socket interfaces with features traditionally provided by
specialised messaging middleware products. ZeroMQ sockets provide an abstraction of asynchronous message queues and much
more.

.. note::

Before using ZMQ as async queue you need to install ZMQ library::

pip install pyzmq

To run the worker execute this command::

zmqworker --connection_url=<zmq_connection_url>

or define :ref:`zmq_settings` settings in :file:`kanedasettings.py` and simply execute the worker command with::

zmqworker

.. autoclass:: kaneda.queues.ZMQQueue
:members:
13 changes: 12 additions & 1 deletion docs/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ QUEUE

* :code:`kaneda.backends.CeleryQueue`
* :code:`kaneda.backends.RQQueue`
* :code:`kaneda.backends.ZMQQueue`

Celery
------
Expand All @@ -148,4 +149,14 @@ RQ_REDIS_URL
Redis connection url.

RQ_QUEUE_NAME
Name of the RQ queue.
Name of the RQ queue.

.. _zmq_settings:

ZMQ
---
ZMQ_CONNECTION_URL
ZMQ connection url.

ZMQ_TIMEOUT
ZMQ socket timeout (milliseconds).
1 change: 1 addition & 0 deletions kaneda/queues/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .base import BaseQueue # NOQA
from .celery import CeleryQueue # NOQA
from .rq import RQQueue # NOQA
from .zmq import ZMQQueue # NOQA
39 changes: 39 additions & 0 deletions kaneda/queues/zmq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from __future__ import absolute_import

import logging

try:
import zmq
except ImportError:
zmq = None

from kaneda.exceptions import ImproperlyConfigured

from .base import BaseQueue


class ZMQQueue(BaseQueue):
"""
ZeroMQ queue
:param connection_url: ZMQ connection url (tcp://127.0.0.1:5555).
:param timeout: ZMQ socket timeout (milliseconds).
"""
settings_namespace = 'ZMQ'

def __init__(self, connection_url, timeout=300):
if not zmq:
raise ImproperlyConfigured('You need to install pyzmq to use the ZMQ queue.')
context = zmq.Context()
self.socket = context.socket(zmq.PUSH)
self.socket.SNDTIMEO = timeout
self.socket.bind(connection_url)

def report(self, name, metric, value, tags, id_):
payload = locals()
del payload['self']
try:
return self.socket.send_json(payload)
except Exception as e:
logger = logging.getLogger(__name__)
logger.exception(e)
1 change: 1 addition & 0 deletions kaneda/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .zmq import zmq_task # NOQA
42 changes: 42 additions & 0 deletions kaneda/tasks/zmq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from __future__ import absolute_import

from datetime import datetime

import click
import zmq

from kaneda.exceptions import SettingsError
from kaneda.utils import get_backend, get_settings


@click.command()
@click.option('--connection_url', '-u', help='ZMQ connection url, e.g: tcp://127.0.0.1:5555')
def zmq_task(connection_url):
"""
ZMQ job to report metrics to the configured backend in kanedasettings.py
To run the worker execute this command:
zmqworker --connection_url=<zmq_connection_url>
"""
if not connection_url:
try:
settings = get_settings()
connection_url = settings.ZMQ_CONNECTION_URL
except ImportError:
raise SettingsError("Pass --connection_url option or define ZMQ_CONNECTION_URL on Kaneda settings file "
"before use ZMQ task processor.")
backend = get_backend()
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect(connection_url)
poller = zmq.Poller()
poller.register(socket)
click.secho('Running ZMQ worker - listening at {}.'.format(connection_url), fg='blue')
click.secho('Using {}.'.format(backend.__class__.__name__), fg='blue')
click.echo('\n')
while True:
events = dict(poller.poll(0))
if socket in events:
payload = socket.recv_json()
click.secho('[{}: Received data] {}'.format(datetime.utcnow(), payload), fg='green')
backend.report(**payload)
8 changes: 8 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ def get_email(package):
long_description=codecs.open(os.path.join(os.path.dirname(__file__), 'README.rst'), encoding='utf-8').read(),
author=get_author('kaneda'),
author_email=get_email('kaneda'),
entry_points={
'console_scripts': [
'zmqworker = kaneda.tasks:zmq_task',
],
},
install_requires=[
'click==6.6',
],
classifiers=[
'Intended Audience :: Developers',
'Programming Language :: Python',
Expand Down
10 changes: 10 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ class celery:
CELERY_BROKER = 'redis://localhost:6379/1'
CELERY_QUEUE_NAME = ''

class zmq:
QUEUE = 'kaneda.queues.ZMQQueue'
ZMQ_CONNECTION_URL = 'tcp://127.0.0.1:5555'
ZMQ_TIMEOUT = 300


@pytest.fixture
def kaneda_settings():
Expand Down Expand Up @@ -88,5 +93,10 @@ def rq_settings():
return KanedaSettings.rq


@pytest.fixture
def zmq_settings():
return KanedaSettings.zmq


def pytest_addoption(parser):
parser.addoption("--run-benchmark", action="store_true", help="run benchmark tests")
4 changes: 4 additions & 0 deletions tests/integration/benchmarks/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ def test_benchmark_celery(self, celery_queue, benchmark):
def test_benchmark_rq(self, rq_queue, benchmark):
metrics = Metrics(queue=rq_queue)
benchmark(metrics.gauge, 'benchmark_rq', 1)

def test_benchmark_zmq(self, zmq_queue, benchmark):
metrics = Metrics(queue=zmq_queue)
benchmark(metrics.gauge, 'benchmark_zmq', 1)
7 changes: 6 additions & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from influxdb import InfluxDBClient

from kaneda.backends import ElasticsearchBackend, LoggerBackend, MongoBackend, RethinkBackend, InfluxBackend
from kaneda.queues import CeleryQueue, RQQueue
from kaneda.queues import CeleryQueue, RQQueue, ZMQQueue


@pytest.fixture
Expand Down Expand Up @@ -128,3 +128,8 @@ def celery_queue(celery_settings):
@pytest.fixture
def rq_queue(rq_settings):
return RQQueue(redis_url=rq_settings.RQ_REDIS_URL)


@pytest.fixture
def zmq_queue(zmq_settings):
return ZMQQueue(connection_url=zmq_settings.ZMQ_CONNECTION_URL)
5 changes: 5 additions & 0 deletions tests/integration/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,8 @@ def test_rq(self, rq_queue):
metrics = Metrics(queue=rq_queue)
result = metrics.gauge('test_gauge_rq', 1)
assert result

def test_zmq(self, zmq_queue):
metrics = Metrics(queue=zmq_queue)
metrics.gauge('test_gauge_rq', 1)
zmq_queue.socket.close()
6 changes: 4 additions & 2 deletions tests/unit/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pytest

from kaneda.exceptions import SettingsError, UnexistingKanedaClass
from kaneda.queues import RQQueue, CeleryQueue
from kaneda.queues import RQQueue, CeleryQueue, ZMQQueue
from kaneda.utils import import_class, get_object_from_settings, get_kaneda_objects, get_backend
from kaneda.backends import ElasticsearchBackend, MongoBackend, LoggerBackend, RethinkBackend, InfluxBackend

Expand All @@ -22,7 +22,8 @@ def test_import_backend_class(self, backend_path_module, backend_class):

@pytest.mark.parametrize('queue_path_module, queue_class', [
('kaneda.queues.RQQueue', RQQueue),
('kaneda.queues.CeleryQueue', CeleryQueue)
('kaneda.queues.CeleryQueue', CeleryQueue),
('kaneda.queues.ZMQQueue', ZMQQueue),
])
def test_import_queue_class(self, queue_path_module, queue_class):
assert import_class(queue_path_module) is queue_class
Expand All @@ -40,6 +41,7 @@ def test_get_backend_from_settings(self, kaneda_settings, backend_name, backend_
@pytest.mark.parametrize('queue_name, queue_class', [
('rq', RQQueue),
('celery', CeleryQueue),
('zmq', ZMQQueue),
])
def test_get_queue_from_settings(self, kaneda_settings, queue_name, queue_class):
queue_settings = getattr(kaneda_settings, queue_name)
Expand Down
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ deps =
influxdb
celery
rq
pyzmq
redis
six==1.10.0

Expand Down

0 comments on commit bbfe3a3

Please sign in to comment.