Skip to content

Latest commit

 

History

History
118 lines (76 loc) · 4.85 KB

spark-sql-streaming-KafkaRelation.adoc

File metadata and controls

118 lines (76 loc) · 4.85 KB

KafkaRelation

KafkaRelation represents a collection of rows with a predefined schema (BaseRelation) that supports column pruning (TableScan).

Tip
Read up on BaseRelation and TableScan in The Internals of Spark SQL book.

KafkaRelation is created exclusively when KafkaSourceProvider is requested to create a BaseRelation.

Table 1. KafkaRelation’s Options
Name Description

kafkaConsumer.pollTimeoutMs

Default: spark.network.timeout configuration if set or 120s

Tip

Enable INFO or DEBUG logging levels for org.apache.spark.sql.kafka010.KafkaRelation to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.kafka010.KafkaRelation=DEBUG

Refer to Logging.

Creating KafkaRelation Instance

KafkaRelation takes the following when created:

getPartitionOffsets Internal Method

getPartitionOffsets(
  kafkaReader: KafkaOffsetReader,
  kafkaOffsets: KafkaOffsetRangeLimit): Map[TopicPartition, Long]
Caution
FIXME
Note
getPartitionOffsets is used exclusively when KafkaRelation builds RDD of rows (from the tuples).

Building Distributed Data Scan with Column Pruning — buildScan Method

buildScan(): RDD[Row]
Note
buildScan is part of the TableScan contract to build a distributed data scan with column pruning.

buildScan generates a unique group ID of the format spark-kafka-relation-[randomUUID] (to make sure that a streaming query creates a new consumer group).

buildScan creates a KafkaOffsetReader with the following:

buildScan uses the KafkaOffsetReader to getPartitionOffsets for the starting and ending offsets (based on the given KafkaOffsetRangeLimit and the KafkaOffsetRangeLimit, respectively). buildScan requests the KafkaOffsetReader to close afterwards.

buildScan creates offset ranges (that are a collection of KafkaSourceRDDOffsetRanges with a Kafka TopicPartition, beginning and ending offsets and undefined preferred location).

buildScan prints out the following INFO message to the logs:

Generating RDD of offset ranges: [offsetRanges]

buildScan creates a KafkaSourceRDD with the following:

buildScan requests the KafkaSourceRDD to map Kafka ConsumerRecords to InternalRows.

In the end, buildScan requests the SQLContext to create a DataFrame (with the name kafka and the predefined schema) that is immediately converted to a RDD[InternalRow].

buildScan throws a IllegalStateException when…​FIXME

different topic partitions for starting offsets topics[[fromTopics]] and ending offsets topics[[untilTopics]]

buildScan throws a IllegalStateException when…​FIXME

[tp] doesn't have a from offset