From a105691c22634defd80c0d98939587de6a4296ef Mon Sep 17 00:00:00 2001 From: timbu Date: Fri, 22 Mar 2019 17:27:53 +0000 Subject: [PATCH 1/4] kombu 4.3 doesn't need maybe_declare --- nameko_amqp_retry/backoff.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/nameko_amqp_retry/backoff.py b/nameko_amqp_retry/backoff.py index 87f2bea..be8f43b 100644 --- a/nameko_amqp_retry/backoff.py +++ b/nameko_amqp_retry/backoff.py @@ -3,6 +3,7 @@ import six from kombu import Connection from kombu.common import maybe_declare +from kombu.exceptions import NotBoundError from kombu.messaging import Exchange, Queue from nameko.amqp.publish import Publisher from nameko.constants import AMQP_URI_CONFIG_KEY, DEFAULT_RETRY_POLICY @@ -133,10 +134,14 @@ def republish(self, backoff_exc, message, target_queue): # force redeclaration; the publisher will skip declaration if # the entity has previously been declared by the same connection # (see https://github.com/celery/kombu/pull/884) + # kombu >= 4.3.0 will raise NotBoundError here conn = Connection(amqp_uri) - maybe_declare( - queue, conn.channel(), retry=True, **DEFAULT_RETRY_POLICY - ) + try: + maybe_declare( + queue, conn.channel(), retry=True, **DEFAULT_RETRY_POLICY + ) + except NotBoundError: + pass # republish to appropriate backoff queue publisher = Publisher(amqp_uri) From e99b3960b822056889d9b07971048d4ac424eae1 Mon Sep 17 00:00:00 2001 From: timbu Date: Sat, 23 Mar 2019 20:40:13 +0000 Subject: [PATCH 2/4] check for kombu versions --- nameko_amqp_retry/backoff.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/nameko_amqp_retry/backoff.py b/nameko_amqp_retry/backoff.py index be8f43b..c73caf7 100644 --- a/nameko_amqp_retry/backoff.py +++ b/nameko_amqp_retry/backoff.py @@ -1,9 +1,9 @@ import random +from pkg_resources import parse_version import six -from kombu import Connection +from kombu import Connection, __version__ as kombu_version from kombu.common import maybe_declare -from kombu.exceptions import NotBoundError from kombu.messaging import Exchange, Queue from nameko.amqp.publish import Publisher from nameko.constants import AMQP_URI_CONFIG_KEY, DEFAULT_RETRY_POLICY @@ -131,17 +131,15 @@ def republish(self, backoff_exc, message, target_queue): amqp_uri = self.container.config[AMQP_URI_CONFIG_KEY] - # force redeclaration; the publisher will skip declaration if + # force redeclaration; + # In kombu versions prior to 4.3.0 the publisher will skip declaration if # the entity has previously been declared by the same connection # (see https://github.com/celery/kombu/pull/884) - # kombu >= 4.3.0 will raise NotBoundError here conn = Connection(amqp_uri) - try: + if parse_version(kombu_version) < parse_version('4.3.0'): maybe_declare( queue, conn.channel(), retry=True, **DEFAULT_RETRY_POLICY ) - except NotBoundError: - pass # republish to appropriate backoff queue publisher = Publisher(amqp_uri) From 528e62e9da4ec4d3f3ad948afc949bf3088622b5 Mon Sep 17 00:00:00 2001 From: timbu Date: Sat, 23 Mar 2019 21:08:32 +0000 Subject: [PATCH 3/4] bump version and CHANGES --- CHANGES | 7 +++++++ nameko_amqp_retry/backoff.py | 2 +- setup.py | 2 +- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/CHANGES b/CHANGES index 418bf5b..caf1119 100644 --- a/CHANGES +++ b/CHANGES @@ -4,6 +4,13 @@ Release Notes Semantic versioning is followed. +Version 0.7.1 +------------- +Released 2019-03-25 + +* Compatibility fix for kombu > 4.3.0 + + Version 0.7.0 ------------- diff --git a/nameko_amqp_retry/backoff.py b/nameko_amqp_retry/backoff.py index c73caf7..3edac2e 100644 --- a/nameko_amqp_retry/backoff.py +++ b/nameko_amqp_retry/backoff.py @@ -132,7 +132,7 @@ def republish(self, backoff_exc, message, target_queue): amqp_uri = self.container.config[AMQP_URI_CONFIG_KEY] # force redeclaration; - # In kombu versions prior to 4.3.0 the publisher will skip declaration if + # In kombu versions prior to 4.3.0, the publisher will skip declaration if # the entity has previously been declared by the same connection # (see https://github.com/celery/kombu/pull/884) conn = Connection(amqp_uri) diff --git a/setup.py b/setup.py index f2283c6..3fa3a35 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name='nameko-amqp-retry', - version='0.7.0', + version='0.7.1', description='Nameko extension allowing AMQP entrypoints to retry later', author='Student.com', url='http://github.com/nameko/nameko-amqp-retry', From 5aaf16b89ce776d069bfa83545927c022dd97fd6 Mon Sep 17 00:00:00 2001 From: timbu Date: Sun, 24 Mar 2019 00:10:40 +0000 Subject: [PATCH 4/4] pre-parse kombu version --- nameko_amqp_retry/backoff.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/nameko_amqp_retry/backoff.py b/nameko_amqp_retry/backoff.py index 3edac2e..8a22958 100644 --- a/nameko_amqp_retry/backoff.py +++ b/nameko_amqp_retry/backoff.py @@ -12,6 +12,9 @@ EXPIRY_GRACE_PERIOD = 5000 # ms +KOMBU_PRE_4_3 = parse_version(kombu_version) < parse_version('4.3.0') + + def get_backoff_queue_name(expiration): return "backoff--{}ms".format(expiration) @@ -133,10 +136,10 @@ def republish(self, backoff_exc, message, target_queue): # force redeclaration; # In kombu versions prior to 4.3.0, the publisher will skip declaration if - # the entity has previously been declared by the same connection + # the entity has previously been declared by the same connection. # (see https://github.com/celery/kombu/pull/884) conn = Connection(amqp_uri) - if parse_version(kombu_version) < parse_version('4.3.0'): + if KOMBU_PRE_4_3: # pragma: no cover maybe_declare( queue, conn.channel(), retry=True, **DEFAULT_RETRY_POLICY )