Skip to content

Commit

Permalink
Merge pull request #28 from nameko/fix-for-kombu-4.3
Browse files Browse the repository at this point in the history
kombu 4.3 doesn't need maybe_declare
  • Loading branch information
timbu authored Mar 25, 2019
2 parents 032faaa + 5aaf16b commit 0d36cbd
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 7 deletions.
7 changes: 7 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ Release Notes
Semantic versioning is followed.


Version 0.7.1
-------------
Released 2019-03-25

* Compatibility fix for kombu > 4.3.0


Version 0.7.0
-------------

Expand Down
18 changes: 12 additions & 6 deletions nameko_amqp_retry/backoff.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import random
from pkg_resources import parse_version

import six
from kombu import Connection
from kombu import Connection, __version__ as kombu_version
from kombu.common import maybe_declare
from kombu.messaging import Exchange, Queue
from nameko.amqp.publish import Publisher
Expand All @@ -11,6 +12,9 @@
EXPIRY_GRACE_PERIOD = 5000 # ms


KOMBU_PRE_4_3 = parse_version(kombu_version) < parse_version('4.3.0')


def get_backoff_queue_name(expiration):
return "backoff--{}ms".format(expiration)

Expand Down Expand Up @@ -130,13 +134,15 @@ def republish(self, backoff_exc, message, target_queue):

amqp_uri = self.container.config[AMQP_URI_CONFIG_KEY]

# force redeclaration; the publisher will skip declaration if
# the entity has previously been declared by the same connection
# force redeclaration;
# In kombu versions prior to 4.3.0, the publisher will skip declaration if
# the entity has previously been declared by the same connection.
# (see https://github.com/celery/kombu/pull/884)
conn = Connection(amqp_uri)
maybe_declare(
queue, conn.channel(), retry=True, **DEFAULT_RETRY_POLICY
)
if KOMBU_PRE_4_3: # pragma: no cover
maybe_declare(
queue, conn.channel(), retry=True, **DEFAULT_RETRY_POLICY
)

# republish to appropriate backoff queue
publisher = Publisher(amqp_uri)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

setup(
name='nameko-amqp-retry',
version='0.7.0',
version='0.7.1',
description='Nameko extension allowing AMQP entrypoints to retry later',
author='Student.com',
url='http://github.com/nameko/nameko-amqp-retry',
Expand Down

0 comments on commit 0d36cbd

Please sign in to comment.