Skip to content

Commit

Permalink
Merge pull request #198 from avast/publisher-confirms
Browse files Browse the repository at this point in the history
introduced publisher confirms
  • Loading branch information
jendakol authored Dec 16, 2022
2 parents 5a739b9 + bd1a667 commit 0ef2a8a
Show file tree
Hide file tree
Showing 12 changed files with 400 additions and 51 deletions.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,25 @@ If you don't specify the strategy by yourself, `CorrelationIdStrategy.FromProper
properties (or headers) and generate a new one if it doesn't succeed. In any way, the CID will be part of both logs and resulting (outgoing)
RabbitMQ message.

#### Publisher confirms

By using following configuration
```hocon
producer {
properties {
confirms {
enabled = true
sendAttempts = 2
}
}
}
```
clients can enable [publisher confirms](https://www.rabbitmq.com/confirms.html#publisher-confirms). Each `send` call will wait for ack/nack from broker.
This wait is of course non-blocking. `sendAttempts` is number of all attempts including initial one. If number of `sendAttempts` is greater than 1 it will try to resend messages again
right after it obtains nack from broker.

From implementation point of view, it uses asynchronous acks/nacks combined with [Deferred](https://typelevel.org/cats-effect/docs/std/deferred) from cats library.

#### Consumers

You can also get the CorrelationId from the message properties on the consumer side. The CID is taken from both AMQP properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@ case class ConversionException(desc: String, cause: Throwable = null) extends Ru
case class ChannelNotRecoveredException(desc: String, cause: Throwable = null) extends IOException(desc, cause)

case class TooBigMessage(desc: String, cause: Throwable = null) extends IllegalArgumentException(desc, cause)

case class MaxAttemptsReached(desc: String, cause: Throwable = null) extends RuntimeException(desc, cause)

case class NotAcknowledgedPublish(desc: String, cause: Throwable = null, messageId: Long) extends RuntimeException(desc, cause)
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package com.avast.clients.rabbitmq

import cats.effect._
import cats.effect.concurrent.{Deferred, Ref}
import cats.implicits.{catsSyntaxFlatMapOps, toFunctorOps, toTraverseOps}
import com.avast.bytes.Bytes
import com.avast.clients.rabbitmq.DefaultRabbitMQClientFactory.startConsumingQueue
import com.avast.clients.rabbitmq.api._
import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
import com.avast.clients.rabbitmq.publisher.{BaseRabbitMQProducer, DefaultRabbitMQProducer, PublishConfirmsRabbitMQProducer}
import com.avast.metrics.scalaeffectapi.Monitor
import com.rabbitmq.client.Consumer

import scala.collection.compat._
import scala.collection.immutable
import scala.jdk.CollectionConverters._
import scala.language.implicitConversions
import scala.reflect.ClassTag

private[rabbitmq] class DefaultRabbitMQClientFactory[F[_]: ConcurrentEffect: Timer: ContextShift](
connection: RabbitMQConnection[F],
Expand All @@ -26,7 +29,7 @@ private[rabbitmq] class DefaultRabbitMQClientFactory[F[_]: ConcurrentEffect: Tim

object Producer {

def create[A: ProductConverter](producerConfig: ProducerConfig, monitor: Monitor[F]): Resource[F, DefaultRabbitMQProducer[F, A]] = {
def create[A: ProductConverter](producerConfig: ProducerConfig, monitor: Monitor[F]): Resource[F, BaseRabbitMQProducer[F, A]] = {
prepareProducer[A](producerConfig, connection, monitor)
}
}
Expand Down Expand Up @@ -263,34 +266,61 @@ private[rabbitmq] class DefaultRabbitMQClientFactory[F[_]: ConcurrentEffect: Tim

private def prepareProducer[A: ProductConverter](producerConfig: ProducerConfig,
connection: RabbitMQConnection[F],
monitor: Monitor[F]): Resource[F, DefaultRabbitMQProducer[F, A]] = {
val logger = ImplicitContextLogger.createLogger[F, DefaultRabbitMQProducer[F, A]]
monitor: Monitor[F]): Resource[F, BaseRabbitMQProducer[F, A]] = {
producerConfig.properties.confirms match {
case Some(PublisherConfirmsConfig(true, sendAttempts)) =>
prepareProducer(producerConfig, connection) { (defaultProperties, channel, logger) =>
Ref.of(Map.empty[Long, Deferred[F, Either[NotAcknowledgedPublish, Unit]]])
.map {
new PublishConfirmsRabbitMQProducer[F, A](
producerConfig.name,
producerConfig.exchange,
channel,
defaultProperties,
_,
sendAttempts,
producerConfig.reportUnroutable,
producerConfig.sizeLimitBytes,
blocker,
logger,
monitor)
}
}
case _ =>
prepareProducer(producerConfig, connection) { (defaultProperties, channel, logger) =>
F.pure {
new DefaultRabbitMQProducer[F, A](producerConfig.name,
producerConfig.exchange,
channel,
defaultProperties,
producerConfig.reportUnroutable,
producerConfig.sizeLimitBytes,
blocker,
logger,
monitor)
}
}
}
}

private def prepareProducer[T: ClassTag, A: ProductConverter](producerConfig: ProducerConfig, connection: RabbitMQConnection[F])(
createProducer: (MessageProperties, ServerChannel, ImplicitContextLogger[F]) => F[T]) = {
val logger: ImplicitContextLogger[F] = ImplicitContextLogger.createLogger[F, T]

connection
.newChannel()
.evalTap { channel =>
// auto declare exchange; if configured
producerConfig.declare.map { declareExchange(producerConfig.exchange, channel, _)(logger) }.getOrElse(F.unit)
}
.map { channel =>
.evalMap[F, T] { channel =>
val defaultProperties = MessageProperties(
deliveryMode = DeliveryMode.fromCode(producerConfig.properties.deliveryMode),
contentType = producerConfig.properties.contentType,
contentEncoding = producerConfig.properties.contentEncoding,
priority = producerConfig.properties.priority.map(Integer.valueOf)
)

new DefaultRabbitMQProducer[F, A](
producerConfig.name,
producerConfig.exchange,
channel,
defaultProperties,
producerConfig.reportUnroutable,
producerConfig.sizeLimitBytes,
blocker,
logger,
monitor
)
)
createProducer(defaultProperties, channel, logger)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ final case class ProducerConfig(name: String,
final case class ProducerPropertiesConfig(deliveryMode: Int = 2,
contentType: Option[String] = None,
contentEncoding: Option[String] = None,
priority: Option[Int] = None)
priority: Option[Int] = None,
confirms: Option[PublisherConfirmsConfig] = None)

final case class PublisherConfirmsConfig(enabled: Boolean = false, sendAttempts: Int = 1)

final case class AutoDeclareExchangeConfig(enabled: Boolean,
`type`: ExchangeType,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,31 @@
package com.avast.clients.rabbitmq
package com.avast.clients.rabbitmq.publisher

import cats.effect.{Blocker, ContextShift, Effect, Sync}
import cats.implicits.{catsSyntaxApplicativeError, catsSyntaxFlatMapOps, toFlatMapOps}
import cats.syntax.applicativeError._
import cats.syntax.flatMap._
import cats.syntax.functor._
import com.avast.bytes.Bytes
import com.avast.clients.rabbitmq.JavaConverters._
import com.avast.clients.rabbitmq.api.CorrelationIdStrategy.FromPropertiesOrRandomNew
import com.avast.clients.rabbitmq.api._
import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
import com.avast.clients.rabbitmq.{CorrelationId, ProductConverter, ServerChannel, startAndForget}
import com.avast.metrics.scalaeffectapi.Monitor
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client.{AlreadyClosedException, ReturnListener}

import java.util.UUID
import scala.util.control.NonFatal

class DefaultRabbitMQProducer[F[_], A: ProductConverter](name: String,
exchangeName: String,
channel: ServerChannel,
defaultProperties: MessageProperties,
reportUnroutable: Boolean,
sizeLimitBytes: Option[Int],
blocker: Blocker,
logger: ImplicitContextLogger[F],
monitor: Monitor[F])(implicit F: Effect[F], cs: ContextShift[F])
abstract class BaseRabbitMQProducer[F[_], A: ProductConverter](name: String,
exchangeName: String,
channel: ServerChannel,
defaultProperties: MessageProperties,
reportUnroutable: Boolean,
sizeLimitBytes: Option[Int],
blocker: Blocker,
logger: ImplicitContextLogger[F],
monitor: Monitor[F])(implicit F: Effect[F], cs: ContextShift[F])
extends RabbitMQProducer[F, A] {

private val sentMeter = monitor.meter("sent")
Expand All @@ -35,6 +38,8 @@ class DefaultRabbitMQProducer[F[_], A: ProductConverter](name: String,

channel.addReturnListener(if (reportUnroutable) LoggingReturnListener else NoOpReturnListener)

def sendMessage(routingKey: String, body: Bytes, properties: MessageProperties)(implicit correlationId: CorrelationId): F[Unit]

override def send(routingKey: String, body: A, properties: Option[MessageProperties] = None)(
implicit cidStrategy: CorrelationIdStrategy = FromPropertiesOrRandomNew(properties)): F[Unit] = {
implicit val correlationId: CorrelationId = CorrelationId(cidStrategy.toCIDValue)
Expand All @@ -49,33 +54,40 @@ class DefaultRabbitMQProducer[F[_], A: ProductConverter](name: String,
}

converter.convert(body) match {
case Right(convertedBody) => send(routingKey, convertedBody, finalProperties)
case Right(convertedBody) =>
for {
_ <- checkSize(convertedBody, routingKey)
_ <- processErrors(sendMessage(routingKey, convertedBody, finalProperties), routingKey)
} yield ()
case Left(ce) => Sync[F].raiseError(ce)
}
}

private def send(routingKey: String, body: Bytes, properties: MessageProperties)(implicit correlationId: CorrelationId): F[Unit] = {
checkSize(body, routingKey) >>
logger.debug(s"Sending message with ${body.size()} B to exchange $exchangeName with routing key '$routingKey' and $properties") >>
blocker
.delay {
sendLock.synchronized {
// see https://www.rabbitmq.com/api-guide.html#channel-threads
channel.basicPublish(exchangeName, routingKey, properties.asAMQP, body.toByteArray)
}
}
.flatTap(_ => sentMeter.mark)
.recoverWith {
case ce: AlreadyClosedException =>
logger.debug(ce)(s"[$name] Failed to send message with routing key '$routingKey' to exchange '$exchangeName'") >>
sentFailedMeter.mark >>
F.raiseError[Unit](ChannelNotRecoveredException("Channel closed, wait for recovery", ce))

case NonFatal(e) =>
logger.debug(e)(s"[$name] Failed to send message with routing key '$routingKey' to exchange '$exchangeName'") >>
sentFailedMeter.mark >>
F.raiseError[Unit](e)
protected def basicSend(routingKey: String, body: Bytes, properties: MessageProperties)(implicit correlationId: CorrelationId): F[Unit] = {
for {
_ <- logger.debug(s"Sending message with ${body.size()} B to exchange $exchangeName with routing key '$routingKey' and $properties")
_ <- blocker.delay {
sendLock.synchronized {
// see https://www.rabbitmq.com/api-guide.html#channel-threads
channel.basicPublish(exchangeName, routingKey, properties.asAMQP, body.toByteArray)
}
}
_ <- sentMeter.mark
} yield ()
}

private def processErrors(from: F[Unit], routingKey: String)(implicit correlationId: CorrelationId): F[Unit] = {
from.recoverWith {
case ce: AlreadyClosedException =>
logger.debug(ce)(s"[$name] Failed to send message with routing key '$routingKey' to exchange '$exchangeName'") >>
sentFailedMeter.mark >>
F.raiseError[Unit](ChannelNotRecoveredException("Channel closed, wait for recovery", ce))

case NonFatal(e) =>
logger.debug(e)(s"[$name] Failed to send message with routing key '$routingKey' to exchange '$exchangeName'") >>
sentFailedMeter.mark >>
F.raiseError[Unit](e)
}
}

private def checkSize(bytes: Bytes, routingKey: String)(implicit correlationId: CorrelationId): F[Unit] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.avast.clients.rabbitmq.publisher

import cats.effect.{Blocker, ConcurrentEffect, ContextShift}
import com.avast.bytes.Bytes
import com.avast.clients.rabbitmq.api.MessageProperties
import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
import com.avast.clients.rabbitmq.{CorrelationId, ProductConverter, ServerChannel}
import com.avast.metrics.scalaeffectapi.Monitor

class DefaultRabbitMQProducer[F[_], A: ProductConverter](name: String,
exchangeName: String,
channel: ServerChannel,
defaultProperties: MessageProperties,
reportUnroutable: Boolean,
sizeLimitBytes: Option[Int],
blocker: Blocker,
logger: ImplicitContextLogger[F],
monitor: Monitor[F])(implicit F: ConcurrentEffect[F], cs: ContextShift[F])
extends BaseRabbitMQProducer[F, A](name,
exchangeName,
channel,
defaultProperties,
reportUnroutable,
sizeLimitBytes,
blocker,
logger,
monitor) {
override def sendMessage(routingKey: String, body: Bytes, properties: MessageProperties)(implicit correlationId: CorrelationId): F[Unit] =
basicSend(routingKey, body, properties)
}
Loading

0 comments on commit 0ef2a8a

Please sign in to comment.