Skip to content

Latest commit

 

History

History
573 lines (414 loc) · 18.6 KB

spark-sql-dataset.adoc

File metadata and controls

573 lines (414 loc) · 18.6 KB

Dataset

Dataset is the Spark SQL API for working with structured data, i.e. records with a known schema.

Datasets are "lazy" and structured query expressions are only triggered when an action is invoked. Internally, a Dataset represents a logical plan that describes the computation query required to produce the data (for a given Spark SQL session).

spark sql Dataset
Figure 1. Dataset’s Internals

A Dataset is a result of executing a query expression against data storage like files or databases. The structured query expression can be described by a SQL query or a Scala/Java lambda function.

If LogicalPlan is used to create a Dataset, it is executed (using the current SessionState) to create a corresponding QueryExecution.

Dataset API comes with declarative and type-safe operators (that improves on the earlier experience of data processing using DataFrames that were a set of Rows).

Note

Dataset was first introduced in Apache Spark 1.6.0 as an experimental feature, and has since turned itself into a fully supported API.

As of Spark 2.0.0, DataFrame - the flagship data abstraction of previous versions of Spark SQL - is currently a mere type alias for Dataset[Row]:

type DataFrame = Dataset[Row]

Dataset offers convenience of RDDs with the performance optimizations of DataFrames and the strong static type-safety of Scala. The last feature of bringing the strong type-safety to DataFrame makes Dataset so appealing. All the features together give you a more functional programming interface to work with structured data.

scala> spark.range(1).filter('id === 0).explain(true)
== Parsed Logical Plan ==
'Filter ('id = 0)
+- Range (0, 1, splits=8)

