Skip to content

Commit

Permalink
Configurable timeout log level (#21)
Browse files Browse the repository at this point in the history
Configurable timeout log level
  • Loading branch information
jendakol authored Jan 18, 2019
1 parent 3d57856 commit 23090c5
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 5 deletions.
2 changes: 2 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ avastRabbitMQConsumerDefaults {
failureAction = Republish // Ack, Reject, Retry, Republish
timeoutAction = Republish // Ack, Reject, Retry, Republish

timeoutLogLevel = WARN // TRACE, DEBUG, INFO, WARN, ERROR

declare {
enabled = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import cats.effect.Effect
import com.avast.bytes.Bytes
import com.avast.clients.rabbitmq.api.DeliveryResult.{Ack, Reject, Republish, Retry}
import com.avast.clients.rabbitmq.api._
import com.avast.metrics.scalaapi.Monitor
import com.avast.metrics.scalaapi.{Meter, Monitor}
import com.rabbitmq.client.AMQP
import com.rabbitmq.client.AMQP.Queue
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
Expand All @@ -17,6 +17,7 @@ import monix.execution.Scheduler
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader._
import net.ceedubs.ficus.readers.ValueReader
import org.slf4j.event.Level

import scala.collection.JavaConverters._
import scala.collection.generic.CanBuildFrom
Expand Down Expand Up @@ -97,6 +98,8 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
BindArguments(argumentsMap)
}

private implicit final val logLevelReader: ValueReader[Level] = ValueReader[String].map(Level.valueOf)

// this overrides Ficus's default reader and pass better path into possible exception
private implicit def traversableReader[C[_], A](implicit entryReader: ValueReader[A],
cbf: CanBuildFrom[Nothing, A, C[A]]): ValueReader[C[A]] =
Expand Down Expand Up @@ -527,10 +530,7 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
action
.timeout(ScalaDuration(processTimeout.toMillis, TimeUnit.MILLISECONDS))
.onErrorRecoverWith {
case e: TimeoutException =>
timeoutsMeter.mark()
logger.warn(s"[$name] Task timed-out, applying DeliveryResult.${consumerConfig.timeoutAction}", e)
Task.now(consumerConfig.timeoutAction)
case e: TimeoutException => doTimeoutAction(consumerConfig, timeoutsMeter, e)
}
}

Expand All @@ -551,6 +551,26 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging {
}
}

private def doTimeoutAction[A, F[_]: Effect](consumerConfig: ConsumerConfig,
timeoutsMeter: Meter,
e: TimeoutException): Task[DeliveryResult] = Task {
import consumerConfig._

timeoutsMeter.mark()

lazy val msg = s"[$name] Task timed-out, applying DeliveryResult.${consumerConfig.timeoutAction}"

timeoutLogLevel match {
case Level.ERROR => logger.error(msg, e)
case Level.WARN => logger.warn(msg, e)
case Level.INFO => logger.info(msg, e)
case Level.DEBUG => logger.debug(msg, e)
case Level.TRACE => logger.trace(msg, e)
}

consumerConfig.timeoutAction
}

implicit class WrapConfig(val c: Config) extends AnyVal {
def wrapped(prefix: String = "root"): Config = {
// we need to wrap it with one level, to be able to parse it with Ficus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import java.time.Duration

import com.avast.clients.rabbitmq.api.DeliveryResult
import com.typesafe.config.Config
import org.slf4j.event.Level

import scala.collection.immutable

Expand Down Expand Up @@ -32,6 +33,7 @@ case class ConsumerConfig(queueName: String,
processTimeout: Duration,
failureAction: DeliveryResult,
timeoutAction: DeliveryResult,
timeoutLogLevel: Level,
prefetchCount: Int,
declare: AutoDeclareQueue,
bindings: immutable.Seq[AutoBindQueue],
Expand Down

0 comments on commit 23090c5

Please sign in to comment.