Skip to content

Latest commit

 

History

History
223 lines (177 loc) · 5.8 KB

spark-sql-streaming-KeyValueGroupedDataset.adoc

File metadata and controls

223 lines (177 loc) · 5.8 KB

KeyValueGroupedDataset — Streaming Aggregation

KeyValueGroupedDataset represents a grouped dataset as a result of Dataset.groupByKey operator (that aggregates records by a grouping function).

// Dataset[T]
groupByKey(func: T => K): KeyValueGroupedDataset[K, T]
import java.sql.Timestamp
val numGroups = spark.
  readStream.
  format("rate").
  load.
  as[(Timestamp, Long)].
  groupByKey { case (time, value) => value % 2 }

scala> :type numGroups
org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)]

KeyValueGroupedDataset is also created for KeyValueGroupedDataset.keyAs and KeyValueGroupedDataset.mapValues operators.

scala> :type numGroups
org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)]

scala> :type numGroups.keyAs[String]
org.apache.spark.sql.KeyValueGroupedDataset[String,(java.sql.Timestamp, Long)]
scala> :type numGroups
org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)]

val mapped = numGroups.mapValues { case (ts, n) => s"($ts, $n)" }
scala> :type mapped
org.apache.spark.sql.KeyValueGroupedDataset[Long,String]

KeyValueGroupedDataset works for batch and streaming aggregations, but shines the most when used for streaming aggregation (with streaming Datasets).

scala> :type numGroups
org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)]

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
numGroups.
  mapGroups { case(group, values) => values.size }.
  writeStream.
  format("console").
  trigger(Trigger.ProcessingTime(10.seconds)).
  start

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+
|value|
+-----+
+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+
|value|
+-----+
|    3|
|    2|
+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+
|value|
+-----+
|    5|
|    5|
+-----+

// Eventually...
spark.streams.active.foreach(_.stop)

The most prestigious use case of KeyValueGroupedDataset however is stateful streaming aggregation that allows for accumulating streaming state (by means of GroupState) using mapGroupsWithState and the more advanced flatMapGroupsWithState operators.

Table 1. KeyValueGroupedDataset’s Operators
Operator Description

agg

agg[U1](col1: TypedColumn[V, U1]): Dataset[(K, U1)]
agg[U1, U2](
  col1: TypedColumn[V, U1],
  col2: TypedColumn[V, U2]): Dataset[(K, U1, U2)]
agg[U1, U2, U3](
  col1: TypedColumn[V, U1],
  col2: TypedColumn[V, U2],
  col3: TypedColumn[V, U3]): Dataset[(K, U1, U2, U3)]
agg[U1, U2, U3, U4](
  col1: TypedColumn[V, U1],
  col2: TypedColumn[V, U2],
  col3: TypedColumn[V, U3],
  col4: TypedColumn[V, U4]): Dataset[(K, U1, U2, U3, U4)]

cogroup

cogroup[U, R : Encoder](
  other: KeyValueGroupedDataset[K, U])(
  f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R]): Dataset[R]

count

count(): Dataset[(K, Long)]

flatMapGroups

flatMapGroups[U : Encoder](f: (K, Iterator[V]) => TraversableOnce[U]): Dataset[U]

flatMapGroupsWithState

flatMapGroupsWithState[S: Encoder, U: Encoder](
  outputMode: OutputMode,
  timeoutConf: GroupStateTimeout)(
  func: (K, Iterator[V], GroupState[S]) => Iterator[U]): Dataset[U]

Creates a new Dataset with FlatMapGroupsWithState logical operator

Note
The difference between this flatMapGroupsWithState and mapGroupsWithState operators is the state function that generates zero or more elements (that are in turn the rows in the result streaming Dataset).

keyAs

keys: Dataset[K]
keyAs[L : Encoder]: KeyValueGroupedDataset[L, V]

mapGroups

mapGroups[U : Encoder](f: (K, Iterator[V]) => U): Dataset[U]

mapGroupsWithState

mapGroupsWithState[S: Encoder, U: Encoder](
  func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
mapGroupsWithState[S: Encoder, U: Encoder](
  timeoutConf: GroupStateTimeout)(
  func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

Creates a new Dataset with FlatMapGroupsWithState logical operator

Note
The difference between mapGroupsWithState and flatMapGroupsWithState is the state function that generates exactly one element (that is in turn the row in the result Dataset).

mapValues

mapValues[W : Encoder](func: V => W): KeyValueGroupedDataset[K, W]

reduceGroups

reduceGroups(f: (V, V) => V): Dataset[(K, V)]

Creating KeyValueGroupedDataset Instance

KeyValueGroupedDataset takes the following when created:

  • Encoder for keys

  • Encoder for values

  • QueryExecution

  • Data attributes

  • Grouping attributes