== Analyzed Logical Plan ==
id: bigint
Filter (id#51L = cast(0 as bigint))
+- Range (0, 1, splits=8)

== Optimized Logical Plan ==
Filter (id#51L = 0)
+- Range (0, 1, splits=8)

== Physical Plan ==
*Filter (id#51L = 0)
+- *Range (0, 1, splits=8)

scala> spark.range(1).filter(_ == 0).explain(true)
== Parsed Logical Plan ==
'TypedFilter <function1>, class java.lang.Long, [StructField(value,LongType,true)], unresolveddeserializer(newInstance(class java.lang.Long))
+- Range (0, 1, splits=8)

== Analyzed Logical Plan ==
id: bigint
TypedFilter <function1>, class java.lang.Long, [StructField(value,LongType,true)], newInstance(class java.lang.Long)
+- Range (0, 1, splits=8)

== Optimized Logical Plan ==
TypedFilter <function1>, class java.lang.Long, [StructField(value,LongType,true)], newInstance(class java.lang.Long)
+- Range (0, 1, splits=8)

== Physical Plan ==
*Filter <function1>.apply
+- *Range (0, 1, splits=8)

It is only with Datasets to have syntax and analysis checks at compile time (that was not possible using DataFrame, regular SQL queries or even RDDs).

Using Dataset objects turns DataFrames of Row instances into a DataFrames of case classes with proper names and types (following their equivalents in the case classes). Instead of using indices to access respective fields in a DataFrame and cast it to a type, all this is automatically handled by Datasets and checked by the Scala compiler.

Datasets use Catalyst Query Optimizer and Tungsten to optimize query performance.

A Dataset object requires a SQLContext, a QueryExecution, and an Encoder. In same cases, a Dataset can also be seen as a pair of LogicalPlan in a given SQLContext.

Note
SQLContext and QueryExecution are transient and hence do not participate in Dataset serialization. The only firmly-tied feature of a Dataset is the Encoder.

A Dataset is Queryable and Serializable, i.e. can be saved to a persistent storage.

It also has a schema.

You can convert a type-safe Dataset to a "untyped" DataFrame (see Type Conversions to Dataset[T]) or access the RDD that sits underneath (see Converting Datasets into RDDs (using rdd method)). It is supposed to give you a more pleasant experience while transitioning from legacy RDD-based or DataFrame-based APIs.

The default storage level for Datasets is MEMORY_AND_DISK because recomputing the in-memory columnar representation of the underlying table is expensive. See Persisting Dataset (persist method) in this document.

Spark 2.0 has introduced a new query model called Structured Streaming for continuous incremental execution of structured queries. That made possible to consider Datasets a static and bounded data as well as streaming and unbounded with one single API.

Creating Datasets

If LogicalPlan is used to create a Dataset, it is executed (using the current SessionState) to create a corresponding QueryExecution.

Display Records (show methods)

Caution
FIXME

Internally, show relays to a private showString to do the formatting. It turns the Dataset into a DataFrame (by calling toDF()) and takes first n records.

Taking First n Records (take method)

take(n: Int): Array[T]

take is an action on a Dataset that returns a collection of n records.

Warning
take loads all the data into the memory of the Spark application’s driver process and for a large n could result in OutOfMemoryError.

Internally, take creates a new Dataset with Limit logical plan for Literal expression and the current LogicalPlan. It then runs the SparkPlan that produces a Array[InternalRow] that is in turn decoded to Array[T] using a bounded encoder.

join

Caution
FIXME

where

Caution
FIXME

groupBy

Caution
FIXME

foreachPartition method

foreachPartition(f: Iterator[T] => Unit): Unit

foreachPartition applies the f function to each partition of the Dataset.

case class Record(id: Int, city: String)
val ds = Seq(Record(0, "Warsaw"), Record(1, "London")).toDS

ds.foreachPartition { iter: Iterator[Record] => iter.foreach(println) }
Note
foreachPartition is used to save a DataFrame to a JDBC table (indirectly through JdbcUtils.saveTable) and ForeachSink.

mapPartitions method

mapPartitions[U: Encoder](func: Iterator[T] => Iterator[U]): Dataset[U]

mapPartitions returns a new Dataset (of type U) with the function func applied to each partition.

Caution
FIXME Example

Creating Zero or More Records (flatMap method)

flatMap[U: Encoder](func: T => TraversableOnce[U]): Dataset[U]

flatMap returns a new Dataset (of type U) with all records (of type T) mapped over using the function func and then flattening the results.

Note
flatMap can create new records. It deprecated explode.
final case class Sentence(id: Long, text: String)
val sentences = Seq(Sentence(0, "hello world"), Sentence(1, "witaj swiecie")).toDS

scala> sentences.flatMap(s => s.text.split("\\s+")).show
+-------+
|  value|
+-------+
|  hello|
|  world|
|  witaj|
|swiecie|
+-------+

Internally, flatMap calls mapPartitions with the partitions flatMap(ped).

Caching Dataset (cache method)

cache(): this.type

cache merely passes the calls to no-argument persist method.

Persisting Dataset (persist method)

persist(): this.type
persist(newLevel: StorageLevel): this.type

persist caches the Dataset using the default storage level MEMORY_AND_DISK or newLevel.

Internally, it requests the CacheManager to cache the query.

Unpersisting Dataset (unpersist method)

unpersist(blocking: Boolean): this.type

unpersist uncache the Dataset possibly by blocking the call.

Internally, it requests the CacheManager to uncache the query.

Repartitioning Dataset (repartition method)

repartition(numPartitions: Int): Dataset[T]

repartition repartition the Dataset to exactly numPartitions partitions.

Features of Dataset API

The features of the Dataset API in Spark SQL:

  • Type-safety as Datasets are Scala domain objects and operations operate on their attributes. All is checked by the Scala compiler at compile time.

Type Conversions to Dataset[T] (and DataFrame) (toDS and toDF methods)

DatasetHolder case class offers three methods that do the conversions from Seq[T] or RDD[T] type to Dataset[T]:

  • toDS(): Dataset[T]

  • toDF(): DataFrame

  • toDF(colNames: String*): DataFrame

Note
DataFrame is a mere type alias for Dataset[Row] since Spark 2.0.0.

DatasetHolder is used by SQLImplicits that is available to use after importing implicits object of SQLContext.

scala> val ds = Seq("I am a shiny Dataset!").toDS
ds: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val df = Seq("I am an old grumpy DataFrame!").toDF
df: org.apache.spark.sql.DataFrame = [value: string]

scala> val df = Seq("I am an old grumpy DataFrame!").toDF("text")
df: org.apache.spark.sql.DataFrame = [text: string]

scala> val ds = sc.parallelize(Seq("hello")).toDS
ds: org.apache.spark.sql.Dataset[String] = [value: string]
Note

This import is automatically executed in Spark Shell.

scala> sc.version
res11: String = 2.0.0-SNAPSHOT

scala> :imports
 1) import spark.implicits._  (59 terms, 38 are implicit)
 2) import spark.sql          (1 terms)
import spark.implicits._

case class Token(name: String, productId: Int, score: Double)
val data = Seq(
  Token("aaa", 100, 0.12),
  Token("aaa", 200, 0.29),
  Token("bbb", 200, 0.53),
  Token("bbb", 300, 0.42))

// Transform data to a Dataset[Token]
// It doesn't work with type annotation yet
// https://issues.apache.org/jira/browse/SPARK-13456
val ds: Dataset[Token] = data.toDS

// Transform data into a DataFrame with no explicit schema
val df = data.toDF

// Transform DataFrame into a Dataset
val ds = df.as[Token]

scala> ds.show
+----+---------+-----+
|name|productId|score|
+----+---------+-----+
| aaa|      100| 0.12|
| aaa|      200| 0.29|
| bbb|      200| 0.53|
| bbb|      300| 0.42|
+----+---------+-----+

scala> ds.printSchema
root
 |-- name: string (nullable = true)
 |-- productId: integer (nullable = false)
 |-- score: double (nullable = false)

// In DataFrames we work with Row instances
scala> df.map(_.getClass.getName).show(false)
+--------------------------------------------------------------+
|value                                                         |
+--------------------------------------------------------------+
|org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema|
|org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema|
|org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema|
|org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema|
+--------------------------------------------------------------+

// In Datasets we work with case class instances
scala> ds.map(_.getClass.getName).show(false)
+---------------------------+
|value                      |
+---------------------------+
|$line40.$read$$iw$$iw$Token|
|$line40.$read$$iw$$iw$Token|
|$line40.$read$$iw$$iw$Token|
|$line40.$read$$iw$$iw$Token|
+---------------------------+

scala> ds.map(_.name).show
+-----+
|value|
+-----+
|  aaa|
|  aaa|
|  bbb|
|  bbb|
+-----+

Converting Datasets into RDDs (using rdd method)

Whenever you are in need to convert a Dataset into a RDD, executing rdd method gives you a RDD of the proper input object type (not Row as in DataFrames).

scala> val rdd = tokens.rdd
rdd: org.apache.spark.rdd.RDD[Token] = MapPartitionsRDD[11] at rdd at <console>:30

Schema

A Dataset has a schema.

schema: StructType
Tip

You may also use the following methods to learn about the schema:

Supported Types

Caution
FIXME What types are supported by Encoders

toJSON

toJSON maps the content of Dataset to a Dataset of JSON strings.

Note
A new feature in Spark 2.0.0.
scala> val ds = Seq("hello", "world", "foo bar").toDS
ds: org.apache.spark.sql.Dataset[String] = [value: string]

scala> ds.toJSON.show
+-------------------+
|              value|
+-------------------+
|  {"value":"hello"}|
|  {"value":"world"}|
|{"value":"foo bar"}|
+-------------------+

explain

explain(): Unit
explain(extended: Boolean): Unit

explain prints the logical and physical plans to the console. You can use it for debugging.

Tip
If you are serious about query debugging you could also use the Debugging Query Execution facility.

Internally, explain executes a ExplainCommand logical plan on the logical plan of the QueryExecution of the Dataset.

scala> spark.range(10).explain(extended = true)
== Parsed Logical Plan ==
Range (0, 10, splits=8)

== Analyzed Logical Plan ==
id: bigint
Range (0, 10, splits=8)

== Optimized Logical Plan ==
Range (0, 10, splits=8)

== Physical Plan ==
*Range (0, 10, splits=8)

select

select[U1: Encoder](c1: TypedColumn[T, U1]): Dataset[U1]
select[U1, U2](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2]): Dataset[(U1, U2)]
select[U1, U2, U3](
  c1: TypedColumn[T, U1],
  c2: TypedColumn[T, U2],
  c3: TypedColumn[T, U3]): Dataset[(U1, U2, U3)]
