-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RPC retry entrypoints can't retry other RPC retry entrypoints #29
Comments
Is this issue because the dead letter exchange is the same for all services?
|
Never mind, that does not seam to be the problem here. Just tested the following code and still see the same error. Code:import logging
from nameko.rpc import RpcProxy
from nameko_amqp_retry import Backoff, BackoffPublisher
from nameko_amqp_retry.rpc import Rpc
from kombu.messaging import Exchange
logger = logging.getLogger(__name__)
class CustomBackoffPublisherService1(BackoffPublisher):
@property
def exchange(self):
backoff_exchange = Exchange(
type="headers",
name="service1-backoff"
)
return backoff_exchange
class CustomRpcService1(Rpc):
backoff_publisher = CustomBackoffPublisherService1()
rpc1 = CustomRpcService1.decorator
class Service1:
name = "service1"
service2_rpc = RpcProxy("service2")
@rpc1
def call_service2(self):
try:
logger.info('service2: {}'.format(self.service2_rpc.do_bad_thing()))
a = {"bad": "thing"}
return a["good"]
except KeyError:
raise Backoff()
class CustomBackoffPublisherService2(BackoffPublisher):
@property
def exchange(self):
backoff_exchange = Exchange(
type="headers",
name="service2-backoff"
)
return backoff_exchange
class CustomRpcService2(Rpc):
backoff_publisher = CustomBackoffPublisherService2()
rpc2 = CustomRpcService2.decorator
class Service2:
name = "service2"
@rpc2
def do_bad_thing(self):
return "Works the first time only." Logs:(venv) ketan@Other:~/Projects/pyMSC/scheduler/profile/service$ nameko run --config config.yaml tests.integration.test_retry:Service1
starting <QueueConsumer at 0x7f45755585c0>
waiting for consumer ready <QueueConsumer at 0x7f45755585c0>
Connected to amqp://guest:**@127.0.0.1:5672//
setting up consumers <QueueConsumer at 0x7f45755585c0>
consumer started <QueueConsumer at 0x7f45755585c0>
started <QueueConsumer at 0x7f45755585c0>
invoking <proxy method: service2.do_bad_thing>
Waiting for RPC reply event <nameko.rpc.RpcReply object at 0x7f4575bcdc50>
RPC reply event complete <nameko.rpc.RpcReply object at 0x7f4575bcdc50> {'result': 'Works the first time only.', 'error': None}
service2: Works the first time only.
invoking <proxy method: service2.do_bad_thing>
Waiting for RPC reply event <nameko.rpc.RpcReply object at 0x7f4575c84dd8>
RPC reply event complete <nameko.rpc.RpcReply object at 0x7f4575c84dd8> {'result': None, 'error': {'exc_type': 'MethodNotFound', 'exc_path': 'nameko.exceptions.MethodNotFound', 'exc_args': ['call_service2'], 'value': 'call_service2'}} |
Using the ProblemThe headers for dead letter queue of service 1 are being sent to service 2 which results in the observed exception. See starting <QueueConsumer at 0x7f3466c713c8>
waiting for consumer ready <QueueConsumer at 0x7f3466c713c8>
Connected to amqp://guest:**@127.0.0.1:5672//
setting up consumers <QueueConsumer at 0x7f3466c713c8>
consumer started <QueueConsumer at 0x7f3466c713c8>
started <QueueConsumer at 0x7f3466c713c8>
[nameko_tracer] {'timestamp': datetime.datetime(2019, 9, 9, 6, 28, 12, 994320), 'hostname': 'Other', 'service': 'service1', 'entrypoint_type': 'Rpc', 'entrypoint_name': 'call_service2', 'context_data': {}, 'call_id': 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792', 'call_id_stack': ['standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792'], 'origin_call_id': 'standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'stage': 'request', 'call_args': {}, 'call_args_redacted': False}
[service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792] entrypoint call trace
invoking <proxy method: service2.do_bad_thing>
<class 'nameko.rpc.MethodProxy'>:
exchange: Exchange nameko-rpc(topic)
routing_key: service2.do_bad_thing
reply_to: c006b4e3-5125-46bb-a84a-0fa5c97f3c38
correlation_id: 9451ad22-1956-42a6-8963-0cbc91ca4e35
extra_headers: {'nameko.call_id_stack': ['standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792']}
Waiting for RPC reply event <nameko.rpc.RpcReply object at 0x7f3466df4eb8>
RPC reply event complete <nameko.rpc.RpcReply object at 0x7f3466df4eb8> {'result': 'Works the first time only.', 'error': None}
service2: Works the first time only.
[nameko_tracer] {'timestamp': datetime.datetime(2019, 9, 9, 6, 28, 13, 72546), 'hostname': 'Other', 'service': 'service1', 'entrypoint_type': 'Rpc', 'entrypoint_name': 'call_service2', 'context_data': {}, 'call_id': 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792', 'call_id_stack': ['standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792'], 'origin_call_id': 'standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'stage': 'response', 'call_args': {}, 'call_args_redacted': False, 'response_status': 'error', 'exception_type': 'CustomBackOff', 'exception_path': 'tests.integration.test_retry.CustomBackOff', 'exception_args': [], 'exception_value': 'Backoff(retry #1 in 1000ms)', 'exception_traceback': 'Traceback (most recent call last):\n File "./tests/integration/test_retry.py", line 45, in call_service2\n return a["good"]\nKeyError: \'good\'\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n File "/home/ketan/Projects/pyMSC/scheduler/profile/venv/lib/python3.6/site-packages/nameko/containers.py", line 392, in _run_worker\n result = method(*worker_ctx.args, **worker_ctx.kwargs)\n File "./tests/integration/test_retry.py", line 47, in call_service2\n raise CustomBackOff()\ntests.integration.test_retry.CustomBackOff: Backoff(retry #1 in 1000ms)\n', 'exception_expected': True, 'response_time': 0.078226}
[service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792] entrypoint result trace
[nameko_tracer] {'timestamp': datetime.datetime(2019, 9, 9, 6, 28, 14, 68293), 'hostname': 'Other', 'service': 'service1', 'entrypoint_type': 'Rpc', 'entrypoint_name': 'call_service2', 'context_data': {'backoff': 1000, 'rpc_method_id': 'service1.call_service2', 'x-death': [{'count': 1, 'reason': 'expired', 'queue': 'backoff--1000ms', 'time': '2019-09-09 06:28:14', 'exchange': 'backoff', 'routing-keys': ['rpc-service1'], 'original-expiration': '1000'}], 'x-first-death-exchange': 'backoff', 'x-first-death-queue': 'backoff--1000ms', 'x-first-death-reason': 'expired'}, 'call_id': 'service1.call_service2.2564aef6-89c0-45d9-bcef-abf2bfbe2945', 'call_id_stack': ['standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792.backoff', 'service1.call_service2.2564aef6-89c0-45d9-bcef-abf2bfbe2945'], 'origin_call_id': 'standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'stage': 'request', 'call_args': {}, 'call_args_redacted': False}
[service1.call_service2.2564aef6-89c0-45d9-bcef-abf2bfbe2945] entrypoint call trace
invoking <proxy method: service2.do_bad_thing>
<class 'nameko.rpc.MethodProxy'>:
exchange: Exchange nameko-rpc(topic)
routing_key: service2.do_bad_thing
reply_to: c006b4e3-5125-46bb-a84a-0fa5c97f3c38
correlation_id: 09d6f40e-bb68-4f8e-a98b-87d99373c56d
extra_headers: {'nameko.backoff': 1000, 'nameko.rpc_method_id': 'service1.call_service2', 'nameko.x-death': [{'count': 1, 'reason': 'expired', 'queue': 'backoff--1000ms', 'time': datetime.datetime(2019, 9, 9, 6, 28, 14), 'exchange': 'backoff', 'routing-keys': ['rpc-service1'], 'original-expiration': '1000'}], 'nameko.x-first-death-exchange': 'backoff', 'nameko.x-first-death-queue': 'backoff--1000ms', 'nameko.x-first-death-reason': 'expired', 'nameko.call_id_stack': ['standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792.backoff', 'service1.call_service2.2564aef6-89c0-45d9-bcef-abf2bfbe2945']}
Waiting for RPC reply event <nameko.rpc.RpcReply object at 0x7f3466d65160>
RPC reply event complete <nameko.rpc.RpcReply object at 0x7f3466d65160> {'result': None, 'error': {'exc_type': 'MethodNotFound', 'exc_path': 'nameko.exceptions.MethodNotFound', 'exc_args': ['call_service2'], 'value': 'call_service2'}}
[nameko_tracer] {'timestamp': datetime.datetime(2019, 9, 9, 6, 28, 14, 96395), 'hostname': 'Other', 'service': 'service1', 'entrypoint_type': 'Rpc', 'entrypoint_name': 'call_service2', 'context_data': {'backoff': 1000, 'rpc_method_id': 'service1.call_service2', 'x-death': [{'count': 1, 'reason': 'expired', 'queue': 'backoff--1000ms', 'time': '2019-09-09 06:28:14', 'exchange': 'backoff', 'routing-keys': ['rpc-service1'], 'original-expiration': '1000'}], 'x-first-death-exchange': 'backoff', 'x-first-death-queue': 'backoff--1000ms', 'x-first-death-reason': 'expired'}, 'call_id': 'service1.call_service2.2564aef6-89c0-45d9-bcef-abf2bfbe2945', 'call_id_stack': ['standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'service1.call_service2.4e61ac89-c169-4318-8e20-85efba4de792.backoff', 'service1.call_service2.2564aef6-89c0-45d9-bcef-abf2bfbe2945'], 'origin_call_id': 'standalone_rpc_proxy.call.811db76d-a9c4-417b-94a0-99894f6985de', 'stage': 'response', 'call_args': {}, 'call_args_redacted': False, 'response_status': 'error', 'exception_type': 'MethodNotFound', 'exception_path': 'nameko.exceptions.MethodNotFound', 'exception_args': ['call_service2'], 'exception_value': 'call_service2', 'exception_traceback': 'Traceback (most recent call last):\n File "/home/ketan/Projects/pyMSC/scheduler/profile/venv/lib/python3.6/site-packages/nameko/containers.py", line 392, in _run_worker\n result = method(*worker_ctx.args, **worker_ctx.kwargs)\n File "./tests/integration/test_retry.py", line 43, in call_service2\n logger.info(\'service2: {}\'.format(self.service2_rpc.do_bad_thing()))\n File "/home/ketan/Projects/pyMSC/scheduler/profile/venv/lib/python3.6/site-packages/nameko/rpc.py", line 373, in __call__\n return reply.result()\n File "/home/ketan/Projects/pyMSC/scheduler/profile/venv/lib/python3.6/site-packages/nameko/rpc.py", line 331, in result\n raise deserialize(error)\nnameko.exceptions.MethodNotFound: call_service2\n', 'exception_expected': False, 'response_time': 0.028102}
[service1.call_service2.2564aef6-89c0-45d9-bcef-abf2bfbe2945] entrypoint result trace SolutionRemove the extra headers added for dead letter queue and backoff. This can be achieved by using the following custom rpc proxy: # utils.py module
from nameko.rpc import RpcProxy as NamekoRpcProxy, ServiceProxy
class CustomRpcProxy(NamekoRpcProxy):
dead_letter_properties = ['x-death', 'x-first-death-exchange', 'x-first-death-queue', 'x-first-death-reason']
backoff_properties = ['backoff', 'rpc_method_id']
def get_dependency(self, worker_ctx):
# Removing dead letter queue and backoff headers
for key in self.dead_letter_properties + self.backoff_properties:
if key in worker_ctx.data:
worker_ctx.data.pop(key)
return ServiceProxy(
worker_ctx,
self.target_service,
self.rpc_reply_listener,
**self.options
) The following code worked for me as desired: # sevice.py module
import logging
from .utils import CustomRpcProxy
from nameko_amqp_retry import Backoff
from nameko_amqp_retry.rpc import rpc
from nameko_tracer import Tracer
logger = logging.getLogger(__name__)
class CustomBackOff(Backoff):
limit = 2
schedule = [1000]
class Service1:
name = "service1"
service2_rpc = CustomRpcProxy("service2")
tracer = Tracer()
@rpc
def call_service2(self):
try:
logger.info('service2: {}'.format(self.service2_rpc.do_bad_thing()))
a = {"bad": "thing"}
return a["good"]
except KeyError:
raise CustomBackOff()
class Service2:
name = "service2"
tracer = Tracer()
@rpc
def do_bad_thing(self):
return "Works the first time only." Hope that helps! |
Thanks for doing this investigation @ketgo. I think the only header that's causing a problem is nameko-amqp-retry/nameko_amqp_retry/rpc.py Lines 15 to 18 in 0d36cbd
The conditional is required because the message republisher has to use the If you unset just that header in a custom RpcProxy I think it'll work. I'd welcome a pull request with a fix for this if you're able to contribute one. |
@mattbennett Sure, will send one. |
Describe the bug
Retrying rpc endpoint with decorator imported from nameko-amqp-retry will not work if it then makes a call to another rpc endpoint with decorator imported from nameko-amqp-retry.
To Reproduce
setting up 2 services both with nameko_amqp_retry rpc entrypoints
service1
service2
Call service 2 with service1
n.rpc.service1.call_service2()
results in KeyError as expected first but on retry it hits a MethodNotFound error instead of second KeyError:
Expected behavior
Not hitting MethodNotFound exceptions. Example above should retry KeyError twice and give up.
Environment (please complete the following information):
The text was updated successfully, but these errors were encountered: