Skip to content

Latest commit



146 lines (91 loc) · 6.99 KB


File metadata and controls

146 lines (91 loc) · 6.99 KB

Ingesting Data from Apache Kafka

Spark Streaming comes with two ways of ingesting data from Apache Kafka:

There is yet another "middle-ground" approach (so-called unofficial since it is not available by default in Spark Streaming):

  • …​

Data Ingestion with no Receivers

In this approach, with no receivers, you find two modes of ingesting data from Kafka:

  • Streaming mode using KafkaUtils.createDirectStream that creates an input stream that directly pulls messages from Kafka brokers (with no receivers). See Streaming mode section.

  • Non-streaming mode using KafkaUtils.createRDD that just creates a KafkaRDD of key-value pairs, i.e. RDD[(K, V)].

Streaming mode

You create DirectKafkaInputDStream using KafkaUtils.createDirectStream.

Define the types of keys and values in KafkaUtils.createDirectStream, e.g. KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder], so proper decoders are used to decode messages from Kafka.

You have to specify or bootstrap.servers (in that order of precedence) for your Kafka environment. is a comma-separated list of Kafka’s (seed) brokers in the format of <host>:<port>.

Kafka brokers have to be up and running before you can create a direct stream.
val conf = new SparkConf().setMaster("local[*]").setAppName("Ingesting Data from Kafka")
conf.set("spark.streaming.ui.retainedBatches", "5")

// Enable Back Pressure
conf.set("spark.streaming.backpressure.enabled", "true")

val ssc = new StreamingContext(conf, batchDuration = Seconds(5))

// Enable checkpointing

// You may or may not want to enable some additional DEBUG logging
import org.apache.log4j._

// Connect to Kafka
import org.apache.spark.streaming.kafka.KafkaUtils
import _root_.kafka.serializer.StringDecoder
val kafkaParams = Map("" -> "localhost:9092")
val kafkaTopics = Set("spark-topic")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, kafkaTopics)

// print 10 last messages

// start streaming computation

If zookeeper.connect or parameters are not set, they are added with their values being empty strings.

In this mode, you will only see jobs submitted (in the Jobs tab in web UI) when a message comes in.

spark streaming kafka webui jobs
Figure 1. Complete Jobs in web UI for batch time 22:17:15

It corresponds to Input size larger than 0 in the Streaming tab in the web UI.

spark streaming kafka webui streaming
Figure 2. Completed Batch in web UI for batch time 22:17:15

Click the link in Completed Jobs for a batch and you see the details.

spark streaming kafka webui details batch
Figure 3. Details of batch in web UI for batch time 22:17:15

spark-streaming-kafka Library Dependency

The new API for both Kafka RDD and DStream is in the spark-streaming-kafka artifact. Add the following dependency to sbt project to use the streaming integration:

libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "2.0.0-SNAPSHOT"
Replace "2.0.0-SNAPSHOT" with available version as found at The Central Repository’s search.


DirectKafkaInputDStream is an input stream of KafkaRDD batches.

As an input stream, it implements the five mandatory abstract methods - three from DStream and two from InputDStream:

  • dependencies: List[DStream[_]] returns an empty collection, i.e. it has no dependencies on other streams (other than Kafka brokers to read data from).

  • slideDuration: Duration passes all calls on to DStreamGraph.batchDuration.

  • compute(validTime: Time): Option[RDD[T]] - consult Computing RDDs (using compute Method) section.

  • start() does nothing.

  • stop() does nothing.

The name of the input stream is Kafka direct stream [id]. You can find the name in the Streaming tab in web UI (in the details of a batch in Input Metadata section).

It uses spark.streaming.kafka.maxRetries setting while computing latestLeaderOffsets (i.e. a mapping of kafka.common.TopicAndPartition and LeaderOffset).

Computing RDDs (using compute Method)

DirectKafkaInputDStream.compute always computes a KafkaRDD instance (despite the DStream contract that says it may or may not generate one).

Every time the method is called, latestLeaderOffsets calculates the latest offsets (as Map[TopicAndPartition, LeaderOffset]).

Every call to compute does call Kafka brokers for the offsets.

The moving parts of generated KafkaRDD instances are offsets. Others are taken directly from DirectKafkaInputDStream (given at the time of instantiation).

It then filters out empty offset ranges to build StreamInputInfo for InputInfoTracker.reportInfo.

It sets the just-calculated offsets as current (using currentOffsets) and returns a new KafkaRDD instance.

Back Pressure


Back pressure for Direct Kafka input dstream can be configured using spark.streaming.backpressure.enabled setting.

Back pressure is disabled by default.

Kafka Concepts

  • broker

  • leader

  • topic

  • partition

  • offset

  • exactly-once semantics

  • Kafka high-level consumer


LeaderOffset is an internal class to represent an offset on the topic partition on the broker that works on a host and a port.