diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 3607aa41..2973c360 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -45,6 +45,8 @@ avastRabbitMQConsumerDefaults { failureAction = Republish // Ack, Reject, Retry, Republish timeoutAction = Republish // Ack, Reject, Retry, Republish + timeoutLogLevel = WARN // TRACE, DEBUG, INFO, WARN, ERROR + declare { enabled = false diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala index d2719214..daa25207 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala @@ -7,7 +7,7 @@ import cats.effect.Effect import com.avast.bytes.Bytes import com.avast.clients.rabbitmq.api.DeliveryResult.{Ack, Reject, Republish, Retry} import com.avast.clients.rabbitmq.api._ -import com.avast.metrics.scalaapi.Monitor +import com.avast.metrics.scalaapi.{Meter, Monitor} import com.rabbitmq.client.AMQP import com.rabbitmq.client.AMQP.Queue import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} @@ -17,6 +17,7 @@ import monix.execution.Scheduler import net.ceedubs.ficus.Ficus._ import net.ceedubs.ficus.readers.ArbitraryTypeReader._ import net.ceedubs.ficus.readers.ValueReader +import org.slf4j.event.Level import scala.collection.JavaConverters._ import scala.collection.generic.CanBuildFrom @@ -97,6 +98,8 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { BindArguments(argumentsMap) } + private implicit final val logLevelReader: ValueReader[Level] = ValueReader[String].map(Level.valueOf) + // this overrides Ficus's default reader and pass better path into possible exception private implicit def traversableReader[C[_], A](implicit entryReader: ValueReader[A], cbf: CanBuildFrom[Nothing, A, C[A]]): ValueReader[C[A]] = @@ -527,10 +530,7 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { action .timeout(ScalaDuration(processTimeout.toMillis, TimeUnit.MILLISECONDS)) .onErrorRecoverWith { - case e: TimeoutException => - timeoutsMeter.mark() - logger.warn(s"[$name] Task timed-out, applying DeliveryResult.${consumerConfig.timeoutAction}", e) - Task.now(consumerConfig.timeoutAction) + case e: TimeoutException => doTimeoutAction(consumerConfig, timeoutsMeter, e) } } @@ -551,6 +551,26 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { } } + private def doTimeoutAction[A, F[_]: Effect](consumerConfig: ConsumerConfig, + timeoutsMeter: Meter, + e: TimeoutException): Task[DeliveryResult] = Task { + import consumerConfig._ + + timeoutsMeter.mark() + + lazy val msg = s"[$name] Task timed-out, applying DeliveryResult.${consumerConfig.timeoutAction}" + + timeoutLogLevel match { + case Level.ERROR => logger.error(msg, e) + case Level.WARN => logger.warn(msg, e) + case Level.INFO => logger.info(msg, e) + case Level.DEBUG => logger.debug(msg, e) + case Level.TRACE => logger.trace(msg, e) + } + + consumerConfig.timeoutAction + } + implicit class WrapConfig(val c: Config) extends AnyVal { def wrapped(prefix: String = "root"): Config = { // we need to wrap it with one level, to be able to parse it with Ficus diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala b/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala index 1eeb53fa..3a3e8353 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala @@ -5,6 +5,7 @@ import java.time.Duration import com.avast.clients.rabbitmq.api.DeliveryResult import com.typesafe.config.Config +import org.slf4j.event.Level import scala.collection.immutable @@ -32,6 +33,7 @@ case class ConsumerConfig(queueName: String, processTimeout: Duration, failureAction: DeliveryResult, timeoutAction: DeliveryResult, + timeoutLogLevel: Level, prefetchCount: Int, declare: AutoDeclareQueue, bindings: immutable.Seq[AutoBindQueue],