diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala index 4b052f35..afe159d0 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala @@ -461,7 +461,7 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { } } - wrapReadAction(consumerConfig, convAction, blockingScheduler) + wrapReadAction(consumerConfig, convAction, monitor, blockingScheduler) } val consumer = { @@ -477,9 +477,13 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { private def wrapReadAction[F[_]: Effect, A](consumerConfig: ConsumerConfig, userReadAction: DefaultDeliveryReadAction[F], + consumerMonitor: Monitor, blockingScheduler: Scheduler)(implicit sch: Scheduler): DefaultDeliveryReadAction[F] = { import consumerConfig._ + val timeoutsMeter = consumerMonitor.meter("timeouts") + val fatalFailuresMeter = consumerMonitor.meter("fatalFailures") + delivery: Delivery[Bytes] => try { // we try to catch also long-lasting synchronous work on the thread @@ -493,10 +497,12 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { .timeout(ScalaDuration(processTimeout.toMillis, TimeUnit.MILLISECONDS)) .onErrorRecoverWith { case e: TimeoutException => + timeoutsMeter.mark() logger.warn(s"[$name] Task timed-out, applying DeliveryResult.${consumerConfig.timeoutAction}", e) Task.now(consumerConfig.timeoutAction) case NonFatal(e) => + fatalFailuresMeter.mark() logger.warn(s"[$name] Error while executing callback, applying DeliveryResult.${consumerConfig.failureAction}", e) Task.now(consumerConfig.failureAction) }