Skip to content

Commit

Permalink
Configurable AddressResolver (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
augi authored Feb 6, 2020
1 parent 0120fb6 commit 6a80cd1
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.avast.clients.rabbitmq

import java.io.IOException
import java.util
import java.util.concurrent.ExecutorService

import cats.effect._
Expand Down Expand Up @@ -105,8 +106,17 @@ object RabbitMQConnection extends StrictLogging {
Resource.make {
Sync[F].delay {
import connectionConfig._

val factory = new ConnectionFactory
import com.avast.clients.rabbitmq.AddressResolverType._
val factory = new ConnectionFactory {
override def createAddressResolver(addresses: util.List[Address]): AddressResolver = addressResolverType match {
case Default => super.createAddressResolver(addresses)
case List => new ListAddressResolver(addresses)
case DnsRecord if addresses.size() == 1 => new DnsRecordIpAddressResolver(addresses.get(0))
case DnsRecord => throw new IllegalArgumentException(s"DnsRecord configured but more hosts specified")
case DnsSrvRecord if addresses.size() == 1 => new DnsSrvRecordAddressResolver(addresses.get(0).getHost)
case DnsSrvRecord => throw new IllegalArgumentException(s"DnsSrvRecord configured but more hosts specified")
}
}
val exceptionHandler = createExceptionHandler(connectionListener, channelListener, consumerListener)
setUpConnection(connectionConfig, factory, exceptionHandler, sslContext, executor)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import scala.concurrent.duration._
final case class RabbitMQConnectionConfig(name: String,
hosts: immutable.Seq[String],
virtualHost: String,
addressResolverType: AddressResolverType = AddressResolverType.Default,
connectionTimeout: FiniteDuration = 5.seconds,
heartBeatInterval: FiniteDuration = 30.seconds,
topologyRecovery: Boolean = true,
Expand Down Expand Up @@ -92,6 +93,14 @@ final case class BindExchangeConfig(sourceExchangeName: String,
routingKeys: immutable.Seq[String],
arguments: BindArgumentsConfig = BindArgumentsConfig())

sealed trait AddressResolverType
object AddressResolverType {
case object Default extends AddressResolverType
case object List extends AddressResolverType
case object DnsRecord extends AddressResolverType
case object DnsSrvRecord extends AddressResolverType
}

sealed trait ExchangeType {
val value: String
}
Expand Down
1 change: 1 addition & 0 deletions pureconfig/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ rabbitConfig {
myConfig {
hosts = ["localhost:5672"]
virtualHost = "/"
addressResolverType = "Default" // other possible values: List, DnsRecord, DnsSrvRecord
name = "Cluster01Connection" // used for logging AND is also visible in client properties in RabbitMQ management console
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,13 @@ import _root_.pureconfig.generic.semiauto._
import cats.data.NonEmptyList
import com.avast.clients.rabbitmq.api.DeliveryResult
import com.avast.clients.rabbitmq.api.DeliveryResult._
import com.avast.clients.rabbitmq.{
AutoBindExchangeConfig,
AutoBindQueueConfig,
AutoDeclareExchangeConfig,
AutoDeclareQueueConfig,
BindArgumentsConfig,
BindExchangeConfig,
BindQueueConfig,
ConsumerConfig,
CredentialsConfig,
DeclareArgumentsConfig,
DeclareExchangeConfig,
DeclareQueueConfig,
ExchangeType,
NetworkRecoveryConfig,
ProducerConfig,
ProducerPropertiesConfig,
PullConsumerConfig,
RabbitMQConnectionConfig,
RecoveryDelayHandlers
}
import com.avast.clients.rabbitmq.{pureconfig => _, _}
import com.rabbitmq.client.RecoveryDelayHandler
import org.slf4j.event.Level
import pureconfig.error._

import scala.collection.JavaConverters._
import scala.util.{Failure, Success}

// scalastyle:off
object implicits extends PureconfigImplicits( /* use defaults */ ) {
Expand Down Expand Up @@ -106,6 +87,13 @@ class PureconfigImplicits(implicit namingConvention: NamingConvention = CamelCas
implicit val logLevelReader: ConfigReader[Level] = ConfigReader.stringConfigReader.map(Level.valueOf)
implicit val recoveryDelayHandlerReader: ConfigReader[RecoveryDelayHandler] = RecoveryDelayHandlerReader
implicit val exchangeTypeReader: ConfigReader[ExchangeType] = ConfigReader.fromNonEmptyStringOpt(ExchangeType.apply)
implicit val addressResolverTypeReader: ConfigReader[AddressResolverType] = ConfigReader.fromNonEmptyStringTry {
case "Default" => Success(AddressResolverType.Default)
case "ListAddress" => Success(AddressResolverType.List)
case "DnsRecordIpAddress" => Success(AddressResolverType.DnsRecord)
case "DnsSrvRecordAddress" => Success(AddressResolverType.DnsSrvRecord)
case unknownName => Failure(new IllegalArgumentException(s"Unknown addressResolverType: $unknownName"))
}

implicit val deliveryResultReader: ConfigReader[DeliveryResult] = ConfigReader.stringConfigReader.map {
_.toLowerCase match {
Expand Down

0 comments on commit 6a80cd1

Please sign in to comment.