Spark Streaming comes with two ways of ingesting data from Apache Kafka:
-
Using receivers
There is yet another "middle-ground" approach (so-called unofficial since it is not available by default in Spark Streaming):
-
…
In this approach, with no receivers, you find two modes of ingesting data from Kafka:
-
Streaming mode using
KafkaUtils.createDirectStream
that creates an input stream that directly pulls messages from Kafka brokers (with no receivers). See Streaming mode section. -
Non-streaming mode using
KafkaUtils.createRDD
that just creates a KafkaRDD of key-value pairs, i.e.RDD[(K, V)]
.
You create DirectKafkaInputDStream using KafkaUtils.createDirectStream
.
Note
|
Define the types of keys and values in KafkaUtils.createDirectStream , e.g. KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] , so proper decoders are used to decode messages from Kafka.
|
You have to specify metadata.broker.list
or bootstrap.servers
(in that order of precedence) for your Kafka environment. metadata.broker.list
is a comma-separated list of Kafka’s (seed) brokers in the format of <host>:<port>
.
Note
|
Kafka brokers have to be up and running before you can create a direct stream. |
val conf = new SparkConf().setMaster("local[*]").setAppName("Ingesting Data from Kafka")
conf.set("spark.streaming.ui.retainedBatches", "5")
// Enable Back Pressure
conf.set("spark.streaming.backpressure.enabled", "true")
val ssc = new StreamingContext(conf, batchDuration = Seconds(5))
// Enable checkpointing
ssc.checkpoint("_checkpoint")
// You may or may not want to enable some additional DEBUG logging
import org.apache.log4j._
Logger.getLogger("org.apache.spark.streaming.dstream.DStream").setLevel(Level.DEBUG)
Logger.getLogger("org.apache.spark.streaming.dstream.WindowedDStream").setLevel(Level.DEBUG)
Logger.getLogger("org.apache.spark.streaming.DStreamGraph").setLevel(Level.DEBUG)
Logger.getLogger("org.apache.spark.streaming.scheduler.JobGenerator").setLevel(Level.DEBUG)
// Connect to Kafka
import org.apache.spark.streaming.kafka.KafkaUtils
import _root_.kafka.serializer.StringDecoder
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val kafkaTopics = Set("spark-topic")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, kafkaTopics)
// print 10 last messages
messages.print()
// start streaming computation
ssc.start
If zookeeper.connect
or group.id
parameters are not set, they are added with their values being empty strings.
In this mode, you will only see jobs submitted (in the Jobs tab in web UI) when a message comes in.
It corresponds to Input size larger than 0
in the Streaming tab in the web UI.
Click the link in Completed Jobs for a batch and you see the details.
The new API for both Kafka RDD and DStream is in the spark-streaming-kafka
artifact. Add the following dependency to sbt project to use the streaming integration:
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "2.0.0-SNAPSHOT"
Note
|
Replace "2.0.0-SNAPSHOT" with available version as found at The Central Repository’s search.
|
DirectKafkaInputDStream
is an input stream of KafkaRDD batches.
As an input stream, it implements the five mandatory abstract methods - three from DStream
and two from InputDStream
:
-
dependencies: List[DStream[_]]
returns an empty collection, i.e. it has no dependencies on other streams (other than Kafka brokers to read data from). -
slideDuration: Duration
passes all calls on to DStreamGraph.batchDuration. -
compute(validTime: Time): Option[RDD[T]]
- consult Computing RDDs (using compute Method) section. -
start()
does nothing. -
stop()
does nothing.
The name
of the input stream is Kafka direct stream [id]. You can find the name in the Streaming tab in web UI (in the details of a batch in Input Metadata section).
It uses spark.streaming.kafka.maxRetries setting while computing latestLeaderOffsets
(i.e. a mapping of kafka.common.TopicAndPartition
and LeaderOffset).
DirectKafkaInputDStream.compute
always computes a KafkaRDD instance (despite the DStream contract that says it may or may not generate one).
Every time the method is called, latestLeaderOffsets
calculates the latest offsets (as Map[TopicAndPartition, LeaderOffset]
).
Note
|
Every call to compute does call Kafka brokers for the offsets.
|
The moving parts of generated KafkaRDD
instances are offsets. Others are taken directly from DirectKafkaInputDStream
(given at the time of instantiation).
It then filters out empty offset ranges to build StreamInputInfo
for InputInfoTracker.reportInfo.
It sets the just-calculated offsets as current (using currentOffsets
) and returns a new KafkaRDD instance.
Caution
|
FIXME |
Back pressure for Direct Kafka input dstream can be configured using spark.streaming.backpressure.enabled setting.
Note
|
Back pressure is disabled by default. |
-
broker
-
leader
-
topic
-
partition
-
offset
-
exactly-once semantics
-
Kafka high-level consumer
LeaderOffset
is an internal class to represent an offset on the topic partition on the broker that works on a host and a port.