Skip to content

Latest commit

 

History

History
90 lines (70 loc) · 4.02 KB

spark-sql-streaming-EventTimeWatermark.adoc

File metadata and controls

90 lines (70 loc) · 4.02 KB

EventTimeWatermark Unary Logical Operator

EventTimeWatermark is a unary logical operator (UnaryNode) that is created as the result of Dataset.withWatermark operator.

val rates = spark
  .readStream
  .format("rate")
  .load
  .withWatermark(eventTime = "timestamp", delayThreshold = "10 seconds") // <-- creates EventTimeWatermark logical operator
scala> rates.explain(extended = true)
== Parsed Logical Plan ==
'EventTimeWatermark 'timestamp, interval 10 seconds
+- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2219ca9f, rate, [timestamp#693, value#694L]

== Analyzed Logical Plan ==
timestamp: timestamp, value: bigint
EventTimeWatermark timestamp#693: timestamp, interval 10 seconds
+- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2219ca9f, rate, [timestamp#693, value#694L]

== Optimized Logical Plan ==
EventTimeWatermark timestamp#693: timestamp, interval 10 seconds
+- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2219ca9f, rate, [timestamp#693, value#694L]

== Physical Plan ==
EventTimeWatermark timestamp#693: timestamp, interval 10 seconds
+- StreamingRelation rate, [timestamp#693, value#694L]

EventTimeWatermark uses spark.watermarkDelayMs key (in the Metadata of the output attributes) to hold the event-time watermark delay (as a so-called watermark attribute or eventTime watermark).

Note
The event-time watermark delay is used to calculate the difference between the event time of an event (that is modeled as a row in the Dataset for a streaming batch) and the time in the past.
Note

EliminateEventTimeWatermark logical optimization rule (i.e. Rule[LogicalPlan]) removes EventTimeWatermark logical operator from a logical plan if the child logical operator is not streaming, i.e. when Dataset.withWatermark operator is used on a batch query.

val logs = spark.
  read. // <-- batch non-streaming query that makes `EliminateEventTimeWatermark` rule applicable
  format("text").
  load("logs")

// logs is a batch Dataset
assert(!logs.isStreaming)

val q = logs.
  withWatermark(eventTime = "timestamp", delayThreshold = "30 seconds") // <-- creates EventTimeWatermark
scala> println(q.queryExecution.logical.numberedTreeString) // <-- no EventTimeWatermark as it was removed immediately
00 Relation[value#0] text

EventTimeWatermark is converted (aka planned) to EventTimeWatermarkExec physical operator in StatefulAggregationStrategy execution planning strategy.

EventTimeWatermark takes the following to be created:

  • Name of the column with event-time watermarks

  • Delay (CalendarInterval)

  • Child logical operator (LogicalPlan)

Output Schema — output Property

output: Seq[Attribute]
Note
output is part of the QueryPlan Contract to describe the attributes of (the schema of) the output.

output finds eventTime column in the output schema of the child logical operator and updates the Metadata of the column with spark.watermarkDelayMs key and the milliseconds for the delay.

output removes spark.watermarkDelayMs key from the other columns.

// See q created above
// FIXME How to access/show the eventTime column with the metadata updated to include spark.watermarkDelayMs?
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
val etw = q.queryExecution.logical.asInstanceOf[EventTimeWatermark]
scala> etw.output.toStructType.printTreeString
root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true)