Skip to content

Commit

Permalink
Better config errors - contain understandable config path (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
jendakol authored Dec 6, 2018
1 parent 13f033c commit 89d7796
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import net.ceedubs.ficus.readers.ArbitraryTypeReader._
import net.ceedubs.ficus.readers.ValueReader

import scala.collection.JavaConverters._
import scala.collection.generic.CanBuildFrom
import scala.collection.immutable
import scala.concurrent.duration.{Duration => ScalaDuration}
import scala.language.{higherKinds, implicitConversions}
Expand All @@ -30,6 +31,8 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {

private type DefaultDeliveryReadAction[F[_]] = DeliveryReadAction[F, Bytes]

private[rabbitmq] val FakeConfigRootName = "_rabbitmq_config_root_"

private[rabbitmq] final val DeclareQueueRootConfigKey = "avastRabbitMQDeclareQueueDefaults"
private[rabbitmq] final val DeclareQueueDefaultConfig = ConfigFactory.defaultReference().getConfig(DeclareQueueRootConfigKey)

Expand Down Expand Up @@ -94,23 +97,41 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
BindArguments(argumentsMap)
}

// this overrides Ficus's default reader and pass better path into possible exception
private implicit def traversableReader[C[_], A](implicit entryReader: ValueReader[A],
cbf: CanBuildFrom[Nothing, A, C[A]]): ValueReader[C[A]] =
(config: Config, path: String) => {
val list = config.getList(path).asScala
val builder = cbf()
builder.sizeHint(list.size)
list.zipWithIndex.foreach {
case (entry, index) =>
val entryConfig = entry.atPath(s"$path.$index")
builder += entryReader.read(entryConfig, s"$path.$index")
}

builder.result
}

object Producer {

def fromConfig[F[_]: Effect, A: ProductConverter](providedConfig: Config,
configName: String,
channel: ServerChannel,
factoryInfo: RabbitMQConnectionInfo,
blockingScheduler: Scheduler,
monitor: Monitor)(implicit scheduler: Scheduler): DefaultRabbitMQProducer[F, A] = {
val producerConfig = providedConfig.wrapped.as[ProducerConfig]("root")
create[F, A](producerConfig, channel, factoryInfo, blockingScheduler, monitor)
val producerConfig = providedConfig.wrapped(configName).as[ProducerConfig](configName)
create[F, A](producerConfig, configName, channel, factoryInfo, blockingScheduler, monitor)
}

def create[F[_]: Effect, A: ProductConverter](producerConfig: ProducerConfig,
configName: String,
channel: ServerChannel,
factoryInfo: RabbitMQConnectionInfo,
blockingScheduler: Scheduler,
monitor: Monitor)(implicit scheduler: Scheduler): DefaultRabbitMQProducer[F, A] = {
prepareProducer[F, A](producerConfig, channel, factoryInfo, blockingScheduler, monitor)
prepareProducer[F, A](producerConfig, configName, channel, factoryInfo, blockingScheduler, monitor)
}

}
Expand All @@ -119,6 +140,7 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {

def fromConfig[F[_]: Effect, A: DeliveryConverter](
providedConfig: Config,
configName: String,
channel: ServerChannel,
channelFactoryInfo: RabbitMQConnectionInfo,
blockingScheduler: Scheduler,
Expand All @@ -140,28 +162,30 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
mergedConfig.withValue("bindings", ConfigValueFactory.fromIterable(updated.asJava))
}

val consumerConfig = updatedConfig.wrapped.as[ConsumerConfig]("root")
val consumerConfig = updatedConfig.wrapped(configName).as[ConsumerConfig](configName)

create[F, A](consumerConfig, channel, channelFactoryInfo, blockingScheduler, monitor, consumerListener, readAction)
create[F, A](consumerConfig, configName, channel, channelFactoryInfo, blockingScheduler, monitor, consumerListener, readAction)
}

def create[F[_]: Effect, A: DeliveryConverter](
consumerConfig: ConsumerConfig,
configName: String,
channel: ServerChannel,
channelFactoryInfo: RabbitMQConnectionInfo,
blockingScheduler: Scheduler,
monitor: Monitor,
consumerListener: ConsumerListener,
readAction: DeliveryReadAction[F, A])(implicit scheduler: Scheduler): DefaultRabbitMQConsumer[F] = {

prepareConsumer(consumerConfig, readAction, channelFactoryInfo, channel, consumerListener, blockingScheduler, monitor)
prepareConsumer(consumerConfig, configName, readAction, channelFactoryInfo, channel, consumerListener, blockingScheduler, monitor)
}
}