select[U1, U2, U3, U4](
  c1: TypedColumn[T, U1],
  c2: TypedColumn[T, U2],
  c3: TypedColumn[T, U3],
  c4: TypedColumn[T, U4]): Dataset[(U1, U2, U3, U4)]
select[U1, U2, U3, U4, U5](
  c1: TypedColumn[T, U1],
  c2: TypedColumn[T, U2],
  c3: TypedColumn[T, U3],
  c4: TypedColumn[T, U4],
  c5: TypedColumn[T, U5]): Dataset[(U1, U2, U3, U4, U5)]
Caution
FIXME

selectExpr

selectExpr(exprs: String*): DataFrame

selectExpr is like select, but accepts SQL expressions exprs.

val ds = spark.range(5)

scala> ds.selectExpr("rand() as random").show
16/04/14 23:16:06 INFO HiveSqlParser: Parsing command: rand() as random
+-------------------+
|             random|
+-------------------+
|  0.887675894185651|
|0.36766085091074086|
| 0.2700020856675186|
| 0.1489033635529543|
| 0.5862990791950973|
+-------------------+

Internally, it executes select with every expression in exprs mapped to Column (using SparkSqlParser.parseExpression).

scala> ds.select(expr("rand() as random")).show
+------------------+
|            random|
+------------------+
|0.5514319279894851|
|0.2876221510433741|
|0.4599999092045741|
|0.5708558868374893|
|0.6223314406247136|
+------------------+
Note
A new feature in Spark 2.0.0.

