From 6a80cd145c3fee3ea3d9d370db1b07dc68343d0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20August=C3=BDn?= Date: Thu, 6 Feb 2020 14:00:51 +0100 Subject: [PATCH] Configurable AddressResolver (#42) --- .../clients/rabbitmq/RabbitMQConnection.scala | 14 +++++++-- .../clients/rabbitmq/configuration.scala | 9 ++++++ pureconfig/README.md | 1 + .../pureconfig/PureconfigImplicits.scala | 30 ++++++------------- 4 files changed, 31 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/RabbitMQConnection.scala b/core/src/main/scala/com/avast/clients/rabbitmq/RabbitMQConnection.scala index 044b18f1..51a89631 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/RabbitMQConnection.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/RabbitMQConnection.scala @@ -1,6 +1,7 @@ package com.avast.clients.rabbitmq import java.io.IOException +import java.util import java.util.concurrent.ExecutorService import cats.effect._ @@ -105,8 +106,17 @@ object RabbitMQConnection extends StrictLogging { Resource.make { Sync[F].delay { import connectionConfig._ - - val factory = new ConnectionFactory + import com.avast.clients.rabbitmq.AddressResolverType._ + val factory = new ConnectionFactory { + override def createAddressResolver(addresses: util.List[Address]): AddressResolver = addressResolverType match { + case Default => super.createAddressResolver(addresses) + case List => new ListAddressResolver(addresses) + case DnsRecord if addresses.size() == 1 => new DnsRecordIpAddressResolver(addresses.get(0)) + case DnsRecord => throw new IllegalArgumentException(s"DnsRecord configured but more hosts specified") + case DnsSrvRecord if addresses.size() == 1 => new DnsSrvRecordAddressResolver(addresses.get(0).getHost) + case DnsSrvRecord => throw new IllegalArgumentException(s"DnsSrvRecord configured but more hosts specified") + } + } val exceptionHandler = createExceptionHandler(connectionListener, channelListener, consumerListener) setUpConnection(connectionConfig, factory, exceptionHandler, sslContext, executor) diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala b/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala index ad74078a..2d39d4f1 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala @@ -10,6 +10,7 @@ import scala.concurrent.duration._ final case class RabbitMQConnectionConfig(name: String, hosts: immutable.Seq[String], virtualHost: String, + addressResolverType: AddressResolverType = AddressResolverType.Default, connectionTimeout: FiniteDuration = 5.seconds, heartBeatInterval: FiniteDuration = 30.seconds, topologyRecovery: Boolean = true, @@ -92,6 +93,14 @@ final case class BindExchangeConfig(sourceExchangeName: String, routingKeys: immutable.Seq[String], arguments: BindArgumentsConfig = BindArgumentsConfig()) +sealed trait AddressResolverType +object AddressResolverType { + case object Default extends AddressResolverType + case object List extends AddressResolverType + case object DnsRecord extends AddressResolverType + case object DnsSrvRecord extends AddressResolverType +} + sealed trait ExchangeType { val value: String } diff --git a/pureconfig/README.md b/pureconfig/README.md index 2033c85c..d5fc78fb 100644 --- a/pureconfig/README.md +++ b/pureconfig/README.md @@ -70,6 +70,7 @@ rabbitConfig { myConfig { hosts = ["localhost:5672"] virtualHost = "/" + addressResolverType = "Default" // other possible values: List, DnsRecord, DnsSrvRecord name = "Cluster01Connection" // used for logging AND is also visible in client properties in RabbitMQ management console diff --git a/pureconfig/src/main/scala/com/avast/clients/rabbitmq/pureconfig/PureconfigImplicits.scala b/pureconfig/src/main/scala/com/avast/clients/rabbitmq/pureconfig/PureconfigImplicits.scala index 72c645f0..ec93bb30 100644 --- a/pureconfig/src/main/scala/com/avast/clients/rabbitmq/pureconfig/PureconfigImplicits.scala +++ b/pureconfig/src/main/scala/com/avast/clients/rabbitmq/pureconfig/PureconfigImplicits.scala @@ -7,32 +7,13 @@ import _root_.pureconfig.generic.semiauto._ import cats.data.NonEmptyList import com.avast.clients.rabbitmq.api.DeliveryResult import com.avast.clients.rabbitmq.api.DeliveryResult._ -import com.avast.clients.rabbitmq.{ - AutoBindExchangeConfig, - AutoBindQueueConfig, - AutoDeclareExchangeConfig, - AutoDeclareQueueConfig, - BindArgumentsConfig, - BindExchangeConfig, - BindQueueConfig, - ConsumerConfig, - CredentialsConfig, - DeclareArgumentsConfig, - DeclareExchangeConfig, - DeclareQueueConfig, - ExchangeType, - NetworkRecoveryConfig, - ProducerConfig, - ProducerPropertiesConfig, - PullConsumerConfig, - RabbitMQConnectionConfig, - RecoveryDelayHandlers -} +import com.avast.clients.rabbitmq.{pureconfig => _, _} import com.rabbitmq.client.RecoveryDelayHandler import org.slf4j.event.Level import pureconfig.error._ import scala.collection.JavaConverters._ +import scala.util.{Failure, Success} // scalastyle:off object implicits extends PureconfigImplicits( /* use defaults */ ) { @@ -106,6 +87,13 @@ class PureconfigImplicits(implicit namingConvention: NamingConvention = CamelCas implicit val logLevelReader: ConfigReader[Level] = ConfigReader.stringConfigReader.map(Level.valueOf) implicit val recoveryDelayHandlerReader: ConfigReader[RecoveryDelayHandler] = RecoveryDelayHandlerReader implicit val exchangeTypeReader: ConfigReader[ExchangeType] = ConfigReader.fromNonEmptyStringOpt(ExchangeType.apply) + implicit val addressResolverTypeReader: ConfigReader[AddressResolverType] = ConfigReader.fromNonEmptyStringTry { + case "Default" => Success(AddressResolverType.Default) + case "ListAddress" => Success(AddressResolverType.List) + case "DnsRecordIpAddress" => Success(AddressResolverType.DnsRecord) + case "DnsSrvRecordAddress" => Success(AddressResolverType.DnsSrvRecord) + case unknownName => Failure(new IllegalArgumentException(s"Unknown addressResolverType: $unknownName")) + } implicit val deliveryResultReader: ConfigReader[DeliveryResult] = ConfigReader.stringConfigReader.map { _.toLowerCase match {