From d7442b14ceae7c294ce59eff36e98775f316816f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20August=C3=BDn?= Date: Tue, 17 Jul 2018 13:23:35 +0200 Subject: [PATCH] Configurable message properties (#10) * configurable default MessageProperties for publisher Fixes #9 * sealed trait for DeliveryMode --- README.md | 8 +++++++ .../rabbitmq/api/MessageProperties.scala | 20 +++++++++++++++- core/src/main/resources/reference.conf | 8 +++++++ .../DefaultRabbitMQClientFactory.scala | 18 ++++++-------- .../rabbitmq/DefaultRabbitMQProducer.scala | 6 ++--- .../clients/rabbitmq/configuration.scala | 5 ++-- .../javaapi/DefaultRabbitMQProducer.scala | 4 ++-- .../rabbitmq/javaapi/JavaConverters.scala | 24 ++++++------------- .../DefaultRabbitMQProducerTest.scala | 1 + 9 files changed, 58 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index e1c34888..ba77578b 100644 --- a/README.md +++ b/README.md @@ -143,6 +143,14 @@ myConfig { durable = true // default value autoDelete = false // default value } + + // These properties are used when none properties are passed to the send method. + properties { + deliveryMode = 2 // this is default value + contentType = "text" // default is not set + contentEncoding = "UTF8" // default is not set + priority = 1 // default is not set + } } } ``` diff --git a/api/src/main/scala/com/avast/clients/rabbitmq/api/MessageProperties.scala b/api/src/main/scala/com/avast/clients/rabbitmq/api/MessageProperties.scala index d4054ded..8f499c67 100644 --- a/api/src/main/scala/com/avast/clients/rabbitmq/api/MessageProperties.scala +++ b/api/src/main/scala/com/avast/clients/rabbitmq/api/MessageProperties.scala @@ -5,7 +5,7 @@ import java.time.Instant case class MessageProperties(contentType: Option[String] = None, contentEncoding: Option[String] = None, headers: Map[String, AnyRef] = Map.empty, - deliveryMode: Option[Integer] = None, + deliveryMode: Option[DeliveryMode] = None, priority: Option[Integer] = None, correlationId: Option[String] = None, replyTo: Option[String] = None, @@ -20,3 +20,21 @@ case class MessageProperties(contentType: Option[String] = None, object MessageProperties { val empty: MessageProperties = MessageProperties() } + +sealed trait DeliveryMode { + def code: Integer +} +object DeliveryMode { + case object NonPersistent extends DeliveryMode { + override def code: Integer = 1 + } + case object Persistent extends DeliveryMode { + override def code: Integer = 2 + } + def fromCode(code: Integer): Option[DeliveryMode] = code.toInt match { + case 0 => None + case 1 => Some(NonPersistent) + case 2 => Some(Persistent) + case _ => throw new IllegalArgumentException(s"Unknown delivery mode: $code") + } +} diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 1106c1e7..4a39ba75 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -109,6 +109,14 @@ avastRabbitMQProducerDefaults { reportUnroutable = true + // These properties are used when none properties are passed to the send method. + properties { + deliveryMode = 2 // 2 is persistent, 1 non-persistent + // contentType = "" + // contentEncoding = "" + // priority = 1 + } + declare { enabled = false diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala index 7d45d0a4..7a0cc18a 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala @@ -101,18 +101,9 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { blockingScheduler: Scheduler, monitor: Monitor): DefaultRabbitMQProducer[F, A] = { val producerConfig = providedConfig.wrapped.as[ProducerConfig]("root") - - create[F, A](producerConfig, channel, factoryInfo, blockingScheduler, monitor) - } - - def create[F[_]: FromTask, A: ProductConverter](producerConfig: ProducerConfig, - channel: ServerChannel, - factoryInfo: RabbitMQConnectionInfo, - blockingScheduler: Scheduler, - monitor: Monitor): DefaultRabbitMQProducer[F, A] = { - prepareProducer[F, A](producerConfig, channel, factoryInfo, blockingScheduler, monitor) } + } object Consumer { @@ -252,6 +243,11 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { import producerConfig._ val finalBlockingScheduler = if (useKluzo) Monix.wrapScheduler(blockingScheduler) else blockingScheduler + val defaultProperties = MessageProperties( + deliveryMode = DeliveryMode.fromCode(producerConfig.properties.deliveryMode), + contentType = producerConfig.properties.contentType, + contentEncoding = producerConfig.properties.contentEncoding, + priority = producerConfig.properties.priority.map(Integer.valueOf)) // auto declare of exchange // parse it only if it's needed @@ -260,7 +256,7 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { val d = declare.wrapped.as[AutoDeclareExchange]("root") declareExchange(exchange, channelFactoryInfo, channel, d) } - new DefaultRabbitMQProducer[F, A](producerConfig.name, exchange, channel, useKluzo, reportUnroutable, finalBlockingScheduler, monitor) + new DefaultRabbitMQProducer[F, A](producerConfig.name, exchange, channel, defaultProperties, useKluzo, reportUnroutable, finalBlockingScheduler, monitor) } private[rabbitmq] def declareExchange(name: String, diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducer.scala b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducer.scala index 8875e349..301b3a7e 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducer.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducer.scala @@ -19,6 +19,7 @@ import scala.util.control.NonFatal class DefaultRabbitMQProducer[F[_]: FromTask, A: ProductConverter](name: String, exchangeName: String, channel: ServerChannel, + defaultProperties: MessageProperties, useKluzo: Boolean, reportUnroutable: Boolean, scheduler: Scheduler, @@ -39,9 +40,8 @@ class DefaultRabbitMQProducer[F[_]: FromTask, A: ProductConverter](name: String, override def send(routingKey: String, body: A, properties: Option[MessageProperties] = None): F[Unit] = { val finalProperties = { - val messageProperties = converter.fillProperties { - properties.getOrElse(MessageProperties(messageId = Some(UUID.randomUUID().toString))) - } + val initialProperties = properties.getOrElse(defaultProperties.copy(messageId = Some(UUID.randomUUID().toString))) + val messageProperties = converter.fillProperties(initialProperties) if (useKluzo && Kluzo.getTraceId.nonEmpty) { diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala b/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala index e7408864..d9662679 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala @@ -3,7 +3,7 @@ package com.avast.clients.rabbitmq import java.nio.file.Path import java.time.Duration -import com.avast.clients.rabbitmq.api.DeliveryResult +import com.avast.clients.rabbitmq.api.{DeliveryMode, DeliveryResult} import com.typesafe.config.Config import scala.collection.immutable @@ -56,7 +56,8 @@ case class AutoBindQueue(exchange: AutoBindExchange, routingKeys: immutable.Seq[ case class AutoBindExchange(name: String, declare: Config) -case class ProducerConfig(exchange: String, declare: Config, useKluzo: Boolean, reportUnroutable: Boolean, name: String) +case class ProducerConfig(exchange: String, declare: Config, useKluzo: Boolean, reportUnroutable: Boolean, name: String, properties: ProducerProperties) +case class ProducerProperties(deliveryMode: Int, contentType: Option[String], contentEncoding: Option[String], priority: Option[Int]) case class AutoDeclareExchange(enabled: Boolean, `type`: String, durable: Boolean, autoDelete: Boolean, arguments: DeclareArguments) diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/javaapi/DefaultRabbitMQProducer.scala b/core/src/main/scala/com/avast/clients/rabbitmq/javaapi/DefaultRabbitMQProducer.scala index 7698828b..a63dd4b7 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/javaapi/DefaultRabbitMQProducer.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/javaapi/DefaultRabbitMQProducer.scala @@ -3,7 +3,7 @@ package com.avast.clients.rabbitmq.javaapi import java.util.concurrent.CompletableFuture import com.avast.bytes.Bytes -import com.avast.clients.rabbitmq.api.{MessageProperties => ScalaProperties, RabbitMQProducer => ScalaProducer} +import com.avast.clients.rabbitmq.api.{DeliveryMode, MessageProperties => ScalaProperties, RabbitMQProducer => ScalaProducer} import com.avast.clients.rabbitmq.javaapi.JavaConverters._ import scala.collection.JavaConverters._ @@ -27,7 +27,7 @@ class DefaultRabbitMQProducer(scalaProducer: ScalaProducer[Future, Bytes] with A Option(properties.getContentType), Option(properties.getContentEncoding), Option(properties.getHeaders).map(_.asScala.toMap).getOrElse(Map.empty), - Option(properties.getDeliveryMode), + DeliveryMode.fromCode(properties.getDeliveryMode), Option(properties.getPriority), Option(properties.getCorrelationId), Option(properties.getReplyTo), diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/javaapi/JavaConverters.scala b/core/src/main/scala/com/avast/clients/rabbitmq/javaapi/JavaConverters.scala index 756b2a5a..35624f72 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/javaapi/JavaConverters.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/javaapi/JavaConverters.scala @@ -1,22 +1,12 @@ package com.avast.clients.rabbitmq.javaapi import java.util.concurrent.{CompletableFuture, Executor} -import java.util.{function, Date, Optional} +import java.util.{Date, Optional, function} import com.avast.bytes.Bytes import com.avast.clients.rabbitmq.DeliveryReadAction -import com.avast.clients.rabbitmq.api.{ - Delivery => ScalaDelivery, - DeliveryResult => ScalaResult, - DeliveryWithHandle => ScalaDeliveryWithHandle, - MessageProperties => ScalaProperties -} -import com.avast.clients.rabbitmq.javaapi.{ - Delivery => JavaDelivery, - DeliveryResult => JavaResult, - DeliveryWithHandle => JavaDeliveryWithHandle, - MessageProperties => JavaProperties -} +import com.avast.clients.rabbitmq.api.{DeliveryMode, Delivery => ScalaDelivery, DeliveryResult => ScalaResult, DeliveryWithHandle => ScalaDeliveryWithHandle, MessageProperties => ScalaProperties} +import com.avast.clients.rabbitmq.javaapi.{Delivery => JavaDelivery, DeliveryResult => JavaResult, DeliveryWithHandle => JavaDeliveryWithHandle, MessageProperties => JavaProperties} import com.rabbitmq.client.AMQP import com.rabbitmq.client.AMQP.BasicProperties @@ -70,7 +60,7 @@ private[rabbitmq] object JavaConverters { builder.headers(messageProperties.headers.asJava) messageProperties.contentType.foreach(builder.contentType) messageProperties.contentEncoding.foreach(builder.contentEncoding) - messageProperties.deliveryMode.foreach(builder.deliveryMode) + messageProperties.deliveryMode.foreach(dm => builder.deliveryMode(dm.code)) messageProperties.priority.foreach(builder.priority) messageProperties.correlationId.foreach(builder.correlationId) messageProperties.replyTo.foreach(builder.replyTo) @@ -93,7 +83,7 @@ private[rabbitmq] object JavaConverters { } messageProperties.contentType.foreach(builder.contentType) messageProperties.contentEncoding.foreach(builder.contentEncoding) - messageProperties.deliveryMode.foreach(builder.deliveryMode) + messageProperties.deliveryMode.foreach(dm => builder.deliveryMode(dm.code)) messageProperties.priority.foreach(builder.priority) messageProperties.correlationId.foreach(builder.correlationId) messageProperties.replyTo.foreach(builder.replyTo) @@ -115,7 +105,7 @@ private[rabbitmq] object JavaConverters { Option(properties.getContentType), Option(properties.getContentEncoding), Option(properties.getHeaders).map(_.asScala.toMap).getOrElse(Map.empty), - Option(properties.getDeliveryMode), + DeliveryMode.fromCode(properties.getDeliveryMode), Option(properties.getPriority), Option(properties.getCorrelationId), Option(properties.getReplyTo), @@ -136,7 +126,7 @@ private[rabbitmq] object JavaConverters { Option(properties.getContentType), Option(properties.getContentEncoding), Option(properties.getHeaders).map(_.asScala.toMap).getOrElse(Map.empty), - Option(properties.getDeliveryMode), + DeliveryMode.fromCode(properties.getDeliveryMode), Option(properties.getPriority), Option(properties.getCorrelationId), Option(properties.getReplyTo), diff --git a/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducerTest.scala b/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducerTest.scala index 27abe06b..cec631d6 100644 --- a/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducerTest.scala +++ b/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducerTest.scala @@ -28,6 +28,7 @@ class DefaultRabbitMQProducerTest extends FunSuite with MockitoSugar with Eventu exchangeName = exchangeName, channel = channel, monitor = Monitor.noOp, + defaultProperties = MessageProperties.empty, useKluzo = true, reportUnroutable = false, scheduler = Scheduler.Implicits.global