From 2cc8cda998f7b2cf8b691f94a7bba493239a9fbc Mon Sep 17 00:00:00 2001 From: kolena Date: Fri, 2 Nov 2018 11:59:58 +0100 Subject: [PATCH] Fixed bug: wrong metrics reporting into "processed" TimerPair in consumer; reworked DefaultRabbitMQConsumer internals Removed cats-effect explicit dependency --- build.gradle | 1 - core/build.gradle | 1 - .../rabbitmq/DefaultRabbitMQConsumer.scala | 93 +++++---- .../DefaultRabbitMQConsumerTest.scala | 193 +++++++++++++++++- 4 files changed, 241 insertions(+), 47 deletions(-) diff --git a/build.gradle b/build.gradle index 09bd81f4..25ed29fa 100644 --- a/build.gradle +++ b/build.gradle @@ -100,7 +100,6 @@ subprojects { cactusVersion = "0.12.2" monixVersion = "3.0.0-RC1" catsVersion = "1.2.0" - catsEffectVersion = "1.0.0-RC2-78a795d" } dependencies { diff --git a/core/build.gradle b/core/build.gradle index fd300bbc..b2a3413f 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -33,7 +33,6 @@ dependencies { compile "io.monix:monix_$scalaVersion:$monixVersion" compile "org.typelevel:cats-core_$scalaVersion:$catsVersion" - compile "org.typelevel:cats-effect_$scalaVersion:$catsEffectVersion" compile 'org.xbib:jsr-305:1.0.0' diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumer.scala b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumer.scala index 963006cf..163ef4be 100755 --- a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumer.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumer.scala @@ -15,9 +15,10 @@ import com.typesafe.scalalogging.StrictLogging import monix.execution.Scheduler import scala.collection.JavaConverters._ +import scala.concurrent.Future import scala.language.higherKinds -import scala.util.Failure import scala.util.control.NonFatal +import scala.util.{Failure, Success} class DefaultRabbitMQConsumer[F[_]: Effect]( override val name: String, @@ -60,21 +61,17 @@ class DefaultRabbitMQConsumer[F[_]: Effect]( case None => envelope.getRoutingKey } - val handleAction = handleDelivery(messageId, deliveryTag, properties, routingKey, body) + handleDelivery(messageId, deliveryTag, properties, routingKey, body) + .andThen { + case Success(_) => + processingCount.decrementAndGet() + logger.debug(s"Delivery processed successfully (tag $deliveryTag)") - processedTimer.time { - (for { - _ <- IO.shift(blockingScheduler) - _ <- Effect[F].runAsync(handleAction) { _ => + case Failure(NonFatal(e)) => processingCount.decrementAndGet() - IO.unit - } - } yield ()) - .unsafeToFuture() - .andThen { - case Failure(NonFatal(e)) => logger.debug("Could not process delivery", e) - }(scheduler) - }(scheduler) + processingFailedMeter.mark() + logger.debug("Could not process delivery", e) + } () } @@ -83,32 +80,35 @@ class DefaultRabbitMQConsumer[F[_]: Effect]( deliveryTag: Long, properties: BasicProperties, routingKey: String, - body: Array[Byte]): F[Unit] = { - { - try { - readMeter.mark() - - logger.debug(s"[$name] Read delivery with ID $messageId, deliveryTag $deliveryTag") - - val delivery = Delivery(Bytes.copyFrom(body), properties.asScala, Option(routingKey).getOrElse("")) - - logger.trace(s"[$name] Received delivery: $delivery") - - readAction(delivery) - .flatMap { - handleResult(messageId, deliveryTag, properties, routingKey, body) - } - .recoverWith { - case NonFatal(t) => handleCallbackFailure(messageId, deliveryTag, properties, routingKey, body)(t) - } - } catch { - // we catch this specific exception, handling of others is up to Lyra - case e: RejectedExecutionException => - logger.debug(s"[$name] Executor was unable to plan the handling task", e) - handleFailure(messageId, deliveryTag, properties, routingKey, body, e) - - case NonFatal(e) => handleCallbackFailure(messageId, deliveryTag, properties, routingKey, body)(e) - } + body: Array[Byte]): Future[Unit] = { + try { + readMeter.mark() + + logger.debug(s"[$name] Read delivery with ID $messageId, deliveryTag $deliveryTag") + + val delivery = Delivery(Bytes.copyFrom(body), properties.asScala, Option(routingKey).getOrElse("")) + + logger.trace(s"[$name] Received delivery: $delivery") + + processedTimer + .time { + toIO { + readAction(delivery) + .flatMap { + handleResult(messageId, deliveryTag, properties, routingKey, body) + } + }.unsafeToFuture() + } + .recoverWith { + case NonFatal(t) => handleCallbackFailure(messageId, deliveryTag, properties, routingKey, body)(t) + } + } catch { + // we catch this specific exception, handling of others is up to Lyra + case e: RejectedExecutionException => + logger.debug(s"[$name] Executor was unable to plan the handling task", e) + handleFailure(messageId, deliveryTag, properties, routingKey, body, e) + + case NonFatal(e) => handleCallbackFailure(messageId, deliveryTag, properties, routingKey, body)(e) } } @@ -116,7 +116,7 @@ class DefaultRabbitMQConsumer[F[_]: Effect]( deliveryTag: Long, properties: BasicProperties, routingKey: String, - body: Array[Byte])(t: Throwable): F[Unit] = { + body: Array[Byte])(t: Throwable): Future[Unit] = { logger.error(s"[$name] Error while executing callback, it's probably a BUG", t) @@ -128,11 +128,14 @@ class DefaultRabbitMQConsumer[F[_]: Effect]( properties: BasicProperties, routingKey: String, body: Array[Byte], - t: Throwable): F[Unit] = { + t: Throwable): Future[Unit] = { processingCount.decrementAndGet() processingFailedMeter.mark() consumerListener.onError(this, name, channel, t) - executeFailureAction(messageId, deliveryTag, properties, routingKey, body) + + toIO { + executeFailureAction(messageId, deliveryTag, properties, routingKey, body) + }.unsafeToFuture() } private def executeFailureAction(messageId: String, @@ -147,6 +150,10 @@ class DefaultRabbitMQConsumer[F[_]: Effect]( channel.close() } + private def toIO[A](f: F[A]): IO[A] = + IO.async { cb => + Effect[F].runAsync(f)(r => IO(cb(r))).unsafeRunSync() + } } object DefaultRabbitMQConsumer { diff --git a/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumerTest.scala b/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumerTest.scala index f6ad8050..c84b837d 100644 --- a/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumerTest.scala +++ b/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumerTest.scala @@ -1,10 +1,11 @@ package com.avast.clients.rabbitmq +import java.time.Duration import java.util.UUID import com.avast.clients.rabbitmq.RabbitMQConnection.DefaultListeners import com.avast.clients.rabbitmq.api.DeliveryResult -import com.avast.metrics.scalaapi.Monitor +import com.avast.metrics.scalaapi.{Gauge, Monitor, TimerPair} import com.rabbitmq.client.AMQP.BasicProperties import com.rabbitmq.client.Envelope import com.rabbitmq.client.impl.recovery.AutorecoveringChannel @@ -16,7 +17,9 @@ import org.mockito.Matchers.any import org.mockito.Mockito._ import org.scalatest.time.{Seconds, Span} -import scala.util.Random +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Random, Success} class DefaultRabbitMQConsumerTest extends TestBase { test("should ACK") { @@ -240,4 +243,190 @@ class DefaultRabbitMQConsumerTest extends TestBase { } } + test("measures processed time correctly - success") { + val messageId = UUID.randomUUID().toString + + val deliveryTag = Random.nextInt(1000) + + val envelope = mock[Envelope] + when(envelope.getDeliveryTag).thenReturn(deliveryTag) + + val properties = mock[BasicProperties] + when(properties.getMessageId).thenReturn(messageId) + + val channel = mock[AutorecoveringChannel] + + val monitor = mock[Monitor] + when(monitor.meter(Matchers.anyString())).thenReturn(Monitor.noOp.meter("")) + when(monitor.named(Matchers.eq("results"))).thenReturn(Monitor.noOp()) + val tasksMonitor = mock[Monitor] + when(monitor.named(Matchers.eq("tasks"))).thenReturn(tasksMonitor) + when(tasksMonitor.gauge(Matchers.anyString())(Matchers.any())) + .thenReturn(Monitor.noOp().gauge("")(() => 0).asInstanceOf[Gauge[Nothing]]) + + var successLengths = mutable.Seq.empty[Long] // scalastyle:ignore + var failuresLengths = mutable.Seq.empty[Long] // scalastyle:ignore + + when(tasksMonitor.timerPair(Matchers.eq("processed"))).thenReturn(new TimerPair { + override def start(): TimeContext = ??? + override def update(duration: Duration): Unit = ??? + + override def updateFailure(duration: Duration): Unit = ??? + override def time[A](block: => A): A = ??? + override def time[A](future: => Future[A])(implicit ec: ExecutionContext): Future[A] = { + val start = System.currentTimeMillis() + + future.andThen { + case Success(_) => successLengths = successLengths :+ System.currentTimeMillis() - start + case Failure(_) => failuresLengths = failuresLengths :+ System.currentTimeMillis() - start + } + } + }) + + { + val consumer = new DefaultRabbitMQConsumer[Task]( + "test", + channel, + "queueName", + monitor, + DeliveryResult.Retry, + DefaultListeners.DefaultConsumerListener, + Scheduler.global + )({ delivery => + assertResult(Some(messageId))(delivery.properties.messageId) + Task.now(DeliveryResult.Ack) // immediate + }) + + consumer.handleDelivery("abcd", envelope, properties, Random.nextString(5).getBytes) + + eventually(timeout(Span(3, Seconds)), interval(Span(0.1, Seconds))) { + assertResult(Seq.empty)(failuresLengths) + val Seq(taskLength) = successLengths + + assert(taskLength < 200) + } + } + + successLengths = mutable.Seq.empty + failuresLengths = mutable.Seq.empty + + { + val consumer = new DefaultRabbitMQConsumer[Task]( + "test", + channel, + "queueName", + monitor, + DeliveryResult.Retry, + DefaultListeners.DefaultConsumerListener, + Scheduler.global + )({ delivery => + assertResult(Some(messageId))(delivery.properties.messageId) + import scala.concurrent.duration._ + Task.now(DeliveryResult.Ack).delayResult(2.second) + }) + + consumer.handleDelivery("abcd", envelope, properties, Random.nextString(5).getBytes) + + eventually(timeout(Span(3, Seconds)), interval(Span(0.1, Seconds))) { + assertResult(Seq.empty)(failuresLengths) + val Seq(taskLength) = successLengths + + assert(taskLength > 2000) + } + } + } + + test("measures processed time correctly - failure") { + val messageId = UUID.randomUUID().toString + + val deliveryTag = Random.nextInt(1000) + + val envelope = mock[Envelope] + when(envelope.getDeliveryTag).thenReturn(deliveryTag) + + val properties = mock[BasicProperties] + when(properties.getMessageId).thenReturn(messageId) + + val channel = mock[AutorecoveringChannel] + + val monitor = mock[Monitor] + when(monitor.meter(Matchers.anyString())).thenReturn(Monitor.noOp.meter("")) + when(monitor.named(Matchers.eq("results"))).thenReturn(Monitor.noOp()) + val tasksMonitor = mock[Monitor] + when(monitor.named(Matchers.eq("tasks"))).thenReturn(tasksMonitor) + when(tasksMonitor.gauge(Matchers.anyString())(Matchers.any())) + .thenReturn(Monitor.noOp().gauge("")(() => 0).asInstanceOf[Gauge[Nothing]]) + + var successLengths = mutable.Seq.empty[Long] // scalastyle:ignore + var failuresLengths = mutable.Seq.empty[Long] // scalastyle:ignore + + when(tasksMonitor.timerPair(Matchers.eq("processed"))).thenReturn(new TimerPair { + override def start(): TimeContext = ??? + override def update(duration: Duration): Unit = ??? + + override def updateFailure(duration: Duration): Unit = ??? + override def time[A](block: => A): A = ??? + override def time[A](future: => Future[A])(implicit ec: ExecutionContext): Future[A] = { + val start = System.currentTimeMillis() + + future.andThen { + case Success(_) => successLengths = successLengths :+ System.currentTimeMillis() - start + case Failure(_) => failuresLengths = failuresLengths :+ System.currentTimeMillis() - start + } + } + }) + + { + val consumer = new DefaultRabbitMQConsumer[Task]( + "test", + channel, + "queueName", + monitor, + DeliveryResult.Retry, + DefaultListeners.DefaultConsumerListener, + Scheduler.global + )({ delivery => + assertResult(Some(messageId))(delivery.properties.messageId) + Task.raiseError(new RuntimeException) // immediate + }) + + consumer.handleDelivery("abcd", envelope, properties, Random.nextString(5).getBytes) + + eventually(timeout(Span(3, Seconds)), interval(Span(0.1, Seconds))) { + assertResult(Seq.empty)(successLengths) + val Seq(taskLength) = failuresLengths + + assert(taskLength < 200) + } + } + + successLengths = mutable.Seq.empty + failuresLengths = mutable.Seq.empty + + { + val consumer = new DefaultRabbitMQConsumer[Task]( + "test", + channel, + "queueName", + monitor, + DeliveryResult.Retry, + DefaultListeners.DefaultConsumerListener, + Scheduler.global + )({ delivery => + assertResult(Some(messageId))(delivery.properties.messageId) + import scala.concurrent.duration._ + Task.raiseError(new RuntimeException).delayExecution(2.second) + }) + + consumer.handleDelivery("abcd", envelope, properties, Random.nextString(5).getBytes) + + eventually(timeout(Span(3, Seconds)), interval(Span(0.1, Seconds))) { + assertResult(Seq.empty)(successLengths) + val Seq(taskLength) = failuresLengths + + assert(taskLength > 2000) + } + } + } + }