From 3e7908a02a16911ca45c5cf00469d54f9e13e072 Mon Sep 17 00:00:00 2001 From: jendakol Date: Tue, 11 Dec 2018 12:32:45 +0100 Subject: [PATCH] Option to disable process timeout for consumer (#19) --- core/src/main/resources/reference.conf | 2 +- .../clients/rabbitmq/DefaultRabbitMQClientFactory.scala | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 2e49eac8..3607aa41 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -38,7 +38,7 @@ avastRabbitMQConsumerDefaults { // queueName = "test" - processTimeout = 10s + processTimeout = 10s // 0 for disable the timeout prefetchCount = 100 diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala index 6d44c926..b304e29f 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala @@ -520,8 +520,13 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { .map(Task.fromEffect[F, DeliveryResult]) .flatten - action - .timeout(ScalaDuration(processTimeout.toMillis, TimeUnit.MILLISECONDS)) + val timedOutAction = if (processTimeout == Duration.ZERO) { + action + } else { + action.timeout(ScalaDuration(processTimeout.toMillis, TimeUnit.MILLISECONDS)) + } + + timedOutAction .onErrorRecoverWith { case e: TimeoutException => timeoutsMeter.mark()