Dataset is the Spark SQL API for working with structured data, i.e. records with a known schema.
Datasets are "lazy" and structured query expressions are only triggered when an action is invoked. Internally, a Dataset
represents a logical plan that describes the computation query required to produce the data (for a given Spark SQL session).
A Dataset is a result of executing a query expression against data storage like files or databases. The structured query expression can be described by a SQL query or a Scala/Java lambda function.
If LogicalPlan is used to create a Dataset
, it is executed (using the current SessionState) to create a corresponding QueryExecution.
Dataset API comes with declarative and type-safe operators (that improves on the earlier experience of data processing using DataFrames that were a set of Rows).
Note
|
As of Spark 2.0.0, DataFrame - the flagship data abstraction of previous versions of Spark SQL - is currently a mere type alias for type DataFrame = Dataset[Row] See package object sql. |
Dataset
offers convenience of RDDs with the performance optimizations of DataFrames and the strong static type-safety of Scala. The last feature of bringing the strong type-safety to DataFrame makes Dataset so appealing. All the features together give you a more functional programming interface to work with structured data.
scala> spark.range(1).filter('id === 0).explain(true)
== Parsed Logical Plan ==
'Filter ('id = 0)
+- Range (0, 1, splits=8)
== Analyzed Logical Plan ==
id: bigint
Filter (id#51L = cast(0 as bigint))
+- Range (0, 1, splits=8)
== Optimized Logical Plan ==
Filter (id#51L = 0)
+- Range (0, 1, splits=8)
== Physical Plan ==
*Filter (id#51L = 0)
+- *Range (0, 1, splits=8)
scala> spark.range(1).filter(_ == 0).explain(true)
== Parsed Logical Plan ==
'TypedFilter <function1>, class java.lang.Long, [StructField(value,LongType,true)], unresolveddeserializer(newInstance(class java.lang.Long))
+- Range (0, 1, splits=8)
== Analyzed Logical Plan ==
id: bigint
TypedFilter <function1>, class java.lang.Long, [StructField(value,LongType,true)], newInstance(class java.lang.Long)
+- Range (0, 1, splits=8)
== Optimized Logical Plan ==
TypedFilter <function1>, class java.lang.Long, [StructField(value,LongType,true)], newInstance(class java.lang.Long)
+- Range (0, 1, splits=8)
== Physical Plan ==
*Filter <function1>.apply
+- *Range (0, 1, splits=8)
It is only with Datasets to have syntax and analysis checks at compile time (that was not possible using DataFrame, regular SQL queries or even RDDs).
Using Dataset
objects turns DataFrames
of Row instances into a DataFrames
of case classes with proper names and types (following their equivalents in the case classes). Instead of using indices to access respective fields in a DataFrame and cast it to a type, all this is automatically handled by Datasets and checked by the Scala compiler.
Datasets use Catalyst Query Optimizer and Tungsten to optimize query performance.
A Dataset
object requires a SQLContext, a QueryExecution, and an Encoder. In same cases, a Dataset
can also be seen as a pair of LogicalPlan in a given SQLContext.
Note
|
SQLContext and QueryExecution are transient and hence do not participate in Dataset serialization. The only firmly-tied feature of a Dataset is the Encoder.
|
A Dataset
is Queryable and Serializable
, i.e. can be saved to a persistent storage.
It also has a schema.
You can convert a type-safe Dataset to a "untyped" DataFrame (see Type Conversions to Dataset[T]) or access the RDD that sits underneath (see Converting Datasets into RDDs (using rdd method)). It is supposed to give you a more pleasant experience while transitioning from legacy RDD-based or DataFrame-based APIs.
The default storage level for Datasets
is MEMORY_AND_DISK because recomputing the in-memory columnar representation of the underlying table is expensive. See Persisting Dataset (persist method) in this document.
Spark 2.0 has introduced a new query model called Structured Streaming for continuous incremental execution of structured queries. That made possible to consider Datasets a static and bounded data as well as streaming and unbounded with one single API.
If LogicalPlan is used to create a Dataset
, it is executed (using the current SessionState) to create a corresponding QueryExecution.
Caution
|
FIXME |
Internally, show
relays to a private showString
to do the formatting. It turns the Dataset
into a DataFrame
(by calling toDF()
) and takes first n
records.
take(n: Int): Array[T]
take
is an action on a Dataset
that returns a collection of n
records.
Warning
|
take loads all the data into the memory of the Spark application’s driver process and for a large n could result in OutOfMemoryError .
|
foreachPartition(f: Iterator[T] => Unit): Unit
foreachPartition
applies the f
function to each partition of the Dataset
.
case class Record(id: Int, city: String)
val ds = Seq(Record(0, "Warsaw"), Record(1, "London")).toDS
ds.foreachPartition { iter: Iterator[Record] => iter.foreach(println) }
Note
|
foreachPartition is used to save a DataFrame to a JDBC table (indirectly through JdbcUtils.saveTable ) and ForeachSink.
|
mapPartitions[U: Encoder](func: Iterator[T] => Iterator[U]): Dataset[U]
mapPartitions
returns a new Dataset
(of type U
) with the function func
applied to each partition.
Caution
|
FIXME Example |
flatMap[U: Encoder](func: T => TraversableOnce[U]): Dataset[U]
flatMap
returns a new Dataset
(of type U
) with all records (of type T
) mapped over using the function func
and then flattening the results.
Note
|
flatMap can create new records. It deprecated explode .
|
final case class Sentence(id: Long, text: String)
val sentences = Seq(Sentence(0, "hello world"), Sentence(1, "witaj swiecie")).toDS
scala> sentences.flatMap(s => s.text.split("\\s+")).show
+-------+
| value|
+-------+
| hello|
| world|
| witaj|
|swiecie|
+-------+
Internally, flatMap
calls mapPartitions with the partitions flatMap(ped)
.
cache(): this.type
cache
merely passes the calls to no-argument persist method.
persist(): this.type
persist(newLevel: StorageLevel): this.type
persist
caches the Dataset
using the default storage level MEMORY_AND_DISK
or newLevel
.
Internally, it requests the CacheManager
to cache the query.
unpersist(blocking: Boolean): this.type
unpersist
uncache the Dataset
possibly by blocking
the call.
Internally, it requests the CacheManager
to uncache the query.
repartition(numPartitions: Int): Dataset[T]
repartition
repartition the Dataset
to exactly numPartitions
partitions.
The features of the Dataset API in Spark SQL:
-
Type-safety as Datasets are Scala domain objects and operations operate on their attributes. All is checked by the Scala compiler at compile time.
DatasetHolder
case class offers three methods that do the conversions from Seq[T]
or RDD[T]
type to Dataset[T]
:
-
toDS(): Dataset[T]
-
toDF(): DataFrame
-
toDF(colNames: String*): DataFrame
Note
|
DataFrame is a mere type alias for Dataset[Row] since Spark 2.0.0.
|
DatasetHolder
is used by SQLImplicits
that is available to use after importing implicits object of SQLContext.
scala> val ds = Seq("I am a shiny Dataset!").toDS
ds: org.apache.spark.sql.Dataset[String] = [value: string]
scala> val df = Seq("I am an old grumpy DataFrame!").toDF
df: org.apache.spark.sql.DataFrame = [value: string]
scala> val df = Seq("I am an old grumpy DataFrame!").toDF("text")
df: org.apache.spark.sql.DataFrame = [text: string]
scala> val ds = sc.parallelize(Seq("hello")).toDS
ds: org.apache.spark.sql.Dataset[String] = [value: string]
Note
|
This import is automatically executed in Spark Shell.
|
import spark.implicits._
case class Token(name: String, productId: Int, score: Double)
val data = Seq(
Token("aaa", 100, 0.12),
Token("aaa", 200, 0.29),
Token("bbb", 200, 0.53),
Token("bbb", 300, 0.42))
// Transform data to a Dataset[Token]
// It doesn't work with type annotation yet
// https://issues.apache.org/jira/browse/SPARK-13456
val ds: Dataset[Token] = data.toDS
// Transform data into a DataFrame with no explicit schema
val df = data.toDF
// Transform DataFrame into a Dataset
val ds = df.as[Token]
scala> ds.show
+----+---------+-----+
|name|productId|score|
+----+---------+-----+
| aaa| 100| 0.12|
| aaa| 200| 0.29|
| bbb| 200| 0.53|
| bbb| 300| 0.42|
+----+---------+-----+
scala> ds.printSchema
root
|-- name: string (nullable = true)
|-- productId: integer (nullable = false)
|-- score: double (nullable = false)
// In DataFrames we work with Row instances
scala> df.map(_.getClass.getName).show(false)
+--------------------------------------------------------------+
|value |
+--------------------------------------------------------------+
|org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema|
|org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema|
|org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema|
|org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema|
+--------------------------------------------------------------+
// In Datasets we work with case class instances
scala> ds.map(_.getClass.getName).show(false)
+---------------------------+
|value |
+---------------------------+
|$line40.$read$$iw$$iw$Token|
|$line40.$read$$iw$$iw$Token|
|$line40.$read$$iw$$iw$Token|
|$line40.$read$$iw$$iw$Token|
+---------------------------+
scala> ds.map(_.name).show
+-----+
|value|
+-----+
| aaa|
| aaa|
| bbb|
| bbb|
+-----+
Whenever you are in need to convert a Dataset
into a RDD
, executing rdd
method gives you a RDD of the proper input object type (not Row as in DataFrames).
scala> val rdd = tokens.rdd
rdd: org.apache.spark.rdd.RDD[Token] = MapPartitionsRDD[11] at rdd at <console>:30
A Dataset
has a schema.
schema: StructType
Tip
|
You may also use the following methods to learn about the schema:
|
toJSON
maps the content of Dataset
to a Dataset
of JSON strings.
Note
|
A new feature in Spark 2.0.0. |
scala> val ds = Seq("hello", "world", "foo bar").toDS
ds: org.apache.spark.sql.Dataset[String] = [value: string]
scala> ds.toJSON.show
+-------------------+
| value|
+-------------------+
| {"value":"hello"}|
| {"value":"world"}|
|{"value":"foo bar"}|
+-------------------+
explain(): Unit
explain(extended: Boolean): Unit
explain
prints the logical and physical plans to the console. You can use it for debugging.
Tip
|
If you are serious about query debugging you could also use the Debugging Query Execution facility. |
Internally, explain
executes a ExplainCommand
logical plan on the logical plan of the QueryExecution of the Dataset.
scala> spark.range(10).explain(extended = true)
== Parsed Logical Plan ==
Range (0, 10, splits=8)
== Analyzed Logical Plan ==
id: bigint
Range (0, 10, splits=8)
== Optimized Logical Plan ==
Range (0, 10, splits=8)
== Physical Plan ==
*Range (0, 10, splits=8)
select[U1: Encoder](c1: TypedColumn[T, U1]): Dataset[U1]
select[U1, U2](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2]): Dataset[(U1, U2)]
select[U1, U2, U3](
c1: TypedColumn[T, U1],
c2: TypedColumn[T, U2],
c3: TypedColumn[T, U3]): Dataset[(U1, U2, U3)]
select[U1, U2, U3, U4](
c1: TypedColumn[T, U1],
c2: TypedColumn[T, U2],
c3: TypedColumn[T, U3],
c4: TypedColumn[T, U4]): Dataset[(U1, U2, U3, U4)]
select[U1, U2, U3, U4, U5](
c1: TypedColumn[T, U1],
c2: TypedColumn[T, U2],
c3: TypedColumn[T, U3],
c4: TypedColumn[T, U4],
c5: TypedColumn[T, U5]): Dataset[(U1, U2, U3, U4, U5)]
Caution
|
FIXME |
selectExpr(exprs: String*): DataFrame
selectExpr
is like select
, but accepts SQL expressions exprs
.
val ds = spark.range(5)
scala> ds.selectExpr("rand() as random").show
16/04/14 23:16:06 INFO HiveSqlParser: Parsing command: rand() as random
+-------------------+
| random|
+-------------------+
| 0.887675894185651|
|0.36766085091074086|
| 0.2700020856675186|
| 0.1489033635529543|
| 0.5862990791950973|
+-------------------+
Internally, it executes select
with every expression in exprs
mapped to Column (using SparkSqlParser.parseExpression).
scala> ds.select(expr("rand() as random")).show
+------------------+
| random|
+------------------+
|0.5514319279894851|
|0.2876221510433741|
|0.4599999092045741|
|0.5708558868374893|
|0.6223314406247136|
+------------------+
Note
|
A new feature in Spark 2.0.0. |
isStreaming
returns true
when Dataset
contains StreamingRelation or StreamingExecutionRelation streaming sources.
Note
|
Streaming datasets are created using DataFrameReader.stream method (for StreamingRelation) and contain StreamingExecutionRelation after DataFrameWriter.startStream. |
val reader = spark.read
val helloStream = reader.stream("hello")
scala> helloStream.isStreaming
res9: Boolean = true
Note
|
A new feature in Spark 2.0.0. |
randomSplit(weights: Array[Double]): Array[Dataset[T]]
randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]]
randomSplit
randomly splits the Dataset
per weights
.
weights
doubles should sum up to 1
and will be normalized if they do not.
You can define seed
and if you don’t, a random seed
will be used.
Note
|
It is used in TrainValidationSplit to split dataset into training and validation datasets. |
val ds = spark.range(10)
scala> ds.randomSplit(Array[Double](2, 3)).foreach(_.show)
+---+
| id|
+---+
| 0|
| 1|
| 2|
+---+
+---+
| id|
+---+
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+---+
Note
|
A new feature in Spark 2.0.0. |
withNewExecutionId[U](body: => U): U
withNewExecutionId
is a private[sql]
method that executes the input body
action using SQLExecution.withNewExecutionId that sets the execution id local property set.
Note
|
It is used in foreach , foreachPartition, and (private) collect .
|