KeyValueGroupedDataset
represents a grouped dataset as a result of Dataset.groupByKey operator (that aggregates records by a grouping function).
// Dataset[T]
groupByKey(func: T => K): KeyValueGroupedDataset[K, T]
import java.sql.Timestamp
val numGroups = spark.
readStream.
format("rate").
load.
as[(Timestamp, Long)].
groupByKey { case (time, value) => value % 2 }
scala> :type numGroups
org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)]
KeyValueGroupedDataset
is also created for KeyValueGroupedDataset.keyAs and KeyValueGroupedDataset.mapValues operators.
scala> :type numGroups
org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)]
scala> :type numGroups.keyAs[String]
org.apache.spark.sql.KeyValueGroupedDataset[String,(java.sql.Timestamp, Long)]
scala> :type numGroups
org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)]
val mapped = numGroups.mapValues { case (ts, n) => s"($ts, $n)" }
scala> :type mapped
org.apache.spark.sql.KeyValueGroupedDataset[Long,String]
KeyValueGroupedDataset
works for batch and streaming aggregations, but shines the most when used for streaming aggregation (with streaming Datasets).
scala> :type numGroups
org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)]
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
numGroups.
mapGroups { case(group, values) => values.size }.
writeStream.
format("console").
trigger(Trigger.ProcessingTime(10.seconds)).
start
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+
|value|
+-----+
+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+-----+
|value|
+-----+
| 3|
| 2|
+-----+
-------------------------------------------
Batch: 2
-------------------------------------------
+-----+
|value|
+-----+
| 5|
| 5|
+-----+
// Eventually...
spark.streams.active.foreach(_.stop)
The most prestigious use case of KeyValueGroupedDataset
however is stateful streaming aggregation that allows for accumulating streaming state (by means of GroupState) using mapGroupsWithState and the more advanced flatMapGroupsWithState operators.
Operator | Description | ||
---|---|---|---|
|
agg[U1](col1: TypedColumn[V, U1]): Dataset[(K, U1)]
agg[U1, U2](
col1: TypedColumn[V, U1],
col2: TypedColumn[V, U2]): Dataset[(K, U1, U2)]
agg[U1, U2, U3](
col1: TypedColumn[V, U1],
col2: TypedColumn[V, U2],
col3: TypedColumn[V, U3]): Dataset[(K, U1, U2, U3)]
agg[U1, U2, U3, U4](
col1: TypedColumn[V, U1],
col2: TypedColumn[V, U2],
col3: TypedColumn[V, U3],
col4: TypedColumn[V, U4]): Dataset[(K, U1, U2, U3, U4)] |
||
|
cogroup[U, R : Encoder](
other: KeyValueGroupedDataset[K, U])(
f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R]): Dataset[R] |
||
|
count(): Dataset[(K, Long)] |
||
|
flatMapGroups[U : Encoder](f: (K, Iterator[V]) => TraversableOnce[U]): Dataset[U] |
||
flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout)(
func: (K, Iterator[V], GroupState[S]) => Iterator[U]): Dataset[U] Creates a new
|
|||
|
keys: Dataset[K]
keyAs[L : Encoder]: KeyValueGroupedDataset[L, V] |
||
|
mapGroups[U : Encoder](f: (K, Iterator[V]) => U): Dataset[U] |
||
mapGroupsWithState[S: Encoder, U: Encoder](
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout)(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] Creates a new
|
|||
|
mapValues[W : Encoder](func: V => W): KeyValueGroupedDataset[K, W] |
||
|
reduceGroups(f: (V, V) => V): Dataset[(K, V)] |