object PullConsumer {

def fromConfig[F[_]: Effect, A: DeliveryConverter](
providedConfig: Config,
configName: String,
channel: ServerChannel,
channelFactoryInfo: RabbitMQConnectionInfo,
blockingScheduler: Scheduler,
Expand All @@ -181,18 +205,19 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
mergedConfig.withValue("bindings", ConfigValueFactory.fromIterable(updated.asJava))
}

val consumerConfig = updatedConfig.wrapped.as[PullConsumerConfig]("root")
val consumerConfig = updatedConfig.wrapped(configName).as[PullConsumerConfig](configName)

create[F, A](consumerConfig, channel, channelFactoryInfo, blockingScheduler, monitor)
create[F, A](consumerConfig, configName, channel, channelFactoryInfo, blockingScheduler, monitor)
}

def create[F[_]: Effect, A: DeliveryConverter](consumerConfig: PullConsumerConfig,
configName: String,
channel: ServerChannel,
channelFactoryInfo: RabbitMQConnectionInfo,
blockingScheduler: Scheduler,
monitor: Monitor)(implicit scheduler: Scheduler): DefaultRabbitMQPullConsumer[F, A] = {

preparePullConsumer(consumerConfig, channelFactoryInfo, channel, blockingScheduler, monitor)
preparePullConsumer(consumerConfig, configName, channelFactoryInfo, channel, blockingScheduler, monitor)
}
}

