Skip to content

Latest commit

 

History

History
162 lines (118 loc) · 7.33 KB

spark-sql-streaming-EventTimeWatermarkExec.adoc

File metadata and controls

162 lines (118 loc) · 7.33 KB

EventTimeWatermarkExec Unary Physical Operator — Monitoring Event-Time Watermark

EventTimeWatermarkExec is a unary physical operator that simply updates the EventTimeStatsAccum internal accumulator with the values of the event-time watermark column.

Note

A unary physical operator (UnaryExecNode) is a physical operator with a single child physical operator.

Read up on UnaryExecNode (and physical operators in general) in The Internals of Spark SQL book.

EventTimeWatermarkExec uses EventTimeStatsAccum internal accumulator as a way to send the statistics (the maximum, minimum, average and update count) of the values in the event-time watermark column that is used in:

EventTimeWatermarkExec is created for every EventTimeWatermark logical operator in a logical query plan when StatefulAggregationStrategy execution planning strategy is requested to plan a logical plan for execution.

Note
EventTimeWatermark logical operator represents Dataset.withWatermark operator in a logical query plan.

EventTimeWatermarkExec takes the following to be created:

  • Event-time watermark column

  • Delay interval (CalendarInterval)

  • Child physical operator (SparkPlan)

While created, EventTimeWatermarkExec registers the EventTimeStatsAccum internal accumulator (with the current SparkContext).

val rates = spark
  .readStream
  .format("rate")
  .load
  .withWatermark(eventTime = "timestamp", delayThreshold = "10 seconds") // <-- creates EventTimeWatermark logical operator

// EventTimeWatermark logical operator is planned as EventTimeWatermarkExec physical operator
// Note that as a physical operator EventTimeWatermarkExec shows itself without the Exec suffix
scala> rates.explain
== Physical Plan ==
EventTimeWatermark timestamp#0: timestamp, interval 10 seconds
+- StreamingRelation rate, [timestamp#0, value#1L]

// Let's build a complete streaming pipeline
// Connecting the input (source) to the output (sink)
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val queryName = "rates-to-console"
val sq = rates
  .writeStream
  .format("console")
  .option("numRows", 1) // just the last row with the timestamp
  .option("truncate", false)
  .trigger(Trigger.ProcessingTime(15.seconds))
  .queryName(queryName)
  .start

// Let's access the underlying stream execution engine
// It should be MicroBatchExecution since we use ProcessingTime, shouldn't it?
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val se = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery
scala> :type se
org.apache.spark.sql.execution.streaming.StreamExecution

import org.apache.spark.sql.execution.streaming.MicroBatchExecution
val engine = se.asInstanceOf[MicroBatchExecution]
scala> :type engine
org.apache.spark.sql.execution.streaming.MicroBatchExecution

val lastMicroBatch = engine.lastExecution
scala> :type lastMicroBatch
org.apache.spark.sql.execution.streaming.IncrementalExecution

// Access executedPlan that is the optimized physical query plan ready for execution
// All streaming optimizations have been applied at this point
// We just need the EventTimeWatermarkExec physical operator
val plan = lastMicroBatch.executedPlan
scala> :type plan
org.apache.spark.sql.execution.SparkPlan

// Let's find the one and only EventTimeWatermarkExec operator in the plan
import org.apache.spark.sql.execution.streaming.EventTimeWatermarkExec
val watermarkOp = plan.collect { case op: EventTimeWatermarkExec => op }.head

// Among the properties are delay interval (in millis)
// We used withWatermark(..., delayThreshold = "10 seconds")
assert(watermarkOp.delayMs == 10000)

// Let's check out the event-time watermark stats
// They correspond to the concrete EventTimeWatermarkExec operator for a micro-batch
val stats = watermarkOp.eventTimeStats.value
scala> :type stats
org.apache.spark.sql.execution.streaming.EventTimeStats

scala> println(stats)
EventTimeStats(1560159493350,1560159479350,1.56015948635E12,15)

// Let's compare it to the latest lastExecution
val lastMicroBatch = engine.lastExecution
val plan = lastMicroBatch.executedPlan
import org.apache.spark.sql.execution.streaming.EventTimeWatermarkExec
val watermarkOp = plan.collect { case op: EventTimeWatermarkExec => op }.head
val stats = watermarkOp.eventTimeStats.value
scala> println(stats)
EventTimeStats(1560160063350,1560160049350,1.5601600563500002E12,15)

// The stats are indeed different (on the second and third places)

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

doExecute(): RDD[InternalRow]
Note
doExecute is part of SparkPlan Contract to generate the runtime representation of an physical operator as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

Internally, doExecute executes child physical operator and maps over the partitions (using RDD.mapPartitions) that does the following:

  1. Creates an unsafe projection for eventTime in the output schema of child physical operator.

  2. For every row (as InternalRow)

Output Schema — output Property

output: Seq[Attribute]
Note
output is part of the QueryPlan Contract to describe the attributes of the output (aka schema).

output…​FIXME

Internal Properties

Name Description

delayMs

delay interval in milliseconds

Used when:

eventTimeStats

EventTimeStatsAccum accumulator to accumulate eventTime values from every row in a streaming batch (when EventTimeWatermarkExec is executed).

Note
EventTimeStatsAccum is a Spark accumulator of EventTimeStats from Longs (i.e. AccumulatorV2[Long, EventTimeStats]).
Note
Every Spark accumulator has to be registered before use, and eventTimeStats is registered when EventTimeWatermarkExec is created.