Skip to content

Commit

Permalink
Merge pull request #40 from avast/StreamingSupport
Browse files Browse the repository at this point in the history
Streaming support
  • Loading branch information
jendakol authored Feb 11, 2020
2 parents 6a80cd1 + b70a955 commit 0f3d0c2
Show file tree
Hide file tree
Showing 21 changed files with 935 additions and 172 deletions.
85 changes: 72 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ There is a [migration guide](Migration-6_1-8.md) between versions 6.1.x and 8.0.

## Usage

The Scala API is _finally tagless_ (read more e.g. [here](https://www.beyondthelines.net/programming/introduction-to-tagless-final/)) with
The API is _finally tagless_ (read more e.g. [here](https://www.beyondthelines.net/programming/introduction-to-tagless-final/)) with
[`cats.effect.Resource`](https://typelevel.org/cats-effect/datatypes/resource.html) which is convenient way how to
[manage resources in your app](https://typelevel.org/cats-effect/tutorial/tutorial.html#acquiring-and-releasing-resources).
[manage resources in your app](https://typelevel.org/cats-effect/tutorial/tutorial.html#acquiring-and-releasing-resources). In addition,
there is a support for [streaming](#streaming-support) with [`fs2.Stream`](https://fs2.io/).

The Scala API uses types-conversions for both consumer and producer, that means you don't have to work directly with `Bytes` (however you
still can, if you want) and you touch only your business class which is then (de)serialized using provided converter.
The API uses conversions for both consumer and producer, that means you don't have to work directly with `Bytes` (however you
still can if you want to) and you touch only your business model class which is then (de)serialized using provided converter.

The library uses two types of executors - one is for blocking (IO) operations and the second for callbacks. You _have to_ provide both of them:
1. Blocking executor as `ExecutorService`
Expand All @@ -53,6 +54,7 @@ The default way is to configure the client with manually provided case classes;
HOCON (Lightbend Config).

This is somewhat minimal setup, using [Monix](https://monix.io/) `Task`:

```scala
import java.util.concurrent.ExecutorService

Expand All @@ -65,6 +67,11 @@ import javax.net.ssl.SSLContext
import monix.eval._
import monix.execution.Scheduler

implicit val sch: Scheduler = ???
val monitor: Monitor = ???

val blockingExecutor: ExecutorService = ???

val sslContext = SSLContext.getDefault

val connectionConfig = RabbitMQConnectionConfig(
Expand All @@ -87,21 +94,16 @@ val producerConfig = ProducerConfig(
exchange = "MyGreatApp"
)

implicit val sch: Scheduler = ???
val monitor: Monitor = ???

val blockingExecutor: ExecutorService = ???

// see https://typelevel.org/cats-effect/tutorial/tutorial.html#acquiring-and-releasing-resources

val rabbitMQProducer: Resource[Task, RabbitMQProducer[Task, Bytes]] = {
for {
connection <- RabbitMQConnection.make[Task](connectionConfig, blockingExecutor, Some(sslContext))
/*
Here you have created the connection; it's shared for all producers/consumers amongst one RabbitMQ server - they will share a single
TCP connection but have separated channels.
If you expect very high load, you can use separate connections for each producer/consumer, however it's usually not needed.
*/
Here you have created the connection; it's shared for all producers/consumers amongst one RabbitMQ server - they will share a single
TCP connection but have separated channels.
If you expect very high load, you can use separate connections for each producer/consumer, however it's usually not needed.
*/

consumer <- connection.newConsumer[Bytes](consumerConfig, monitor) {
case delivery: Delivery.Ok[Bytes] =>
Expand All @@ -118,6 +120,63 @@ val rabbitMQProducer: Resource[Task, RabbitMQProducer[Task, Bytes]] = {
}
```

#### Streaming support

It seems quite natural to process RabbitMQ queue with a streaming app.
[`StreamingRabbitMQConsumer`](core/src/main/scala/com/avast/clients/rabbitmq/StreamingRabbitMQConsumer.scala) provides you an
[`fs2.Stream`](https://fs2.io/) through which you can easily process incoming messages in a streaming way.

Notice: Using this functionality requires you to know some basics of [FS2](https://fs2.io/guide.html#overview) library. Please see it's official
guide if you're not familiar with it first.

```scala
// skipping imports and common things, they are the same as in general example above

val consumerConfig = StreamingConsumerConfig( // notice: StreamingConsumerConfig vs. ConsumerConfig
name = "MyConsumer",
queueName = "QueueWithMyEvents",
bindings = List(
AutoBindQueueConfig(exchange = AutoBindExchangeConfig(name = "OtherAppExchange"), routingKeys = List("TheEvent"))
)
)

val processMyStream: fs2.Pipe[Task, StreamedDelivery[Task, Bytes], StreamedResult] = { in =>
in.evalMap(delivery => delivery.handle(DeliveryResult.Ack)) // TODO you probably want to do some real stuff here
}

val deliveryStream: Resource[Task, fs2.Stream[Task, StreamedResult]] = for {
connection <- RabbitMQConnection.make[Task](connectionConfig, blockingExecutor, Some(sslContext))
streamingConsumer <- connection.newStreamingConsumer[Bytes](consumerConfig, monitor)
} yield {
val stream: fs2.Stream[Task, StreamedResult] = streamingConsumer.deliveryStream.through(processMyStream)

// create resilient (self-restarting) stream; see more information below
val resilientStream: fs2.Stream[Task, StreamedResult] = stream.handleErrorWith { _ =>
// TODO log the error - something is going wrong!
stream
}

resilientStream
}
```

While you should never ever let the stream fail (handle all your possible errors; see [Error handling](https://fs2.io/guide.html#error-handling)
section in official docs how the stream can be failed), it's important you're able to recover the stream when it accidentally happens.
You can do that by simply _requesting_ a new stream from the client:

```scala
val stream = streamingConsumer
.deliveryStream // get stream from client
.through(processMyStream) // "run" the stream through your processing logic

val resilientStream = stream.handleErrorWith { _ => // handle the error in stream: recover by calling itself
// TODO don't forget to add some logging/metrics here!
stream
}
```

Please refer to the [official guide](https://fs2.io/guide.html#overview) for understanding more deeply how the recovery of `fs2.Stream` works.

#### Providing converters for producer/consumer

Both the producer and consumer require type argument when creating from _connection_:
Expand Down
1 change: 1 addition & 0 deletions api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ archivesBaseName = "rabbitmq-client-api_$scalaVersion"

dependencies {
compile "com.avast.bytes:bytes-core:${bytesVersion}"
compile "co.fs2:fs2-core_$scalaVersion:$fs2Version"

compile "com.kailuowang:mainecoon-core_$scalaVersion:0.6.2"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.avast.clients.rabbitmq.api

import scala.language.higherKinds

trait RabbitMQStreamingConsumer[F[_], A] {
def deliveryStream: fs2.Stream[F, StreamedDelivery[F, A]]
}

trait StreamedDelivery[+F[_], +A] {
def delivery: Delivery[A]

def handle(result: DeliveryResult): F[StreamedResult]
}

sealed trait StreamedResult
object StreamedResult extends StreamedResult
9 changes: 8 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ subprojects {
scalaCompileOptions.additionalParameters = ['-target:jvm-1.8']
}

test {
testLogging {
showStandardStreams = true
}
}

sourceCompatibility = '1.8'
targetCompatibility = '1.8'

Expand All @@ -100,6 +106,7 @@ subprojects {
cactusVersion = "0.16.12"
catsVersion = "2.0.0"
catsEffectVersion = "2.0.0"
fs2Version = "2.2.1"
monixVersion = "3.0.0" // just for tests!
}

Expand All @@ -124,7 +131,7 @@ subprojects {
}

tasks.withType(ScalaCompile) {
List plugins = configurations.scalaCompilerPlugin.files.collect{ "-Xplugin:${it.getAbsolutePath()}".toString() }
List plugins = configurations.scalaCompilerPlugin.files.collect { "-Xplugin:${it.getAbsolutePath()}".toString() }
plugins.add("-Ypartial-unification")
scalaCompileOptions.additionalParameters = plugins
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package com.avast.clients.rabbitmq

import cats.effect.{Blocker, ContextShift, Effect, Sync}
import cats.effect.{Blocker, ContextShift, Sync}
import com.avast.clients.rabbitmq.DefaultRabbitMQConsumer._
import com.avast.clients.rabbitmq.api.DeliveryResult
import com.avast.metrics.scalaapi.Monitor
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client.ShutdownSignalException
import com.typesafe.scalalogging.StrictLogging

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -45,6 +46,7 @@ private[rabbitmq] trait ConsumerBase[F[_]] extends StrictLogging {
blocker.delay {
try {
logger.debug(s"[$name] ACK delivery ID $messageId, deliveryTag $deliveryTag")
if (!channel.isOpen) throw new IllegalStateException("Cannot ack delivery on closed channel")
channel.basicAck(deliveryTag, false)
resultAckMeter.mark()
} catch {
Expand All @@ -56,6 +58,7 @@ private[rabbitmq] trait ConsumerBase[F[_]] extends StrictLogging {
blocker.delay {
try {
logger.debug(s"[$name] REJECT delivery ID $messageId, deliveryTag $deliveryTag")
if (!channel.isOpen) throw new IllegalStateException("Cannot reject delivery on closed channel")
channel.basicReject(deliveryTag, false)
resultRejectMeter.mark()
} catch {
Expand All @@ -67,6 +70,7 @@ private[rabbitmq] trait ConsumerBase[F[_]] extends StrictLogging {
blocker.delay {
try {
logger.debug(s"[$name] REJECT (with requeue) delivery ID $messageId, deliveryTag $deliveryTag")
if (!channel.isOpen) throw new IllegalStateException("Cannot retry delivery on closed channel")
channel.basicReject(deliveryTag, true)
resultRetryMeter.mark()
} catch {
Expand All @@ -78,6 +82,7 @@ private[rabbitmq] trait ConsumerBase[F[_]] extends StrictLogging {
blocker.delay {
try {
logger.debug(s"[$name] Republishing delivery (ID $messageId, deliveryTag $deliveryTag) to end of queue '$queueName'")
if (!channel.isOpen) throw new IllegalStateException("Cannot republish delivery on closed channel")
channel.basicPublish("", queueName, properties, body)
channel.basicAck(deliveryTag, false)
resultRepublishMeter.mark()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package com.avast.clients.rabbitmq

import java.time.{Duration, Instant}
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.atomic.AtomicInteger

import cats.effect.{Effect, Sync}
import cats.syntax.all._
import com.avast.bytes.Bytes
import com.avast.clients.rabbitmq.JavaConverters._
import com.avast.clients.rabbitmq.api.{Delivery, DeliveryResult}
import com.avast.metrics.scalaapi._
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client.{DefaultConsumer, ShutdownSignalException}

import scala.collection.JavaConverters._
import scala.language.higherKinds
import scala.util.control.NonFatal

abstract class ConsumerWithCallbackBase[F[_]: Effect](channel: ServerChannel,
failureAction: DeliveryResult,
consumerListener: ConsumerListener)
extends DefaultConsumer(channel)
with ConsumerBase[F] {

override protected implicit val F: Sync[F] = Effect[F]

protected val readMeter: Meter = monitor.meter("read")

protected val processingFailedMeter: Meter = resultsMonitor.meter("processingFailed")

protected val tasksMonitor: Monitor = monitor.named("tasks")

protected val processingCount: AtomicInteger = new AtomicInteger(0)

tasksMonitor.gauge("processing")(() => processingCount.get())

protected val processedTimer: TimerPair = tasksMonitor.timerPair("processed")

override def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException): Unit =
consumerListener.onShutdown(this, channel, name, consumerTag, sig)

protected def handleDelivery(messageId: String, deliveryTag: Long, properties: BasicProperties, routingKey: String, body: Array[Byte])(
readAction: DeliveryReadAction[F, Bytes]): F[Unit] =
F.delay {
try {
readMeter.mark()

logger.debug(s"[$name] Read delivery with ID $messageId, deliveryTag $deliveryTag")

val delivery = Delivery(Bytes.copyFrom(body), properties.asScala, Option(routingKey).getOrElse(""))

logger.trace(s"[$name] Received delivery: $delivery")

val st = Instant.now()

@inline
def taskDuration: Duration = Duration.between(st, Instant.now())

readAction(delivery)
.flatMap { handleResult(messageId, deliveryTag, properties, routingKey, body) }
.flatTap(_ =>
F.delay {
val duration = taskDuration
logger.debug(s"[$name] Delivery ID $messageId handling succeeded in $duration")
processedTimer.update(duration)
})
.recoverWith {
case NonFatal(t) =>
F.delay {
val duration = taskDuration
logger.debug(s"[$name] Delivery ID $messageId handling failed in $duration", t)
processedTimer.updateFailure(duration)
} >>
handleCallbackFailure(messageId, deliveryTag, properties, routingKey, body)(t)
}
} catch {
// we catch this specific exception, handling of others is up to Lyra
case e: RejectedExecutionException =>
logger.debug(s"[$name] Executor was unable to plan the handling task", e)
handleFailure(messageId, deliveryTag, properties, routingKey, body, e)

case NonFatal(e) => handleCallbackFailure(messageId, deliveryTag, properties, routingKey, body)(e)
}
}.flatten

private def handleCallbackFailure(messageId: String,
deliveryTag: Long,
properties: BasicProperties,
routingKey: String,
body: Array[Byte])(t: Throwable): F[Unit] = {
F.delay {
logger.error(s"[$name] Error while executing callback, it's probably a BUG", t)
} >>
handleFailure(messageId, deliveryTag, properties, routingKey, body, t)
}

private def handleFailure(messageId: String,
deliveryTag: Long,
properties: BasicProperties,
routingKey: String,
body: Array[Byte],
t: Throwable): F[Unit] = {
F.delay {
processingCount.decrementAndGet()
processingFailedMeter.mark()
consumerListener.onError(this, name, channel, t)
} >>
executeFailureAction(messageId, deliveryTag, properties, routingKey, body)
}

private def executeFailureAction(messageId: String,
deliveryTag: Long,
properties: BasicProperties,
routingKey: String,
body: Array[Byte]): F[Unit] = {
handleResult(messageId, deliveryTag, properties, routingKey, body)(failureAction)
}
}
Loading

0 comments on commit 0f3d0c2

Please sign in to comment.