Expand Down Expand Up @@ -245,6 +270,7 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {

private def prepareProducer[F[_]: Effect, A: ProductConverter](
producerConfig: ProducerConfig,
configName: String,
channel: ServerChannel,
channelFactoryInfo: RabbitMQConnectionInfo,
blockingScheduler: Scheduler,
Expand All @@ -262,7 +288,9 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
// parse it only if it's needed
// "Lazy" parsing, because exchange type is not part of reference.conf and we don't want to make it fail on missing type when enabled=false
if (declare.getBoolean("enabled")) {
val d = declare.wrapped.as[AutoDeclareExchange]("root")
val path = s"$configName.declare"
val d = declare.wrapped(path).as[AutoDeclareExchange](path)

declareExchange(exchange, channelFactoryInfo, channel, d)
}
new DefaultRabbitMQProducer[F, A](producerConfig.name,
Expand Down Expand Up @@ -301,6 +329,7 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {

private def prepareConsumer[F[_]: Effect, A: DeliveryConverter](
consumerConfig: ConsumerConfig,
configName: String,
readAction: DeliveryReadAction[F, A],
channelFactoryInfo: RabbitMQConnectionInfo,
channel: ServerChannel,
Expand All @@ -309,27 +338,10 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
monitor: Monitor)(implicit scheduler: Scheduler): DefaultRabbitMQConsumer[F] = {

// auto declare exchanges
consumerConfig.bindings.foreach { bind =>
import bind.exchange._

// parse it only if it's needed
if (declare.getBoolean("enabled")) {
val d = declare.wrapped.as[AutoDeclareExchange]("root")

declareExchange(name, channelFactoryInfo, channel, d)
}
}
declareExchangesFromBindings(configName, channelFactoryInfo, channel, consumerConfig.bindings)

// auto declare queue
{
import consumerConfig.declare._
import consumerConfig.queueName

if (enabled) {
logger.info(s"Declaring queue '$queueName' in virtual host '${channelFactoryInfo.virtualHost}'")
declareQueue(channel, queueName, durable, exclusive, autoDelete, arguments)
}
}
declareQueue(consumerConfig.queueName, channelFactoryInfo, channel, consumerConfig.declare)

// set prefetch size (per consumer)
channel.basicQos(consumerConfig.prefetchCount)
Expand All @@ -342,6 +354,7 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {

private def preparePullConsumer[F[_]: Effect, A: DeliveryConverter](
consumerConfig: PullConsumerConfig,
configName: String,
channelFactoryInfo: RabbitMQConnectionInfo,
channel: ServerChannel,
blockingScheduler: Scheduler,
Expand All @@ -350,33 +363,29 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
import consumerConfig._

// auto declare exchanges
consumerConfig.bindings.foreach { bind =>
import bind.exchange._

// parse it only if it's needed
if (declare.getBoolean("enabled")) {
val d = declare.wrapped.as[AutoDeclareExchange]("root")

declareExchange(name, channelFactoryInfo, channel, d)
}
}
declareExchangesFromBindings(configName, channelFactoryInfo, channel, consumerConfig.bindings)

// auto declare queue
{
import consumerConfig.declare._

if (enabled) {
logger.info(s"Declaring queue '$queueName' in virtual host '${channelFactoryInfo.virtualHost}'")
declareQueue(channel, queueName, durable, exclusive, autoDelete, arguments)
}
}
declareQueue(queueName, channelFactoryInfo, channel, declare)

// auto bind
bindQueues(channelFactoryInfo, channel, consumerConfig.queueName, consumerConfig.bindings)

new DefaultRabbitMQPullConsumer[F, A](name, channel, queueName, failureAction, monitor, blockingScheduler)
}

private def declareQueue(queueName: String,
channelFactoryInfo: RabbitMQConnectionInfo,
channel: ServerChannel,
declare: AutoDeclareQueue): Unit = {
import declare._

if (enabled) {
logger.info(s"Declaring queue '$queueName' in virtual host '${channelFactoryInfo.virtualHost}'")
declareQueue(channel, queueName, durable, exclusive, autoDelete, arguments)
}
}

private[rabbitmq] def declareQueue(channel: ServerChannel,
queueName: String,
durable: Boolean,
Expand Down Expand Up @@ -435,6 +444,24 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
channel.exchangeBind(destExchangeName, sourceExchangeName, routingKey, arguments)
}

private def declareExchangesFromBindings(configName: String,
channelFactoryInfo: RabbitMQConnectionInfo,
channel: ServerChannel,
bindings: Seq[AutoBindQueue]): Unit = {
bindings.zipWithIndex.foreach {
case (bind, i) =>
import bind.exchange._

// parse it only if it's needed
if (declare.getBoolean("enabled")) {
val path = s"$configName.bindings.$i.exchange.declare"
val d = declare.wrapped(path).as[AutoDeclareExchange](path)

declareExchange(name, channelFactoryInfo, channel, d)
}
}
}

private def prepareConsumer[F[_]: Effect, A: DeliveryConverter](consumerConfig: ConsumerConfig,
channelFactoryInfo: RabbitMQConnectionInfo,
channel: ServerChannel,
Expand Down Expand Up @@ -517,11 +544,11 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
}

implicit class WrapConfig(val c: Config) extends AnyVal {
def wrapped: Config = {
def wrapped(prefix: String = "root"): Config = {
// we need to wrap it with one level, to be able to parse it with Ficus
ConfigFactory
.empty()
.withValue("root", c.withFallback(ProducerDefaultConfig).root())
.withValue(prefix, c.withFallback(ProducerDefaultConfig).root())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.avast.clients.rabbitmq

import cats.effect.Effect
import cats.syntax.all._
import com.avast.clients.rabbitmq.DefaultRabbitMQClientFactory.FakeConfigRootName
import com.avast.clients.rabbitmq.api.{FAutoCloseable, RabbitMQConsumer, RabbitMQProducer, RabbitMQPullConsumer}
import com.avast.metrics.scalaapi.Monitor
import com.rabbitmq.client.ShutdownSignalException
Expand Down Expand Up @@ -61,7 +62,14 @@ class DefaultRabbitMQConnection[F[_]](connection: ServerConnection,
implicit val scheduler = Scheduler(ses, ec)

DefaultRabbitMQClientFactory.Consumer
.fromConfig[F, A](config.getConfig(configName), channel, info, blockingScheduler, monitor, consumerListener, readAction)
.fromConfig[F, A](config.getConfig(configName),
s"$FakeConfigRootName.$configName",
channel,
info,
blockingScheduler,
monitor,
consumerListener,
readAction)
}
}
}
Expand All @@ -73,7 +81,7 @@ class DefaultRabbitMQConnection[F[_]](connection: ServerConnection,
implicit val scheduler = Scheduler(ses, ec)

DefaultRabbitMQClientFactory.Consumer
.create[F, A](consumerConfig, channel, info, blockingScheduler, monitor, consumerListener, readAction)
.create[F, A](consumerConfig, "_manually_provided_", channel, info, blockingScheduler, monitor, consumerListener, readAction)
}
}
}
Expand All @@ -85,7 +93,7 @@ class DefaultRabbitMQConnection[F[_]](connection: ServerConnection,
implicit val scheduler = Scheduler(ses, ec)

DefaultRabbitMQClientFactory.PullConsumer
.fromConfig[F, A](config.getConfig(configName), channel, info, blockingScheduler, monitor)
.fromConfig[F, A](config.getConfig(configName), s"$FakeConfigRootName.$configName", channel, info, blockingScheduler, monitor)
}
}
}
Expand All @@ -97,7 +105,7 @@ class DefaultRabbitMQConnection[F[_]](connection: ServerConnection,
implicit val scheduler = Scheduler(ses, ec)

DefaultRabbitMQClientFactory.PullConsumer
.create[F, A](pullConsumerConfig, channel, info, blockingScheduler, monitor)
.create[F, A](pullConsumerConfig, "_manually_provided_", channel, info, blockingScheduler, monitor)
}
}
}
Expand All @@ -108,7 +116,7 @@ class DefaultRabbitMQConnection[F[_]](connection: ServerConnection,
implicit val scheduler = Scheduler(ses, ec)

DefaultRabbitMQClientFactory.Producer
.fromConfig[F, A](config.getConfig(configName), channel, info, blockingScheduler, monitor)
.fromConfig[F, A](config.getConfig(configName), s"$FakeConfigRootName.$configName", channel, info, blockingScheduler, monitor)
}
}
}
Expand All @@ -120,7 +128,7 @@ class DefaultRabbitMQConnection[F[_]](connection: ServerConnection,
implicit val scheduler = Scheduler(ses, ec)

DefaultRabbitMQClientFactory.Producer
.create[F, A](producerConfig, channel, info, blockingScheduler, monitor)
.create[F, A](producerConfig, "_manually_provided_", channel, info, blockingScheduler, monitor)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import java.time.Duration
import java.util.concurrent.ExecutorService

import cats.effect.{Effect, Sync}
import com.avast.clients.rabbitmq.DefaultRabbitMQClientFactory.FakeConfigRootName
import com.avast.clients.rabbitmq.api._
import com.avast.clients.rabbitmq.ssl.{KeyStoreTypes, SSLBuilder}
import com.avast.metrics.scalaapi.Monitor
Expand Down Expand Up @@ -141,9 +142,9 @@ object RabbitMQConnection extends StrictLogging {
// we need to wrap it with one level, to be able to parse it with Ficus
val config = ConfigFactory
.empty()
.withValue("root", providedConfig.withFallback(DefaultConfig).root())
.withValue(FakeConfigRootName, providedConfig.withFallback(DefaultConfig).root())

val connectionConfig = config.as[RabbitMQConnectionConfig]("root")
val connectionConfig = config.as[RabbitMQConnectionConfig](FakeConfigRootName)

val connection = createConnection(connectionConfig, blockingExecutor, connectionListener, channelListener, consumerListener)

Expand Down

0 comments on commit 89d7796

Please sign in to comment.