isStreaming

isStreaming returns true when Dataset contains StreamingRelation or StreamingExecutionRelation streaming sources.

Note
Streaming datasets are created using DataFrameReader.stream method (for StreamingRelation) and contain StreamingExecutionRelation after DataFrameWriter.startStream.
val reader = spark.read
val helloStream = reader.stream("hello")

scala> helloStream.isStreaming
res9: Boolean = true
Note
A new feature in Spark 2.0.0.

randomSplit

randomSplit(weights: Array[Double]): Array[Dataset[T]]
randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]]

randomSplit randomly splits the Dataset per weights.

weights doubles should sum up to 1 and will be normalized if they do not.

You can define seed and if you don’t, a random seed will be used.

Note
It is used in TrainValidationSplit to split dataset into training and validation datasets.
val ds = spark.range(10)
scala> ds.randomSplit(Array[Double](2, 3)).foreach(_.show)
+---+
| id|
+---+
|  0|
|  1|
|  2|
+---+

+---+
| id|
+---+
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+
Note
A new feature in Spark 2.0.0.

Queryable

Caution
FIXME

Tracking Multi-Job SQL Query Executions (withNewExecutionId method)

withNewExecutionId[U](body: => U): U

withNewExecutionId is a private[sql] method that executes the input body action using SQLExecution.withNewExecutionId that sets the execution id local property set.

Note
It is used in foreach, foreachPartition, and (private) collect.

Further reading or watching