Skip to content

Commit

Permalink
Merge pull request #43 from avast/RepublishReworked
Browse files Browse the repository at this point in the history
Republish reworked
  • Loading branch information
jendakol authored Feb 12, 2020
2 parents 54efe29 + d8446b0 commit 01d407e
Show file tree
Hide file tree
Showing 18 changed files with 721 additions and 272 deletions.
1 change: 1 addition & 0 deletions Migration-6_1-8.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ SSL without custom (key|trust)store; it's now disabled by default (`None`)!
1. You are now able to specify network recovery strategy.
1. You are now able to specify timeout log level.
1. Additional declarations: `bindQueue` now has more consistent API. The only change is `bindArguments` were renamed to `arguments`.
1. You are now able to configure custom exchange for republishing.

---

Expand Down
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,38 @@ When using _Retry_ the message can effectively cause starvation of other message
until the message itself can be processed; on the other hand _Republish_ inserts the message to the original queue as a new message and it
lets the consumer handle other messages (if they can be processed).

#### Republishing
Republishing is solved at application level with publishing a new message (with original content, headers, messageId, etc.) to the original queue and
acknowledging the old one. This can be done via:
1. Default exchange
Every virtual host in RabbitMQ has default exchange which has implicit bindings to all queues and can be easily used for publishing to
basically any queue. This is very handy for functionality such as the republishing however it's also very dangerous and you don't have
permissions to use it. In case you do have them, use this option instead of the custom exchange.
This the default option (in other words, the client will use the default exchange in case you don't tell it not to do so).
1. Custom exchange
In case you're unable to use the default exchange, you have to create your own exchange to replace the functionality. The RabbitMQ client
will create it for you together with all necessary bindings and all you have to do is to just configure a name of the exchange, e.g.
```hocon
rabbitConnection {
hosts = ["localhost:5672"]
virtualHost = "/"

...

republishStrategy {
type = CustomExchange

exchangeName = "ExchangeForRepublishing"

exchangeDeclare = true // default
exchangeAutoBind = true // default
}

...
}
```
The exchange is created as _direct_, _durable_ and without _auto-delete_ flag.

### Bind/declare arguments
There is an option to specify bind/declare arguments for queues/exchanges as you may read about at [RabbitMQ docs](https://www.rabbitmq.com/queues.html).
Example of configuration with HOCON:
Expand Down
20 changes: 7 additions & 13 deletions core/src/main/scala/com/avast/clients/rabbitmq/ConsumerBase.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.avast.clients.rabbitmq

import cats.effect.{Blocker, ContextShift, Sync}
import cats.syntax.flatMap._
import com.avast.clients.rabbitmq.DefaultRabbitMQConsumer._
import com.avast.clients.rabbitmq.api.DeliveryResult
import com.avast.metrics.scalaapi.Monitor
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client.ShutdownSignalException
import com.typesafe.scalalogging.StrictLogging

import scala.collection.JavaConverters._
Expand All @@ -17,6 +17,7 @@ private[rabbitmq] trait ConsumerBase[F[_]] extends StrictLogging {
protected def queueName: String
protected def channel: ServerChannel
protected def blocker: Blocker
protected def republishStrategy: RepublishStrategy
protected implicit def F: Sync[F] // scalastyle:ignore
protected implicit def cs: ContextShift[F]
protected def connectionInfo: RabbitMQConnectionInfo
Expand Down Expand Up @@ -78,18 +79,11 @@ private[rabbitmq] trait ConsumerBase[F[_]] extends StrictLogging {
}
}

protected def republish(messageId: String, deliveryTag: Long, properties: BasicProperties, body: Array[Byte]): F[Unit] =
blocker.delay {
try {
logger.debug(s"[$name] Republishing delivery (ID $messageId, deliveryTag $deliveryTag) to end of queue '$queueName'")
if (!channel.isOpen) throw new IllegalStateException("Cannot republish delivery on closed channel")
channel.basicPublish("", queueName, properties, body)
channel.basicAck(deliveryTag, false)
resultRepublishMeter.mark()
} catch {
case NonFatal(e) => logger.warn(s"[$name] Error while republishing the delivery", e)
}
}
protected def republish(messageId: String, deliveryTag: Long, properties: BasicProperties, body: Array[Byte]): F[Unit] = {
republishStrategy
.republish(blocker, channel, name)(queueName, messageId, deliveryTag, properties, body)
.flatTap(_ => F.delay(resultRepublishMeter.mark()))
}

protected def createPropertiesForRepublish(newHeaders: Map[String, AnyRef],
properties: BasicProperties,
Expand Down
Loading

0 comments on commit 01d407e

Please sign in to comment.