Skip to content

Commit

Permalink
Merge pull request #192 from avast/DirectPoisoning
Browse files Browse the repository at this point in the history
Direct poisoning
  • Loading branch information
jendakol authored Sep 26, 2022
2 parents 608e693 + 6b3c155 commit 2d94bfd
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,7 @@ object DeliveryResult {
* */
case class Republish(countAsPoisoned: Boolean = true, newHeaders: Map[String, AnyRef] = Map.empty) extends DeliveryResult

/** The message cannot be processed and should be considered as _poisoned_ even without further retrying. */
case object DirectlyPoison extends DeliveryResult

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ final private[rabbitmq] case class ConsumerChannelOps[F[_]: ConcurrentEffect: Ti
case Reject => reject()
case Retry => retry()
case Republish(_, newHeaders) => republish(createPropertiesForRepublish(newHeaders, fixedProperties, routingKey), rawBody)
case DirectlyPoison => throw new IllegalStateException("Poison state should be handled by PMH, this is most probably a BUG")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import cats.effect.{Resource, Sync, Timer}
import cats.implicits.{catsSyntaxApplicativeError, catsSyntaxFlatMapOps, toFunctorOps}
import com.avast.bytes.Bytes
import com.avast.clients.rabbitmq.PoisonedMessageHandler.DiscardedTimeHeaderName
import com.avast.clients.rabbitmq.api.DeliveryResult.{Reject, Republish}
import com.avast.clients.rabbitmq.api.DeliveryResult._
import com.avast.clients.rabbitmq.api._
import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
import com.avast.metrics.scalaeffectapi.Monitor
import com.avast.metrics.scalaeffectapi.{Meter, Monitor}

import java.time.Instant
import scala.reflect.ClassTag
import scala.util.Try
import scala.util.control.NonFatal

Expand All @@ -19,69 +20,70 @@ sealed trait PoisonedMessageHandler[F[_], A] {
implicit dctx: DeliveryContext): F[DeliveryResult]
}

class LoggingPoisonedMessageHandler[F[_]: Sync: Timer, A](maxAttempts: Int,
redactPayload: Boolean,
republishDelay: Option[ExponentialDelay])
extends PoisonedMessageHandler[F, A] {
private val logger = ImplicitContextLogger.createLogger[F, LoggingPoisonedMessageHandler[F, A]]
private trait PoisonedMessageHandlerAction[F[_], A] {
def handlePoisonedMessage(rawBody: Bytes)(delivery: Delivery[A], maxAttempts: Int)(implicit dctx: DeliveryContext): F[Unit]
}

private def defaultHandlePoisonedMessage[F[_]: Sync](maxAttempts: Int, logger: ImplicitContextLogger[F])(delivery: Delivery[A])(
implicit dctx: DeliveryContext): F[Unit] = {
val deliveryStr = (if (!redactPayload) delivery else delivery.withRedactedBody).toString

logger.warn(s"Message failures reached the limit $maxAttempts attempts, throwing away: $deliveryStr")
}
private sealed abstract class PoisonedMessageHandlerBase[F[_]: Sync: Timer, A](maxAttempts: Int,
republishDelay: Option[ExponentialDelay],
helper: PoisonedMessageHandlerHelper[F])
extends PoisonedMessageHandler[F, A]
with PoisonedMessageHandlerAction[F, A] {

override def interceptResult(delivery: Delivery[A], messageId: MessageId, rawBody: Bytes)(result: DeliveryResult)(
implicit dctx: DeliveryContext): F[DeliveryResult] = {
PoisonedMessageHandler.handleResult(delivery,
messageId,
maxAttempts,
logger,
republishDelay,
(d: Delivery[A], _) => defaultHandlePoisonedMessage[F](maxAttempts, logger)(d))(result)
PoisonedMessageHandler.handleResult(delivery, rawBody, messageId, maxAttempts, helper, republishDelay, this)(result)
}
}

class NoOpPoisonedMessageHandler[F[_]: Sync, A] extends PoisonedMessageHandler[F, A] {
override def interceptResult(delivery: Delivery[A], messageId: MessageId, rawBody: Bytes)(result: DeliveryResult)(
implicit dctx: DeliveryContext): F[DeliveryResult] = Sync[F].pure(result)
}

class DeadQueuePoisonedMessageHandler[F[_]: Sync: Timer, A](
maxAttempts: Int,
redactPayload: Boolean,
republishDelay: Option[ExponentialDelay])(moveToDeadQueue: (Delivery[A], Bytes, DeliveryContext) => F[Unit])
extends PoisonedMessageHandler[F, A] {
private val logger = ImplicitContextLogger.createLogger[F, DeadQueuePoisonedMessageHandler[F, A]]
private[rabbitmq] class LoggingPoisonedMessageHandler[F[_]: Sync: Timer, A](maxAttempts: Int,
republishDelay: Option[ExponentialDelay],
helper: PoisonedMessageHandlerHelper[F])
extends PoisonedMessageHandlerBase[F, A](maxAttempts, republishDelay, helper) {
import helper._

override def interceptResult(delivery: Delivery[A], messageId: MessageId, rawBody: Bytes)(result: DeliveryResult)(
implicit dctx: DeliveryContext): F[DeliveryResult] = {
PoisonedMessageHandler.handleResult(delivery,
messageId,
maxAttempts,
logger,
republishDelay,
(d, _) => handlePoisonedMessage(d, messageId, rawBody))(result)
override def handlePoisonedMessage(rawBody: Bytes)(delivery: Delivery[A], ma: Int)(implicit dctx: DeliveryContext): F[Unit] = {
logger.warn(s"Message failures reached the limit $ma attempts, throwing away: ${redactIfConfigured(delivery)}")
}
}

private def handlePoisonedMessage(delivery: Delivery[A], messageId: MessageId, rawBody: Bytes)(
implicit dctx: DeliveryContext): F[Unit] = {
private[rabbitmq] class DeadQueuePoisonedMessageHandler[F[_]: Sync: Timer, A](
maxAttempts: Int,
republishDelay: Option[ExponentialDelay],
helper: PoisonedMessageHandlerHelper[F])(moveToDeadQueue: (Delivery[A], Bytes, DeliveryContext) => F[Unit])
extends PoisonedMessageHandlerBase[F, A](maxAttempts, republishDelay, helper) {
import helper._

val deliveryStr = (if (!redactPayload) delivery else delivery.withRedactedBody).toString
override def handlePoisonedMessage(rawBody: Bytes)(delivery: Delivery[A], ma: Int)(implicit dctx: DeliveryContext): F[Unit] = {
import dctx._

logger.warn { s"Message $messageId failures reached the limit $maxAttempts attempts, moving it to the dead queue: $deliveryStr" } >>
logger.warn {
s"Message $messageId failures reached the limit $ma attempts, moving it to the dead queue: ${redactIfConfigured(delivery)}"
} >>
moveToDeadQueue(delivery, rawBody, dctx) >>
logger.debug(s"Message $messageId moved to the dead queue")
}
}

object DeadQueuePoisonedMessageHandler {
private[rabbitmq] class NoOpPoisonedMessageHandler[F[_]: Sync, A](helper: PoisonedMessageHandlerHelper[F])
extends PoisonedMessageHandler[F, A] {
override def interceptResult(delivery: Delivery[A], messageId: MessageId, rawBody: Bytes)(result: DeliveryResult)(
implicit dctx: DeliveryContext): F[DeliveryResult] = {
result match {
case DeliveryResult.DirectlyPoison =>
helper.logger.warn("Delivery can't be poisoned, because NoOpPoisonedMessageHandler is installed! Rejecting instead...").as(Reject)

case _ => Sync[F].pure(result)
}
}
}

private[rabbitmq] object DeadQueuePoisonedMessageHandler {
def make[F[_]: Sync: Timer, A](conf: DeadQueuePoisonedMessageHandling,
connection: RabbitMQConnection[F],
redactPayload: Boolean,
monitor: Monitor[F]): Resource[F, DeadQueuePoisonedMessageHandler[F, A]] = {
helper: PoisonedMessageHandlerHelper[F]): Resource[F, DeadQueuePoisonedMessageHandler[F, A]] = {
import conf._
import helper._

val pc = ProducerConfig(
name = deadQueueProducer.name,
Expand All @@ -93,15 +95,14 @@ object DeadQueuePoisonedMessageHandler {
)

connection.newProducer[Bytes](pc, monitor.named("deadQueueProducer")).map { producer =>
new DeadQueuePoisonedMessageHandler[F, A](maxAttempts, redactPayload, republishDelay)(
new DeadQueuePoisonedMessageHandler[F, A](maxAttempts, republishDelay, helper)(
(d: Delivery[A], rawBody: Bytes, dctx: DeliveryContext) => {
val cidStrategy = dctx.correlationId match {
case Some(value) => CorrelationIdStrategy.Fixed(value.value)
case None => CorrelationIdStrategy.RandomNew
}

val now = Instant.now()

val finalProperties = d.properties.copy(headers = d.properties.headers.updated(DiscardedTimeHeaderName, now.toString))

producer.send(deadQueueProducer.routingKey, rawBody, Some(finalProperties))(cidStrategy)
Expand All @@ -110,7 +111,7 @@ object DeadQueuePoisonedMessageHandler {
}
}

object PoisonedMessageHandler {
private[rabbitmq] object PoisonedMessageHandler {
final val RepublishCountHeaderName: String = "X-Republish-Count"
final val DiscardedTimeHeaderName: String = "X-Discarded-Time"

Expand All @@ -120,28 +121,38 @@ object PoisonedMessageHandler {
monitor: Monitor[F]): Resource[F, PoisonedMessageHandler[F, A]] = {
config match {
case Some(LoggingPoisonedMessageHandling(maxAttempts, republishDelay)) =>
Resource.pure(new LoggingPoisonedMessageHandler[F, A](maxAttempts, redactPayload, republishDelay))
case Some(c: DeadQueuePoisonedMessageHandling) => DeadQueuePoisonedMessageHandler.make(c, connection, redactPayload, monitor)
val helper = PoisonedMessageHandlerHelper[F, LoggingPoisonedMessageHandler[F, A]](monitor, redactPayload)
Resource.pure(new LoggingPoisonedMessageHandler[F, A](maxAttempts, republishDelay, helper))

case Some(c: DeadQueuePoisonedMessageHandling) =>
val helper = PoisonedMessageHandlerHelper[F, DeadQueuePoisonedMessageHandler[F, A]](monitor, redactPayload)
DeadQueuePoisonedMessageHandler.make(c, connection, helper)

case Some(NoOpPoisonedMessageHandling) | None =>
Resource.eval {
val logger = ImplicitContextLogger.createLogger[F, NoOpPoisonedMessageHandler[F, A]]
logger.plainWarn("Using NO-OP poisoned message handler. Potential poisoned messages will cycle forever.").as {
new NoOpPoisonedMessageHandler[F, A]
val helper = PoisonedMessageHandlerHelper[F, NoOpPoisonedMessageHandler[F, A]](monitor, redactPayload)

helper.logger.plainWarn("Using NO-OP poisoned message handler. Potential poisoned messages will cycle forever.").as {
new NoOpPoisonedMessageHandler[F, A](helper)
}
}
}
}

private[rabbitmq] def handleResult[F[_]: Sync: Timer, A](
delivery: Delivery[A],
rawBody: Bytes,
messageId: MessageId,
maxAttempts: Int,
logger: ImplicitContextLogger[F],
helper: PoisonedMessageHandlerHelper[F],
republishDelay: Option[ExponentialDelay],
handlePoisonedMessage: (Delivery[A], Int) => F[Unit])(r: DeliveryResult)(implicit dctx: DeliveryContext): F[DeliveryResult] = {
handler: PoisonedMessageHandlerAction[F, A])(r: DeliveryResult)(implicit dctx: DeliveryContext): F[DeliveryResult] = {
r match {
case Republish(isPoisoned, newHeaders) if isPoisoned =>
adjustDeliveryResult(delivery, messageId, maxAttempts, newHeaders, logger, republishDelay, handlePoisonedMessage)
adjustDeliveryResult(delivery, messageId, maxAttempts, newHeaders, helper, republishDelay, handler.handlePoisonedMessage(rawBody))

case DirectlyPoison => poisonRightAway(delivery, messageId, helper, handler.handlePoisonedMessage(rawBody))

case r => Applicative[F].pure(r) // keep other results as they are
}
}
Expand All @@ -151,10 +162,11 @@ object PoisonedMessageHandler {
messageId: MessageId,
maxAttempts: Int,
newHeaders: Map[String, AnyRef],
logger: ImplicitContextLogger[F],
helper: PoisonedMessageHandlerHelper[F],
republishDelay: Option[ExponentialDelay],
handlePoisonedMessage: (Delivery[A], Int) => F[Unit])(implicit dctx: DeliveryContext): F[DeliveryResult] = {
import cats.syntax.traverse._
import helper._

// get current attempt no. from passed headers with fallback to original (incoming) headers - the fallback will most likely happen
// but we're giving the programmer chance to programmatically _pretend_ lower attempt number
Expand Down Expand Up @@ -197,4 +209,33 @@ object PoisonedMessageHandler {
}
}

private def poisonRightAway[F[_]: Sync, A](
delivery: Delivery[A],
messageId: MessageId,
helper: PoisonedMessageHandlerHelper[F],
handlePoisonedMessage: (Delivery[A], Int) => F[Unit])(implicit dctx: DeliveryContext): F[DeliveryResult] = {
helper.logger.info(s"Directly poisoning delivery $messageId") >>
handlePoisonedMessage(delivery, 0) >>
helper.directlyPoisonedMeter.mark >>
Sync[F].pure(Reject: DeliveryResult)
}

}

private[rabbitmq] class PoisonedMessageHandlerHelper[F[_]: Sync](val logger: ImplicitContextLogger[F],
val monitor: Monitor[F],
redactPayload: Boolean) {

val directlyPoisonedMeter: Meter[F] = monitor.meter("directlyPoisoned")

def redactIfConfigured(delivery: Delivery[_]): Delivery[Any] = {
if (!redactPayload) delivery else delivery.withRedactedBody
}
}

private[rabbitmq] object PoisonedMessageHandlerHelper {
def apply[F[_]: Sync, PMH: ClassTag](monitor: Monitor[F], redactPayload: Boolean): PoisonedMessageHandlerHelper[F] = {
val logger: ImplicitContextLogger[F] = ImplicitContextLogger.createLogger[F, PMH]
new PoisonedMessageHandlerHelper[F](logger, monitor, redactPayload)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.util._
class DefaultRabbitMQConsumerTest extends TestBase {

private val connectionInfo = RabbitMQConnectionInfo(immutable.Seq("localhost"), "/", None)
private val pmhHelper = PoisonedMessageHandlerHelper[Task, DefaultRabbitMQConsumerTest](Monitor.noOp(), redactPayload = false)

test("should ACK") {
val messageId = UUID.randomUUID().toString
Expand Down Expand Up @@ -466,5 +467,5 @@ class DefaultRabbitMQConsumerTest extends TestBase {
)(userAction)
}

object PMH extends LoggingPoisonedMessageHandler[Task, Bytes](3, false, None)
object PMH extends LoggingPoisonedMessageHandler[Task, Bytes](3, None, pmhHelper)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.util.Random
class DefaultRabbitMQPullConsumerTest extends TestBase {

private val connectionInfo = RabbitMQConnectionInfo(immutable.Seq("localhost"), "/", None)
private val pmhHelper = PoisonedMessageHandlerHelper[Task, DefaultRabbitMQPullConsumerTest](Monitor.noOp(), redactPayload = false)

test("should ACK") {
val messageId = UUID.randomUUID().toString
Expand Down Expand Up @@ -287,5 +288,5 @@ class DefaultRabbitMQPullConsumerTest extends TestBase {
new DefaultRabbitMQPullConsumer[Task, A](base, channelOps)
}

class PMH[A] extends LoggingPoisonedMessageHandler[Task, A](3, false, None)
class PMH[A] extends LoggingPoisonedMessageHandler[Task, A](3, None, pmhHelper)
}
Loading

0 comments on commit 2d94bfd

Please sign in to comment.