Skip to content

Latest commit

 

History

History
146 lines (91 loc) · 6.99 KB

spark-streaming-kafka.adoc

File metadata and controls

146 lines (91 loc) · 6.99 KB

Ingesting Data from Apache Kafka

Spark Streaming comes with two ways of ingesting data from Apache Kafka:

There is yet another "middle-ground" approach (so-called unofficial since it is not available by default in Spark Streaming):

  • …​

Data Ingestion with no Receivers

In this approach, with no receivers, you find two modes of ingesting data from Kafka:

  • Streaming mode using KafkaUtils.createDirectStream that creates an input stream that directly pulls messages from Kafka brokers (with no receivers). See Streaming mode section.

  • Non-streaming mode using KafkaUtils.createRDD that just creates a KafkaRDD of key-value pairs, i.e. RDD[(K, V)].

Streaming mode

You create DirectKafkaInputDStream using KafkaUtils.createDirectStream.

Note
Define the types of keys and values in KafkaUtils.createDirectStream, e.g. KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder], so proper decoders are used to decode messages from Kafka.

You have to specify metadata.broker.list or bootstrap.servers (in that order of precedence) for your Kafka environment. metadata.broker.list is a comma-separated list of Kafka’s (seed) brokers in the format of <host>:<port>.

Note
Kafka brokers have to be up and running before you can create a direct stream.
val conf = new SparkConf().setMaster("local[*]").setAppName("Ingesting Data from Kafka")
conf.set("spark.streaming.ui.retainedBatches", "5")

// Enable Back Pressure
conf.set("spark.streaming.backpressure.enabled", "true")

val ssc = new StreamingContext(conf, batchDuration = Seconds(5))

// Enable checkpointing
ssc.checkpoint("_checkpoint")

// You may or may not want to enable some additional DEBUG logging
import org.apache.log4j._
Logger.getLogger("org.apache.spark.streaming.dstream.DStream").setLevel(Level.DEBUG)
Logger.getLogger("org.apache.spark.streaming.dstream.WindowedDStream").setLevel(Level.DEBUG)
Logger.getLogger("org.apache.spark.streaming.DStreamGraph").setLevel(Level.DEBUG)
Logger.getLogger("org.apache.spark.streaming.scheduler.JobGenerator").setLevel(Level.DEBUG)

// Connect to Kafka
import org.apache.spark.streaming.kafka.KafkaUtils
import _root_.kafka.serializer.StringDecoder
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val kafkaTopics = Set("spark-topic")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, kafkaTopics)

// print 10 last messages
messages.print()

// start streaming computation
ssc.start

If zookeeper.connect or group.id parameters are not set, they are added with their values being empty strings.

In this mode, you will only see jobs submitted (in the Jobs tab in web UI) when a message comes in.

spark streaming kafka webui jobs
Figure 1. Complete Jobs in web UI for batch time 22:17:15

It corresponds to Input size larger than 0 in the Streaming tab in the web UI.

spark streaming kafka webui streaming
Figure 2. Completed Batch in web UI for batch time 22:17:15

Click the link in Completed Jobs for a batch and you see the details.

spark streaming kafka webui details batch
Figure 3. Details of batch in web UI for batch time 22:17:15

spark-streaming-kafka Library Dependency

The new API for both Kafka RDD and DStream is in the spark-streaming-kafka artifact. Add the following dependency to sbt project to use the streaming integration:

libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "2.0.0-SNAPSHOT"
Note
Replace "2.0.0-SNAPSHOT" with available version as found at The Central Repository’s search.

DirectKafkaInputDStream

DirectKafkaInputDStream is an input stream of KafkaRDD batches.

As an input stream, it implements the five mandatory abstract methods - three from DStream and two from InputDStream:

  • dependencies: List[DStream[_]] returns an empty collection, i.e. it has no dependencies on other streams (other than Kafka brokers to read data from).

  • slideDuration: Duration passes all calls on to DStreamGraph.batchDuration.

  • compute(validTime: Time): Option[RDD[T]] - consult Computing RDDs (using compute Method) section.

  • start() does nothing.

  • stop() does nothing.

The name of the input stream is Kafka direct stream [id]. You can find the name in the Streaming tab in web UI (in the details of a batch in Input Metadata section).

It uses spark.streaming.kafka.maxRetries setting while computing latestLeaderOffsets (i.e. a mapping of kafka.common.TopicAndPartition and LeaderOffset).

Computing RDDs (using compute Method)

DirectKafkaInputDStream.compute always computes a KafkaRDD instance (despite the DStream contract that says it may or may not generate one).

Every time the method is called, latestLeaderOffsets calculates the latest offsets (as Map[TopicAndPartition, LeaderOffset]).

Note
Every call to compute does call Kafka brokers for the offsets.

The moving parts of generated KafkaRDD instances are offsets. Others are taken directly from DirectKafkaInputDStream (given at the time of instantiation).

It then filters out empty offset ranges to build StreamInputInfo for InputInfoTracker.reportInfo.

It sets the just-calculated offsets as current (using currentOffsets) and returns a new KafkaRDD instance.

Back Pressure

Caution
FIXME

Back pressure for Direct Kafka input dstream can be configured using spark.streaming.backpressure.enabled setting.

Note
Back pressure is disabled by default.

Kafka Concepts

  • broker

  • leader

  • topic

  • partition

  • offset

  • exactly-once semantics

  • Kafka high-level consumer

LeaderOffset

LeaderOffset is an internal class to represent an offset on the topic partition on the broker that works on a host and a port.