ZIO Kafka provides a purely functional, streams-based interface to the Kafka client. It integrates effortlessly with ZIO and ZIO Streams.
Add the following dependencies to your build.sbt
file:
libraryDependencies ++= Seq(
"dev.zio" %% "zio-streams" % "1.0.0-RC17",
"dev.zio" %% "zio-kafka" % "<version>"
)
Somewhere in your application, configure the zio.kafka.ConsumerSettings
data type:
import zio._, zio.duration._
import zio.kafka.client._
val settings: ConsumerSettings =
ConsumerSettings(
bootstrapServers = List("localhost:9092"),
groupId = "group",
clientId = "client",
closeTimeout = 30.seconds,
extraDriverSettings = Map(),
pollInterval = 250.millis,
pollTimeout = 50.millis,
perPartitionChunkPrefetch = 2
)
For a lot of use cases where you just want to do something with all messages on a Kafka topic, ZIO Kafka provides the convenience method Consumer.consumeWith
. This method lets you execute a ZIO effect for each message. Topic partitions will be processed in parallel and offsets are committed after running the effect automatically.
import zio._
import zio.console._
import zio.kafka.client._
import zio.kafka.client.serde._
val subscription = Subscription.topics("topic")
Consumer.consumeWith(settings, subscription, Serde.string, Serde.string) { case (key, value) =>
putStrLn(s"Received message ${key}: ${value}")
// Perform an effect with the received message
}
If you require more control over the consumption process, read on!
First, create a consumer using the ConsumerSettings instance:
import zio.ZManaged, zio.blocking.Blocking, zio.clock.Clock
import zio.kafka.client.{ Consumer, ConsumerSettings }
val consumerSettings: ConsumerSettings = ???
val consumer: ZManaged[Clock with Blocking, Throwable, Consumer] =
Consumer.make(consumerSettings)
The consumer returned from Consumer.make
is wrapped in a ZManaged
to ensure its proper release. To get access to it, you can use the ZManaged#use
method
or compose other ZManaged
instances with it using a for-comprehension. Here's an example
using ZManaged#use
:
import zio._, zio.blocking.Blocking, zio.clock.Clock
import zio.kafka.client.Consumer
val consumer: ZManaged[Clock with Blocking, Throwable, Consumer] = ???
consumer.use { c =>
// Consumer now available as `c`
ZIO.unit
}
You may stream data from Kafka using the subscribe
and plainStream
methods. plainStream
takes as parameters deserializers for the key and values of the Kafka messages. Serializers and deserializers (Serdes) for common data types are available in the Deserializer
, Serializer
, and Serde
companion objects:
import zio.ZManaged, zio.blocking.Blocking, zio.clock.Clock, zio.console.putStrLn
import zio.stream._
import zio.kafka.client._
import zio.kafka.client.serde._
val consumer: ZManaged[Clock with Blocking, Throwable, Consumer] = ???
consumer.use { c =>
c.subscribeAnd(Subscription.topics("topic150"))
.plainStream(Serde.string, Serde.string)
.flattenChunks
.tap(cr => putStrLn(s"key: ${cr.record.key}, value: ${cr.record.value}"))
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.mapM(_.commit)
.runDrain
}
If you need to distinguish between the different partitions assigned
to the consumer, you may use the Consumer#partitionedStream
method,
which creates a nested stream of partitions:
import zio.ZManaged, zio.blocking.Blocking, zio.clock.Clock, zio.console.putStrLn
import zio.stream._
import zio.kafka.client._
import zio.kafka.client.serde._
val consumer: ZManaged[Clock with Blocking, Throwable, Consumer] = ???
consumer.use { c =>
c.subscribeAnd(Subscription.topics("topic150"))
.partitionedStream(Serde.string, Serde.string)
.tap(tpAndStr => putStrLn(s"topic: ${tpAndStr._1.topic}, partition: ${tpAndStr._1.partition}"))
.flatMap(_._2.flattenChunks)
.tap(cr => putStrLn(s"key: ${cr.record.key}, value: ${cr.record.value}"))
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.mapM(_.commit)
.runDrain
}
This example shows how to consume messages from topic topic_a
and produce transformed messages to topic_b
, after which consumer offsets are committed. Processing is done in chunks using ZStreamChunk
for more efficiency.
import zio.kafka.client._
import zio.kafka.client.serde._
import org.apache.kafka.clients.producer.ProducerRecord
val consumerSettings: ConsumerSettings = ???
val producerSettings: ProducerSettings = ???
(Consumer.make(consumerSettings) zip Producer.make(producerSettings, Serde.int, Serde.string)).use {
case (consumer, producer) =>
consumer
.subscribeAnd(Subscription.topics("my-input-topic"))
.plainStream(Serde.int, Serde.long)
.map { record =>
val key: Int = record.record.key()
val value: Long = record.record.value()
val newValue: String = value.toString
val producerRecord: ProducerRecord[Int, String] = new ProducerRecord("my-output-topic", key, newValue)
(producerRecord, record.offset)
}
.chunks
.mapM { chunk =>
val records = chunk.map(_._1)
val offsetBatch = OffsetBatch(chunk.map(_._2).toSeq)
producer.produceChunk(records) *> offsetBatch.commit
}
.runDrain
}
Serializers and deserializers (serdes) for custom data types can be constructed from scratch or by converting existing serdes. For example, to create a serde for an Instant
:
import java.time.Instant
import zio.kafka.client.serde._
val instantSerde: Serde[Any, Instant] = Serde.long.inmap(java.time.Instant.ofEpochMilli)(_.toEpochMilli)
The default behavior for a consumer stream when encountering a deserialization failure is to fail the stream. In many cases you may want to handle this situation differently, eg by skipping the message that failed to deserialize or by executing an alternative effect. For this purpose, any Deserializer[T]
for some type T
can be easily converted into a Deserializer[Try[T]]
where deserialization failures are converted to a Failure
using the asTry
method.
Below is an example of skipping messages that fail to deserialize. The offset is passed downstream to be committed.
import zio.ZManaged, zio.blocking.Blocking, zio.clock.Clock, zio.console.putStrLn
import zio.stream._
import zio.kafka.client._
import zio.kafka.client.serde._
import scala.util.{Success, Failure}
import zio._
val consumer: ZManaged[Clock with Blocking, Throwable, Consumer] = ???
consumer.use { c =>
val stream = c.subscribeAnd(Subscription.topics("topic150"))
.plainStream(Serde.string, Serde.string.asTry)
stream
.mapM { record =>
val tryValue: Try[String] = record.record.value()
val offset: Offset = record.offset
tryValue match {
case Success(value) =>
// Action for successful deserialization
someEffect(value).as(offset)
case Failure(exception) =>
// Possibly log the exception or take alternative action
ZIO.succeed(offset)
}
}
.flattenChunks
.aggregateAsync(Consumer.offsetBatches)
.mapM(_.commit)
.runDrain
}
Join us on the ZIO Discord server at the #zio-kafka
channel.
This library is heavily inspired and made possible by the research and implementation done in Alpakka Kafka, a library maintained by the Akka team and originally written as Reactive Kafka by SoftwareMill.
Copyright 2019 Itamar Ravid and the zio-kafka contributors. All rights reserved.