Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to reconnect if connection is lost (replace pika with kombu) #55

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ For example::
When using ``extra`` field make sure you don't use reserved names. From `Python documentation <https://docs.python.org/2/library/logging.html>`_.
| "The keys in the dictionary passed in extra should not clash with the keys used by the logging system. (See the `Formatter <https://docs.python.org/2/library/logging.html#logging.Formatter>`_ documentation for more information on which keys are used by the logging system.)"

To use the AMQPLogstashHandler you will need to install pika first.
To use the AMQPLogstashHandler you will need to install kombu first.

pip install pika
pip install kombu

For example::

Expand Down
97 changes: 54 additions & 43 deletions logstash/handler_amqp.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import json
import socket
from logging.handlers import SocketHandler

try:
from urllib import urlencode
except ImportError:
from urllib.parse import urlencode

from logging import Filter
from logging.handlers import SocketHandler
from kombu import Connection, Exchange, producers

import pika
from logstash import formatter


Expand Down Expand Up @@ -48,8 +48,6 @@ def __init__(self, host='localhost', port=5672, username='guest',
virtual_host='/', message_type='logstash', tags=None,
durable=False, passive=False, version=0, extra_fields=True,
fqdn=False, facility=None, exchange_routing_key=''):


# AMQP parameters
self.host = host
self.port = port
Expand All @@ -76,54 +74,67 @@ def __init__(self, host='localhost', port=5672, username='guest',
self.facility = facility

def makeSocket(self, **kwargs):

return PikaSocket(self.host,
self.port,
self.username,
self.password,
self.virtual_host,
self.exchange,
self.routing_key,
self.exchange_is_durable,
self.declare_exchange_passively,
self.exchange_type)
socket = KombuSocket(self.host,
self.port,
self.username,
self.password,
self.virtual_host,
self.exchange,
self.routing_key,
self.exchange_is_durable,
self.declare_exchange_passively,
self.exchange_type)
socket.connect()
return socket

def makePickle(self, record):
return self.formatter.format(record)

def send(self, s):
"""
Behaves exactly like SocketHandler.send() except that it allows
exceptions to bubble up to emit() so we can atleast be aware of
logging failures.
"""
if self.sock is None:
self.createSocket()

class PikaSocket(object):
if self.sock:
try:
self.sock.sendall(s)
except (OSError, socket.error): #pragma: no cover
self.sock.close()
self.sock = None # so we can call createSocket next time
raise

def __init__(self, host, port, username, password, virtual_host, exchange,
routing_key, durable, passive, exchange_type):

# create connection parameters
credentials = pika.PlainCredentials(username, password)
parameters = pika.ConnectionParameters(host, port, virtual_host,
credentials)
class KombuSocket(object):

# create connection & channel
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()

# create an exchange, if needed
self.channel.exchange_declare(exchange=exchange,
exchange_type=exchange_type,
passive=passive,
durable=durable)

# needed when publishing
self.spec = pika.spec.BasicProperties(delivery_mode=2)
def __init__(self, host, port, username, password, virtual_host, exchange,
routing_key, durable, passive, exchange_type):
# create connection
self.connection = Connection(hostname=host,
port=port,
userid=username,
password=password,
virtual_host=virtual_host)

# create exchange
self.exchange = Exchange(exchange, type=exchange_type, durable=durable)
self.exchange.passive = passive

# other publishing params
self.routing_key = routing_key
self.exchange = exchange


def sendall(self, data):

self.channel.basic_publish(self.exchange,
self.routing_key,
data,
properties=self.spec)
with producers[self.connection].acquire(block=True) as producer:
producer.publish(data,
routing_key=self.routing_key,
exchange=self.exchange,
declare=[self.exchange])

def connect(self):
self.connection.connect()

def close(self):
try:
Expand Down