Skip to content

Commit

Permalink
Consumer timeouts and fatalFailures meters
Browse files Browse the repository at this point in the history
  • Loading branch information
jendakol committed Nov 27, 2018
1 parent b703325 commit 5f6c86d
Showing 1 changed file with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
}
}

wrapReadAction(consumerConfig, convAction, blockingScheduler)
wrapReadAction(consumerConfig, convAction, monitor, blockingScheduler)
}

val consumer = {
Expand All @@ -477,9 +477,13 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {

private def wrapReadAction[F[_]: Effect, A](consumerConfig: ConsumerConfig,
userReadAction: DefaultDeliveryReadAction[F],
consumerMonitor: Monitor,
blockingScheduler: Scheduler)(implicit sch: Scheduler): DefaultDeliveryReadAction[F] = {
import consumerConfig._

val timeoutsMeter = consumerMonitor.meter("timeouts")
val fatalFailuresMeter = consumerMonitor.meter("fatalFailures")

delivery: Delivery[Bytes] =>
try {
// we try to catch also long-lasting synchronous work on the thread
Expand All @@ -493,10 +497,12 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
.timeout(ScalaDuration(processTimeout.toMillis, TimeUnit.MILLISECONDS))
.onErrorRecoverWith {
case e: TimeoutException =>
timeoutsMeter.mark()
logger.warn(s"[$name] Task timed-out, applying DeliveryResult.${consumerConfig.timeoutAction}", e)
Task.now(consumerConfig.timeoutAction)

case NonFatal(e) =>
fatalFailuresMeter.mark()
logger.warn(s"[$name] Error while executing callback, applying DeliveryResult.${consumerConfig.failureAction}", e)
Task.now(consumerConfig.failureAction)
}
Expand Down

0 comments on commit 5f6c86d

Please sign in to comment.