From db9d2027dac0b1453a75cbd2066af8d0d6cfffcc Mon Sep 17 00:00:00 2001 From: Kolena Jan Date: Wed, 29 Jan 2020 20:36:39 +0100 Subject: [PATCH 1/2] Streaming support StreamingRabbitMQConsumer - providing fs2.Stream of deliveries --- api/build.gradle | 1 + .../api/RabbitMQStreamingConsumer.scala | 16 + build.gradle | 9 +- .../avast/clients/rabbitmq/ConsumerBase.scala | 7 +- .../rabbitmq/ConsumerWithCallbackBase.scala | 119 ++++++++ .../DefaultRabbitMQClientFactory.scala | 93 ++++-- .../rabbitmq/DefaultRabbitMQConnection.scala | 27 +- .../rabbitmq/DefaultRabbitMQConsumer.scala | 115 +------ .../DefaultRabbitMQStreamingConsumer.scala | 285 ++++++++++++++++++ .../clients/rabbitmq/RabbitMQConnection.scala | 10 +- .../clients/rabbitmq/configuration.scala | 8 + core/src/test/resources/application.conf | 40 +++ core/src/test/resources/logback.xml | 7 +- .../DefaultRabbitMQConsumerTest.scala | 10 +- .../DefaultRabbitMQPullConsumerTest.scala | 6 + .../com/avast/clients/rabbitmq/LiveTest.scala | 206 ++++++++++++- .../avast/clients/rabbitmq/TestHelper.scala | 41 ++- .../pureconfig/ConfigRabbitMQConnection.scala | 20 +- .../pureconfig/PureconfigImplicits.scala | 1 + .../rabbitmq/pureconfig/pureconfig.scala | 1 + 20 files changed, 863 insertions(+), 159 deletions(-) create mode 100644 api/src/main/scala/com/avast/clients/rabbitmq/api/RabbitMQStreamingConsumer.scala create mode 100644 core/src/main/scala/com/avast/clients/rabbitmq/ConsumerWithCallbackBase.scala create mode 100644 core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQStreamingConsumer.scala diff --git a/api/build.gradle b/api/build.gradle index 24d22229..407acbd4 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -2,6 +2,7 @@ archivesBaseName = "rabbitmq-client-api_$scalaVersion" dependencies { compile "com.avast.bytes:bytes-core:${bytesVersion}" + compile "co.fs2:fs2-core_$scalaVersion:$fs2Version" compile "com.kailuowang:mainecoon-core_$scalaVersion:0.6.2" } diff --git a/api/src/main/scala/com/avast/clients/rabbitmq/api/RabbitMQStreamingConsumer.scala b/api/src/main/scala/com/avast/clients/rabbitmq/api/RabbitMQStreamingConsumer.scala new file mode 100644 index 00000000..2fa53c0a --- /dev/null +++ b/api/src/main/scala/com/avast/clients/rabbitmq/api/RabbitMQStreamingConsumer.scala @@ -0,0 +1,16 @@ +package com.avast.clients.rabbitmq.api + +import scala.language.higherKinds + +trait RabbitMQStreamingConsumer[F[_], A] { + def deliveryStream: fs2.Stream[F, StreamedDelivery[F, A]] +} + +trait StreamedDelivery[+F[_], +A] { + def delivery: Delivery[A] + + def handle(result: DeliveryResult): F[StreamedResult] +} + +sealed trait StreamedResult +object StreamedResult extends StreamedResult diff --git a/build.gradle b/build.gradle index f62aa903..f53e47cd 100644 --- a/build.gradle +++ b/build.gradle @@ -83,6 +83,12 @@ subprojects { scalaCompileOptions.additionalParameters = ['-target:jvm-1.8'] } + test { + testLogging { + showStandardStreams = true + } + } + sourceCompatibility = '1.8' targetCompatibility = '1.8' @@ -100,6 +106,7 @@ subprojects { cactusVersion = "0.16.12" catsVersion = "2.0.0" catsEffectVersion = "2.0.0" + fs2Version = "2.2.1" monixVersion = "3.0.0" // just for tests! } @@ -124,7 +131,7 @@ subprojects { } tasks.withType(ScalaCompile) { - List plugins = configurations.scalaCompilerPlugin.files.collect{ "-Xplugin:${it.getAbsolutePath()}".toString() } + List plugins = configurations.scalaCompilerPlugin.files.collect { "-Xplugin:${it.getAbsolutePath()}".toString() } plugins.add("-Ypartial-unification") scalaCompileOptions.additionalParameters = plugins } diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/ConsumerBase.scala b/core/src/main/scala/com/avast/clients/rabbitmq/ConsumerBase.scala index 2144f8da..8371831f 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/ConsumerBase.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/ConsumerBase.scala @@ -1,10 +1,11 @@ package com.avast.clients.rabbitmq -import cats.effect.{Blocker, ContextShift, Effect, Sync} +import cats.effect.{Blocker, ContextShift, Sync} import com.avast.clients.rabbitmq.DefaultRabbitMQConsumer._ import com.avast.clients.rabbitmq.api.DeliveryResult import com.avast.metrics.scalaapi.Monitor import com.rabbitmq.client.AMQP.BasicProperties +import com.rabbitmq.client.ShutdownSignalException import com.typesafe.scalalogging.StrictLogging import scala.collection.JavaConverters._ @@ -45,6 +46,7 @@ private[rabbitmq] trait ConsumerBase[F[_]] extends StrictLogging { blocker.delay { try { logger.debug(s"[$name] ACK delivery ID $messageId, deliveryTag $deliveryTag") + if (!channel.isOpen) throw new IllegalStateException("Cannot ack delivery on closed channel") channel.basicAck(deliveryTag, false) resultAckMeter.mark() } catch { @@ -56,6 +58,7 @@ private[rabbitmq] trait ConsumerBase[F[_]] extends StrictLogging { blocker.delay { try { logger.debug(s"[$name] REJECT delivery ID $messageId, deliveryTag $deliveryTag") + if (!channel.isOpen) throw new IllegalStateException("Cannot reject delivery on closed channel") channel.basicReject(deliveryTag, false) resultRejectMeter.mark() } catch { @@ -67,6 +70,7 @@ private[rabbitmq] trait ConsumerBase[F[_]] extends StrictLogging { blocker.delay { try { logger.debug(s"[$name] REJECT (with requeue) delivery ID $messageId, deliveryTag $deliveryTag") + if (!channel.isOpen) throw new IllegalStateException("Cannot retry delivery on closed channel") channel.basicReject(deliveryTag, true) resultRetryMeter.mark() } catch { @@ -78,6 +82,7 @@ private[rabbitmq] trait ConsumerBase[F[_]] extends StrictLogging { blocker.delay { try { logger.debug(s"[$name] Republishing delivery (ID $messageId, deliveryTag $deliveryTag) to end of queue '$queueName'") + if (!channel.isOpen) throw new IllegalStateException("Cannot republish delivery on closed channel") channel.basicPublish("", queueName, properties, body) channel.basicAck(deliveryTag, false) resultRepublishMeter.mark() diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/ConsumerWithCallbackBase.scala b/core/src/main/scala/com/avast/clients/rabbitmq/ConsumerWithCallbackBase.scala new file mode 100644 index 00000000..e71f9a33 --- /dev/null +++ b/core/src/main/scala/com/avast/clients/rabbitmq/ConsumerWithCallbackBase.scala @@ -0,0 +1,119 @@ +package com.avast.clients.rabbitmq + +import java.time.{Duration, Instant} +import java.util.concurrent.RejectedExecutionException +import java.util.concurrent.atomic.AtomicInteger + +import cats.effect.{Effect, Sync} +import cats.syntax.all._ +import com.avast.bytes.Bytes +import com.avast.clients.rabbitmq.JavaConverters._ +import com.avast.clients.rabbitmq.api.{Delivery, DeliveryResult} +import com.avast.metrics.scalaapi._ +import com.rabbitmq.client.AMQP.BasicProperties +import com.rabbitmq.client.{DefaultConsumer, ShutdownSignalException} + +import scala.collection.JavaConverters._ +import scala.language.higherKinds +import scala.util.control.NonFatal + +abstract class ConsumerWithCallbackBase[F[_]: Effect](channel: ServerChannel, + failureAction: DeliveryResult, + consumerListener: ConsumerListener) + extends DefaultConsumer(channel) + with ConsumerBase[F] { + + override protected implicit val F: Sync[F] = Effect[F] + + protected val readMeter: Meter = monitor.meter("read") + + protected val processingFailedMeter: Meter = resultsMonitor.meter("processingFailed") + + protected val tasksMonitor: Monitor = monitor.named("tasks") + + protected val processingCount: AtomicInteger = new AtomicInteger(0) + + tasksMonitor.gauge("processing")(() => processingCount.get()) + + protected val processedTimer: TimerPair = tasksMonitor.timerPair("processed") + + override def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException): Unit = + consumerListener.onShutdown(this, channel, name, consumerTag, sig) + + protected def handleDelivery(messageId: String, deliveryTag: Long, properties: BasicProperties, routingKey: String, body: Array[Byte])( + readAction: DeliveryReadAction[F, Bytes]): F[Unit] = + F.delay { + try { + readMeter.mark() + + logger.debug(s"[$name] Read delivery with ID $messageId, deliveryTag $deliveryTag") + + val delivery = Delivery(Bytes.copyFrom(body), properties.asScala, Option(routingKey).getOrElse("")) + + logger.trace(s"[$name] Received delivery: $delivery") + + val st = Instant.now() + + @inline + def taskDuration: Duration = Duration.between(st, Instant.now()) + + readAction(delivery) + .flatMap { handleResult(messageId, deliveryTag, properties, routingKey, body) } + .flatTap(_ => + F.delay { + val duration = taskDuration + logger.debug(s"[$name] Delivery ID $messageId handling succeeded in $duration") + processedTimer.update(duration) + }) + .recoverWith { + case NonFatal(t) => + F.delay { + val duration = taskDuration + logger.debug(s"[$name] Delivery ID $messageId handling failed in $duration", t) + processedTimer.updateFailure(duration) + } >> + handleCallbackFailure(messageId, deliveryTag, properties, routingKey, body)(t) + } + } catch { + // we catch this specific exception, handling of others is up to Lyra + case e: RejectedExecutionException => + logger.debug(s"[$name] Executor was unable to plan the handling task", e) + handleFailure(messageId, deliveryTag, properties, routingKey, body, e) + + case NonFatal(e) => handleCallbackFailure(messageId, deliveryTag, properties, routingKey, body)(e) + } + }.flatten + + private def handleCallbackFailure(messageId: String, + deliveryTag: Long, + properties: BasicProperties, + routingKey: String, + body: Array[Byte])(t: Throwable): F[Unit] = { + F.delay { + logger.error(s"[$name] Error while executing callback, it's probably a BUG", t) + } >> + handleFailure(messageId, deliveryTag, properties, routingKey, body, t) + } + + private def handleFailure(messageId: String, + deliveryTag: Long, + properties: BasicProperties, + routingKey: String, + body: Array[Byte], + t: Throwable): F[Unit] = { + F.delay { + processingCount.decrementAndGet() + processingFailedMeter.mark() + consumerListener.onError(this, name, channel, t) + } >> + executeFailureAction(messageId, deliveryTag, properties, routingKey, body) + } + + private def executeFailureAction(messageId: String, + deliveryTag: Long, + properties: BasicProperties, + routingKey: String, + body: Array[Byte]): F[Unit] = { + handleResult(messageId, deliveryTag, properties, routingKey, body)(failureAction) + } +} diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala index 2fd6c2b5..f2c7ce36 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala @@ -7,13 +7,14 @@ import cats.syntax.all._ import com.avast.bytes.Bytes import com.avast.clients.rabbitmq.api._ import com.avast.metrics.scalaapi.{Meter, Monitor} -import com.rabbitmq.client.AMQP import com.rabbitmq.client.AMQP.Queue +import com.rabbitmq.client.{AMQP, Consumer} import com.typesafe.scalalogging.LazyLogging import org.slf4j.event.Level import scala.collection.JavaConverters._ import scala.collection.immutable +import scala.concurrent.duration.Duration import scala.language.{higherKinds, implicitConversions} import scala.util.control.NonFatal @@ -64,6 +65,40 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { } } + private def prepareStreamingConsumer[F[_]: ConcurrentEffect, A: DeliveryConverter]( + consumerConfig: StreamingConsumerConfig, + connectionInfo: RabbitMQConnectionInfo, + channel: ServerChannel, + newChannel: F[ServerChannel], + consumerListener: ConsumerListener, + blocker: Blocker, + monitor: Monitor)(implicit timer: Timer[F], cs: ContextShift[F]): Resource[F, DefaultRabbitMQStreamingConsumer[F, A]] = { + import consumerConfig._ + + // auto declare exchanges + declareExchangesFromBindings(connectionInfo, channel, consumerConfig.bindings) + + // auto declare queue; if configured + consumerConfig.declare.foreach { + declareQueue(consumerConfig.queueName, connectionInfo, channel, _) + } + + // auto bind + bindQueues(connectionInfo, channel, consumerConfig.queueName, consumerConfig.bindings) + + DefaultRabbitMQStreamingConsumer( + name, + newChannel.flatTap(ch => Sync[F].delay(ch.basicQos(consumerConfig.prefetchCount))), + consumerTag, + queueName, + connectionInfo, + consumerListener, + queueBufferSize, + monitor, + blocker + ) + } + object Declarations { def declareExchange[F[_]: Sync](config: DeclareExchangeConfig, channel: ServerChannel, @@ -181,6 +216,27 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { prepareConsumer(consumerConfig, connectionInfo, channel, readAction, consumerListener, blocker, monitor) } + object StreamingConsumer { + + def create[F[_]: ConcurrentEffect, A: DeliveryConverter](consumerConfig: StreamingConsumerConfig, + channel: ServerChannel, + newChannel: F[ServerChannel], + connectionInfo: RabbitMQConnectionInfo, + blocker: Blocker, + monitor: Monitor, + consumerListener: ConsumerListener)( + implicit timer: Timer[F], + cs: ContextShift[F]): Resource[F, DefaultRabbitMQStreamingConsumer[F, A]] = { + + prepareStreamingConsumer(consumerConfig, connectionInfo, channel, newChannel, consumerListener, blocker, monitor) + } + } + + private[rabbitmq] def startConsumingQueue(channel: ServerChannel, queueName: String, consumerTag: String, consumer: Consumer): String = { + channel.setDefaultConsumer(consumer) // see `setDefaultConsumer` javadoc; this is possible because the channel is here exclusively for this consumer + val finalConsumerTag = channel.basicConsume(queueName, false, if (consumerTag == "Default") "" else consumerTag, consumer) + finalConsumerTag + } private def preparePullConsumer[F[_]: ConcurrentEffect, A: DeliveryConverter]( consumerConfig: PullConsumerConfig, connectionInfo: RabbitMQConnectionInfo, @@ -300,14 +356,7 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { val readAction: DefaultDeliveryReadAction[F] = { val convAction: DefaultDeliveryReadAction[F] = { d: Delivery[Bytes] => try { - val devA = d.flatMap { d => - implicitly[DeliveryConverter[A]].convert(d.body) match { - case Right(a) => d.mapBody(_ => a) - case Left(ce) => Delivery.MalformedContent(d.body, d.properties, d.routingKey, ce) - } - } - - userReadAction(devA) + userReadAction(convertDelivery(d)) } catch { case NonFatal(e) => ConcurrentEffect[F].raiseError(e) @@ -321,14 +370,20 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { new DefaultRabbitMQConsumer(name, channel, queueName, connectionInfo, monitor, failureAction, consumerListener, blocker)(readAction) } - val finalConsumerTag = if (consumerTag == "Default") "" else consumerTag - - channel.basicConsume(queueName, false, finalConsumerTag, consumer) - channel.setDefaultConsumer(consumer) // see `setDefaultConsumer` javadoc; this is possible because the channel is here exclusively for this consumer + startConsumingQueue(channel, queueName, consumerTag, consumer) consumer } + private[rabbitmq] def convertDelivery[F[_]: ConcurrentEffect, A: DeliveryConverter](d: Delivery[Bytes]): Delivery[A] = { + d.flatMap { d => + implicitly[DeliveryConverter[A]].convert(d.body) match { + case Right(a) => d.mapBody(_ => a) + case Left(ce) => Delivery.MalformedContent(d.body, d.properties, d.routingKey, ce) + } + } + } + private def wrapReadAction[F[_]: ConcurrentEffect, A]( consumerConfig: ConsumerConfig, userReadAction: DefaultDeliveryReadAction[F], @@ -345,11 +400,13 @@ private[rabbitmq] object DefaultRabbitMQClientFactory extends LazyLogging { val action: F[DeliveryResult] = blocker.delay { userReadAction(delivery) }.flatten val timedOutAction: F[DeliveryResult] = { - Concurrent - .timeout(action, processTimeout) - .recoverWith { - case e: TimeoutException => doTimeoutAction(consumerConfig, timeoutsMeter, e) - } + if (processTimeout != Duration.Zero) { + Concurrent + .timeout(action, processTimeout) + .recoverWith { + case e: TimeoutException => doTimeoutAction(consumerConfig, timeoutsMeter, e) + } + } else action } timedOutAction diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConnection.scala b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConnection.scala index 315a9557..ffbc0a9d 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConnection.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConnection.scala @@ -22,8 +22,8 @@ class DefaultRabbitMQConnection[F[_]](connection: ServerConnection, createChannel() } - private def createChannel(): Resource[F, ServerChannel] = - Resource.make(F.delay { + private val createChannelF: F[ServerChannel] = { + F.delay { try { connection.createChannel() match { case channel: ServerChannel => @@ -40,11 +40,17 @@ class DefaultRabbitMQConnection[F[_]](connection: ServerConnection, channelListener.onCreateFailure(e) throw e } - })(channel => - F.delay { - logger.debug(s"Closing channel: $channel ${channel.hashCode()}") - channel.close() - }) + } + } + + override def newStreamingConsumer[A: DeliveryConverter](consumerConfig: StreamingConsumerConfig, + monitor: Monitor): Resource[F, RabbitMQStreamingConsumer[F, A]] = { + createChannel().flatMap { channel => + DefaultRabbitMQClientFactory.StreamingConsumer + .create[F, A](consumerConfig, channel, createChannelF, info, blocker, monitor, consumerListener) + .map(identity[RabbitMQStreamingConsumer[F, A]]) // type inference... :-( + } + } def newConsumer[A: DeliveryConverter](consumerConfig: ConsumerConfig, monitor: Monitor)( readAction: DeliveryReadAction[F, A]): Resource[F, RabbitMQConsumer[F]] = { @@ -62,6 +68,13 @@ class DefaultRabbitMQConnection[F[_]](connection: ServerConnection, } } + private def createChannel(): Resource[F, ServerChannel] = + Resource.make(createChannelF)(channel => + F.delay { + logger.debug(s"Closing channel: $channel ${channel.hashCode()}") + channel.close() + }) + override def newProducer[A: ProductConverter](producerConfig: ProducerConfig, monitor: Monitor): Resource[F, RabbitMQProducer[F, A]] = { createChannel().map { channel => DefaultRabbitMQClientFactory.Producer diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumer.scala b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumer.scala index da8f2430..fd34ddca 100755 --- a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumer.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumer.scala @@ -1,14 +1,9 @@ package com.avast.clients.rabbitmq -import java.time.{Duration, Instant} -import java.util.concurrent.RejectedExecutionException -import java.util.concurrent.atomic.AtomicInteger - import cats.effect._ import cats.implicits._ import com.avast.bytes.Bytes import com.avast.clients.rabbitmq.api._ -import JavaConverters._ import com.avast.metrics.scalaapi.Monitor import com.rabbitmq.client.AMQP.BasicProperties import com.rabbitmq.client.{Delivery => _, _} @@ -16,7 +11,6 @@ import com.typesafe.scalalogging.StrictLogging import scala.collection.JavaConverters._ import scala.language.higherKinds -import scala.util.control.NonFatal class DefaultRabbitMQConsumer[F[_]: Effect]( override val name: String, @@ -27,30 +21,13 @@ class DefaultRabbitMQConsumer[F[_]: Effect]( failureAction: DeliveryResult, consumerListener: ConsumerListener, override protected val blocker: Blocker)(readAction: DeliveryReadAction[F, Bytes])(implicit override protected val cs: ContextShift[F]) - extends DefaultConsumer(channel) + extends ConsumerWithCallbackBase(channel, failureAction, consumerListener) with RabbitMQConsumer[F] with ConsumerBase[F] with StrictLogging { import DefaultRabbitMQConsumer._ - override protected implicit val F: Sync[F] = Effect[F] - - private val readMeter = monitor.meter("read") - - private val processingFailedMeter = resultsMonitor.meter("processingFailed") - - private val tasksMonitor = monitor.named("tasks") - - private val processingCount = new AtomicInteger(0) - - tasksMonitor.gauge("processing")(() => processingCount.get()) - - private val processedTimer = tasksMonitor.timerPair("processed") - - override def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException): Unit = - consumerListener.onShutdown(this, channel, name, consumerTag, sig) - override def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]): Unit = { processingCount.incrementAndGet() @@ -61,7 +38,7 @@ class DefaultRabbitMQConsumer[F[_]: Effect]( case None => envelope.getRoutingKey } - val action = handleDelivery(messageId, deliveryTag, properties, routingKey, body) + val action = handleDelivery(messageId, deliveryTag, properties, routingKey, body)(readAction) .flatTap(_ => F.delay { processingCount.decrementAndGet() @@ -77,96 +54,10 @@ class DefaultRabbitMQConsumer[F[_]: Effect]( F.raiseError(e) } - toIO(action).unsafeToFuture() // actually start the processing + Effect[F].toIO(action).unsafeToFuture() // actually start the processing () } - - private def handleDelivery(messageId: String, - deliveryTag: Long, - properties: BasicProperties, - routingKey: String, - body: Array[Byte]): F[Unit] = - F.delay { - try { - readMeter.mark() - - logger.debug(s"[$name] Read delivery with ID $messageId, deliveryTag $deliveryTag") - - val delivery = Delivery(Bytes.copyFrom(body), properties.asScala, Option(routingKey).getOrElse("")) - - logger.trace(s"[$name] Received delivery: $delivery") - - val st = Instant.now() - - @inline - def taskDuration: Duration = Duration.between(st, Instant.now()) - - readAction(delivery) - .flatMap { - handleResult(messageId, deliveryTag, properties, routingKey, body) - } - .flatTap(_ => - F.delay { - val duration = taskDuration - logger.debug(s"[$name] Delivery ID $messageId handling succeeded in $duration") - processedTimer.update(duration) - }) - .recoverWith { - case NonFatal(t) => - F.delay { - val duration = taskDuration - logger.debug(s"[$name] Delivery ID $messageId handling failed in $duration", t) - processedTimer.updateFailure(duration) - } >> - handleCallbackFailure(messageId, deliveryTag, properties, routingKey, body)(t) - } - } catch { - // we catch this specific exception, handling of others is up to Lyra - case e: RejectedExecutionException => - logger.debug(s"[$name] Executor was unable to plan the handling task", e) - handleFailure(messageId, deliveryTag, properties, routingKey, body, e) - - case NonFatal(e) => handleCallbackFailure(messageId, deliveryTag, properties, routingKey, body)(e) - } - }.flatten - - private def handleCallbackFailure(messageId: String, - deliveryTag: Long, - properties: BasicProperties, - routingKey: String, - body: Array[Byte])(t: Throwable): F[Unit] = { - F.delay { - logger.error(s"[$name] Error while executing callback, it's probably a BUG", t) - } >> - handleFailure(messageId, deliveryTag, properties, routingKey, body, t) - } - - private def handleFailure(messageId: String, - deliveryTag: Long, - properties: BasicProperties, - routingKey: String, - body: Array[Byte], - t: Throwable): F[Unit] = { - F.delay { - processingCount.decrementAndGet() - processingFailedMeter.mark() - consumerListener.onError(this, name, channel, t) - } >> - executeFailureAction(messageId, deliveryTag, properties, routingKey, body) - } - private def executeFailureAction(messageId: String, - deliveryTag: Long, - properties: BasicProperties, - routingKey: String, - body: Array[Byte]): F[Unit] = { - handleResult(messageId, deliveryTag, properties, routingKey, body)(failureAction) - } - - private def toIO[A](f: F[A]): IO[A] = - IO.async { cb => - Effect[F].runAsync(f)(r => IO(cb(r))).unsafeRunSync() - } } object DefaultRabbitMQConsumer { diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQStreamingConsumer.scala b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQStreamingConsumer.scala new file mode 100644 index 00000000..01c298f6 --- /dev/null +++ b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQStreamingConsumer.scala @@ -0,0 +1,285 @@ +package com.avast.clients.rabbitmq + +import cats.effect.concurrent._ +import cats.effect.{Blocker, CancelToken, ConcurrentEffect, ContextShift, Effect, ExitCase, IO, Resource, Sync} +import cats.syntax.all._ +import com.avast.bytes.Bytes +import com.avast.clients.rabbitmq.DefaultRabbitMQConsumer.RepublishOriginalRoutingKeyHeaderName +import com.avast.clients.rabbitmq.DefaultRabbitMQStreamingConsumer.DeliveryQueue +import com.avast.clients.rabbitmq.api._ +import com.avast.metrics.scalaapi.Monitor +import com.rabbitmq.client.{Delivery => _, _} +import com.typesafe.scalalogging.StrictLogging +import fs2.Stream +import fs2.concurrent.Queue + +import scala.language.higherKinds +import scala.util.control.NonFatal + +class DefaultRabbitMQStreamingConsumer[F[_]: ConcurrentEffect, A: DeliveryConverter] private ( + name: String, + queueName: String, + initialConsumerTag: String, + connectionInfo: RabbitMQConnectionInfo, + consumerListener: ConsumerListener, + monitor: Monitor, + blocker: Blocker)(createQueue: F[DeliveryQueue[F, Bytes]], newChannel: F[ServerChannel])(implicit cs: ContextShift[F]) + extends RabbitMQStreamingConsumer[F, A] + with StrictLogging { + + private lazy val F: Sync[F] = Sync[F] + + private lazy val streamFailureMeter = monitor.meter("streamFailures") + + private lazy val recoveringMutex: Semaphore[F] = newMutexSemaphore() + + private lazy val consumer = Ref.unsafe[F, Option[StreamingConsumer]](None) + private lazy val isClosed = Ref.unsafe[F, Boolean](false) + private lazy val isOk = Ref.unsafe[F, Boolean](false) + private lazy val tasks = Ref.unsafe[IO, Set[CancelToken[IO]]](Set.empty) + + lazy val deliveryStream: fs2.Stream[F, StreamedDelivery[F, A]] = { + Stream + .eval { checkNotClosed >> recoverIfNeeded } + .flatMap { queue => + queue.dequeue + .map { + case (del, deff) => + new StreamedDelivery[F, A] { + override val delivery: Delivery[A] = DefaultRabbitMQClientFactory.convertDelivery(del) + override def handle(result: DeliveryResult): F[StreamedResult] = deff.complete(result).map(_ => StreamedResult) + } + } + } + .onFinalizeCase(handleStreamFinalize) + } + + private lazy val recoverIfNeeded: F[DeliveryQueue[F, Bytes]] = { + recoveringMutex.withPermit { + isOk.get.flatMap { + case true => getCurrentQueue + case false => recover + } + } + } + + private lazy val getCurrentQueue: F[DeliveryQueue[F, Bytes]] = { + consumer.get.map { cons => + cons + .getOrElse(throw new IllegalStateException("Consumer has to be initialized at this stage! It's probably a BUG")) + .queue + } + } + + private lazy val recover: F[DeliveryQueue[F, Bytes]] = { + createQueue.flatTap { newQueue => + newChannel.flatMap { newChannel => + val newConsumer = new StreamingConsumer(newChannel, newQueue) + + consumer + .getAndSet(Some(newConsumer)) + .flatMap { + case Some(oldConsumer) => + blocker + .delay { + val consumerTag = oldConsumer.getConsumerTag + logger.debug(s"[$name] Cancelling consumer with consumer tag $consumerTag") + + try { + oldConsumer.channel.close() + } catch { + case NonFatal(e) => logger.debug(s"Could not close channel", e) + } + + consumerTag + } + .flatTap(_ => tryCancelRunningTasks) + + case None => + F.delay { + logger.debug("No old consumer to be cancelled!") + initialConsumerTag + } + } + .flatMap { consumerTag => + blocker.delay { + logger.debug("Starting consuming") + DefaultRabbitMQClientFactory.startConsumingQueue(newChannel, queueName, consumerTag, newConsumer) + () + } + } >> isOk.set(true) + } + } + } + + private lazy val close: F[Unit] = recoveringMutex.withPermit { + isOk.get.flatMap { isOk => + if (isOk) consumer.get.flatMap { + case Some(streamingConsumer) => + blocker.delay { + streamingConsumer.stopConsuming() + streamingConsumer.channel.close() + } + + case None => F.unit + } else F.unit + } >> isOk.set(false) >> isClosed.set(true) + } + + private lazy val tryCancelRunningTasks: F[Unit] = { + tasks + .update { tasks => + tasks.foreach(_.unsafeRunSync()) + Set.empty + } + .to[F] + } + + private lazy val stopConsuming: F[Unit] = { + recoveringMutex.withPermit { + isOk.set(false) >> consumer.get.flatMap(_.fold(F.unit)(_.stopConsuming())) // stop consumer, if there is some + } + } + + private lazy val checkNotClosed: F[Unit] = { + isClosed.get.flatMap(cl => if (cl) F.raiseError[Unit](new IllegalStateException("This consumer is already closed")) else F.unit) + } + + private def handleStreamFinalize(e: ExitCase[Throwable]): F[Unit] = e match { + case ExitCase.Completed => + stopConsuming + .flatTap(_ => F.delay(logger.debug(s"[$name] Delivery stream was completed"))) + + case ExitCase.Canceled => + stopConsuming + .flatTap(_ => F.delay(logger.debug(s"[$name] Delivery stream was cancelled"))) + + case ExitCase.Error(e: ShutdownSignalException) => + stopConsuming + .flatTap { _ => + F.delay { + streamFailureMeter.mark() + logger.error(s"[$name] Delivery stream was terminated because of channel shutdown. It might be a BUG int the client", e) + } + } + + case ExitCase.Error(e) => + stopConsuming + .flatTap(_ => + F.delay { + streamFailureMeter.mark() + logger.debug(s"[$name] Delivery stream was terminated", e) + }) + } + + private def deliveryCallback(delivery: Delivery[Bytes]): F[DeliveryResult] = { + for { + deferred <- Deferred[F, DeliveryResult] + consumerOpt <- this.consumer.get + consumer = consumerOpt.getOrElse(throw new IllegalStateException("Consumer has to be initialized at this stage! It's probably a BUG")) + _ <- consumer.queue.enqueue1((delivery, deferred)) + res <- deferred.get + } yield { + res + } + } + + private def newMutexSemaphore(): Semaphore[F] = { + ConcurrentEffect[F].toIO(Semaphore[F](1)).unsafeRunSync() // this doesn't block + } + + private class StreamingConsumer(override val channel: ServerChannel, val queue: DeliveryQueue[F, Bytes]) + extends ConsumerWithCallbackBase(channel, DeliveryResult.Retry, consumerListener) { + private val receivingEnabled = Ref.unsafe[F, Boolean](true) + + override def handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: Array[Byte]): Unit = { + processingCount.incrementAndGet() + + val deliveryTag = envelope.getDeliveryTag + val messageId = properties.getMessageId + val routingKey = Option(properties.getHeaders).flatMap(p => Option(p.get(RepublishOriginalRoutingKeyHeaderName))) match { + case Some(originalRoutingKey) => originalRoutingKey.toString + case None => envelope.getRoutingKey + } + + val task: F[Unit] = receivingEnabled.get.flatMap { + case false => + F.delay { + logger.trace(s"Delivery $messageId (tag $deliveryTag) was ignored because consumer is not OK - it will be redelivered later") + processingCount.decrementAndGet() + () + } + + case true => + val action = handleDelivery(messageId, deliveryTag, properties, routingKey, body)(deliveryCallback) + .flatTap(_ => F.delay(logger.debug(s"Delivery $messageId processed successfully (tag $deliveryTag)"))) + .recoverWith { + case e => + F.delay { + processingFailedMeter.mark() + logger.debug("Could not process delivery", e) + } >> F.raiseError(e) + } + + recoveringMutex.withPermit(F.suspend { + lazy val ct: CancelToken[IO] = Effect[F] + .toIO(action) + .runCancelable(_ => { + processingCount.decrementAndGet() + tasks.update(_ - ct) + }) + .unsafeRunSync() + + tasks.update(_ + ct).to[F] + }) + } + + Effect[F].toIO(task).unsafeToFuture() + + () + } + + def stopConsuming(): F[Unit] = { + receivingEnabled.set(false) >> blocker.delay { + logger.debug(s"Stopping consummation for $getConsumerTag") + channel.basicCancel(getConsumerTag) + channel.setDefaultConsumer(null) + } + } + + override protected implicit val cs: ContextShift[F] = DefaultRabbitMQStreamingConsumer.this.cs + override protected def name: String = DefaultRabbitMQStreamingConsumer.this.name + override protected def queueName: String = DefaultRabbitMQStreamingConsumer.this.queueName + override protected def blocker: Blocker = DefaultRabbitMQStreamingConsumer.this.blocker + override protected def connectionInfo: RabbitMQConnectionInfo = DefaultRabbitMQStreamingConsumer.this.connectionInfo + override protected def monitor: Monitor = DefaultRabbitMQStreamingConsumer.this.monitor + } +} + +object DefaultRabbitMQStreamingConsumer extends StrictLogging { + + private type DeliveryQueue[F[_], A] = Queue[F, (Delivery[A], Deferred[F, DeliveryResult])] + + def apply[F[_]: ConcurrentEffect, A: DeliveryConverter]( + name: String, + newChannel: F[ServerChannel], + initialConsumerTag: String, + queueName: String, + connectionInfo: RabbitMQConnectionInfo, + consumerListener: ConsumerListener, + queueBufferSize: Int, + monitor: Monitor, + blocker: Blocker)(implicit cs: ContextShift[F]): Resource[F, DefaultRabbitMQStreamingConsumer[F, A]] = { + val newQueue: F[DeliveryQueue[F, Bytes]] = createQueue(queueBufferSize) + + Resource.make(Sync[F].delay { + new DefaultRabbitMQStreamingConsumer(name, queueName, initialConsumerTag, connectionInfo, consumerListener, monitor, blocker)( + newQueue, + newChannel) + })(_.close) + } + + private def createQueue[F[_]: ConcurrentEffect](queueBufferSize: Int): F[DeliveryQueue[F, Bytes]] = { + fs2.concurrent.Queue.bounded[F, (Delivery[Bytes], Deferred[F, DeliveryResult])](queueBufferSize) + } +} diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/RabbitMQConnection.scala b/core/src/main/scala/com/avast/clients/rabbitmq/RabbitMQConnection.scala index 51a89631..6ca071dd 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/RabbitMQConnection.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/RabbitMQConnection.scala @@ -26,7 +26,7 @@ trait RabbitMQConnection[F[_]] { * * @param consumerConfig Configuration of the consumer. * @param monitor Monitor for metrics. - * @param readAction Action executed for each delivered message. You should never return a failed future. + * @param readAction Action executed for each delivered message. You should never return a failed F. */ def newConsumer[A: DeliveryConverter](consumerConfig: ConsumerConfig, monitor: Monitor)( readAction: DeliveryReadAction[F, A]): Resource[F, RabbitMQConsumer[F]] @@ -46,6 +46,14 @@ trait RabbitMQConnection[F[_]] { def newPullConsumer[A: DeliveryConverter](pullConsumerConfig: PullConsumerConfig, monitor: Monitor): Resource[F, RabbitMQPullConsumer[F, A]] + /** Creates new instance of streaming consumer, using the passed configuration. + * + * @param consumerConfig Configuration of the consumer. + * @param monitor Monitor for metrics. + */ + def newStreamingConsumer[A: DeliveryConverter](consumerConfig: StreamingConsumerConfig, + monitor: Monitor): Resource[F, RabbitMQStreamingConsumer[F, A]] + def declareExchange(config: DeclareExchangeConfig): F[Unit] def declareQueue(config: DeclareQueueConfig): F[Unit] def bindExchange(config: BindExchangeConfig): F[Unit] diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala b/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala index 2d39d4f1..d5f71818 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala @@ -32,6 +32,14 @@ final case class ConsumerConfig(name: String, declare: Option[AutoDeclareQueueConfig] = None, consumerTag: String = "Default") +final case class StreamingConsumerConfig(name: String, + queueName: String, + bindings: immutable.Seq[AutoBindQueueConfig], + prefetchCount: Int = 100, + queueBufferSize: Int = 100, + declare: Option[AutoDeclareQueueConfig] = None, + consumerTag: String = "Default") + final case class PullConsumerConfig(name: String, queueName: String, bindings: immutable.Seq[AutoBindQueueConfig], diff --git a/core/src/test/resources/application.conf b/core/src/test/resources/application.conf index e185908f..01976e3a 100644 --- a/core/src/test/resources/application.conf +++ b/core/src/test/resources/application.conf @@ -89,6 +89,46 @@ myConfig { } ] } + + testingStreaming { + name = "Testing" + + queueName = "QUEUE1" + + declare { + enabled = true + } + + prefetchCount = 500 + + bindings = [ + { + routingKeys = ["test"] + + exchange { + name = "EXCHANGE1" + + declare { + enabled = true + + type = "direct" + } + } + }, { + routingKeys = ["test2"] + + exchange { + name = "EXCHANGE2" + + declare { + enabled = true + + type = "direct" + } + } + } + ] + } } producers { diff --git a/core/src/test/resources/logback.xml b/core/src/test/resources/logback.xml index 38808f22..3ceea264 100644 --- a/core/src/test/resources/logback.xml +++ b/core/src/test/resources/logback.xml @@ -4,16 +4,15 @@ System.out - %d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%-10mdc{traceId}] [%thread] %-35logger{35}: %msg\(%file:%line\)%n%xThrowable{full} - + %d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%thread] %-35logger{35}: %msg\(%file:%line\)%n%xThrowable{full} - + - \ No newline at end of file + diff --git a/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumerTest.scala b/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumerTest.scala index 0103a590..d9c0c4ca 100644 --- a/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumerTest.scala +++ b/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumerTest.scala @@ -36,6 +36,7 @@ class DefaultRabbitMQConsumerTest extends TestBase { when(properties.getMessageId).thenReturn(messageId) val channel = mock[AutorecoveringChannel] + when(channel.isOpen).thenReturn(true) val consumer = new DefaultRabbitMQConsumer[Task]( "test", @@ -75,6 +76,7 @@ class DefaultRabbitMQConsumerTest extends TestBase { when(properties.getMessageId).thenReturn(messageId) val channel = mock[AutorecoveringChannel] + when(channel.isOpen).thenReturn(true) val consumer = new DefaultRabbitMQConsumer[Task]( "test", @@ -114,6 +116,7 @@ class DefaultRabbitMQConsumerTest extends TestBase { when(properties.getMessageId).thenReturn(messageId) val channel = mock[AutorecoveringChannel] + when(channel.isOpen).thenReturn(true) val consumer = new DefaultRabbitMQConsumer[Task]( "test", @@ -153,6 +156,7 @@ class DefaultRabbitMQConsumerTest extends TestBase { val properties = new BasicProperties.Builder().messageId(messageId).userId(originalUserId).build() val channel = mock[AutorecoveringChannel] + when(channel.isOpen).thenReturn(true) val consumer = new DefaultRabbitMQConsumer[Task]( "test", @@ -194,6 +198,7 @@ class DefaultRabbitMQConsumerTest extends TestBase { when(properties.getMessageId).thenReturn(messageId) val channel = mock[AutorecoveringChannel] + when(channel.isOpen).thenReturn(true) val consumer = new DefaultRabbitMQConsumer[Task]( "test", @@ -230,6 +235,7 @@ class DefaultRabbitMQConsumerTest extends TestBase { when(properties.getMessageId).thenReturn(messageId) val channel = mock[AutorecoveringChannel] + when(channel.isOpen).thenReturn(true) val consumer = new DefaultRabbitMQConsumer[Task]( "test", @@ -266,6 +272,7 @@ class DefaultRabbitMQConsumerTest extends TestBase { when(properties.getMessageId).thenReturn(messageId) val channel = mock[AutorecoveringChannel] + when(channel.isOpen).thenReturn(true) val monitor = mock[Monitor] when(monitor.meter(Matchers.anyString())).thenReturn(Monitor.noOp.meter("")) @@ -337,7 +344,7 @@ class DefaultRabbitMQConsumerTest extends TestBase { assertResult(Seq.empty)(failuresLengths) val Seq(taskLength) = successLengths.result() - assert(taskLength > 2000) + assert(taskLength > 1990) // 2000 minus some tolerance } } } @@ -354,6 +361,7 @@ class DefaultRabbitMQConsumerTest extends TestBase { when(properties.getMessageId).thenReturn(messageId) val channel = mock[AutorecoveringChannel] + when(channel.isOpen).thenReturn(true) val monitor = mock[Monitor] when(monitor.meter(Matchers.anyString())).thenReturn(Monitor.noOp.meter("")) diff --git a/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQPullConsumerTest.scala b/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQPullConsumerTest.scala index 3c32d836..e9ee2ecf 100644 --- a/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQPullConsumerTest.scala +++ b/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQPullConsumerTest.scala @@ -34,6 +34,7 @@ class DefaultRabbitMQPullConsumerTest extends TestBase { when(properties.getMessageId).thenReturn(messageId) val channel = mock[AutorecoveringChannel] + when(channel.isOpen).thenReturn(true) val body = Random.nextString(5).getBytes @@ -77,6 +78,7 @@ class DefaultRabbitMQPullConsumerTest extends TestBase { when(properties.getMessageId).thenReturn(messageId) val channel = mock[AutorecoveringChannel] + when(channel.isOpen).thenReturn(true) val body = Random.nextString(5).getBytes @@ -120,6 +122,7 @@ class DefaultRabbitMQPullConsumerTest extends TestBase { when(properties.getMessageId).thenReturn(messageId) val channel = mock[AutorecoveringChannel] + when(channel.isOpen).thenReturn(true) val body = Random.nextString(5).getBytes @@ -163,6 +166,7 @@ class DefaultRabbitMQPullConsumerTest extends TestBase { val properties = new BasicProperties.Builder().messageId(messageId).userId(originalUserId).build() val channel = mock[AutorecoveringChannel] + when(channel.isOpen).thenReturn(true) val body = Random.nextString(5).getBytes @@ -208,6 +212,7 @@ class DefaultRabbitMQPullConsumerTest extends TestBase { when(properties.getMessageId).thenReturn(messageId) val channel = mock[AutorecoveringChannel] + when(channel.isOpen).thenReturn(true) val body = Random.nextString(5).getBytes @@ -253,6 +258,7 @@ class DefaultRabbitMQPullConsumerTest extends TestBase { when(properties.getMessageId).thenReturn(messageId) val channel = mock[AutorecoveringChannel] + when(channel.isOpen).thenReturn(true) val body = Random.nextString(5).getBytes diff --git a/core/src/test/scala/com/avast/clients/rabbitmq/LiveTest.scala b/core/src/test/scala/com/avast/clients/rabbitmq/LiveTest.scala index fbde372c..2c57eb04 100644 --- a/core/src/test/scala/com/avast/clients/rabbitmq/LiveTest.scala +++ b/core/src/test/scala/com/avast/clients/rabbitmq/LiveTest.scala @@ -26,6 +26,9 @@ class LiveTest extends TestBase with ScalaFutures { private implicit val p: PatienceConfig = PatienceConfig(timeout = Span(5, Seconds)) + private lazy val testHelper = new TestHelper(System.getProperty("rabbit.host", System.getenv("rabbit.host")), + System.getProperty("rabbit.tcp.15672", System.getenv("rabbit.tcp.15672")).toInt) + private def createConfig() = new { val queueName1: String = randomString(4) + "_QU1" @@ -35,6 +38,9 @@ class LiveTest extends TestBase with ScalaFutures { val exchange3: String = randomString(4) + "_EX3" val exchange4: String = randomString(4) + "_EX4" + testHelper.deleteQueue(queueName1) + testHelper.deleteQueue(queueName2) + private val original = ConfigFactory.load().getConfig("myConfig") val bindConfigs: Array[Config] = original.getObjectList("consumers.testing.bindings").asScala.map(_.toConfig).toArray @@ -47,6 +53,9 @@ class LiveTest extends TestBase with ScalaFutures { .withValue("consumers.testing.bindings", ConfigValueFactory.fromIterable(bindConfigs.toSeq.map(_.root()).asJava)) .withValue("consumers.testingPull.queueName", ConfigValueFactory.fromAnyRef(queueName1)) .withValue("consumers.testingPull.bindings", ConfigValueFactory.fromIterable(bindConfigs.toSeq.map(_.root()).asJava)) + .withValue("consumers.testingStreaming.queueName", ConfigValueFactory.fromAnyRef(queueName1)) + .withValue("consumers.testingStreaming.queueBufferSize", ConfigValueFactory.fromAnyRef(200)) + .withValue("consumers.testingStreaming.bindings", ConfigValueFactory.fromIterable(bindConfigs.toSeq.map(_.root()).asJava)) .withValue("producers.testing.exchange", ConfigValueFactory.fromAnyRef(exchange1)) .withValue("producers.testing2.exchange", ConfigValueFactory.fromAnyRef(exchange2)) .withValue("producers.testing3.exchange", ConfigValueFactory.fromAnyRef(exchange4)) @@ -68,9 +77,6 @@ class LiveTest extends TestBase with ScalaFutures { implicit val sched: Scheduler = Scheduler(Executors.newCachedThreadPool()) } - val testHelper = new TestHelper(System.getProperty("rabbit.host", System.getenv("rabbit.host")), - System.getProperty("rabbit.tcp.15672", System.getenv("rabbit.tcp.15672")).toInt) - test("basic") { val c = createConfig() import c._ @@ -107,22 +113,27 @@ class LiveTest extends TestBase with ScalaFutures { import c._ RabbitMQConnection.fromConfig[Task](config, ex).withResource { rabbitConnection => - val cnt = Random.nextInt(100) - val count = cnt + 100 // random 100 - 300 messages + val count = Random.nextInt(500) + 500 // random 500 - 1000 messages - val latch = new CountDownLatch(count + 5) + logger.info(s"Sending $count messages") + + val latch = new CountDownLatch(count + 100) // explanation below val d = new AtomicInteger(0) val cons = rabbitConnection.newConsumer("testing", Monitor.noOp()) { _: Delivery[Bytes] => Task { - Thread.sleep(if (d.get() % 2 == 0) 300 else 0) + val n = d.incrementAndGet() + + Thread.sleep(if (n % 2 == 0) 300 else 0) latch.countDown() - if (d.incrementAndGet() < (cnt - 50)) Ack + if (n < (count - 100) || n > count) Ack else { - if (d.incrementAndGet() < (cnt - 10)) Retry else Republish() + if (n < (count - 50)) Retry else Republish() } + + // ^ example: 750 messages in total => 650 * Ack, 50 * Retry, 50 * Republish => processing 850 (== +100) messages in total } } @@ -132,6 +143,11 @@ class LiveTest extends TestBase with ScalaFutures { sender.send("test", Bytes.copyFromUtf8(Random.nextString(10))).await } + // it takes some time before the stats appear... :-| + eventually(timeout(Span(3, Seconds)), interval(Span(0.1, Seconds))) { + assertResult(count)(testHelper.getPublishedCount(queueName1)) + } + eventually(timeout(Span(3, Seconds)), interval(Span(0.1, Seconds))) { assertResult(true)(latch.await(1000, TimeUnit.MILLISECONDS)) assertResult(0)(testHelper.getMessagesCount(queueName1)) @@ -363,6 +379,178 @@ class LiveTest extends TestBase with ScalaFutures { } } + test("streaming consumer") { + val c = createConfig() + import c._ + + RabbitMQConnection.fromConfig[Task](config, ex).withResource { rabbitConnection => + val count = Random.nextInt(50000) + 50000 // random 50 - 100k messages + + logger.info(s"Sending $count messages") + + val latch = new CountDownLatch(count + 10000) // explanation below + + val d = new AtomicInteger(0) + + rabbitConnection.newStreamingConsumer[Bytes]("testingStreaming", Monitor.noOp()).withResource { cons => + val stream = cons.deliveryStream + .mapAsyncUnordered(20) { del => + Task.delay(d.incrementAndGet()).flatMap { n => + Task.sleep((if (n % 500 == 0) Random.nextInt(100) else 0).millis) >> // random slowdown 0-100 ms for every 500th message + del.handle { + latch.countDown() + + if (n <= (count - 10000) || n > count) Ack + else { + if (n <= (count - 5000)) Retry else Republish() + } + + // ^ example: 100000 messages in total => 6500 * Ack, 5000 * Retry, 5000 * Republish => processing 110000 (== +10000) messages in total + } + } + } + + rabbitConnection.newProducer[Bytes]("testing", Monitor.noOp()).withResource { sender => + for (_ <- 1 to count) { + sender.send("test", Bytes.copyFromUtf8(Random.nextString(10))).await + } + + // it takes some time before the stats appear... :-| + eventually(timeout(Span(50, Seconds)), interval(Span(1, Seconds))) { + assertResult(count)(testHelper.getPublishedCount(queueName1)) + } + + stream.compile.drain.runToFuture // run the stream + + eventually(timeout(Span(4, Minutes)), interval(Span(1, Seconds))) { + println("D: " + d.get()) + assertResult(count + 10000)(d.get()) + println("LATCH: " + latch.getCount) + assertResult(true)(latch.await(1000, TimeUnit.MILLISECONDS)) + assertResult(0)(testHelper.getMessagesCount(queueName1)) + } + } + } + } + } + + test("streaming consumers to single queue") { + val c = createConfig() + import c._ + + RabbitMQConnection.fromConfig[Task](config, ex).withResource { rabbitConnection => + val count = Random.nextInt(10000) + 10000 // random 10k - 20k messages + + logger.info(s"Sending $count messages") + + val latch = new CountDownLatch(count) + + def toStream(cons1: RabbitMQStreamingConsumer[Task, Bytes], count: Int, d: AtomicInteger): fs2.Stream[Task, StreamedResult] = { + cons1.deliveryStream + .mapAsyncUnordered(20) { del => + Task.delay(d.incrementAndGet()).flatMap { n => + Task.sleep((if (n % 500 == 0) Random.nextInt(100) else 0).millis) >> // random slowdown 0-100 ms for every 500th message + del.handle { + latch.countDown() + + Ack + } + } + } + } + + rabbitConnection.newStreamingConsumer[Bytes]("testingStreaming", Monitor.noOp()).withResource { cons1 => + rabbitConnection.newStreamingConsumer[Bytes]("testingStreaming", Monitor.noOp()).withResource { cons2 => + val d1 = new AtomicInteger(0) + val d2 = new AtomicInteger(0) + + val stream1 = toStream(cons1, count, d1) + val stream2 = toStream(cons2, count, d2) + + rabbitConnection.newProducer[Bytes]("testing", Monitor.noOp()).withResource { sender => + for (_ <- 1 to count) { + sender.send("test", Bytes.copyFromUtf8(Random.nextString(10))).await + } + + // it takes some time before the stats appear... :-| + eventually(timeout(Span(50, Seconds)), interval(Span(1, Seconds))) { + assertResult(count)(testHelper.getPublishedCount(queueName1)) + } + + stream1.compile.drain.runToFuture // run the stream 1 + stream2.compile.drain.runToFuture // run the stream 2 + + eventually(timeout(Span(5, Minutes)), interval(Span(1, Seconds))) { + println(s"D: ${d1.get}/${d2.get()}") + assertResult(count)(d1.get() + d2.get()) + assert(d1.get() > 0) + assert(d2.get() > 0) + println("LATCH: " + latch.getCount) + assertResult(true)(latch.await(1000, TimeUnit.MILLISECONDS)) + assertResult(0)(testHelper.getMessagesCount(queueName1)) + } + } + } + } + } + } + + test("streaming consumer stream can be manually restarted") { + for (_ <- 1 to 5) { + val c = createConfig() + import c._ + + RabbitMQConnection.fromConfig[Task](config, ex).withResource { rabbitConnection => + val count = Random.nextInt(50000) + 50000 // random 50k - 100k messages + + val nth = 150 + + logger.info(s"Sending $count messages") + + val d = new AtomicInteger(0) + + rabbitConnection.newStreamingConsumer[Bytes]("testingStreaming", Monitor.noOp()).withResource { cons => + def stream: fs2.Stream[Task, StreamedResult] = + cons.deliveryStream + .evalMap { del => + Task + .delay(d.incrementAndGet()) + .flatMap { n => + if (n % nth != 0) del.handle(Ack) + else { + Task.raiseError(new RuntimeException(s"My failure $n")) + } + // ^^ cause failure for every nth message + } + } + .handleErrorWith { e => + logger.info(s"Stream has failed: ${e.getMessage}") + stream + } + + rabbitConnection.newProducer[Bytes]("testing", Monitor.noOp()).withResource { sender => + for (_ <- 1 to count) { + sender.send("test", Bytes.copyFromUtf8(Random.nextString(10))).await + } + + // it takes some time before the stats appear... :-| + eventually(timeout(Span(50, Seconds)), interval(Span(0.5, Seconds))) { + assertResult(count)(testHelper.getPublishedCount(queueName1)) + } + + stream.compile.drain.runToFuture // run the consumer stream + + eventually(timeout(Span(5, Minutes)), interval(Span(1, Seconds))) { + println("D: " + d.get()) + assert(d.get() > count) // can't say exact number, number of redeliveries is unpredictable + assertResult(0)(testHelper.getMessagesCount(queueName1)) + } + } + } + } + } + } + test("consumer parsing failure") { val c = createConfig() import c._ diff --git a/core/src/test/scala/com/avast/clients/rabbitmq/TestHelper.scala b/core/src/test/scala/com/avast/clients/rabbitmq/TestHelper.scala index b25cc231..1abc0383 100644 --- a/core/src/test/scala/com/avast/clients/rabbitmq/TestHelper.scala +++ b/core/src/test/scala/com/avast/clients/rabbitmq/TestHelper.scala @@ -5,7 +5,6 @@ import java.nio.charset.StandardCharsets import io.circe.generic.auto._ import io.circe.parser._ - import scalaj.http.Http class TestHelper(host: String, port: Int) { @@ -17,12 +16,50 @@ class TestHelper(host: String, port: Int) { val resp = Http(s"$RootUri/queues/%2f/$encoded").auth("guest", "guest").asString.body + println("MESSAGES COUNT:") + println(resp) + decode[QueueProperties](resp) match { case Right(p) => p.messages case r => throw new IllegalStateException(s"Wrong response $r") } } - private case class QueueProperties(messages: Int) + def getPublishedCount(queueName: String): Int = { + val encoded = URLEncoder.encode(queueName, StandardCharsets.UTF_8.toString) + + val resp = Http(s"$RootUri/queues/%2f/$encoded").auth("guest", "guest").asString.body + + println("PUBLISHED COUNT:") + println(resp) + + decode[QueueProperties](resp) match { + case Right(p) => + p.message_stats.map(_.publish).getOrElse { + Console.err.println(s"Could not extract published_count for $queueName!") + 0 + } + case r => throw new IllegalStateException(s"Wrong response $r") + } + } + + def deleteQueue(queueName: String, ifEmpty: Boolean = false, ifUnused: Boolean = false): Unit = { + println(s"Deleting queue: $queueName") + val encoded = URLEncoder.encode(queueName, StandardCharsets.UTF_8.toString) + + val resp = Http(s"$RootUri/queues/%2f/$encoded?if-empty=$ifEmpty&if-unused=$ifUnused").method("DELETE").auth("guest", "guest").asString + + val content = resp.body + + val message = s"Delete queue response ${resp.statusLine}: '$content'" + println(message) + + if (!resp.isSuccess && resp.code != 404) { + throw new IllegalStateException(message) + } + } + + private case class QueueProperties(messages: Int, message_stats: Option[MessagesStats]) + private case class MessagesStats(publish: Int, ack: Option[Int]) } diff --git a/pureconfig/src/main/scala/com/avast/clients/rabbitmq/pureconfig/ConfigRabbitMQConnection.scala b/pureconfig/src/main/scala/com/avast/clients/rabbitmq/pureconfig/ConfigRabbitMQConnection.scala index 0ba7178a..3a67cb37 100644 --- a/pureconfig/src/main/scala/com/avast/clients/rabbitmq/pureconfig/ConfigRabbitMQConnection.scala +++ b/pureconfig/src/main/scala/com/avast/clients/rabbitmq/pureconfig/ConfigRabbitMQConnection.scala @@ -1,7 +1,7 @@ package com.avast.clients.rabbitmq.pureconfig +import _root_.pureconfig._ import _root_.pureconfig.error.ConfigReaderException -import _root_.pureconfig.{ConfigCursor, ConfigReader, PathSegment} import cats.effect.{ConcurrentEffect, Resource} import com.avast.clients.rabbitmq.api._ import com.avast.clients.rabbitmq.{ @@ -19,7 +19,8 @@ import com.avast.clients.rabbitmq.{ ProductConverter, PullConsumerConfig, RabbitMQConnection, - ServerChannel + ServerChannel, + StreamingConsumerConfig } import com.avast.metrics.scalaapi.Monitor @@ -34,7 +35,7 @@ trait ConfigRabbitMQConnection[F[_]] { * * @param configName Name of configuration of the consumer. * @param monitor Monitor for metrics. - * @param readAction Action executed for each delivered message. You should never return a failed future. + * @param readAction Action executed for each delivered message. You should never return a failed F. */ def newConsumer[A: DeliveryConverter](configName: String, monitor: Monitor)( readAction: DeliveryReadAction[F, A]): Resource[F, RabbitMQConsumer[F]] @@ -53,6 +54,13 @@ trait ConfigRabbitMQConnection[F[_]] { */ def newPullConsumer[A: DeliveryConverter](configName: String, monitor: Monitor): Resource[F, RabbitMQPullConsumer[F, A]] + /** Creates new instance of streaming consumer, using the TypeSafe configuration passed to the factory and consumer name. + * + * @param configName Name of configuration of the consumer. + * @param monitor Monitor for metrics. + */ + def newStreamingConsumer[A: DeliveryConverter](configName: String, monitor: Monitor): Resource[F, RabbitMQStreamingConsumer[F, A]] + /** * Declares and additional exchange, using the TypeSafe configuration passed to the factory and config name. */ @@ -92,6 +100,7 @@ class DefaultConfigRabbitMQConnection[F[_]](config: ConfigCursor, wrapped: Rabbi consumerConfigReader: ConfigReader[ConsumerConfig], producerConfigReader: ConfigReader[ProducerConfig], pullConsumerConfigReader: ConfigReader[PullConsumerConfig], + streamingConsumerConfigReader: ConfigReader[StreamingConsumerConfig], declareExchangeConfigReader: ConfigReader[DeclareExchangeConfig], declareQueueConfigReader: ConfigReader[DeclareQueueConfig], bindQueueConfigReader: ConfigReader[BindQueueConfig], @@ -122,6 +131,11 @@ class DefaultConfigRabbitMQConnection[F[_]](config: ConfigCursor, wrapped: Rabbi Resource.liftF(loadConfig[PullConsumerConfig](ConsumersRootName, configName)) >>= (wrapped.newPullConsumer(_, monitor)) } + override def newStreamingConsumer[A: DeliveryConverter](configName: String, + monitor: Monitor): Resource[F, RabbitMQStreamingConsumer[F, A]] = { + Resource.liftF(loadConfig[StreamingConsumerConfig](ConsumersRootName, configName)) >>= (wrapped.newStreamingConsumer(_, monitor)) + } + override def declareExchange(configName: String): F[Unit] = { loadConfig[DeclareExchangeConfig](DeclarationsRootName, configName) >>= wrapped.declareExchange } diff --git a/pureconfig/src/main/scala/com/avast/clients/rabbitmq/pureconfig/PureconfigImplicits.scala b/pureconfig/src/main/scala/com/avast/clients/rabbitmq/pureconfig/PureconfigImplicits.scala index ec93bb30..59b8bd6a 100644 --- a/pureconfig/src/main/scala/com/avast/clients/rabbitmq/pureconfig/PureconfigImplicits.scala +++ b/pureconfig/src/main/scala/com/avast/clients/rabbitmq/pureconfig/PureconfigImplicits.scala @@ -65,6 +65,7 @@ class PureconfigImplicits(implicit namingConvention: NamingConvention = CamelCas } implicit val consumerConfigReader: ConfigReader[ConsumerConfig] = deriveReader implicit val pullConsumerConfigReader: ConfigReader[PullConsumerConfig] = deriveReader + implicit val streamingConsumerConfigReader: ConfigReader[StreamingConsumerConfig] = deriveReader implicit val producerConfigReader: ConfigReader[ProducerConfig] = deriveReader // additional declarations: diff --git a/pureconfig/src/main/scala/com/avast/clients/rabbitmq/pureconfig/pureconfig.scala b/pureconfig/src/main/scala/com/avast/clients/rabbitmq/pureconfig/pureconfig.scala index cfa38a66..6372e5ba 100644 --- a/pureconfig/src/main/scala/com/avast/clients/rabbitmq/pureconfig/pureconfig.scala +++ b/pureconfig/src/main/scala/com/avast/clients/rabbitmq/pureconfig/pureconfig.scala @@ -29,6 +29,7 @@ package object pureconfig { consumerConfigReader: ConfigReader[ConsumerConfig] = implicits.CamelCase.consumerConfigReader, producerConfigReader: ConfigReader[ProducerConfig] = implicits.CamelCase.producerConfigReader, pullConsumerConfigReader: ConfigReader[PullConsumerConfig] = implicits.CamelCase.pullConsumerConfigReader, + streamingConsumerConfigReader: ConfigReader[StreamingConsumerConfig] = implicits.CamelCase.streamingConsumerConfigReader, declareExchangeConfigReader: ConfigReader[DeclareExchangeConfig] = implicits.CamelCase.declareExchangeConfigReader, declareQueueConfigReader: ConfigReader[DeclareQueueConfig] = implicits.CamelCase.declareQueueConfigReader, bindQueueConfigReader: ConfigReader[BindQueueConfig] = implicits.CamelCase.bindQueueConfigReader, From b70a95526ca8216b1116fec9f0098b3e2e316433 Mon Sep 17 00:00:00 2001 From: Kolena Jan Date: Thu, 30 Jan 2020 17:56:35 +0100 Subject: [PATCH 2/2] Streaming support Readme --- README.md | 85 ++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 72 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index d0c91ed5..af72dd8c 100644 --- a/README.md +++ b/README.md @@ -38,12 +38,13 @@ There is a [migration guide](Migration-6_1-8.md) between versions 6.1.x and 8.0. ## Usage -The Scala API is _finally tagless_ (read more e.g. [here](https://www.beyondthelines.net/programming/introduction-to-tagless-final/)) with +The API is _finally tagless_ (read more e.g. [here](https://www.beyondthelines.net/programming/introduction-to-tagless-final/)) with [`cats.effect.Resource`](https://typelevel.org/cats-effect/datatypes/resource.html) which is convenient way how to -[manage resources in your app](https://typelevel.org/cats-effect/tutorial/tutorial.html#acquiring-and-releasing-resources). +[manage resources in your app](https://typelevel.org/cats-effect/tutorial/tutorial.html#acquiring-and-releasing-resources). In addition, +there is a support for [streaming](#streaming-support) with [`fs2.Stream`](https://fs2.io/). -The Scala API uses types-conversions for both consumer and producer, that means you don't have to work directly with `Bytes` (however you -still can, if you want) and you touch only your business class which is then (de)serialized using provided converter. +The API uses conversions for both consumer and producer, that means you don't have to work directly with `Bytes` (however you +still can if you want to) and you touch only your business model class which is then (de)serialized using provided converter. The library uses two types of executors - one is for blocking (IO) operations and the second for callbacks. You _have to_ provide both of them: 1. Blocking executor as `ExecutorService` @@ -53,6 +54,7 @@ The default way is to configure the client with manually provided case classes; HOCON (Lightbend Config). This is somewhat minimal setup, using [Monix](https://monix.io/) `Task`: + ```scala import java.util.concurrent.ExecutorService @@ -65,6 +67,11 @@ import javax.net.ssl.SSLContext import monix.eval._ import monix.execution.Scheduler +implicit val sch: Scheduler = ??? +val monitor: Monitor = ??? + +val blockingExecutor: ExecutorService = ??? + val sslContext = SSLContext.getDefault val connectionConfig = RabbitMQConnectionConfig( @@ -87,21 +94,16 @@ val producerConfig = ProducerConfig( exchange = "MyGreatApp" ) -implicit val sch: Scheduler = ??? -val monitor: Monitor = ??? - -val blockingExecutor: ExecutorService = ??? - // see https://typelevel.org/cats-effect/tutorial/tutorial.html#acquiring-and-releasing-resources val rabbitMQProducer: Resource[Task, RabbitMQProducer[Task, Bytes]] = { for { connection <- RabbitMQConnection.make[Task](connectionConfig, blockingExecutor, Some(sslContext)) /* - Here you have created the connection; it's shared for all producers/consumers amongst one RabbitMQ server - they will share a single - TCP connection but have separated channels. - If you expect very high load, you can use separate connections for each producer/consumer, however it's usually not needed. - */ + Here you have created the connection; it's shared for all producers/consumers amongst one RabbitMQ server - they will share a single + TCP connection but have separated channels. + If you expect very high load, you can use separate connections for each producer/consumer, however it's usually not needed. + */ consumer <- connection.newConsumer[Bytes](consumerConfig, monitor) { case delivery: Delivery.Ok[Bytes] => @@ -118,6 +120,63 @@ val rabbitMQProducer: Resource[Task, RabbitMQProducer[Task, Bytes]] = { } ``` +#### Streaming support + +It seems quite natural to process RabbitMQ queue with a streaming app. +[`StreamingRabbitMQConsumer`](core/src/main/scala/com/avast/clients/rabbitmq/StreamingRabbitMQConsumer.scala) provides you an +[`fs2.Stream`](https://fs2.io/) through which you can easily process incoming messages in a streaming way. + +Notice: Using this functionality requires you to know some basics of [FS2](https://fs2.io/guide.html#overview) library. Please see it's official +guide if you're not familiar with it first. + +```scala +// skipping imports and common things, they are the same as in general example above + +val consumerConfig = StreamingConsumerConfig( // notice: StreamingConsumerConfig vs. ConsumerConfig + name = "MyConsumer", + queueName = "QueueWithMyEvents", + bindings = List( + AutoBindQueueConfig(exchange = AutoBindExchangeConfig(name = "OtherAppExchange"), routingKeys = List("TheEvent")) + ) + ) + +val processMyStream: fs2.Pipe[Task, StreamedDelivery[Task, Bytes], StreamedResult] = { in => + in.evalMap(delivery => delivery.handle(DeliveryResult.Ack)) // TODO you probably want to do some real stuff here + } + +val deliveryStream: Resource[Task, fs2.Stream[Task, StreamedResult]] = for { + connection <- RabbitMQConnection.make[Task](connectionConfig, blockingExecutor, Some(sslContext)) + streamingConsumer <- connection.newStreamingConsumer[Bytes](consumerConfig, monitor) + } yield { + val stream: fs2.Stream[Task, StreamedResult] = streamingConsumer.deliveryStream.through(processMyStream) + + // create resilient (self-restarting) stream; see more information below + val resilientStream: fs2.Stream[Task, StreamedResult] = stream.handleErrorWith { _ => + // TODO log the error - something is going wrong! + stream + } + + resilientStream + } +``` + +While you should never ever let the stream fail (handle all your possible errors; see [Error handling](https://fs2.io/guide.html#error-handling) +section in official docs how the stream can be failed), it's important you're able to recover the stream when it accidentally happens. +You can do that by simply _requesting_ a new stream from the client: + +```scala +val stream = streamingConsumer + .deliveryStream // get stream from client + .through(processMyStream) // "run" the stream through your processing logic + +val resilientStream = stream.handleErrorWith { _ => // handle the error in stream: recover by calling itself + // TODO don't forget to add some logging/metrics here! + stream +} +``` + +Please refer to the [official guide](https://fs2.io/guide.html#overview) for understanding more deeply how the recovery of `fs2.Stream` works. + #### Providing converters for producer/consumer Both the producer and consumer require type argument when creating from _connection_: