From 6b3c1556fe8deefbbe21707e29d56fbd490fb0bf Mon Sep 17 00:00:00 2001 From: Jenda Kolena Date: Sun, 25 Sep 2022 14:19:27 +0200 Subject: [PATCH] *DirectlyPoisoned* DeliveryResult - Add `DeliveryResult.DirectlyPoisoned` - Internal redesign of `PoisonedMessageHandler` --- .../clients/rabbitmq/api/DeliveryResult.scala | 3 + .../clients/rabbitmq/ConsumerChannelOps.scala | 1 + .../rabbitmq/PoisonedMessageHandler.scala | 155 +++++++++++------- .../DefaultRabbitMQConsumerTest.scala | 3 +- .../DefaultRabbitMQPullConsumerTest.scala | 3 +- .../rabbitmq/PoisonedMessageHandlerTest.scala | 107 +++++++++--- .../rabbitmq/RepublishStrategyTest.scala | 3 +- 7 files changed, 196 insertions(+), 79 deletions(-) diff --git a/api/src/main/scala/com/avast/clients/rabbitmq/api/DeliveryResult.scala b/api/src/main/scala/com/avast/clients/rabbitmq/api/DeliveryResult.scala index 8abcd3e2..daa2e7ff 100644 --- a/api/src/main/scala/com/avast/clients/rabbitmq/api/DeliveryResult.scala +++ b/api/src/main/scala/com/avast/clients/rabbitmq/api/DeliveryResult.scala @@ -21,4 +21,7 @@ object DeliveryResult { * */ case class Republish(countAsPoisoned: Boolean = true, newHeaders: Map[String, AnyRef] = Map.empty) extends DeliveryResult + /** The message cannot be processed and should be considered as _poisoned_ even without further retrying. */ + case object DirectlyPoison extends DeliveryResult + } diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/ConsumerChannelOps.scala b/core/src/main/scala/com/avast/clients/rabbitmq/ConsumerChannelOps.scala index 321493eb..e619d607 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/ConsumerChannelOps.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/ConsumerChannelOps.scala @@ -40,6 +40,7 @@ final private[rabbitmq] case class ConsumerChannelOps[F[_]: ConcurrentEffect: Ti case Reject => reject() case Retry => retry() case Republish(_, newHeaders) => republish(createPropertiesForRepublish(newHeaders, fixedProperties, routingKey), rawBody) + case DirectlyPoison => throw new IllegalStateException("Poison state should be handled by PMH, this is most probably a BUG") } } diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/PoisonedMessageHandler.scala b/core/src/main/scala/com/avast/clients/rabbitmq/PoisonedMessageHandler.scala index d11cadcf..d3633b1c 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/PoisonedMessageHandler.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/PoisonedMessageHandler.scala @@ -5,12 +5,13 @@ import cats.effect.{Resource, Sync, Timer} import cats.implicits.{catsSyntaxApplicativeError, catsSyntaxFlatMapOps, toFunctorOps} import com.avast.bytes.Bytes import com.avast.clients.rabbitmq.PoisonedMessageHandler.DiscardedTimeHeaderName -import com.avast.clients.rabbitmq.api.DeliveryResult.{Reject, Republish} +import com.avast.clients.rabbitmq.api.DeliveryResult._ import com.avast.clients.rabbitmq.api._ import com.avast.clients.rabbitmq.logging.ImplicitContextLogger -import com.avast.metrics.scalaeffectapi.Monitor +import com.avast.metrics.scalaeffectapi.{Meter, Monitor} import java.time.Instant +import scala.reflect.ClassTag import scala.util.Try import scala.util.control.NonFatal @@ -19,69 +20,70 @@ sealed trait PoisonedMessageHandler[F[_], A] { implicit dctx: DeliveryContext): F[DeliveryResult] } -class LoggingPoisonedMessageHandler[F[_]: Sync: Timer, A](maxAttempts: Int, - redactPayload: Boolean, - republishDelay: Option[ExponentialDelay]) - extends PoisonedMessageHandler[F, A] { - private val logger = ImplicitContextLogger.createLogger[F, LoggingPoisonedMessageHandler[F, A]] +private trait PoisonedMessageHandlerAction[F[_], A] { + def handlePoisonedMessage(rawBody: Bytes)(delivery: Delivery[A], maxAttempts: Int)(implicit dctx: DeliveryContext): F[Unit] +} - private def defaultHandlePoisonedMessage[F[_]: Sync](maxAttempts: Int, logger: ImplicitContextLogger[F])(delivery: Delivery[A])( - implicit dctx: DeliveryContext): F[Unit] = { - val deliveryStr = (if (!redactPayload) delivery else delivery.withRedactedBody).toString - - logger.warn(s"Message failures reached the limit $maxAttempts attempts, throwing away: $deliveryStr") - } +private sealed abstract class PoisonedMessageHandlerBase[F[_]: Sync: Timer, A](maxAttempts: Int, + republishDelay: Option[ExponentialDelay], + helper: PoisonedMessageHandlerHelper[F]) + extends PoisonedMessageHandler[F, A] + with PoisonedMessageHandlerAction[F, A] { override def interceptResult(delivery: Delivery[A], messageId: MessageId, rawBody: Bytes)(result: DeliveryResult)( implicit dctx: DeliveryContext): F[DeliveryResult] = { - PoisonedMessageHandler.handleResult(delivery, - messageId, - maxAttempts, - logger, - republishDelay, - (d: Delivery[A], _) => defaultHandlePoisonedMessage[F](maxAttempts, logger)(d))(result) + PoisonedMessageHandler.handleResult(delivery, rawBody, messageId, maxAttempts, helper, republishDelay, this)(result) } } -class NoOpPoisonedMessageHandler[F[_]: Sync, A] extends PoisonedMessageHandler[F, A] { - override def interceptResult(delivery: Delivery[A], messageId: MessageId, rawBody: Bytes)(result: DeliveryResult)( - implicit dctx: DeliveryContext): F[DeliveryResult] = Sync[F].pure(result) -} - -class DeadQueuePoisonedMessageHandler[F[_]: Sync: Timer, A]( - maxAttempts: Int, - redactPayload: Boolean, - republishDelay: Option[ExponentialDelay])(moveToDeadQueue: (Delivery[A], Bytes, DeliveryContext) => F[Unit]) - extends PoisonedMessageHandler[F, A] { - private val logger = ImplicitContextLogger.createLogger[F, DeadQueuePoisonedMessageHandler[F, A]] +private[rabbitmq] class LoggingPoisonedMessageHandler[F[_]: Sync: Timer, A](maxAttempts: Int, + republishDelay: Option[ExponentialDelay], + helper: PoisonedMessageHandlerHelper[F]) + extends PoisonedMessageHandlerBase[F, A](maxAttempts, republishDelay, helper) { + import helper._ - override def interceptResult(delivery: Delivery[A], messageId: MessageId, rawBody: Bytes)(result: DeliveryResult)( - implicit dctx: DeliveryContext): F[DeliveryResult] = { - PoisonedMessageHandler.handleResult(delivery, - messageId, - maxAttempts, - logger, - republishDelay, - (d, _) => handlePoisonedMessage(d, messageId, rawBody))(result) + override def handlePoisonedMessage(rawBody: Bytes)(delivery: Delivery[A], ma: Int)(implicit dctx: DeliveryContext): F[Unit] = { + logger.warn(s"Message failures reached the limit $ma attempts, throwing away: ${redactIfConfigured(delivery)}") } +} - private def handlePoisonedMessage(delivery: Delivery[A], messageId: MessageId, rawBody: Bytes)( - implicit dctx: DeliveryContext): F[Unit] = { +private[rabbitmq] class DeadQueuePoisonedMessageHandler[F[_]: Sync: Timer, A]( + maxAttempts: Int, + republishDelay: Option[ExponentialDelay], + helper: PoisonedMessageHandlerHelper[F])(moveToDeadQueue: (Delivery[A], Bytes, DeliveryContext) => F[Unit]) + extends PoisonedMessageHandlerBase[F, A](maxAttempts, republishDelay, helper) { + import helper._ - val deliveryStr = (if (!redactPayload) delivery else delivery.withRedactedBody).toString + override def handlePoisonedMessage(rawBody: Bytes)(delivery: Delivery[A], ma: Int)(implicit dctx: DeliveryContext): F[Unit] = { + import dctx._ - logger.warn { s"Message $messageId failures reached the limit $maxAttempts attempts, moving it to the dead queue: $deliveryStr" } >> + logger.warn { + s"Message $messageId failures reached the limit $ma attempts, moving it to the dead queue: ${redactIfConfigured(delivery)}" + } >> moveToDeadQueue(delivery, rawBody, dctx) >> logger.debug(s"Message $messageId moved to the dead queue") } } -object DeadQueuePoisonedMessageHandler { +private[rabbitmq] class NoOpPoisonedMessageHandler[F[_]: Sync, A](helper: PoisonedMessageHandlerHelper[F]) + extends PoisonedMessageHandler[F, A] { + override def interceptResult(delivery: Delivery[A], messageId: MessageId, rawBody: Bytes)(result: DeliveryResult)( + implicit dctx: DeliveryContext): F[DeliveryResult] = { + result match { + case DeliveryResult.DirectlyPoison => + helper.logger.warn("Delivery can't be poisoned, because NoOpPoisonedMessageHandler is installed! Rejecting instead...").as(Reject) + + case _ => Sync[F].pure(result) + } + } +} + +private[rabbitmq] object DeadQueuePoisonedMessageHandler { def make[F[_]: Sync: Timer, A](conf: DeadQueuePoisonedMessageHandling, connection: RabbitMQConnection[F], - redactPayload: Boolean, - monitor: Monitor[F]): Resource[F, DeadQueuePoisonedMessageHandler[F, A]] = { + helper: PoisonedMessageHandlerHelper[F]): Resource[F, DeadQueuePoisonedMessageHandler[F, A]] = { import conf._ + import helper._ val pc = ProducerConfig( name = deadQueueProducer.name, @@ -93,7 +95,7 @@ object DeadQueuePoisonedMessageHandler { ) connection.newProducer[Bytes](pc, monitor.named("deadQueueProducer")).map { producer => - new DeadQueuePoisonedMessageHandler[F, A](maxAttempts, redactPayload, republishDelay)( + new DeadQueuePoisonedMessageHandler[F, A](maxAttempts, republishDelay, helper)( (d: Delivery[A], rawBody: Bytes, dctx: DeliveryContext) => { val cidStrategy = dctx.correlationId match { case Some(value) => CorrelationIdStrategy.Fixed(value.value) @@ -101,7 +103,6 @@ object DeadQueuePoisonedMessageHandler { } val now = Instant.now() - val finalProperties = d.properties.copy(headers = d.properties.headers.updated(DiscardedTimeHeaderName, now.toString)) producer.send(deadQueueProducer.routingKey, rawBody, Some(finalProperties))(cidStrategy) @@ -110,7 +111,7 @@ object DeadQueuePoisonedMessageHandler { } } -object PoisonedMessageHandler { +private[rabbitmq] object PoisonedMessageHandler { final val RepublishCountHeaderName: String = "X-Republish-Count" final val DiscardedTimeHeaderName: String = "X-Discarded-Time" @@ -120,13 +121,19 @@ object PoisonedMessageHandler { monitor: Monitor[F]): Resource[F, PoisonedMessageHandler[F, A]] = { config match { case Some(LoggingPoisonedMessageHandling(maxAttempts, republishDelay)) => - Resource.pure(new LoggingPoisonedMessageHandler[F, A](maxAttempts, redactPayload, republishDelay)) - case Some(c: DeadQueuePoisonedMessageHandling) => DeadQueuePoisonedMessageHandler.make(c, connection, redactPayload, monitor) + val helper = PoisonedMessageHandlerHelper[F, LoggingPoisonedMessageHandler[F, A]](monitor, redactPayload) + Resource.pure(new LoggingPoisonedMessageHandler[F, A](maxAttempts, republishDelay, helper)) + + case Some(c: DeadQueuePoisonedMessageHandling) => + val helper = PoisonedMessageHandlerHelper[F, DeadQueuePoisonedMessageHandler[F, A]](monitor, redactPayload) + DeadQueuePoisonedMessageHandler.make(c, connection, helper) + case Some(NoOpPoisonedMessageHandling) | None => Resource.eval { - val logger = ImplicitContextLogger.createLogger[F, NoOpPoisonedMessageHandler[F, A]] - logger.plainWarn("Using NO-OP poisoned message handler. Potential poisoned messages will cycle forever.").as { - new NoOpPoisonedMessageHandler[F, A] + val helper = PoisonedMessageHandlerHelper[F, NoOpPoisonedMessageHandler[F, A]](monitor, redactPayload) + + helper.logger.plainWarn("Using NO-OP poisoned message handler. Potential poisoned messages will cycle forever.").as { + new NoOpPoisonedMessageHandler[F, A](helper) } } } @@ -134,14 +141,18 @@ object PoisonedMessageHandler { private[rabbitmq] def handleResult[F[_]: Sync: Timer, A]( delivery: Delivery[A], + rawBody: Bytes, messageId: MessageId, maxAttempts: Int, - logger: ImplicitContextLogger[F], + helper: PoisonedMessageHandlerHelper[F], republishDelay: Option[ExponentialDelay], - handlePoisonedMessage: (Delivery[A], Int) => F[Unit])(r: DeliveryResult)(implicit dctx: DeliveryContext): F[DeliveryResult] = { + handler: PoisonedMessageHandlerAction[F, A])(r: DeliveryResult)(implicit dctx: DeliveryContext): F[DeliveryResult] = { r match { case Republish(isPoisoned, newHeaders) if isPoisoned => - adjustDeliveryResult(delivery, messageId, maxAttempts, newHeaders, logger, republishDelay, handlePoisonedMessage) + adjustDeliveryResult(delivery, messageId, maxAttempts, newHeaders, helper, republishDelay, handler.handlePoisonedMessage(rawBody)) + + case DirectlyPoison => poisonRightAway(delivery, messageId, helper, handler.handlePoisonedMessage(rawBody)) + case r => Applicative[F].pure(r) // keep other results as they are } } @@ -151,10 +162,11 @@ object PoisonedMessageHandler { messageId: MessageId, maxAttempts: Int, newHeaders: Map[String, AnyRef], - logger: ImplicitContextLogger[F], + helper: PoisonedMessageHandlerHelper[F], republishDelay: Option[ExponentialDelay], handlePoisonedMessage: (Delivery[A], Int) => F[Unit])(implicit dctx: DeliveryContext): F[DeliveryResult] = { import cats.syntax.traverse._ + import helper._ // get current attempt no. from passed headers with fallback to original (incoming) headers - the fallback will most likely happen // but we're giving the programmer chance to programmatically _pretend_ lower attempt number @@ -197,4 +209,33 @@ object PoisonedMessageHandler { } } + private def poisonRightAway[F[_]: Sync, A]( + delivery: Delivery[A], + messageId: MessageId, + helper: PoisonedMessageHandlerHelper[F], + handlePoisonedMessage: (Delivery[A], Int) => F[Unit])(implicit dctx: DeliveryContext): F[DeliveryResult] = { + helper.logger.info(s"Directly poisoning delivery $messageId") >> + handlePoisonedMessage(delivery, 0) >> + helper.directlyPoisonedMeter.mark >> + Sync[F].pure(Reject: DeliveryResult) + } + +} + +private[rabbitmq] class PoisonedMessageHandlerHelper[F[_]: Sync](val logger: ImplicitContextLogger[F], + val monitor: Monitor[F], + redactPayload: Boolean) { + + val directlyPoisonedMeter: Meter[F] = monitor.meter("directlyPoisoned") + + def redactIfConfigured(delivery: Delivery[_]): Delivery[Any] = { + if (!redactPayload) delivery else delivery.withRedactedBody + } +} + +private[rabbitmq] object PoisonedMessageHandlerHelper { + def apply[F[_]: Sync, PMH: ClassTag](monitor: Monitor[F], redactPayload: Boolean): PoisonedMessageHandlerHelper[F] = { + val logger: ImplicitContextLogger[F] = ImplicitContextLogger.createLogger[F, PMH] + new PoisonedMessageHandlerHelper[F](logger, monitor, redactPayload) + } } diff --git a/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumerTest.scala b/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumerTest.scala index 151b33b7..8a4ab849 100644 --- a/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumerTest.scala +++ b/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumerTest.scala @@ -27,6 +27,7 @@ import scala.util._ class DefaultRabbitMQConsumerTest extends TestBase { private val connectionInfo = RabbitMQConnectionInfo(immutable.Seq("localhost"), "/", None) + private val pmhHelper = PoisonedMessageHandlerHelper[Task, DefaultRabbitMQConsumerTest](Monitor.noOp(), redactPayload = false) test("should ACK") { val messageId = UUID.randomUUID().toString @@ -466,5 +467,5 @@ class DefaultRabbitMQConsumerTest extends TestBase { )(userAction) } - object PMH extends LoggingPoisonedMessageHandler[Task, Bytes](3, false, None) + object PMH extends LoggingPoisonedMessageHandler[Task, Bytes](3, None, pmhHelper) } diff --git a/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQPullConsumerTest.scala b/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQPullConsumerTest.scala index 2f43a29a..047632c1 100644 --- a/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQPullConsumerTest.scala +++ b/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQPullConsumerTest.scala @@ -22,6 +22,7 @@ import scala.util.Random class DefaultRabbitMQPullConsumerTest extends TestBase { private val connectionInfo = RabbitMQConnectionInfo(immutable.Seq("localhost"), "/", None) + private val pmhHelper = PoisonedMessageHandlerHelper[Task, DefaultRabbitMQPullConsumerTest](Monitor.noOp(), redactPayload = false) test("should ACK") { val messageId = UUID.randomUUID().toString @@ -287,5 +288,5 @@ class DefaultRabbitMQPullConsumerTest extends TestBase { new DefaultRabbitMQPullConsumer[Task, A](base, channelOps) } - class PMH[A] extends LoggingPoisonedMessageHandler[Task, A](3, false, None) + class PMH[A] extends LoggingPoisonedMessageHandler[Task, A](3, None, pmhHelper) } diff --git a/core/src/test/scala/com/avast/clients/rabbitmq/PoisonedMessageHandlerTest.scala b/core/src/test/scala/com/avast/clients/rabbitmq/PoisonedMessageHandlerTest.scala index 4b0281de..c40317b2 100644 --- a/core/src/test/scala/com/avast/clients/rabbitmq/PoisonedMessageHandlerTest.scala +++ b/core/src/test/scala/com/avast/clients/rabbitmq/PoisonedMessageHandlerTest.scala @@ -2,32 +2,40 @@ package com.avast.clients.rabbitmq import com.avast.bytes.Bytes import com.avast.clients.rabbitmq.PoisonedMessageHandler._ -import com.avast.clients.rabbitmq.api.DeliveryResult.Republish +import com.avast.clients.rabbitmq.api.DeliveryResult.{DirectlyPoison, Republish} import com.avast.clients.rabbitmq.api._ -import com.avast.clients.rabbitmq.logging.ImplicitContextLogger +import com.avast.metrics.scalaeffectapi.Monitor import monix.eval.Task import monix.execution.Scheduler.Implicits.global import java.time.Instant import java.util.concurrent.atomic.AtomicInteger +import scala.util.Random class PoisonedMessageHandlerTest extends TestBase { implicit val dctx: DeliveryContext = TestDeliveryContext.create() - private val ilogger = ImplicitContextLogger.createLogger[Task, PoisonedMessageHandlerTest] + private val pmhHelper = PoisonedMessageHandlerHelper[Task, PoisonedMessageHandlerTest](Monitor.noOp(), redactPayload = false) test("PoisonedMessageHandler.handleResult ignores non-poisoned") { - def readAction(d: Delivery[Bytes]): Task[DeliveryResult] = { - Task.now(Republish(countAsPoisoned = false)) - } - val movedCount = new AtomicInteger(0) + val action = new PoisonedMessageHandlerAction[Task, Bytes] { + override def handlePoisonedMessage(rawBody: Bytes)(delivery: Delivery[Bytes], + maxAttempts: Int)(implicit dctx: DeliveryContext): Task[Unit] = Task.delay { + movedCount.incrementAndGet() + } + } + PoisonedMessageHandler - .handleResult[Task, Bytes](Delivery.Ok(Bytes.empty(), MessageProperties(), ""), MessageId("msg-id"), 1, ilogger, None, (_, _) => { - Task.delay { movedCount.incrementAndGet() } - })(Republish(countAsPoisoned = false)) + .handleResult[Task, Bytes](Delivery.Ok(Bytes.empty(), MessageProperties(), ""), + Bytes.empty(), + MessageId("msg-id"), + 1, + pmhHelper, + None, + action)(Republish(countAsPoisoned = false)) .await assertResult(0)(movedCount.get()) @@ -35,9 +43,13 @@ class PoisonedMessageHandlerTest extends TestBase { movedCount.set(0) PoisonedMessageHandler - .handleResult[Task, Bytes](Delivery.Ok(Bytes.empty(), MessageProperties(), ""), MessageId("msg-id"), 1, ilogger, None, (_, _) => { - Task.delay { movedCount.incrementAndGet() } - })(Republish()) + .handleResult[Task, Bytes](Delivery.Ok(Bytes.empty(), MessageProperties(), ""), + Bytes.empty(), + MessageId("msg-id"), + 1, + pmhHelper, + None, + action)(Republish()) .await assertResult(1)(movedCount.get()) @@ -48,7 +60,7 @@ class PoisonedMessageHandlerTest extends TestBase { Task.now(Republish()) } - val handler = new LoggingPoisonedMessageHandler[Task, Bytes](5, false, None) + val handler = new LoggingPoisonedMessageHandler[Task, Bytes](5, None, pmhHelper) val properties = (1 to 4).foldLeft(MessageProperties.empty) { case (p, _) => @@ -72,7 +84,8 @@ class PoisonedMessageHandlerTest extends TestBase { Task.now(Republish()) } - val handler = new LoggingPoisonedMessageHandler[Task, Bytes](5, false, Some(new ExponentialDelay(1.seconds, 1.seconds, 2, 2.seconds))) + val delay = Some(new ExponentialDelay(1.seconds, 1.seconds, 2, 2.seconds)) + val handler = new LoggingPoisonedMessageHandler[Task, Bytes](5, delay, pmhHelper) val timeBeforeExecution = Instant.now() val properties = (1 to 4).foldLeft(MessageProperties.empty) { case (p, _) => @@ -91,12 +104,28 @@ class PoisonedMessageHandlerTest extends TestBase { assertResult(DeliveryResult.Reject)(run(handler, readAction, properties)) } + test("LoggingPoisonedMessageHandler direct poisoning") { + import scala.concurrent.duration._ + + def readAction(d: Delivery[Bytes]): Task[DeliveryResult] = { + Task.now(DirectlyPoison) + } + + val delay = Some(new ExponentialDelay(1.seconds, 1.seconds, 2, 2.seconds)) + val handler = new LoggingPoisonedMessageHandler[Task, Bytes](5, delay, pmhHelper) + + // check it will Reject the message on every attempt + for (_ <- 1 to Random.nextInt(10) + 10) { + assertResult(DeliveryResult.Reject)(run(handler, readAction, MessageProperties.empty)) + } + } + test("NoOpPoisonedMessageHandler basic") { def readAction(d: Delivery[Bytes]): Task[DeliveryResult] = { Task.now(Republish()) } - val handler = new NoOpPoisonedMessageHandler[Task, Bytes] + val handler = new NoOpPoisonedMessageHandler[Task, Bytes](pmhHelper) val properties = (1 to 4).foldLeft(MessageProperties.empty) { case (p, _) => @@ -110,6 +139,22 @@ class PoisonedMessageHandlerTest extends TestBase { assertResult(MessageProperties(headers = Map.empty))(properties) } + test("NoOpPoisonedMessageHandler does nothing") { + def readAction(d: Delivery[Bytes]): Task[DeliveryResult] = { + Task.now(Republish()) + } + + val handler = new NoOpPoisonedMessageHandler[Task, Bytes](pmhHelper) + + assertResult(DeliveryResult.Ack)(run(handler, _ => Task.now(DeliveryResult.Ack), MessageProperties.empty)) + assertResult(DeliveryResult.Retry)(run(handler, _ => Task.now(DeliveryResult.Retry), MessageProperties.empty)) + assertResult(DeliveryResult.Reject)(run(handler, _ => Task.now(DeliveryResult.Reject), MessageProperties.empty)) + assertResult(DeliveryResult.Republish())(run(handler, _ => Task.now(DeliveryResult.Republish()), MessageProperties.empty)) + + // this is the only exception... turn DirectlyPoison to Reject + assertResult(DeliveryResult.Reject)(run(handler, _ => Task.now(DeliveryResult.DirectlyPoison), MessageProperties.empty)) + } + test("DeadQueuePoisonedMessageHandler basic") { def readAction(d: Delivery[Bytes]): Task[DeliveryResult] = { Task.now(Republish()) @@ -117,7 +162,7 @@ class PoisonedMessageHandlerTest extends TestBase { val movedCount = new AtomicInteger(0) - val handler = new DeadQueuePoisonedMessageHandler[Task, Bytes](5, false, None)({ (_, _, _) => + val handler = new DeadQueuePoisonedMessageHandler[Task, Bytes](5, None, pmhHelper)({ (_, _, _) => Task.delay(movedCount.incrementAndGet()) }) @@ -138,6 +183,30 @@ class PoisonedMessageHandlerTest extends TestBase { assertResult(1)(movedCount.get()) } + test("DeadQueuePoisonedMessageHandler direct poisoning") { + import scala.concurrent.duration._ + + def readAction(d: Delivery[Bytes]): Task[DeliveryResult] = { + Task.now(DirectlyPoison) + } + + val movedCount = new AtomicInteger(0) + + val delay = Some(new ExponentialDelay(1.seconds, 1.seconds, 2, 2.seconds)) + val handler = new DeadQueuePoisonedMessageHandler[Task, Bytes](5, delay, pmhHelper)({ (_, _, _) => + Task.delay(movedCount.incrementAndGet()) + }) + + // check it will Reject the message on every attempt + val cnt = Random.nextInt(10) + 10 + + for (_ <- 1 to cnt) { + assertResult(DeliveryResult.Reject)(run(handler, readAction, MessageProperties.empty)) + } + + assertResult(cnt)(movedCount.get()) + } + test("DeadQueuePoisonedMessageHandler adds discarded time") { def readAction(d: Delivery[Bytes]): Task[DeliveryResult] = { Task.now(Republish()) @@ -145,7 +214,7 @@ class PoisonedMessageHandlerTest extends TestBase { val movedCount = new AtomicInteger(0) - val handler = new DeadQueuePoisonedMessageHandler[Task, Bytes](3, false, None)({ (d, _, _) => + val handler = new DeadQueuePoisonedMessageHandler[Task, Bytes](3, None, pmhHelper)({ (d, _, _) => // test it's there and it can be parsed assert(Instant.parse(d.properties.headers(DiscardedTimeHeaderName).asInstanceOf[String]).toEpochMilli > 0) @@ -173,7 +242,7 @@ class PoisonedMessageHandlerTest extends TestBase { val movedCount = new AtomicInteger(0) - val handler = new DeadQueuePoisonedMessageHandler[Task, Bytes](5, false, None)({ (_, _, _) => + val handler = new DeadQueuePoisonedMessageHandler[Task, Bytes](5, None, pmhHelper)({ (_, _, _) => Task.delay(movedCount.incrementAndGet()) }) diff --git a/core/src/test/scala/com/avast/clients/rabbitmq/RepublishStrategyTest.scala b/core/src/test/scala/com/avast/clients/rabbitmq/RepublishStrategyTest.scala index 4a7b2fbd..1b53d18a 100644 --- a/core/src/test/scala/com/avast/clients/rabbitmq/RepublishStrategyTest.scala +++ b/core/src/test/scala/com/avast/clients/rabbitmq/RepublishStrategyTest.scala @@ -25,6 +25,7 @@ import scala.util.Random class RepublishStrategyTest extends TestBase { private val connectionInfo = RabbitMQConnectionInfo(immutable.Seq("localhost"), "/", None) + private val pmhHelper = PoisonedMessageHandlerHelper[Task, RepublishStrategyTest](Monitor.noOp(), redactPayload = false) test("default exchange") { val messageId = UUID.randomUUID().toString @@ -125,5 +126,5 @@ class RepublishStrategyTest extends TestBase { )(userAction) } - object PMH extends LoggingPoisonedMessageHandler[Task, Bytes](3, false, None) + object PMH extends LoggingPoisonedMessageHandler[Task, Bytes](3, None, pmhHelper) }