Skip to content

Commit

Permalink
Configurable message properties (#10)
Browse files Browse the repository at this point in the history
* configurable default MessageProperties for publisher Fixes #9
* sealed trait for DeliveryMode
  • Loading branch information
augi authored and jendakol committed Jul 17, 2018
1 parent 35a9fab commit d7442b1
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 36 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ myConfig {
durable = true // default value
autoDelete = false // default value
}
// These properties are used when none properties are passed to the send method.
properties {
deliveryMode = 2 // this is default value
contentType = "text" // default is not set
contentEncoding = "UTF8" // default is not set
priority = 1 // default is not set
}
}
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.time.Instant
case class MessageProperties(contentType: Option[String] = None,
contentEncoding: Option[String] = None,
headers: Map[String, AnyRef] = Map.empty,
deliveryMode: Option[Integer] = None,
deliveryMode: Option[DeliveryMode] = None,
priority: Option[Integer] = None,
correlationId: Option[String] = None,
replyTo: Option[String] = None,
Expand All @@ -20,3 +20,21 @@ case class MessageProperties(contentType: Option[String] = None,
object MessageProperties {
val empty: MessageProperties = MessageProperties()
}

sealed trait DeliveryMode {
def code: Integer
}
object DeliveryMode {
case object NonPersistent extends DeliveryMode {
override def code: Integer = 1
}
case object Persistent extends DeliveryMode {
override def code: Integer = 2
}
def fromCode(code: Integer): Option[DeliveryMode] = code.toInt match {
case 0 => None
case 1 => Some(NonPersistent)
case 2 => Some(Persistent)
case _ => throw new IllegalArgumentException(s"Unknown delivery mode: $code")
}
}
8 changes: 8 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ avastRabbitMQProducerDefaults {

reportUnroutable = true

// These properties are used when none properties are passed to the send method.
properties {
deliveryMode = 2 // 2 is persistent, 1 non-persistent
// contentType = ""
// contentEncoding = ""
// priority = 1
}

declare {
enabled = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,9 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
blockingScheduler: Scheduler,
monitor: Monitor): DefaultRabbitMQProducer[F, A] = {
val producerConfig = providedConfig.wrapped.as[ProducerConfig]("root")

create[F, A](producerConfig, channel, factoryInfo, blockingScheduler, monitor)
}

def create[F[_]: FromTask, A: ProductConverter](producerConfig: ProducerConfig,
channel: ServerChannel,
factoryInfo: RabbitMQConnectionInfo,
blockingScheduler: Scheduler,
monitor: Monitor): DefaultRabbitMQProducer[F, A] = {

prepareProducer[F, A](producerConfig, channel, factoryInfo, blockingScheduler, monitor)
}

}

object Consumer {
Expand Down Expand Up @@ -252,6 +243,11 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
import producerConfig._

val finalBlockingScheduler = if (useKluzo) Monix.wrapScheduler(blockingScheduler) else blockingScheduler
val defaultProperties = MessageProperties(
deliveryMode = DeliveryMode.fromCode(producerConfig.properties.deliveryMode),
contentType = producerConfig.properties.contentType,
contentEncoding = producerConfig.properties.contentEncoding,
priority = producerConfig.properties.priority.map(Integer.valueOf))

// auto declare of exchange
// parse it only if it's needed
Expand All @@ -260,7 +256,7 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
val d = declare.wrapped.as[AutoDeclareExchange]("root")
declareExchange(exchange, channelFactoryInfo, channel, d)
}
new DefaultRabbitMQProducer[F, A](producerConfig.name, exchange, channel, useKluzo, reportUnroutable, finalBlockingScheduler, monitor)
new DefaultRabbitMQProducer[F, A](producerConfig.name, exchange, channel, defaultProperties, useKluzo, reportUnroutable, finalBlockingScheduler, monitor)
}

private[rabbitmq] def declareExchange(name: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import scala.util.control.NonFatal
class DefaultRabbitMQProducer[F[_]: FromTask, A: ProductConverter](name: String,
exchangeName: String,
channel: ServerChannel,
defaultProperties: MessageProperties,
useKluzo: Boolean,
reportUnroutable: Boolean,
scheduler: Scheduler,
Expand All @@ -39,9 +40,8 @@ class DefaultRabbitMQProducer[F[_]: FromTask, A: ProductConverter](name: String,

override def send(routingKey: String, body: A, properties: Option[MessageProperties] = None): F[Unit] = {
val finalProperties = {
val messageProperties = converter.fillProperties {
properties.getOrElse(MessageProperties(messageId = Some(UUID.randomUUID().toString)))
}
val initialProperties = properties.getOrElse(defaultProperties.copy(messageId = Some(UUID.randomUUID().toString)))
val messageProperties = converter.fillProperties(initialProperties)

if (useKluzo && Kluzo.getTraceId.nonEmpty) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.avast.clients.rabbitmq
import java.nio.file.Path
import java.time.Duration

import com.avast.clients.rabbitmq.api.DeliveryResult
import com.avast.clients.rabbitmq.api.{DeliveryMode, DeliveryResult}
import com.typesafe.config.Config

import scala.collection.immutable
Expand Down Expand Up @@ -56,7 +56,8 @@ case class AutoBindQueue(exchange: AutoBindExchange, routingKeys: immutable.Seq[

case class AutoBindExchange(name: String, declare: Config)

case class ProducerConfig(exchange: String, declare: Config, useKluzo: Boolean, reportUnroutable: Boolean, name: String)
case class ProducerConfig(exchange: String, declare: Config, useKluzo: Boolean, reportUnroutable: Boolean, name: String, properties: ProducerProperties)
case class ProducerProperties(deliveryMode: Int, contentType: Option[String], contentEncoding: Option[String], priority: Option[Int])

case class AutoDeclareExchange(enabled: Boolean, `type`: String, durable: Boolean, autoDelete: Boolean, arguments: DeclareArguments)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.avast.clients.rabbitmq.javaapi
import java.util.concurrent.CompletableFuture

import com.avast.bytes.Bytes
import com.avast.clients.rabbitmq.api.{MessageProperties => ScalaProperties, RabbitMQProducer => ScalaProducer}
import com.avast.clients.rabbitmq.api.{DeliveryMode, MessageProperties => ScalaProperties, RabbitMQProducer => ScalaProducer}
import com.avast.clients.rabbitmq.javaapi.JavaConverters._

import scala.collection.JavaConverters._
Expand All @@ -27,7 +27,7 @@ class DefaultRabbitMQProducer(scalaProducer: ScalaProducer[Future, Bytes] with A
Option(properties.getContentType),
Option(properties.getContentEncoding),
Option(properties.getHeaders).map(_.asScala.toMap).getOrElse(Map.empty),
Option(properties.getDeliveryMode),
DeliveryMode.fromCode(properties.getDeliveryMode),
Option(properties.getPriority),
Option(properties.getCorrelationId),
Option(properties.getReplyTo),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,12 @@
package com.avast.clients.rabbitmq.javaapi

import java.util.concurrent.{CompletableFuture, Executor}
import java.util.{function, Date, Optional}
import java.util.{Date, Optional, function}

import com.avast.bytes.Bytes
import com.avast.clients.rabbitmq.DeliveryReadAction
import com.avast.clients.rabbitmq.api.{
Delivery => ScalaDelivery,
DeliveryResult => ScalaResult,
DeliveryWithHandle => ScalaDeliveryWithHandle,
MessageProperties => ScalaProperties
}
import com.avast.clients.rabbitmq.javaapi.{
Delivery => JavaDelivery,
DeliveryResult => JavaResult,
DeliveryWithHandle => JavaDeliveryWithHandle,
MessageProperties => JavaProperties
}
import com.avast.clients.rabbitmq.api.{DeliveryMode, Delivery => ScalaDelivery, DeliveryResult => ScalaResult, DeliveryWithHandle => ScalaDeliveryWithHandle, MessageProperties => ScalaProperties}
import com.avast.clients.rabbitmq.javaapi.{Delivery => JavaDelivery, DeliveryResult => JavaResult, DeliveryWithHandle => JavaDeliveryWithHandle, MessageProperties => JavaProperties}
import com.rabbitmq.client.AMQP
import com.rabbitmq.client.AMQP.BasicProperties

Expand Down Expand Up @@ -70,7 +60,7 @@ private[rabbitmq] object JavaConverters {
builder.headers(messageProperties.headers.asJava)
messageProperties.contentType.foreach(builder.contentType)
messageProperties.contentEncoding.foreach(builder.contentEncoding)
messageProperties.deliveryMode.foreach(builder.deliveryMode)
messageProperties.deliveryMode.foreach(dm => builder.deliveryMode(dm.code))
messageProperties.priority.foreach(builder.priority)
messageProperties.correlationId.foreach(builder.correlationId)
messageProperties.replyTo.foreach(builder.replyTo)
Expand All @@ -93,7 +83,7 @@ private[rabbitmq] object JavaConverters {
}
messageProperties.contentType.foreach(builder.contentType)
messageProperties.contentEncoding.foreach(builder.contentEncoding)
messageProperties.deliveryMode.foreach(builder.deliveryMode)
messageProperties.deliveryMode.foreach(dm => builder.deliveryMode(dm.code))
messageProperties.priority.foreach(builder.priority)
messageProperties.correlationId.foreach(builder.correlationId)
messageProperties.replyTo.foreach(builder.replyTo)
Expand All @@ -115,7 +105,7 @@ private[rabbitmq] object JavaConverters {
Option(properties.getContentType),
Option(properties.getContentEncoding),
Option(properties.getHeaders).map(_.asScala.toMap).getOrElse(Map.empty),
Option(properties.getDeliveryMode),
DeliveryMode.fromCode(properties.getDeliveryMode),
Option(properties.getPriority),
Option(properties.getCorrelationId),
Option(properties.getReplyTo),
Expand All @@ -136,7 +126,7 @@ private[rabbitmq] object JavaConverters {
Option(properties.getContentType),
Option(properties.getContentEncoding),
Option(properties.getHeaders).map(_.asScala.toMap).getOrElse(Map.empty),
Option(properties.getDeliveryMode),
DeliveryMode.fromCode(properties.getDeliveryMode),
Option(properties.getPriority),
Option(properties.getCorrelationId),
Option(properties.getReplyTo),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class DefaultRabbitMQProducerTest extends FunSuite with MockitoSugar with Eventu
exchangeName = exchangeName,
channel = channel,
monitor = Monitor.noOp,
defaultProperties = MessageProperties.empty,
useKluzo = true,
reportUnroutable = false,
scheduler = Scheduler.Implicits.global
Expand Down

0 comments on commit d7442b1

Please sign in to comment.