EventTimeWatermarkExec
is a unary physical operator that simply updates the EventTimeStatsAccum internal accumulator with the values of the event-time watermark column.
Note
|
A unary physical operator ( Read up on UnaryExecNode (and physical operators in general) in The Internals of Spark SQL book. |
EventTimeWatermarkExec
uses EventTimeStatsAccum internal accumulator as a way to send the statistics (the maximum, minimum, average and update count) of the values in the event-time watermark column that is used in:
-
ProgressReporter
for creating execution statistics for the most recent query execution (for you to monitormax
,min
,avg
, andwatermark
event-time watermark statistics) -
StreamExecution
to observe and possibly update event-time watermark when constructing the next streaming batch.
EventTimeWatermarkExec
is created for every EventTimeWatermark logical operator in a logical query plan when StatefulAggregationStrategy execution planning strategy is requested to plan a logical plan for execution.
Note
|
EventTimeWatermark logical operator represents Dataset.withWatermark operator in a logical query plan. |
EventTimeWatermarkExec
takes the following to be created:
While created, EventTimeWatermarkExec
registers the EventTimeStatsAccum internal accumulator (with the current SparkContext
).
val rates = spark
.readStream
.format("rate")
.load
.withWatermark(eventTime = "timestamp", delayThreshold = "10 seconds") // <-- creates EventTimeWatermark logical operator
// EventTimeWatermark logical operator is planned as EventTimeWatermarkExec physical operator
// Note that as a physical operator EventTimeWatermarkExec shows itself without the Exec suffix
scala> rates.explain
== Physical Plan ==
EventTimeWatermark timestamp#0: timestamp, interval 10 seconds
+- StreamingRelation rate, [timestamp#0, value#1L]
// Let's build a complete streaming pipeline
// Connecting the input (source) to the output (sink)
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val queryName = "rates-to-console"
val sq = rates
.writeStream
.format("console")
.option("numRows", 1) // just the last row with the timestamp
.option("truncate", false)
.trigger(Trigger.ProcessingTime(15.seconds))
.queryName(queryName)
.start
// Let's access the underlying stream execution engine
// It should be MicroBatchExecution since we use ProcessingTime, shouldn't it?
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val se = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery
scala> :type se
org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.streaming.MicroBatchExecution
val engine = se.asInstanceOf[MicroBatchExecution]
scala> :type engine
org.apache.spark.sql.execution.streaming.MicroBatchExecution
val lastMicroBatch = engine.lastExecution
scala> :type lastMicroBatch
org.apache.spark.sql.execution.streaming.IncrementalExecution
// Access executedPlan that is the optimized physical query plan ready for execution
// All streaming optimizations have been applied at this point
// We just need the EventTimeWatermarkExec physical operator
val plan = lastMicroBatch.executedPlan
scala> :type plan
org.apache.spark.sql.execution.SparkPlan
// Let's find the one and only EventTimeWatermarkExec operator in the plan
import org.apache.spark.sql.execution.streaming.EventTimeWatermarkExec
val watermarkOp = plan.collect { case op: EventTimeWatermarkExec => op }.head
// Among the properties are delay interval (in millis)
// We used withWatermark(..., delayThreshold = "10 seconds")
assert(watermarkOp.delayMs == 10000)
// Let's check out the event-time watermark stats
// They correspond to the concrete EventTimeWatermarkExec operator for a micro-batch
val stats = watermarkOp.eventTimeStats.value
scala> :type stats
org.apache.spark.sql.execution.streaming.EventTimeStats
scala> println(stats)
EventTimeStats(1560159493350,1560159479350,1.56015948635E12,15)
// Let's compare it to the latest lastExecution
val lastMicroBatch = engine.lastExecution
val plan = lastMicroBatch.executedPlan
import org.apache.spark.sql.execution.streaming.EventTimeWatermarkExec
val watermarkOp = plan.collect { case op: EventTimeWatermarkExec => op }.head
val stats = watermarkOp.eventTimeStats.value
scala> println(stats)
EventTimeStats(1560160063350,1560160049350,1.5601600563500002E12,15)
// The stats are indeed different (on the second and third places)
doExecute(): RDD[InternalRow]
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of an physical operator as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow] ).
|
Internally, doExecute
executes child physical operator and maps over the partitions (using RDD.mapPartitions
) that does the following:
-
Creates an unsafe projection for eventTime in the output schema of child physical operator.
-
For every row (as
InternalRow
)-
Adds eventTime to eventTimeStats accumulator
-
output: Seq[Attribute]
Note
|
output is part of the QueryPlan Contract to describe the attributes of the output (aka schema).
|
output
…FIXME
Name | Description | ||||
---|---|---|---|---|---|
|
delay interval in milliseconds Used when:
|
||||
|
EventTimeStatsAccum accumulator to accumulate eventTime values from every row in a streaming batch (when
|