From 361bc1c18aa313c377b40e51171b8ceda4990acb Mon Sep 17 00:00:00 2001 From: jendakol Date: Fri, 25 May 2018 14:39:53 +0200 Subject: [PATCH] ParsingFailureAction and related API changes (#7) * sealed trait Delivery - Ok and MalformedContent * PullResult instead of Option for RabbitMQPullConsumer * ConversionException to api module --- Migration-5-6.md | 5 +- README.md | 31 +++-- .../avast/clients/rabbitmq/api/Delivery.scala | 20 ++- .../rabbitmq/api/RabbitMQPullConsumer.scala | 22 +++- .../clients/rabbitmq/api/exceptions.scala | 3 + .../javaapi/RabbitMQPullConsumer.scala | 32 ++++- .../DefaultRabbitMQClientFactory.scala | 87 ++++++++----- .../rabbitmq/DefaultRabbitMQConnection.scala | 4 +- .../DefaultRabbitMQPullConsumer.scala | 121 ++++++++++-------- .../rabbitmq/MultiFormatConsumer.scala | 35 +++-- .../avast/clients/rabbitmq/converters.scala | 4 +- .../javaapi/DefaultRabbitMQPullConsumer.scala | 14 +- .../rabbitmq/javaapi/JavaConverters.scala | 10 +- .../com/avast/clients/rabbitmq/rabbitmq.scala | 26 +++- core/src/test/java/ExampleJava.java | 25 +++- .../DefaultRabbitMQPullConsumerTest.scala | 25 ++-- .../com/avast/clients/rabbitmq/LiveTest.scala | 73 +++++++++-- .../rabbitmq/MultiFormatConsumerTest.scala | 46 +++---- .../extras/format/GpbDeliveryConverter.scala | 4 +- .../extras/format/GpbProductConverter.scala | 4 +- .../extras/format/JsonDeliveryConverter.scala | 4 +- .../extras/format/JsonProductConverter.scala | 4 +- .../extras/PoisonedMessageHandler.scala | 48 +++---- scalastyle_config.xml | 2 +- 24 files changed, 421 insertions(+), 228 deletions(-) create mode 100644 api/src/main/scala/com/avast/clients/rabbitmq/api/exceptions.scala diff --git a/Migration-5-6.md b/Migration-5-6.md index 5a21f4fe..5f746302 100644 --- a/Migration-5-6.md +++ b/Migration-5-6.md @@ -11,12 +11,14 @@ Changes in Scala API: 1. The whole API is _finally tagless_ - all methods return `F[_]`. See [related section](README.md#scala-usage) in docs. 1. The API now uses type-conversions - provide type and related converter when creating producer/consumer. See [related section](README.md#providing-converters-for-producer/consumer) in docs. -1. The `Delivery` is now generic - e.g. `Delivery[Bytes]` (depends on type-conversion). +1. The `Delivery` is now sealed trait - there are `Delivery.Ok[A]` (e.g. `Delivery[Bytes]`, depends on type-conversion) and `Delivery.MalformedContent`. +After getting the `Delivery[A]` you should pattern-match it. 1. The API now requires an implicit `monix.execution.Scheduler` instead of `ExecutionContext`. 1. Methods like `RabbitMQConnection.declareQueue` now return `F[Unit]` (was `Try[Done]` before). 1. Possibility to pass manually created configurations (`ProducerConfig` etc.) is now gone. The only option is to use TypeSafe config. 1. There is no `RabbitMQConsumer.bindTo` method anymore. Use [additional declarations](README.md#additional-declarations-and-bindings) for such thing. 1. There are new methods in [`RabbitMQConnection`](core/src/main/scala/com/avast/clients/rabbitmq/RabbitMQConnection.scala): `newChannel` and `withChannel`. +1. [`RabbitMQPullConsumer`](README.md#pull-consumer) was added Changes in Java API: @@ -27,3 +29,4 @@ Changes in Java API: 1. Method `RabbitMQProducer.send` now returns `CompletableFuture[Void]` (was `void` before) - ***it's not blocking anymore!*** 1. `RabbitMQConsumer` and `RabbitMQProducer` (`api` module) are now traits and have their `Default*` counterparts in `core` module 1. There is no `RabbitMQConsumer.bindTo` method anymore. Use [additional declarations](README.md#additional-declarations-and-bindings) for such thing. +1. `RabbitMQPullConsumer` was added diff --git a/README.md b/README.md index e99d433a..e1c34888 100644 --- a/README.md +++ b/README.md @@ -182,9 +182,13 @@ val monitor: Monitor = ??? // if you expect very high load, you can use separate connections for each producer/consumer, but it's usually not needed val rabbitConnection = RabbitMQConnection.fromConfig[Task](config, blockingExecutor) // DefaultRabbitMQConnection[Task] -val consumer = rabbitConnection.newConsumer[Bytes]("consumer", monitor) { (delivery: Delivery[Bytes]) => - println(delivery) - Task.now(DeliveryResult.Ack) +val consumer = rabbitConnection.newConsumer[Bytes]("consumer", monitor) { + case delivery: Delivery.Ok[Bytes] => + println(delivery) + Task.now(DeliveryResult.Ack) + + case _: Delivery.MalformedContent => + Task.now(DeliveryResult.Reject) } // DefaultRabbitMQConsumer val sender = rabbitConnection.newProducer("producer", monitor) // DefaultRabbitMQProducer[Task] @@ -295,6 +299,9 @@ public class ExampleJava { } ``` +The Java API has some limitations compared to the Scala one - mainly it does not support [types conversions](#providing-converters-for-producer/consumer) +and it offers only asynchronous version with `CompletableFuture` as result of all operations. + See [full example](core/src/test/java/ExampleJava.java) ## Notes @@ -360,7 +367,12 @@ Check [reference.conf](core/src/main/resources/reference.conf) for all options o Sometimes your use-case just doesn't fit the _normal_ consumer scenario. Here you can use the _pull consumer_ which gives you much more control over the received messages. You _pull_ new message from the queue and acknowledge (reject, ...) it somewhere in the future. -The pull consumer operates with `Option` which is used for expressing either getting the delivery _or_ detecting an empty queue. +The pull consumer uses `PullResult` as return type: +* Ok - contains `DeliveryWithHandle` instance +* EmptyQueue - there was no message in the queue available + +Additionally you can call `.toOption` method on the `PullResult`. + A simplified example: ```scala @@ -377,14 +389,14 @@ val consumer = connection.newPullConsumer[Bytes](???, ???) // receive "up to" 100 deliveries -val deliveries: Future[Seq[Option[DeliveryWithHandle[Future, Bytes]]]] = Future.sequence { (1 to 100).map(_ => consumer.pull()) } +val deliveries: Future[Seq[PullResult[Future, Bytes]]] = Future.sequence { (1 to 100).map(_ => consumer.pull()) } // do your stuff! ??? -// "handle" all deliveries -val handleResult: Future[Unit] = deliveries.flatMap(s => Future.sequence(s.flatten.map(_.handle(DeliveryResult.Ack))).map(_ => Unit)) +// "handle" all deliveries, ignore failures and "empty queue" results +val handleResult: Future[Unit] = deliveries.flatMap(s => Future.sequence(s.flatMap(_.toOption).map(_.handle(DeliveryResult.Ack))).map(_ => Unit)) consumer.close() connection.close() @@ -425,10 +437,7 @@ case class NewFileSourceAdded(fileSources: Seq[FileSource]) val consumer = MultiFormatConsumer.forType[Future, NewFileSourceAdded]( JsonDeliveryConverter.derive(), // requires implicit `io.circe.Decoder[NewFileSourceAdded]` GpbDeliveryConverter[NewFileSourceAddedGpb].derive() // requires implicit `com.avast.cactus.Converter[NewFileSourceAddedGpb, NewFileSourceAdded]` -)( - businessLogic.processMessage, - failureHandler -) +)(businessLogic.processMessage) ``` (see [unit test](core/src/test/scala/com/avast/clients/rabbitmq/MultiFormatConsumerTest.scala) for full example) diff --git a/api/src/main/scala/com/avast/clients/rabbitmq/api/Delivery.scala b/api/src/main/scala/com/avast/clients/rabbitmq/api/Delivery.scala index 8721e389..fe18941b 100644 --- a/api/src/main/scala/com/avast/clients/rabbitmq/api/Delivery.scala +++ b/api/src/main/scala/com/avast/clients/rabbitmq/api/Delivery.scala @@ -1,3 +1,21 @@ package com.avast.clients.rabbitmq.api -case class Delivery[A](body: A, properties: MessageProperties, routingKey: String) +import com.avast.bytes.Bytes + +sealed trait Delivery[+A] { + def properties: MessageProperties + + def routingKey: String +} + +object Delivery { + + case class Ok[+A](body: A, properties: MessageProperties, routingKey: String) extends Delivery[A] + + case class MalformedContent(body: Bytes, properties: MessageProperties, routingKey: String, ce: ConversionException) + extends Delivery[Nothing] + + def apply[A](body: A, properties: MessageProperties, routingKey: String): Delivery.Ok[A] = { + Delivery.Ok(body, properties, routingKey) + } +} diff --git a/api/src/main/scala/com/avast/clients/rabbitmq/api/RabbitMQPullConsumer.scala b/api/src/main/scala/com/avast/clients/rabbitmq/api/RabbitMQPullConsumer.scala index 5711e4ef..7416a069 100644 --- a/api/src/main/scala/com/avast/clients/rabbitmq/api/RabbitMQPullConsumer.scala +++ b/api/src/main/scala/com/avast/clients/rabbitmq/api/RabbitMQPullConsumer.scala @@ -5,16 +5,30 @@ import scala.language.higherKinds trait RabbitMQPullConsumer[F[_], A] { /** Retrieves one message from the queue, if there is any. - * - * @return Some(DeliveryHandle[A]) when there was a message available; None otherwise. */ - def pull(): F[Option[DeliveryWithHandle[F, A]]] + def pull(): F[PullResult[F, A]] } /** Trait which contains `Delivery` and it's _handle through which it can be *acked*, *rejected* etc. */ -trait DeliveryWithHandle[F[_], A] { +trait DeliveryWithHandle[+F[_], +A] { def delivery: Delivery[A] def handle(result: DeliveryResult): F[Unit] } + +sealed trait PullResult[+F[_], +A] { + def toOption: Option[DeliveryWithHandle[F, A]] +} + +object PullResult { + + case class Ok[F[_], A](deliveryWithHandle: DeliveryWithHandle[F, A]) extends PullResult[F, A] { + override def toOption: Option[DeliveryWithHandle[F, A]] = Some(deliveryWithHandle) + } + + case object EmptyQueue extends PullResult[Nothing, Nothing] { + override def toOption: Option[DeliveryWithHandle[Nothing, Nothing]] = None + } + +} diff --git a/api/src/main/scala/com/avast/clients/rabbitmq/api/exceptions.scala b/api/src/main/scala/com/avast/clients/rabbitmq/api/exceptions.scala new file mode 100644 index 00000000..e9ca5f01 --- /dev/null +++ b/api/src/main/scala/com/avast/clients/rabbitmq/api/exceptions.scala @@ -0,0 +1,3 @@ +package com.avast.clients.rabbitmq.api + +case class ConversionException(desc: String, cause: Throwable = null) extends RuntimeException(desc, cause) diff --git a/api/src/main/scala/com/avast/clients/rabbitmq/javaapi/RabbitMQPullConsumer.scala b/api/src/main/scala/com/avast/clients/rabbitmq/javaapi/RabbitMQPullConsumer.scala index f8133d45..66b13560 100644 --- a/api/src/main/scala/com/avast/clients/rabbitmq/javaapi/RabbitMQPullConsumer.scala +++ b/api/src/main/scala/com/avast/clients/rabbitmq/javaapi/RabbitMQPullConsumer.scala @@ -7,10 +7,8 @@ trait RabbitMQPullConsumer extends AutoCloseable { /** * Retrieves one message from the queue, if there is any. - * - * @return Some(DeliveryHandle[A]) when there was a message available; None otherwise. */ - def pull(): CompletableFuture[Optional[DeliveryWithHandle]] + def pull(): CompletableFuture[PullResult] } trait DeliveryWithHandle { @@ -18,3 +16,31 @@ trait DeliveryWithHandle { def handle(result: DeliveryResult): CompletableFuture[Void] } + +sealed trait PullResult { + def toOptional: Optional[DeliveryWithHandle] + + def isOk: Boolean + + def isEmptyQueue: Boolean +} + +object PullResult { + + /* These two are not _case_ intentionally - it pollutes the API for Java users */ + + class Ok(deliveryWithHandle: DeliveryWithHandle) extends PullResult { + def getDeliveryWithHandle: DeliveryWithHandle = deliveryWithHandle + + override val toOptional: Optional[DeliveryWithHandle] = Optional.of(deliveryWithHandle) + override val isOk: Boolean = true + override val isEmptyQueue: Boolean = false + } + + object EmptyQueue extends PullResult { + override val toOptional: Optional[DeliveryWithHandle] = Optional.empty() + override val isOk: Boolean = false + override val isEmptyQueue: Boolean = true + } + +} diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala index 0a032a87..7d45d0a4 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala @@ -117,13 +117,14 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { object Consumer { - def fromConfig[F[_]: ToTask, A: DeliveryConverter](providedConfig: Config, - channel: ServerChannel, - channelFactoryInfo: RabbitMQConnectionInfo, - blockingScheduler: Scheduler, - monitor: Monitor, - consumerListener: ConsumerListener)(readAction: DeliveryReadAction[F, A])( - implicit scheduler: Scheduler): DefaultRabbitMQConsumer = { + def fromConfig[F[_]: ToTask, A: DeliveryConverter]( + providedConfig: Config, + channel: ServerChannel, + channelFactoryInfo: RabbitMQConnectionInfo, + blockingScheduler: Scheduler, + monitor: Monitor, + consumerListener: ConsumerListener, + readAction: DeliveryReadAction[F, A])(implicit scheduler: Scheduler): DefaultRabbitMQConsumer = { val mergedConfig = providedConfig.withFallback(ConsumerDefaultConfig) @@ -140,16 +141,17 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { val consumerConfig = updatedConfig.wrapped.as[ConsumerConfig]("root") - create[F, A](consumerConfig, channel, channelFactoryInfo, blockingScheduler, monitor, consumerListener)(readAction) + create[F, A](consumerConfig, channel, channelFactoryInfo, blockingScheduler, monitor, consumerListener, readAction) } - def create[F[_]: ToTask, A: DeliveryConverter](consumerConfig: ConsumerConfig, - channel: ServerChannel, - channelFactoryInfo: RabbitMQConnectionInfo, - blockingScheduler: Scheduler, - monitor: Monitor, - consumerListener: ConsumerListener)(readAction: DeliveryReadAction[F, A])( - implicit scheduler: Scheduler): DefaultRabbitMQConsumer = { + def create[F[_]: ToTask, A: DeliveryConverter]( + consumerConfig: ConsumerConfig, + channel: ServerChannel, + channelFactoryInfo: RabbitMQConnectionInfo, + blockingScheduler: Scheduler, + monitor: Monitor, + consumerListener: ConsumerListener, + readAction: DeliveryReadAction[F, A])(implicit scheduler: Scheduler): DefaultRabbitMQConsumer = { prepareConsumer(consumerConfig, readAction, channelFactoryInfo, channel, consumerListener, blockingScheduler, monitor) } @@ -157,7 +159,7 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { object PullConsumer { - def fromConfig[F[_]: FromTask, A: DeliveryConverter]( + def fromConfig[F[_]: FromTask: ToTask, A: DeliveryConverter]( providedConfig: Config, channel: ServerChannel, channelFactoryInfo: RabbitMQConnectionInfo, @@ -182,11 +184,12 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { create[F, A](consumerConfig, channel, channelFactoryInfo, blockingScheduler, monitor) } - def create[F[_]: FromTask, A: DeliveryConverter](consumerConfig: PullConsumerConfig, - channel: ServerChannel, - channelFactoryInfo: RabbitMQConnectionInfo, - blockingScheduler: Scheduler, - monitor: Monitor)(implicit scheduler: Scheduler): DefaultRabbitMQPullConsumer[F, A] = { + def create[F[_]: FromTask: ToTask, A: DeliveryConverter]( + consumerConfig: PullConsumerConfig, + channel: ServerChannel, + channelFactoryInfo: RabbitMQConnectionInfo, + blockingScheduler: Scheduler, + monitor: Monitor)(implicit scheduler: Scheduler): DefaultRabbitMQPullConsumer[F, A] = { preparePullConsumer(consumerConfig, channelFactoryInfo, channel, blockingScheduler, monitor) } @@ -326,7 +329,7 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { prepareConsumer(consumerConfig, channelFactoryInfo, channel, readAction, consumerListener, blockingScheduler, monitor) } - private def preparePullConsumer[F[_]: FromTask, A: DeliveryConverter]( + private def preparePullConsumer[F[_]: FromTask: ToTask, A: DeliveryConverter]( consumerConfig: PullConsumerConfig, channelFactoryInfo: RabbitMQConnectionInfo, channel: ServerChannel, @@ -435,15 +438,16 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { } val readAction: DefaultDeliveryReadAction = { - val convAction: DefaultDeliveryReadAction = { (d: Delivery[Bytes]) => + val convAction: DefaultDeliveryReadAction = { d: Delivery[Bytes] => try { - implicitly[DeliveryConverter[A]].convert(d.body) match { - case Right(a) => - val devA = d.copy(body = a) - implicitly[ToTask[F]].apply(userReadAction(devA)) - - case Left(ce) => Task.raiseError(ce) + val devA = d.flatMap { d => + implicitly[DeliveryConverter[A]].convert(d.body) match { + case Right(a) => d.mapBody(_ => a) + case Left(ce) => Delivery.MalformedContent(d.body, d.properties, d.routingKey, ce) + } } + + implicitly[ToTask[F]].apply(userReadAction(devA)) } catch { case NonFatal(e) => Task.raiseError(e) @@ -468,7 +472,7 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { implicit callbackScheduler: Scheduler): DefaultDeliveryReadAction = { import consumerConfig._ - (delivery: Delivery[Bytes]) => + delivery: Delivery[Bytes] => try { // we try to catch also long-lasting synchronous work on the thread val action = Task.deferFuture { @@ -481,18 +485,18 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { action .timeout(ScalaDuration(processTimeout.toMillis, TimeUnit.MILLISECONDS)) - .onErrorRecover { + .onErrorRecoverWith { case e: TimeoutException => traceId.foreach(Kluzo.setTraceId) logger.warn(s"[$name] Task timed-out, applying DeliveryResult.${consumerConfig.timeoutAction}", e) - consumerConfig.timeoutAction + Task.now(consumerConfig.timeoutAction) case NonFatal(e) => traceId.foreach(Kluzo.setTraceId) logger.warn(s"[$name] Error while executing callback, applying DeliveryResult.${consumerConfig.failureAction}", e) - consumerConfig.failureAction + Task.now(consumerConfig.failureAction) } .executeOn(callbackScheduler) } catch { @@ -503,6 +507,23 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { } + private def executeParsingFailureAction[A](consumerConfig: ConsumerConfig, + ce: ConversionException, + delivery: Delivery[Bytes], + parsingFailureAction: ParsingFailureAction[Task]): Task[DeliveryResult] = { + import consumerConfig._ + + // logging on DEBUG, user should provide own higher-level logging in the callback + logger.debug(s"[$name] ${ce.getMessage}, executing parsingFailureAction", ce) + + parsingFailureAction(name, delivery, ce) + .onErrorRecover { + case NonFatal(ex) => + logger.warn(s"[$name] Error while executing parsingFailureAction, applying DeliveryResult.${consumerConfig.failureAction}", ex) + consumerConfig.failureAction + } + } + implicit class WrapConfig(val c: Config) extends AnyVal { def wrapped: Config = { // we need to wrap it with one level, to be able to parse it with Ficus diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConnection.scala b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConnection.scala index c65ed804..59349ed6 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConnection.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConnection.scala @@ -50,7 +50,7 @@ class DefaultRabbitMQConnection[F[_]: FromTask: ToTask](connection: ServerConnec implicit scheduler: Scheduler): DefaultRabbitMQConsumer = { addAutoCloseable { DefaultRabbitMQClientFactory.Consumer - .fromConfig[F, A](config.getConfig(configName), createChannel(), info, blockingScheduler, monitor, consumerListener)(readAction) + .fromConfig[F, A](config.getConfig(configName), createChannel(), info, blockingScheduler, monitor, consumerListener, readAction) } } @@ -141,3 +141,5 @@ class DefaultRabbitMQConnection[F[_]: FromTask: ToTask](connection: ServerConnec } } + +object DefaultRabbitMQConnection extends StrictLogging {} diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQPullConsumer.scala b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQPullConsumer.scala index b6aed939..4f3e6f9d 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQPullConsumer.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQPullConsumer.scala @@ -2,12 +2,11 @@ package com.avast.clients.rabbitmq import java.util.concurrent.atomic.AtomicInteger -import cats.data.OptionT import com.avast.bytes.Bytes -import com.avast.clients.rabbitmq.api.{Delivery, DeliveryResult, DeliveryWithHandle, RabbitMQPullConsumer} +import com.avast.clients.rabbitmq.api._ import com.avast.clients.rabbitmq.javaapi.JavaConverters._ import com.avast.metrics.scalaapi.Monitor -import com.rabbitmq.client.GetResponse +import com.rabbitmq.client.{AMQP, GetResponse} import com.typesafe.scalalogging.StrictLogging import monix.eval.Task import monix.execution.Scheduler @@ -35,64 +34,78 @@ class DefaultRabbitMQPullConsumer[F[_]: FromTask, A: DeliveryConverter]( private def convertMessage(b: Bytes): Either[ConversionException, A] = implicitly[DeliveryConverter[A]].convert(b) - override def pull(): F[Option[DeliveryWithHandle[F, A]]] = implicitly[FromTask[F]].apply { - OptionT[Task, GetResponse] { - Task { - Option(channel.basicGet(queueName, false)) - }.executeOn(blockingScheduler) // blocking operation! - }.semiflatMap { response => - processingCount.incrementAndGet() - - val envelope = response.getEnvelope - val properties = response.getProps - - val deliveryTag = envelope.getDeliveryTag - val messageId = properties.getMessageId - val routingKey = envelope.getRoutingKey - - logger.debug(s"[$name] Read delivery with ID $messageId, deliveryTag $deliveryTag") - - def handleResult(result: DeliveryResult): Task[Unit] = { - super - .handleResult(messageId, deliveryTag, properties, routingKey, response.getBody)(result) - .foreachL { _ => - processingCount.decrementAndGet() - () + override def pull(): F[PullResult[F, A]] = implicitly[FromTask[F]].apply { + Task { + Option(channel.basicGet(queueName, false)) + }.executeOn(blockingScheduler) // blocking operation! + .flatMap { + case Some(response) => + processingCount.incrementAndGet() + + val envelope = response.getEnvelope + val properties = response.getProps + + val deliveryTag = envelope.getDeliveryTag + val messageId = properties.getMessageId + val routingKey = envelope.getRoutingKey + + logger.debug(s"[$name] Read delivery with ID $messageId, deliveryTag $deliveryTag") + + handleMessage(response, properties, routingKey) { result => + super + .handleResult(messageId, deliveryTag, properties, routingKey, response.getBody)(result) + .foreachL { _ => + processingCount.decrementAndGet() + () + } + } + + case None => + Task.now { + PullResult.EmptyQueue.asInstanceOf[PullResult[F, A]] } } + } - try { - val bytes = Bytes.copyFrom(response.getBody) - - convertMessage(bytes) match { - case Right(a) => - val d = Delivery(a, properties.asScala, routingKey) - logger.trace(s"[$name] Received delivery: ${d.copy(body = bytes)}") - - createDeliveryWithHandle(d, handleResult) - - case Left(ce) => - handleResult(failureAction).flatMap(_ => Task.raiseError(ce)) - } - } catch { - case NonFatal(e) => - logger.error( - s"[$name] Error while converting the message, it's probably a BUG; the converter should return Left(ConversionException)", - e - ) - handleResult(failureAction).flatMap(_ => Task.raiseError(e)) + private def handleMessage(response: GetResponse, properties: AMQP.BasicProperties, routingKey: String)( + handleResult: DeliveryResult => Task[Unit]): Task[PullResult[F, A]] = { + try { + val bytes = Bytes.copyFrom(response.getBody) + + val delivery = convertMessage(bytes) match { + case Right(a) => + val delivery = Delivery(a, properties.asScala, routingKey) + logger.trace(s"[$name] Received delivery: ${delivery.copy(body = bytes)}") + delivery + + case Left(ce) => + val delivery = Delivery.MalformedContent(bytes, properties.asScala, routingKey, ce) + logger.trace(s"[$name] Received delivery but could not convert it: $delivery") + delivery } - }.value + + val dwh = createDeliveryWithHandle(delivery, handleResult) + + Task.now { + PullResult.Ok(dwh) + } + } catch { + case NonFatal(e) => + logger.error( + s"[$name] Error while converting the message, it's probably a BUG; the converter should return Left(ConversionException)", + e + ) + + handleResult(failureAction).flatMap(_ => Task.raiseError(e)) + } } - private def createDeliveryWithHandle(d: Delivery[A], handleResult: DeliveryResult => Task[Unit]): Task[DeliveryWithHandle[F, A]] = { - Task.now { - new DeliveryWithHandle[F, A] { - override val delivery: Delivery[A] = d + private def createDeliveryWithHandle[B](d: Delivery[B], handleResult: DeliveryResult => Task[Unit]): DeliveryWithHandle[F, B] = { + new DeliveryWithHandle[F, B] { + override val delivery: Delivery[B] = d - override def handle(result: DeliveryResult): F[Unit] = implicitly[FromTask[F]].apply { - handleResult(result) - } + override def handle(result: DeliveryResult): F[Unit] = implicitly[FromTask[F]].apply { + handleResult(result) } } } diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/MultiFormatConsumer.scala b/core/src/main/scala/com/avast/clients/rabbitmq/MultiFormatConsumer.scala index fafd07dd..a543d7ab 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/MultiFormatConsumer.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/MultiFormatConsumer.scala @@ -1,7 +1,7 @@ package com.avast.clients.rabbitmq import com.avast.bytes.Bytes -import com.avast.clients.rabbitmq.api.{Delivery, DeliveryResult} +import com.avast.clients.rabbitmq.api.{ConversionException, Delivery, DeliveryResult} import com.typesafe.scalalogging.StrictLogging import scala.collection.immutable @@ -9,44 +9,39 @@ import scala.language.higherKinds import scala.util.control.NonFatal class MultiFormatConsumer[F[_], A] private (supportedConverters: immutable.Seq[CheckedDeliveryConverter[A]], - action: Delivery[A] => F[DeliveryResult], - failureAction: (Delivery[Bytes], ConversionException) => F[DeliveryResult]) + action: Delivery[A] => F[DeliveryResult]) extends (Delivery[Bytes] => F[DeliveryResult]) with StrictLogging { override def apply(delivery: Delivery[Bytes]): F[DeliveryResult] = { - val converted: Either[ConversionException, Delivery[A]] = try { + val converted: Delivery[A] = try { supportedConverters .collectFirst { case c if c.canConvert(delivery) => - c.convert(delivery.body).map { newBody => - delivery.copy(body = newBody) + delivery.flatMap[A] { d => + c.convert(d.body) match { + case Right(a) => d.copy(body = a) + case Left(ce) => + logger.debug("Error while converting", ce) + d.toMalformed(ce) + } } } .getOrElse { - Left(ConversionException(s"Could not find suitable converter for $delivery")) + delivery.toMalformed(ConversionException(s"Could not find suitable converter for $delivery")) } } catch { case NonFatal(e) => logger.debug("Error while converting", e) - Left(ConversionException("Error while converting", e)) + delivery.toMalformed(ConversionException("Error while converting", e)) } - converted match { - case Right(convertedDelivery) => action(convertedDelivery) - case Left(ex: ConversionException) => - logger.debug("Could not find suitable converter", ex) - failureAction(delivery, ex) - case Left(ex) => - logger.debug(s"Error while converting of $delivery", ex) - failureAction(delivery, ex) - } + action(converted) } } object MultiFormatConsumer { def forType[F[_], A](supportedConverters: CheckedDeliveryConverter[A]*)( - action: Delivery[A] => F[DeliveryResult], - failureAction: (Delivery[Bytes], ConversionException) => F[DeliveryResult]): MultiFormatConsumer[F, A] = { - new MultiFormatConsumer[F, A](supportedConverters.toList, action, failureAction) + action: Delivery[A] => F[DeliveryResult]): MultiFormatConsumer[F, A] = { + new MultiFormatConsumer[F, A](supportedConverters.toList, action) } } diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/converters.scala b/core/src/main/scala/com/avast/clients/rabbitmq/converters.scala index acebf4d5..f5949a7c 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/converters.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/converters.scala @@ -1,7 +1,7 @@ package com.avast.clients.rabbitmq import com.avast.bytes.Bytes -import com.avast.clients.rabbitmq.api.{Delivery, MessageProperties} +import com.avast.clients.rabbitmq.api.{ConversionException, Delivery, MessageProperties} import scala.annotation.implicitNotFound @@ -39,5 +39,3 @@ object ProductConverter { override def fillProperties(properties: MessageProperties): MessageProperties = properties } } - -case class ConversionException(desc: String, cause: Throwable = null) extends RuntimeException(desc, cause) diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/javaapi/DefaultRabbitMQPullConsumer.scala b/core/src/main/scala/com/avast/clients/rabbitmq/javaapi/DefaultRabbitMQPullConsumer.scala index fd6c37fe..56caff93 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/javaapi/DefaultRabbitMQPullConsumer.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/javaapi/DefaultRabbitMQPullConsumer.scala @@ -1,20 +1,24 @@ package com.avast.clients.rabbitmq.javaapi -import java.util.Optional import java.util.concurrent.CompletableFuture import com.avast.bytes.Bytes -import com.avast.clients.rabbitmq.api.{RabbitMQPullConsumer => ScalaConsumer} +import com.avast.clients.rabbitmq.api.{PullResult => ScalaResult, RabbitMQPullConsumer => ScalaConsumer} import com.avast.clients.rabbitmq.javaapi.JavaConverters._ +import com.typesafe.scalalogging.StrictLogging import scala.concurrent.{ExecutionContext, Future} class DefaultRabbitMQPullConsumer(scalaConsumer: ScalaConsumer[Future, Bytes] with AutoCloseable)(implicit ec: ExecutionContext) - extends RabbitMQPullConsumer { - override def pull(): CompletableFuture[Optional[DeliveryWithHandle]] = { + extends RabbitMQPullConsumer + with StrictLogging { + override def pull(): CompletableFuture[PullResult] = { scalaConsumer .pull() - .map(_.map(_.asJava).asJava) + .map[PullResult] { + case ScalaResult.Ok(dwh) => new PullResult.Ok(dwh.asJava) + case ScalaResult.EmptyQueue => PullResult.EmptyQueue + } .asJava } diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/javaapi/JavaConverters.scala b/core/src/main/scala/com/avast/clients/rabbitmq/javaapi/JavaConverters.scala index c0db010a..756b2a5a 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/javaapi/JavaConverters.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/javaapi/JavaConverters.scala @@ -4,6 +4,7 @@ import java.util.concurrent.{CompletableFuture, Executor} import java.util.{function, Date, Optional} import com.avast.bytes.Bytes +import com.avast.clients.rabbitmq.DeliveryReadAction import com.avast.clients.rabbitmq.api.{ Delivery => ScalaDelivery, DeliveryResult => ScalaResult, @@ -151,14 +152,15 @@ private[rabbitmq] object JavaConverters { } implicit class ScalaDeliveryConversion(val d: ScalaDelivery[Bytes]) extends AnyVal { - def asJava: JavaDelivery = { - new JavaDelivery(d.routingKey, d.body, d.properties.asJava) + def asJava: JavaDelivery = d match { + case ScalaDelivery.Ok(body, properties, routingKey) => new JavaDelivery(routingKey, body, properties.asJava) + case ScalaDelivery.MalformedContent(_, _, _, ce) => throw ce } } implicit class JavaDeliveryConversion(val d: JavaDelivery) extends AnyVal { def asScala: ScalaDelivery[Bytes] = { - ScalaDelivery(d.getBody, d.getProperties.asScala, d.getRoutingKey) + ScalaDelivery.Ok(d.getBody, d.getProperties.asScala, d.getRoutingKey) } } @@ -191,7 +193,7 @@ private[rabbitmq] object JavaConverters { } implicit class JavaActionConversion(val readAction: function.Function[JavaDelivery, CompletableFuture[DeliveryResult]]) extends AnyVal { - def asScala(implicit ex: Executor, ec: ExecutionContext): ScalaDelivery[Bytes] => Future[ScalaResult] = + def asScala(implicit ex: Executor, ec: ExecutionContext): DeliveryReadAction[Future, Bytes] = d => readAction(d.asJava).asScala.map(_.asScala) } diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/rabbitmq.scala b/core/src/main/scala/com/avast/clients/rabbitmq/rabbitmq.scala index 71aa0015..6ea22c6f 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/rabbitmq.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/rabbitmq.scala @@ -2,7 +2,8 @@ package com.avast.clients import cats.arrow.FunctionK import cats.{~>, Monad} -import com.avast.clients.rabbitmq.api.{Delivery, DeliveryResult, MessageProperties, RabbitMQProducer} +import com.avast.bytes.Bytes +import com.avast.clients.rabbitmq.api._ import com.rabbitmq.client.{RecoverableChannel, RecoverableConnection} import mainecoon.FunctorK import monix.eval.Task @@ -17,7 +18,8 @@ package object rabbitmq { private[rabbitmq] type ServerConnection = RecoverableConnection private[rabbitmq] type ServerChannel = RecoverableChannel - type DeliveryReadAction[F[_], A] = Delivery[A] => F[DeliveryResult] + type DeliveryReadAction[F[_], -A] = Delivery[A] => F[DeliveryResult] + type ParsingFailureAction[F[_]] = (String, Delivery[Bytes], ConversionException) => F[DeliveryResult] type FromTask[A[_]] = FunctionK[Task, A] type ToTask[A[_]] = FunctionK[A, Task] @@ -66,4 +68,24 @@ package object rabbitmq { override def tailRecM[A, B](a: A)(f: A => Task[Either[A, B]]): Task[B] = Task.tailRecM(a)(f) } + private[rabbitmq] implicit class DeliveryOps[A](val d: Delivery[A]) extends AnyVal { + def mapBody[B](f: A => B): Delivery[B] = d match { + case ok: Delivery.Ok[A] => ok.copy(body = f(ok.body)) + case m: Delivery.MalformedContent => m + } + + def flatMap[B](f: Delivery.Ok[A] => Delivery[B]): Delivery[B] = d match { + case ok: Delivery.Ok[A] => f(ok) + case m: Delivery.MalformedContent => m + } + } + + private[rabbitmq] implicit class DeliveryBytesOps(val d: Delivery[Bytes]) extends AnyVal { + + def toMalformed(ce: ConversionException): Delivery.MalformedContent = d match { + case ok: Delivery.Ok[Bytes] => Delivery.MalformedContent(ok.body, ok.properties, ok.routingKey, ce) + case m: Delivery.MalformedContent => m.copy(ce = ce) + } + } + } diff --git a/core/src/test/java/ExampleJava.java b/core/src/test/java/ExampleJava.java index b3638029..99755a12 100644 --- a/core/src/test/java/ExampleJava.java +++ b/core/src/test/java/ExampleJava.java @@ -4,13 +4,10 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.*; public class ExampleJava { - public static void main(String[] args) { + public static void main(String[] args) throws ExecutionException, InterruptedException { final Config config = ConfigFactory.load().getConfig("myConfig"); final String routingKey = config.getString("consumer.queueName"); @@ -26,6 +23,12 @@ public static void main(String[] args) { ExampleJava::handleDelivery ); + final RabbitMQPullConsumer rabbitMQPullConsumer = connection.newPullConsumer( + "consumer", + NoOpMonitor.INSTANCE, + callbackExecutor + ); + final RabbitMQProducer rabbitMQProducer = connection.newProducer( "producer", NoOpMonitor.INSTANCE, @@ -39,6 +42,18 @@ public static void main(String[] args) { System.err.println("Message could not be sent: " + e.getClass().getName() + ": " + e.getMessage()); } } + + /* Pull consumer: */ + + final PullResult pullResult = rabbitMQPullConsumer.pull().get(); + + if (pullResult.isOk()) { + PullResult.Ok result = (PullResult.Ok) pullResult; + + final DeliveryWithHandle deliveryWithHandle = result.getDeliveryWithHandle(); + + deliveryWithHandle.handle(DeliveryResult.Ack()).get(); + } } private static CompletableFuture handleDelivery(Delivery delivery) { diff --git a/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQPullConsumerTest.scala b/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQPullConsumerTest.scala index 20ed9d64..f284828e 100644 --- a/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQPullConsumerTest.scala +++ b/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQPullConsumerTest.scala @@ -3,7 +3,7 @@ package com.avast.clients.rabbitmq import java.util.UUID import com.avast.bytes.Bytes -import com.avast.clients.rabbitmq.api.DeliveryResult +import com.avast.clients.rabbitmq.api.{ConversionException, Delivery, DeliveryResult, PullResult} import com.avast.metrics.scalaapi.Monitor import com.rabbitmq.client.AMQP.BasicProperties import com.rabbitmq.client.impl.recovery.AutorecoveringChannel @@ -51,7 +51,7 @@ class DefaultRabbitMQPullConsumerTest extends FunSuite with MockitoSugar with Sc Scheduler.global ) - val Some(dwh) = consumer.pull().futureValue + val PullResult.Ok(dwh) = consumer.pull().futureValue assertResult(Some(messageId))(dwh.delivery.properties.messageId) @@ -93,7 +93,7 @@ class DefaultRabbitMQPullConsumerTest extends FunSuite with MockitoSugar with Sc Scheduler.global ) - val Some(dwh) = consumer.pull().futureValue + val PullResult.Ok(dwh) = consumer.pull().futureValue assertResult(Some(messageId))(dwh.delivery.properties.messageId) @@ -135,7 +135,7 @@ class DefaultRabbitMQPullConsumerTest extends FunSuite with MockitoSugar with Sc Scheduler.global ) - val Some(dwh) = consumer.pull().futureValue + val PullResult.Ok(dwh) = consumer.pull().futureValue assertResult(Some(messageId))(dwh.delivery.properties.messageId) @@ -176,7 +176,7 @@ class DefaultRabbitMQPullConsumerTest extends FunSuite with MockitoSugar with Sc Scheduler.global ) - val Some(dwh) = consumer.pull().futureValue + val PullResult.Ok(dwh) = consumer.pull().futureValue assertResult(Some(messageId))(dwh.delivery.properties.messageId) @@ -258,23 +258,24 @@ class DefaultRabbitMQPullConsumerTest extends FunSuite with MockitoSugar with Sc case class Abc(i: Int) implicit val c: DeliveryConverter[Abc] = (_: Bytes) => { - Left(ConversionException("")) + Left(ConversionException(messageId)) } val consumer = new DefaultRabbitMQPullConsumer[Future, Abc]( "test", channel, "queueName", - DeliveryResult.Retry, + DeliveryResult.Ack, Monitor.noOp, Scheduler.global ) - try { - consumer.pull().futureValue - } catch { - case e: TestFailedException if e.getCause.isInstanceOf[ConversionException] => // ok - } + val PullResult.Ok(dwh) = consumer.pull().futureValue + val Delivery.MalformedContent(_, _, _, ce) = dwh.delivery + + assertResult(messageId)(ce.getMessage) + + dwh.handle(DeliveryResult.Retry) eventually(timeout(Span(1, Seconds)), interval(Span(0.1, Seconds))) { verify(channel, times(0)).basicAck(deliveryTag, false) diff --git a/core/src/test/scala/com/avast/clients/rabbitmq/LiveTest.scala b/core/src/test/scala/com/avast/clients/rabbitmq/LiveTest.scala index c4bc1d0c..2161a262 100644 --- a/core/src/test/scala/com/avast/clients/rabbitmq/LiveTest.scala +++ b/core/src/test/scala/com/avast/clients/rabbitmq/LiveTest.scala @@ -4,8 +4,9 @@ import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{CountDownLatch, Executors, Semaphore, TimeUnit} import com.avast.bytes.Bytes -import com.avast.clients.rabbitmq.api.{Delivery, DeliveryResult, MessageProperties} +import com.avast.clients.rabbitmq.api._ import com.avast.clients.rabbitmq.extras.PoisonedMessageHandler +import com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter import com.avast.continuity.Continuity import com.avast.continuity.monix.Monix import com.avast.kluzo.{Kluzo, TraceId} @@ -78,7 +79,7 @@ class LiveTest extends FunSuite with Eventually with ScalaFutures with StrictLog val rabbitConnection = RabbitMQConnection.fromConfig[Task](config, ex) - rabbitConnection.newConsumer("consumer", Monitor.noOp()) { (_: Delivery[Bytes]) => + rabbitConnection.newConsumer("consumer", Monitor.noOp()) { _: Delivery[Bytes] => counter.incrementAndGet() logger.debug(s"Kluzo: ${Kluzo.getTraceId}") assertResult(true)(Kluzo.getTraceId.nonEmpty) @@ -114,7 +115,7 @@ class LiveTest extends FunSuite with Eventually with ScalaFutures with StrictLog val d = new AtomicInteger(0) - rabbitConnection.newConsumer("consumer", Monitor.noOp()) { (_: Delivery[Bytes]) => + rabbitConnection.newConsumer("consumer", Monitor.noOp()) { _: Delivery[Bytes] => Task { Thread.sleep(if (d.get() % 2 == 0) 300 else 0) latch.countDown() @@ -146,7 +147,7 @@ class LiveTest extends FunSuite with Eventually with ScalaFutures with StrictLog val rabbitConnection = RabbitMQConnection.fromConfig[Task](config, ex) - rabbitConnection.newConsumer("consumer", Monitor.noOp()) { (_: Delivery[Bytes]) => + rabbitConnection.newConsumer("consumer", Monitor.noOp()) { _: Delivery[Bytes] => latch.countDown() Task.now(Ack) } @@ -172,7 +173,7 @@ class LiveTest extends FunSuite with Eventually with ScalaFutures with StrictLog val cnt = new AtomicInteger(0) - rabbitConnection.newConsumer("consumer", Monitor.noOp()) { (_: Delivery[Bytes]) => + rabbitConnection.newConsumer("consumer", Monitor.noOp()) { _: Delivery[Bytes] => cnt.incrementAndGet() assertResult(true)(Kluzo.getTraceId.nonEmpty) @@ -203,7 +204,7 @@ class LiveTest extends FunSuite with Eventually with ScalaFutures with StrictLog val cnt = new AtomicInteger(0) - rabbitConnection.newConsumer("consumer", Monitor.noOp()) { (_: Delivery[Bytes]) => + rabbitConnection.newConsumer("consumer", Monitor.noOp()) { _: Delivery[Bytes] => cnt.incrementAndGet() Thread.sleep(800) // timeout is set to 500 ms @@ -232,7 +233,7 @@ class LiveTest extends FunSuite with Eventually with ScalaFutures with StrictLog val traceId = TraceId.generate - rabbitConnection.newConsumer("consumer", Monitor.noOp()) { (_: Delivery[Bytes]) => + rabbitConnection.newConsumer("consumer", Monitor.noOp()) { _: Delivery[Bytes] => cnt.incrementAndGet() assertResult(Some(traceId))(Kluzo.getTraceId) Success(Ack) @@ -262,7 +263,7 @@ class LiveTest extends FunSuite with Eventually with ScalaFutures with StrictLog val traceId = "someTraceId" - rabbitConnection.newConsumer("consumer", Monitor.noOp()) { (_: Delivery[Bytes]) => + rabbitConnection.newConsumer("consumer", Monitor.noOp()) { _: Delivery[Bytes] => cnt.incrementAndGet() assertResult(Some(TraceId(traceId)))(Kluzo.getTraceId) Success(Ack) @@ -301,7 +302,7 @@ class LiveTest extends FunSuite with Eventually with ScalaFutures with StrictLog _ <- rabbitConnection.bindQueue("additionalDeclarations.bindQueue") } yield Done - rabbitConnection.newConsumer("consumer", Monitor.noOp()) { (_: Delivery[Bytes]) => + rabbitConnection.newConsumer("consumer", Monitor.noOp()) { _: Delivery[Bytes] => latch.countDown() Success(DeliveryResult.Ack) } @@ -327,12 +328,12 @@ class LiveTest extends FunSuite with Eventually with ScalaFutures with StrictLog val rabbitConnection = RabbitMQConnection.fromConfig[Task](config, ex) - val h = PoisonedMessageHandler.withCustomPoisonedAction[Task, Bytes](2) { (_: Delivery[Bytes]) => + val h = PoisonedMessageHandler.withCustomPoisonedAction[Task, Bytes](2) { _: Delivery[Bytes] => Task { processed.incrementAndGet() DeliveryResult.Republish() } - } { (_: Delivery[Bytes]) => + } { _: Delivery[Bytes] => Task { poisoned.incrementAndGet() () @@ -373,7 +374,7 @@ class LiveTest extends FunSuite with Eventually with ScalaFutures with StrictLog } for (_ <- 1 to 3) { - val Some(dwh) = consumer.pull().futureValue + val PullResult.Ok(dwh) = consumer.pull().futureValue dwh.handle(DeliveryResult.Ack) } @@ -382,7 +383,7 @@ class LiveTest extends FunSuite with Eventually with ScalaFutures with StrictLog } for (_ <- 1 to 7) { - val Some(dwh) = consumer.pull().futureValue + val PullResult.Ok(dwh) = consumer.pull().futureValue dwh.handle(DeliveryResult.Ack) } @@ -391,7 +392,51 @@ class LiveTest extends FunSuite with Eventually with ScalaFutures with StrictLog } for (_ <- 1 to 10) { - assertResult(None)(consumer.pull().futureValue) + assertResult(PullResult.EmptyQueue)(consumer.pull().futureValue) } } + + test("consumer parsing failure") { + val c = createConfig() + import c._ + import io.circe.generic.auto._ + case class Abc(str: String) + + implicit val conv = JsonDeliveryConverter.derive[Abc]() + + val rabbitConnection = RabbitMQConnection.fromConfig[Future](config, ex) + + val parsingFailures = new AtomicInteger(0) + val processing = new AtomicInteger(0) + + rabbitConnection.newConsumer[Abc]("consumer", Monitor.noOp()) { + case _: Delivery.Ok[Abc] => + processing.incrementAndGet() + Future.successful(DeliveryResult.Ack) + + case d: Delivery.MalformedContent => + assertResult(10)(d.body.size()) + + val i = parsingFailures.incrementAndGet() + Future.successful( + if (i > 3) DeliveryResult.Ack + else { + logger.info(s"Retrying $i", d.ce) + DeliveryResult.Retry + }) + } + + val sender = rabbitConnection.newProducer("producer", Monitor.noOp()) + + sender.send("test", Bytes.copyFromUtf8(randomString(10))).futureValue + + eventually(timeout = timeout(Span(5, Seconds))) { + assertResult(0)(testHelper.getMessagesCount(queueName)) + + assertResult(0)(processing.get()) + assertResult(4)(parsingFailures.get()) + + } + + } } diff --git a/core/src/test/scala/com/avast/clients/rabbitmq/MultiFormatConsumerTest.scala b/core/src/test/scala/com/avast/clients/rabbitmq/MultiFormatConsumerTest.scala index 63e2a369..d810f1e9 100644 --- a/core/src/test/scala/com/avast/clients/rabbitmq/MultiFormatConsumerTest.scala +++ b/core/src/test/scala/com/avast/clients/rabbitmq/MultiFormatConsumerTest.scala @@ -3,7 +3,7 @@ package com.avast.clients.rabbitmq import com.avast.bytes.Bytes import com.avast.bytes.gpb.ByteStringBytes import com.avast.cactus.bytes._ -import com.avast.clients.rabbitmq.api.{Delivery, DeliveryResult, MessageProperties} +import com.avast.clients.rabbitmq.api.{ConversionException, Delivery, DeliveryResult, MessageProperties} import com.avast.clients.rabbitmq.extras.format._ import com.avast.clients.rabbitmq.test.ExampleEvents.{FileSource => FileSourceGpb, NewFileSourceAdded => NewFileSourceAddedGpb} import com.google.protobuf.ByteString @@ -32,13 +32,13 @@ class MultiFormatConsumerTest extends FunSuite with ScalaFutures { case class NewFileSourceAdded(fileSources: Seq[FileSource]) test("basic") { - val consumer = MultiFormatConsumer.forType[Future, String](StringDeliveryConverter)( - d => { + val consumer = MultiFormatConsumer.forType[Future, String](StringDeliveryConverter) { + case d: Delivery.Ok[String] => assertResult("abc321")(d.body) Future.successful(DeliveryResult.Ack) - }, - (_, _) => Future.successful(DeliveryResult.Reject) - ) + + case _ => fail() + } val delivery = Delivery( body = Bytes.copyFromUtf8("abc321"), @@ -52,10 +52,12 @@ class MultiFormatConsumerTest extends FunSuite with ScalaFutures { } test("non-supported content-type") { - val consumer = MultiFormatConsumer.forType[Future, String](StringDeliveryConverter)( - _ => Future.successful(DeliveryResult.Ack), - (_, _) => Future.successful(DeliveryResult.Reject) - ) + val consumer = MultiFormatConsumer.forType[Future, String](StringDeliveryConverter) { + case _: Delivery.Ok[NewFileSourceAdded] => + Future.successful(DeliveryResult.Ack) + case _ => + Future.successful(DeliveryResult.Reject) + } val delivery = Delivery( body = Bytes.copyFromUtf8("abc321"), @@ -69,8 +71,8 @@ class MultiFormatConsumerTest extends FunSuite with ScalaFutures { } test("json") { - val consumer = MultiFormatConsumer.forType[Future, NewFileSourceAdded](JsonDeliveryConverter.derive())( - d => { + val consumer = MultiFormatConsumer.forType[Future, NewFileSourceAdded](JsonDeliveryConverter.derive()) { + case d: Delivery.Ok[NewFileSourceAdded] => assertResult( NewFileSourceAdded( Seq( @@ -79,9 +81,9 @@ class MultiFormatConsumerTest extends FunSuite with ScalaFutures { )))(d.body) Future.successful(DeliveryResult.Ack) - }, - (_, _) => Future.successful(DeliveryResult.Reject) - ) + + case _ => Future.successful(DeliveryResult.Reject) + } val delivery = Delivery( body = Bytes.copyFromUtf8(s""" @@ -100,11 +102,9 @@ class MultiFormatConsumerTest extends FunSuite with ScalaFutures { } test("gpb") { - val consumer = MultiFormatConsumer.forType[Future, NewFileSourceAdded]( - JsonDeliveryConverter.derive(), - GpbDeliveryConverter[NewFileSourceAddedGpb].derive() - )( - d => { + val consumer = MultiFormatConsumer.forType[Future, NewFileSourceAdded](JsonDeliveryConverter.derive(), + GpbDeliveryConverter[NewFileSourceAddedGpb].derive()) { + case d: Delivery.Ok[NewFileSourceAdded] => assertResult( NewFileSourceAdded( Seq( @@ -113,9 +113,9 @@ class MultiFormatConsumerTest extends FunSuite with ScalaFutures { )))(d.body) Future.successful(DeliveryResult.Ack) - }, - (_, _) => Future.successful(DeliveryResult.Reject) - ) + + case _ => fail() + } val delivery = Delivery( body = ByteStringBytes.wrap { diff --git a/extras-cactus/src/main/scala/com/avast/clients/rabbitmq/extras/format/GpbDeliveryConverter.scala b/extras-cactus/src/main/scala/com/avast/clients/rabbitmq/extras/format/GpbDeliveryConverter.scala index 2f05efbf..58fb2c99 100644 --- a/extras-cactus/src/main/scala/com/avast/clients/rabbitmq/extras/format/GpbDeliveryConverter.scala +++ b/extras-cactus/src/main/scala/com/avast/clients/rabbitmq/extras/format/GpbDeliveryConverter.scala @@ -4,8 +4,8 @@ import cats.syntax.either._ import com.avast.bytes.Bytes import com.avast.cactus.CactusParser._ import com.avast.cactus.Converter -import com.avast.clients.rabbitmq.api.Delivery -import com.avast.clients.rabbitmq.{CheckedDeliveryConverter, ConversionException} +import com.avast.clients.rabbitmq.CheckedDeliveryConverter +import com.avast.clients.rabbitmq.api.{ConversionException, Delivery} import com.google.protobuf.MessageLite import scala.annotation.implicitNotFound diff --git a/extras-cactus/src/main/scala/com/avast/clients/rabbitmq/extras/format/GpbProductConverter.scala b/extras-cactus/src/main/scala/com/avast/clients/rabbitmq/extras/format/GpbProductConverter.scala index 0cb38b7b..7ea51c78 100644 --- a/extras-cactus/src/main/scala/com/avast/clients/rabbitmq/extras/format/GpbProductConverter.scala +++ b/extras-cactus/src/main/scala/com/avast/clients/rabbitmq/extras/format/GpbProductConverter.scala @@ -5,8 +5,8 @@ import com.avast.bytes.Bytes import com.avast.bytes.gpb.ByteStringBytes import com.avast.cactus.CactusParser._ import com.avast.cactus.Converter -import com.avast.clients.rabbitmq.api.MessageProperties -import com.avast.clients.rabbitmq.{ConversionException, ProductConverter} +import com.avast.clients.rabbitmq.ProductConverter +import com.avast.clients.rabbitmq.api.{ConversionException, MessageProperties} import com.google.protobuf.MessageLite import scala.annotation.implicitNotFound diff --git a/extras-circe/src/main/scala/com/avast/clients/rabbitmq/extras/format/JsonDeliveryConverter.scala b/extras-circe/src/main/scala/com/avast/clients/rabbitmq/extras/format/JsonDeliveryConverter.scala index a66c7983..107feabf 100644 --- a/extras-circe/src/main/scala/com/avast/clients/rabbitmq/extras/format/JsonDeliveryConverter.scala +++ b/extras-circe/src/main/scala/com/avast/clients/rabbitmq/extras/format/JsonDeliveryConverter.scala @@ -2,8 +2,8 @@ package com.avast.clients.rabbitmq.extras.format import cats.syntax.either._ import com.avast.bytes.Bytes -import com.avast.clients.rabbitmq.api.Delivery -import com.avast.clients.rabbitmq.{CheckedDeliveryConverter, ConversionException} +import com.avast.clients.rabbitmq.CheckedDeliveryConverter +import com.avast.clients.rabbitmq.api.{ConversionException, Delivery} import io.circe.Decoder import io.circe.parser.decode diff --git a/extras-circe/src/main/scala/com/avast/clients/rabbitmq/extras/format/JsonProductConverter.scala b/extras-circe/src/main/scala/com/avast/clients/rabbitmq/extras/format/JsonProductConverter.scala index 97a1edec..f815e22d 100644 --- a/extras-circe/src/main/scala/com/avast/clients/rabbitmq/extras/format/JsonProductConverter.scala +++ b/extras-circe/src/main/scala/com/avast/clients/rabbitmq/extras/format/JsonProductConverter.scala @@ -1,8 +1,8 @@ package com.avast.clients.rabbitmq.extras.format import com.avast.bytes.Bytes -import com.avast.clients.rabbitmq.api.MessageProperties -import com.avast.clients.rabbitmq.{ConversionException, ProductConverter} +import com.avast.clients.rabbitmq.ProductConverter +import com.avast.clients.rabbitmq.api.{ConversionException, MessageProperties} import io.circe.Encoder import io.circe.syntax._ diff --git a/extras/src/main/scala/com/avast/clients/rabbitmq/extras/PoisonedMessageHandler.scala b/extras/src/main/scala/com/avast/clients/rabbitmq/extras/PoisonedMessageHandler.scala index e44626c0..abb3955c 100644 --- a/extras/src/main/scala/com/avast/clients/rabbitmq/extras/PoisonedMessageHandler.scala +++ b/extras/src/main/scala/com/avast/clients/rabbitmq/extras/PoisonedMessageHandler.scala @@ -26,33 +26,35 @@ class PoisonedMessageHandler[F[_]: FromTask: ToTask, A](maxAttempts: Int)(wrappe convertFromF { wrappedAction(delivery) }.flatMap { - case Republish(newHeaders) => - // get current attempt no. from passed headers with fallback to original (incoming) headers - the fallback will most likely happen - // but we're giving the programmer chance to programatically _pretend_ lower attempt number - val attempt = (newHeaders ++ delivery.properties.headers) - .get(RepublishCountHeaderName) - .flatMap(v => Try(v.toString.toInt).toOption) - .getOrElse(0) + 1 - - logger.debug(s"Attempt $attempt/$maxAttempts") - - if (attempt < maxAttempts) { - Task.now(Republish(newHeaders + (RepublishCountHeaderName -> attempt.asInstanceOf[AnyRef]))) - } else { - convertFromF { - handlePoisonedMessage(delivery) - }.onErrorRecover { - case NonFatal(e) => - logger.warn("Custom poisoned message handler failed", e) - Done - } - .map(_ => Reject) // always REJECT the message - } - + case Republish(newHeaders) => republishDelivery(delivery, newHeaders) case r => Task.now(r) // keep other results as they are } } + private def republishDelivery(delivery: Delivery[A], newHeaders: Map[String, AnyRef]): Task[DeliveryResult] = { + // get current attempt no. from passed headers with fallback to original (incoming) headers - the fallback will most likely happen + // but we're giving the programmer chance to programatically _pretend_ lower attempt number + val attempt = (newHeaders ++ delivery.properties.headers) + .get(RepublishCountHeaderName) + .flatMap(v => Try(v.toString.toInt).toOption) + .getOrElse(0) + 1 + + logger.debug(s"Attempt $attempt/$maxAttempts") + + if (attempt < maxAttempts) { + Task.now(Republish(newHeaders + (RepublishCountHeaderName -> attempt.asInstanceOf[AnyRef]))) + } else { + convertFromF { + handlePoisonedMessage(delivery) + }.onErrorRecover { + case NonFatal(e) => + logger.warn("Custom poisoned message handler failed", e) + Done + } + .map(_ => Reject) // always REJECT the message + } + } + /** This method logs the delivery by default but can be overridden. The delivery is always REJECTed after this method execution. */ protected def handlePoisonedMessage(delivery: Delivery[A]): F[Unit] = convertToF { diff --git a/scalastyle_config.xml b/scalastyle_config.xml index 4d6ec734..f2bd6ee5 100644 --- a/scalastyle_config.xml +++ b/scalastyle_config.xml @@ -128,7 +128,7 @@ it tries to lay a common ground using the official style guide: http://docs.scal - 8 + 10