Skip to content

Commit

Permalink
Merge pull request #17 from avast/MetricsBugFix
Browse files Browse the repository at this point in the history
Fixed bug: wrong metrics reporting into "processed"
  • Loading branch information
jendakol authored Nov 2, 2018
2 parents 943eab6 + 2cc8cda commit 009d1a4
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 47 deletions.
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ subprojects {
cactusVersion = "0.12.2"
monixVersion = "3.0.0-RC1"
catsVersion = "1.2.0"
catsEffectVersion = "1.0.0-RC2-78a795d"
}

dependencies {
Expand Down
1 change: 0 additions & 1 deletion core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ dependencies {

compile "io.monix:monix_$scalaVersion:$monixVersion"
compile "org.typelevel:cats-core_$scalaVersion:$catsVersion"
compile "org.typelevel:cats-effect_$scalaVersion:$catsEffectVersion"

compile 'org.xbib:jsr-305:1.0.0'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import com.typesafe.scalalogging.StrictLogging
import monix.execution.Scheduler

import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.language.higherKinds
import scala.util.Failure
import scala.util.control.NonFatal
import scala.util.{Failure, Success}

class DefaultRabbitMQConsumer[F[_]: Effect](
override val name: String,
Expand Down Expand Up @@ -60,21 +61,17 @@ class DefaultRabbitMQConsumer[F[_]: Effect](
case None => envelope.getRoutingKey
}

val handleAction = handleDelivery(messageId, deliveryTag, properties, routingKey, body)
handleDelivery(messageId, deliveryTag, properties, routingKey, body)
.andThen {
case Success(_) =>
processingCount.decrementAndGet()
logger.debug(s"Delivery processed successfully (tag $deliveryTag)")

processedTimer.time {
(for {
_ <- IO.shift(blockingScheduler)
_ <- Effect[F].runAsync(handleAction) { _ =>
case Failure(NonFatal(e)) =>
processingCount.decrementAndGet()
IO.unit
}
} yield ())
.unsafeToFuture()
.andThen {
case Failure(NonFatal(e)) => logger.debug("Could not process delivery", e)
}(scheduler)
}(scheduler)
processingFailedMeter.mark()
logger.debug("Could not process delivery", e)
}

()
}
Expand All @@ -83,40 +80,43 @@ class DefaultRabbitMQConsumer[F[_]: Effect](
deliveryTag: Long,
properties: BasicProperties,
routingKey: String,
body: Array[Byte]): F[Unit] = {
{
try {
readMeter.mark()

logger.debug(s"[$name] Read delivery with ID $messageId, deliveryTag $deliveryTag")

val delivery = Delivery(Bytes.copyFrom(body), properties.asScala, Option(routingKey).getOrElse(""))

logger.trace(s"[$name] Received delivery: $delivery")

readAction(delivery)
.flatMap {
handleResult(messageId, deliveryTag, properties, routingKey, body)
}
.recoverWith {
case NonFatal(t) => handleCallbackFailure(messageId, deliveryTag, properties, routingKey, body)(t)
}
} catch {
// we catch this specific exception, handling of others is up to Lyra
case e: RejectedExecutionException =>
logger.debug(s"[$name] Executor was unable to plan the handling task", e)
handleFailure(messageId, deliveryTag, properties, routingKey, body, e)

case NonFatal(e) => handleCallbackFailure(messageId, deliveryTag, properties, routingKey, body)(e)
}
body: Array[Byte]): Future[Unit] = {
try {
readMeter.mark()

logger.debug(s"[$name] Read delivery with ID $messageId, deliveryTag $deliveryTag")

val delivery = Delivery(Bytes.copyFrom(body), properties.asScala, Option(routingKey).getOrElse(""))

logger.trace(s"[$name] Received delivery: $delivery")

processedTimer
.time {
toIO {
readAction(delivery)
.flatMap {
handleResult(messageId, deliveryTag, properties, routingKey, body)
}
}.unsafeToFuture()
}
.recoverWith {
case NonFatal(t) => handleCallbackFailure(messageId, deliveryTag, properties, routingKey, body)(t)
}
} catch {
// we catch this specific exception, handling of others is up to Lyra
case e: RejectedExecutionException =>
logger.debug(s"[$name] Executor was unable to plan the handling task", e)
handleFailure(messageId, deliveryTag, properties, routingKey, body, e)

case NonFatal(e) => handleCallbackFailure(messageId, deliveryTag, properties, routingKey, body)(e)
}
}

private def handleCallbackFailure(messageId: String,
deliveryTag: Long,
properties: BasicProperties,
routingKey: String,
body: Array[Byte])(t: Throwable): F[Unit] = {
body: Array[Byte])(t: Throwable): Future[Unit] = {

logger.error(s"[$name] Error while executing callback, it's probably a BUG", t)

Expand All @@ -128,11 +128,14 @@ class DefaultRabbitMQConsumer[F[_]: Effect](
properties: BasicProperties,
routingKey: String,
body: Array[Byte],
t: Throwable): F[Unit] = {
t: Throwable): Future[Unit] = {
processingCount.decrementAndGet()
processingFailedMeter.mark()
consumerListener.onError(this, name, channel, t)
executeFailureAction(messageId, deliveryTag, properties, routingKey, body)

toIO {
executeFailureAction(messageId, deliveryTag, properties, routingKey, body)
}.unsafeToFuture()
}

private def executeFailureAction(messageId: String,
Expand All @@ -147,6 +150,10 @@ class DefaultRabbitMQConsumer[F[_]: Effect](
channel.close()
}

private def toIO[A](f: F[A]): IO[A] =
IO.async { cb =>
Effect[F].runAsync(f)(r => IO(cb(r))).unsafeRunSync()
}
}

object DefaultRabbitMQConsumer {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package com.avast.clients.rabbitmq

import java.time.Duration
import java.util.UUID

import com.avast.clients.rabbitmq.RabbitMQConnection.DefaultListeners
import com.avast.clients.rabbitmq.api.DeliveryResult
import com.avast.metrics.scalaapi.Monitor
import com.avast.metrics.scalaapi.{Gauge, Monitor, TimerPair}
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client.Envelope
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel
Expand All @@ -16,7 +17,9 @@ import org.mockito.Matchers.any
import org.mockito.Mockito._
import org.scalatest.time.{Seconds, Span}

import scala.util.Random
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Random, Success}

class DefaultRabbitMQConsumerTest extends TestBase {
test("should ACK") {
Expand Down Expand Up @@ -240,4 +243,190 @@ class DefaultRabbitMQConsumerTest extends TestBase {
}
}

test("measures processed time correctly - success") {
val messageId = UUID.randomUUID().toString

val deliveryTag = Random.nextInt(1000)

val envelope = mock[Envelope]
when(envelope.getDeliveryTag).thenReturn(deliveryTag)

val properties = mock[BasicProperties]
when(properties.getMessageId).thenReturn(messageId)

val channel = mock[AutorecoveringChannel]

val monitor = mock[Monitor]
when(monitor.meter(Matchers.anyString())).thenReturn(Monitor.noOp.meter(""))
when(monitor.named(Matchers.eq("results"))).thenReturn(Monitor.noOp())
val tasksMonitor = mock[Monitor]
when(monitor.named(Matchers.eq("tasks"))).thenReturn(tasksMonitor)
when(tasksMonitor.gauge(Matchers.anyString())(Matchers.any()))
.thenReturn(Monitor.noOp().gauge("")(() => 0).asInstanceOf[Gauge[Nothing]])

var successLengths = mutable.Seq.empty[Long] // scalastyle:ignore
var failuresLengths = mutable.Seq.empty[Long] // scalastyle:ignore

when(tasksMonitor.timerPair(Matchers.eq("processed"))).thenReturn(new TimerPair {
override def start(): TimeContext = ???
override def update(duration: Duration): Unit = ???

override def updateFailure(duration: Duration): Unit = ???
override def time[A](block: => A): A = ???
override def time[A](future: => Future[A])(implicit ec: ExecutionContext): Future[A] = {
val start = System.currentTimeMillis()

future.andThen {
case Success(_) => successLengths = successLengths :+ System.currentTimeMillis() - start
case Failure(_) => failuresLengths = failuresLengths :+ System.currentTimeMillis() - start
}
}
})

{
val consumer = new DefaultRabbitMQConsumer[Task](
"test",
channel,
"queueName",
monitor,
DeliveryResult.Retry,
DefaultListeners.DefaultConsumerListener,
Scheduler.global
)({ delivery =>
assertResult(Some(messageId))(delivery.properties.messageId)
Task.now(DeliveryResult.Ack) // immediate
})

consumer.handleDelivery("abcd", envelope, properties, Random.nextString(5).getBytes)

eventually(timeout(Span(3, Seconds)), interval(Span(0.1, Seconds))) {
assertResult(Seq.empty)(failuresLengths)
val Seq(taskLength) = successLengths

assert(taskLength < 200)
}
}

successLengths = mutable.Seq.empty
failuresLengths = mutable.Seq.empty

{
val consumer = new DefaultRabbitMQConsumer[Task](
"test",
channel,
"queueName",
monitor,
DeliveryResult.Retry,
DefaultListeners.DefaultConsumerListener,
Scheduler.global
)({ delivery =>
assertResult(Some(messageId))(delivery.properties.messageId)
import scala.concurrent.duration._
Task.now(DeliveryResult.Ack).delayResult(2.second)
})

consumer.handleDelivery("abcd", envelope, properties, Random.nextString(5).getBytes)

eventually(timeout(Span(3, Seconds)), interval(Span(0.1, Seconds))) {
assertResult(Seq.empty)(failuresLengths)
val Seq(taskLength) = successLengths

assert(taskLength > 2000)
}
}
}

test("measures processed time correctly - failure") {
val messageId = UUID.randomUUID().toString

val deliveryTag = Random.nextInt(1000)

val envelope = mock[Envelope]
when(envelope.getDeliveryTag).thenReturn(deliveryTag)

val properties = mock[BasicProperties]
when(properties.getMessageId).thenReturn(messageId)

val channel = mock[AutorecoveringChannel]

val monitor = mock[Monitor]
when(monitor.meter(Matchers.anyString())).thenReturn(Monitor.noOp.meter(""))
when(monitor.named(Matchers.eq("results"))).thenReturn(Monitor.noOp())
val tasksMonitor = mock[Monitor]
when(monitor.named(Matchers.eq("tasks"))).thenReturn(tasksMonitor)
when(tasksMonitor.gauge(Matchers.anyString())(Matchers.any()))
.thenReturn(Monitor.noOp().gauge("")(() => 0).asInstanceOf[Gauge[Nothing]])

var successLengths = mutable.Seq.empty[Long] // scalastyle:ignore
var failuresLengths = mutable.Seq.empty[Long] // scalastyle:ignore

when(tasksMonitor.timerPair(Matchers.eq("processed"))).thenReturn(new TimerPair {
override def start(): TimeContext = ???
override def update(duration: Duration): Unit = ???

override def updateFailure(duration: Duration): Unit = ???
override def time[A](block: => A): A = ???
override def time[A](future: => Future[A])(implicit ec: ExecutionContext): Future[A] = {
val start = System.currentTimeMillis()

future.andThen {
case Success(_) => successLengths = successLengths :+ System.currentTimeMillis() - start
case Failure(_) => failuresLengths = failuresLengths :+ System.currentTimeMillis() - start
}
}
})

{
val consumer = new DefaultRabbitMQConsumer[Task](
"test",
channel,
"queueName",
monitor,
DeliveryResult.Retry,
DefaultListeners.DefaultConsumerListener,
Scheduler.global
)({ delivery =>
assertResult(Some(messageId))(delivery.properties.messageId)
Task.raiseError(new RuntimeException) // immediate
})

consumer.handleDelivery("abcd", envelope, properties, Random.nextString(5).getBytes)

eventually(timeout(Span(3, Seconds)), interval(Span(0.1, Seconds))) {
assertResult(Seq.empty)(successLengths)
val Seq(taskLength) = failuresLengths

assert(taskLength < 200)
}
}

successLengths = mutable.Seq.empty
failuresLengths = mutable.Seq.empty

{
val consumer = new DefaultRabbitMQConsumer[Task](
"test",
channel,
"queueName",
monitor,
DeliveryResult.Retry,
DefaultListeners.DefaultConsumerListener,
Scheduler.global
)({ delivery =>
assertResult(Some(messageId))(delivery.properties.messageId)
import scala.concurrent.duration._
Task.raiseError(new RuntimeException).delayExecution(2.second)
})

consumer.handleDelivery("abcd", envelope, properties, Random.nextString(5).getBytes)

eventually(timeout(Span(3, Seconds)), interval(Span(0.1, Seconds))) {
assertResult(Seq.empty)(successLengths)
val Seq(taskLength) = failuresLengths

assert(taskLength > 2000)
}
}
}

}

0 comments on commit 009d1a4

Please sign in to comment.