An Encoder object (for type T
) is used to convert (encode and decode) a JVM object of type T
(e.g. your domain object) and primitives to and from the Spark SQL internal binary row format representation using Catalyst expressions and code generation.
Note
|
Encoder is also called "a container of serde expressions in Dataset".
|
The Encoder concept is represented by trait Encoder[T]
.
trait Encoder[T] extends Serializable {
def schema: StructType
def clsTag: ClassTag[T]
}
The one and only implementation of the Encoder
trait in Spark 2.0 is ExpressionEncoder.
case class Person(id: Long, name: String)
import org.apache.spark.sql.Encoders
scala> val personEncoder = Encoders.product[Person]
personEncoder: org.apache.spark.sql.Encoder[Person] = class[id[0]: bigint, name[0]: string]
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
scala> personEncoder.asInstanceOf[ExpressionEncoder[Person]]
res4: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[Person] = class[id[0]: bigint, name[0]: string]
Encoders allows for significantly faster serialization and deserialization (comparing to the default Java or Kryo serializers).
Note
|
Encoders are part of Catalyst Optimizer. |
Dataset owns a Encoder
that serializes and deserializes the type of the Dataset
.
You can create custom encoders using Encoders
object. Encoders for many Scala types are however available through SparkSession.implicits object so in most cases you don’t need to worry about them whatsoever and simply import the implicits
object.
val spark = SparkSession.builder.getOrCreate()
import spark.implicits._
Encoders map columns (of your dataset) to fields (of your JVM object) by name. It is by Encoders that you can bridge JVM objects to data sources (CSV, JDBC, Parquet, Avro, JSON, Cassandra, Elasticsearch, memsql) and vice versa.
import org.apache.spark.sql.Encoders
case class Person(id: Int, name: String, speaksPolish: Boolean)
scala> val personEncoder = Encoders.product[Person]
personEncoder: org.apache.spark.sql.Encoder[Person] = class[id[0]: int, name[0]: string, speaksPolish[0]: boolean]
scala> personEncoder.schema
res11: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,false), StructField(name,StringType,true), StructField(speaksPolish,BooleanType,false))
scala> personEncoder.clsTag
res12: scala.reflect.ClassTag[Person] = Person
case class ExpressionEncoder[T](
schema: StructType,
flat: Boolean,
serializer: Seq[Expression],
deserializer: Expression,
clsTag: ClassTag[T])
extends Encoder[T]
ExpressionEncoder
is the one and only implementation of the Encoder
trait in Spark 2.0 with additional properties, i.e. flat
, one or many serializers and a deserializer expressions.
A ExpressionEncoder
can be flat is which case there is only one Catalyst expression for the serializer.
Serializer expressions are used to encode an object of type T
to a InternalRow
. It is assumed that all serializer expressions contain at least one and the same BoundReference
.
Caution
|
FIXME What’s BoundReference ?
|
Deserializer expression is used to decode an InternalRow
to an object of type T
.
Internally, a ExpressionEncoder
creates a UnsafeProjection
(for the input serializer), a InternalRow
(of size 1
), and a safe Projection
(for the input deserializer). They are all internal lazy attributes of the encoder.
Encoders
factory object defines methods to create Encoder
instances.
Import org.apache.spark.sql
package to have access to the Encoders
factory object.
import org.apache.spark.sql.Encoders
scala> Encoders.LONG
res1: org.apache.spark.sql.Encoder[Long] = class[value[0]: bigint]
You can find methods to create encoders for Java’s object types, e.g. Boolean
, Integer
, Long
, Double
, String
, java.sql.Timestamp
or Byte
array, that could be composed to create more advanced encoders for Java bean classes (using bean
method).
import org.apache.spark.sql.Encoders
scala> Encoders.STRING
res2: org.apache.spark.sql.Encoder[String] = class[value[0]: string]
You can also create encoders based on Kryo or Java serializers.
import org.apache.spark.sql.Encoders
case class Person(id: Int, name: String, speaksPolish: Boolean)
scala> Encoders.kryo[Person]
res3: org.apache.spark.sql.Encoder[Person] = class[value[0]: binary]
scala> Encoders.javaSerialization[Person]
res5: org.apache.spark.sql.Encoder[Person] = class[value[0]: binary]
You can create encoders for Scala’s tuples and case classes, Int
, Long
, Double
, etc.
import org.apache.spark.sql.Encoders
scala> Encoders.tuple(Encoders.scalaLong, Encoders.STRING, Encoders.scalaBoolean)
res9: org.apache.spark.sql.Encoder[(Long, String, Boolean)] = class[_1[0]: bigint, _2[0]: string, _3[0]: boolean]