Skip to content

Latest commit

 

History

History
139 lines (93 loc) · 5.26 KB

spark-sql-Encoder.adoc

File metadata and controls

139 lines (93 loc) · 5.26 KB

Encoders — Internal Row Format Converters

An Encoder object (for type T) is used to convert (encode and decode) a JVM object of type T (e.g. your domain object) and primitives to and from the Spark SQL internal binary row format representation using Catalyst expressions and code generation.

Note
Encoder is also called "a container of serde expressions in Dataset".

The Encoder concept is represented by trait Encoder[T].

trait Encoder[T] extends Serializable {
  def schema: StructType
  def clsTag: ClassTag[T]
}

The one and only implementation of the Encoder trait in Spark 2.0 is ExpressionEncoder.

case class Person(id: Long, name: String)

import org.apache.spark.sql.Encoders

scala> val personEncoder = Encoders.product[Person]
personEncoder: org.apache.spark.sql.Encoder[Person] = class[id[0]: bigint, name[0]: string]

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder

scala> personEncoder.asInstanceOf[ExpressionEncoder[Person]]
res4: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[Person] = class[id[0]: bigint, name[0]: string]

Encoders allows for significantly faster serialization and deserialization (comparing to the default Java or Kryo serializers).

Note
Encoders are part of Catalyst Optimizer.

Dataset owns a Encoder that serializes and deserializes the type of the Dataset.

You can create custom encoders using Encoders object. Encoders for many Scala types are however available through SparkSession.implicits object so in most cases you don’t need to worry about them whatsoever and simply import the implicits object.

val spark = SparkSession.builder.getOrCreate()
import spark.implicits._

Encoders map columns (of your dataset) to fields (of your JVM object) by name. It is by Encoders that you can bridge JVM objects to data sources (CSV, JDBC, Parquet, Avro, JSON, Cassandra, Elasticsearch, memsql) and vice versa.

import org.apache.spark.sql.Encoders

case class Person(id: Int, name: String, speaksPolish: Boolean)

scala> val personEncoder = Encoders.product[Person]
personEncoder: org.apache.spark.sql.Encoder[Person] = class[id[0]: int, name[0]: string, speaksPolish[0]: boolean]

scala> personEncoder.schema
res11: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,false), StructField(name,StringType,true), StructField(speaksPolish,BooleanType,false))

scala> personEncoder.clsTag
res12: scala.reflect.ClassTag[Person] = Person

ExpressionEncoder

case class ExpressionEncoder[T](
    schema: StructType,
    flat: Boolean,
    serializer: Seq[Expression],
    deserializer: Expression,
    clsTag: ClassTag[T])
  extends Encoder[T]

ExpressionEncoder is the one and only implementation of the Encoder trait in Spark 2.0 with additional properties, i.e. flat, one or many serializers and a deserializer expressions.

A ExpressionEncoder can be flat is which case there is only one Catalyst expression for the serializer.

Serializer expressions are used to encode an object of type T to a InternalRow. It is assumed that all serializer expressions contain at least one and the same BoundReference.

Caution
FIXME What’s BoundReference?

Deserializer expression is used to decode an InternalRow to an object of type T.

Internally, a ExpressionEncoder creates a UnsafeProjection (for the input serializer), a InternalRow (of size 1), and a safe Projection (for the input deserializer). They are all internal lazy attributes of the encoder.

Creating Custom Encoders (Encoders object)

Encoders factory object defines methods to create Encoder instances.

Import org.apache.spark.sql package to have access to the Encoders factory object.

import org.apache.spark.sql.Encoders

scala> Encoders.LONG
res1: org.apache.spark.sql.Encoder[Long] = class[value[0]: bigint]

You can find methods to create encoders for Java’s object types, e.g. Boolean, Integer, Long, Double, String, java.sql.Timestamp or Byte array, that could be composed to create more advanced encoders for Java bean classes (using bean method).

import org.apache.spark.sql.Encoders

scala> Encoders.STRING
res2: org.apache.spark.sql.Encoder[String] = class[value[0]: string]

You can also create encoders based on Kryo or Java serializers.

import org.apache.spark.sql.Encoders

case class Person(id: Int, name: String, speaksPolish: Boolean)

scala> Encoders.kryo[Person]
res3: org.apache.spark.sql.Encoder[Person] = class[value[0]: binary]

scala> Encoders.javaSerialization[Person]
res5: org.apache.spark.sql.Encoder[Person] = class[value[0]: binary]

You can create encoders for Scala’s tuples and case classes, Int, Long, Double, etc.

import org.apache.spark.sql.Encoders

scala> Encoders.tuple(Encoders.scalaLong, Encoders.STRING, Encoders.scalaBoolean)
res9: org.apache.spark.sql.Encoder[(Long, String, Boolean)] = class[_1[0]: bigint, _2[0]: string, _3[0]: boolean]