From 607d7a4cade6041eb02c24b2fab67c5261d63020 Mon Sep 17 00:00:00 2001 From: Matt Snider Date: Wed, 3 Aug 2016 17:25:49 +0200 Subject: [PATCH 1/5] Replace pika with kombu to solve reconnection issues Using kombu avoids having to implement redirect logic. --- logstash/handler_amqp.py | 69 ++++++++++++++++++---------------------- 1 file changed, 31 insertions(+), 38 deletions(-) diff --git a/logstash/handler_amqp.py b/logstash/handler_amqp.py index 4209964..011b8cf 100644 --- a/logstash/handler_amqp.py +++ b/logstash/handler_amqp.py @@ -1,4 +1,5 @@ import json + try: from urllib import urlencode except ImportError: @@ -7,7 +8,8 @@ from logging import Filter from logging.handlers import SocketHandler -import pika +from kombu import Connection, Exchange, producers + from logstash import formatter @@ -76,54 +78,45 @@ def __init__(self, host='localhost', port=5672, username='guest', self.facility = facility def makeSocket(self, **kwargs): - - return PikaSocket(self.host, - self.port, - self.username, - self.password, - self.virtual_host, - self.exchange, - self.routing_key, - self.exchange_is_durable, - self.declare_exchange_passively, - self.exchange_type) + return KombuSocket(self.host, + self.port, + self.username, + self.password, + self.virtual_host, + self.exchange, + self.routing_key, + self.exchange_is_durable, + self.declare_exchange_passively, + self.exchange_type) def makePickle(self, record): return self.formatter.format(record) -class PikaSocket(object): +class KombuSocket(object): def __init__(self, host, port, username, password, virtual_host, exchange, routing_key, durable, passive, exchange_type): - - # create connection parameters - credentials = pika.PlainCredentials(username, password) - parameters = pika.ConnectionParameters(host, port, virtual_host, - credentials) - - # create connection & channel - self.connection = pika.BlockingConnection(parameters) - self.channel = self.connection.channel() - - # create an exchange, if needed - self.channel.exchange_declare(exchange=exchange, - exchange_type=exchange_type, - passive=passive, - durable=durable) - - # needed when publishing - self.spec = pika.spec.BasicProperties(delivery_mode=2) + # create connection + self.connection = Connection(hostname=host, + port=port, + userid=username, + password=password, + virtual_host=virtual_host) + + # create exchange + self.exchange = Exchange(exchange, type=exchange_type, durable=durable) + self.exchange.passive = passive + + # other publishing params self.routing_key = routing_key - self.exchange = exchange - def sendall(self, data): - - self.channel.basic_publish(self.exchange, - self.routing_key, - data, - properties=self.spec) + with producers[self.connection].acquire(block=True) as producer: + producer.publish(data, + routing_key=self.routing_key, + exchange=self.exchange, + declare=[self.exchange]) def close(self): try: From 4b182c48fce3fa71d16a24bd12d0610e1a750b45 Mon Sep 17 00:00:00 2001 From: Matt Snider Date: Mon, 8 Aug 2016 00:25:44 +0200 Subject: [PATCH 2/5] Connect KombuSocket on creation and let emit() handle exceptions Allowing connection errors to occur makeSocket() causes the failure logic (e.g. exponential backoff, reconnect, etc) from SocketHandler to be triggered. Reraising exceptions in send() allows them to bubble up to emit() and be reported so that they don't just get silently swallowed. --- logstash/handler_amqp.py | 44 +++++++++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/logstash/handler_amqp.py b/logstash/handler_amqp.py index 011b8cf..10b234a 100644 --- a/logstash/handler_amqp.py +++ b/logstash/handler_amqp.py @@ -50,8 +50,6 @@ def __init__(self, host='localhost', port=5672, username='guest', virtual_host='/', message_type='logstash', tags=None, durable=False, passive=False, version=0, extra_fields=True, fqdn=False, facility=None, exchange_routing_key=''): - - # AMQP parameters self.host = host self.port = port @@ -78,20 +76,39 @@ def __init__(self, host='localhost', port=5672, username='guest', self.facility = facility def makeSocket(self, **kwargs): - return KombuSocket(self.host, - self.port, - self.username, - self.password, - self.virtual_host, - self.exchange, - self.routing_key, - self.exchange_is_durable, - self.declare_exchange_passively, - self.exchange_type) + socket = KombuSocket(self.host, + self.port, + self.username, + self.password, + self.virtual_host, + self.exchange, + self.routing_key, + self.exchange_is_durable, + self.declare_exchange_passively, + self.exchange_type) + socket.connect() + return socket def makePickle(self, record): return self.formatter.format(record) + def send(self, s): + """ + Behaves exactly like SocketHandler.send() except that it allows + exceptions to bubble up to emit() so we can atleast be aware of + logging failures. + """ + if self.sock is None: + self.createSocket() + + if self.sock: + try: + self.sock.sendall(s) + except OSError: #pragma: no cover + self.sock.close() + self.sock = None # so we can call createSocket next time + raise + class KombuSocket(object): @@ -118,6 +135,9 @@ def sendall(self, data): exchange=self.exchange, declare=[self.exchange]) + def connect(self): + self.connection.connect() + def close(self): try: self.connection.close() From e30251559acc5cbc97e333298509926306eab9d2 Mon Sep 17 00:00:00 2001 From: Matt Snider Date: Mon, 8 Aug 2016 00:19:26 +0200 Subject: [PATCH 3/5] Clean up unused imports --- logstash/handler_amqp.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/logstash/handler_amqp.py b/logstash/handler_amqp.py index 10b234a..6c37c89 100644 --- a/logstash/handler_amqp.py +++ b/logstash/handler_amqp.py @@ -1,13 +1,10 @@ -import json +from logging.handlers import SocketHandler try: from urllib import urlencode except ImportError: from urllib.parse import urlencode -from logging import Filter -from logging.handlers import SocketHandler - from kombu import Connection, Exchange, producers from logstash import formatter From 7de24284acbedb089c34aae6723d6c39842c9778 Mon Sep 17 00:00:00 2001 From: Matt Snider Date: Sat, 6 Aug 2016 13:40:01 +0200 Subject: [PATCH 4/5] Update README.md to refer to kombu instead of pika --- README.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index e637785..71b2d01 100644 --- a/README.rst +++ b/README.rst @@ -75,9 +75,9 @@ For example:: When using ``extra`` field make sure you don't use reserved names. From `Python documentation `_. | "The keys in the dictionary passed in extra should not clash with the keys used by the logging system. (See the `Formatter `_ documentation for more information on which keys are used by the logging system.)" -To use the AMQPLogstashHandler you will need to install pika first. +To use the AMQPLogstashHandler you will need to install kombu first. - pip install pika + pip install kombu For example:: From ccb2a5a6a0a238482b8d92f641c637dc89f418ce Mon Sep 17 00:00:00 2001 From: Matt Snider Date: Thu, 11 Aug 2016 23:20:59 +0200 Subject: [PATCH 5/5] Add socket.error for python 2 support --- logstash/handler_amqp.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/logstash/handler_amqp.py b/logstash/handler_amqp.py index 6c37c89..819ca2b 100644 --- a/logstash/handler_amqp.py +++ b/logstash/handler_amqp.py @@ -1,3 +1,4 @@ +import socket from logging.handlers import SocketHandler try: @@ -101,7 +102,7 @@ def send(self, s): if self.sock: try: self.sock.sendall(s) - except OSError: #pragma: no cover + except (OSError, socket.error): #pragma: no cover self.sock.close() self.sock = None # so we can call createSocket next time raise