DataFrameReader
is an interface to return DataFrame
from many storage formats in external storage systems (e.g. databases or files) and streams.
Use SparkSession.read to create an instance of DataFrameReader
.
import org.apache.spark.sql.DataFrameReader
val reader: DataFrameReader = spark.read
It has a direct support for many file formats and interface for new ones. It assumes parquet as the default data source format that you can change using spark.sql.sources.default setting.
format(source: String): DataFrameReader
You use format
to configure DataFrameReader
to use appropriate source
format.
Supported data formats:
-
json
-
csv
(since 2.0.0) -
parquet
(see Parquet) -
orc
-
text
-
jdbc
-
libsvm
(usingspark.read.format("libsvm")
)
Note
|
You can improve your understanding of format("jdbc") with the exercise Creating DataFrames from Tables using JDBC and PostgreSQL.
|
schema(schema: StructType): DataFrameReader
You can apply schema
to the data source.
Tip
|
Refer to Schema. |
option(key: String, value: String): DataFrameReader
option(key: String, value: Boolean): DataFrameReader // (1)
option(key: String, value: Long): DataFrameReader // (1)
option(key: String, value: Double): DataFrameReader // (1)
-
Available since Spark 2.0.0
You can also use options
method to describe different options in a single Map
.
options(options: scala.collection.Map[String, String]): DataFrameReader
load(): DataFrame
load(path: String): DataFrame
load
loads input data as a DataFrame
.
val csv = spark.read
.format("csv")
.option("header", "true")
.load("*.csv")
stream(): DataFrame
stream(path: String): DataFrame
Caution
|
FIXME Review 915a75398ecbccdbf9a1e07333104c857ae1ce5e |
stream
loads input data stream in as a DataFrame
.
Seq("hello", "world").zipWithIndex.toDF("text", "id").write.format("csv").save("text-id.csv")
val csvStream = spark.read.format("csv").stream("text-id.csv")
DataFrameReader
comes with a direct support for multiple file formats:
json(path: String): DataFrame
json(paths: String*): DataFrame
json(jsonRDD: RDD[String]): DataFrame
New in 2.0.0: prefersDecimal
parquet(paths: String*): DataFrame
The supported options:
-
compression (default:
snappy
)
New in 2.0.0: snappy
is the default Parquet codec. See [SPARK-14482][SQL] Change default Parquet codec from gzip to snappy.
-
none
oruncompressed
-
snappy
- the default codec in Spark 2.0.0. -
gzip
- the default codec in Spark before 2.0.0 -
lzo
val tokens = Seq("hello", "henry", "and", "harry")
.zipWithIndex
.map(_.swap)
.toDF("id", "token")
val parquetWriter = tokens.write
parquetWriter.option("compression", "none").save("hello-none")
// The exception is mostly for my learning purposes
// so I know where and how to find the trace to the compressions
// Sorry...
scala> parquetWriter.option("compression", "unsupported").save("hello-unsupported")
java.lang.IllegalArgumentException: Codec [unsupported] is not available. Available codecs are uncompressed, gzip, lzo, snappy, none.
at org.apache.spark.sql.execution.datasources.parquet.ParquetOptions.<init>(ParquetOptions.scala:43)
at org.apache.spark.sql.execution.datasources.parquet.DefaultSource.prepareWrite(ParquetRelation.scala:77)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$4.apply(InsertIntoHadoopFsRelation.scala:122)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$4.apply(InsertIntoHadoopFsRelation.scala:122)
at org.apache.spark.sql.execution.datasources.BaseWriterContainer.driverSideSetup(WriterContainer.scala:103)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:141)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:116)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:116)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:116)
at org.apache.spark.sql.execution.command.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:61)
at org.apache.spark.sql.execution.command.ExecutedCommand.sideEffectResult(commands.scala:59)
at org.apache.spark.sql.execution.command.ExecutedCommand.doExecute(commands.scala:73)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:137)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:134)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:117)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:65)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:65)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:390)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230)
... 48 elided
orc(path: String): DataFrame
Optimized Row Columnar (ORC) file format is a highly efficient columnar format to store Hive data with more than 1,000 columns and improve performance. ORC format was introduced in Hive version 0.11 to use and retain the type information from the table definition.
Tip
|
Read ORC Files document to learn about the ORC file format. |
text
method loads a text file.
text(paths: String*): Dataset[String]
val lines: Dataset[String] = spark.read.text("README.md").as[String]
scala> lines.show
+--------------------+
| value|
+--------------------+
| # Apache Spark|
| |
|Spark is a fast a...|
|high-level APIs i...|
|supports general ...|
|rich set of highe...|
|MLlib for machine...|
|and Spark Streami...|
| |
|<http://spark.apa...|
| |
| |
|## Online Documen...|
| |
|You can find the ...|
|guide, on the [pr...|
|and [project wiki...|
|This README file ...|
| |
| ## Building Spark|
+--------------------+
only showing top 20 rows
table(tableName: String): DataFrame
table
method returns the tableName
table as a DataFrame
.
scala> spark.sql("SHOW TABLES").show(false)
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
|dafa |false |
+---------+-----------+
scala> spark.read.table("dafa").show(false)
+---+-------+
|id |text |
+---+-------+
|1 |swiecie|
|0 |hello |
+---+-------+
Caution
|
FIXME The method uses spark.sessionState.sqlParser.parseTableIdentifier(tableName) and spark.sessionState.catalog.lookupRelation . Would be nice to learn a bit more on their internals, huh?
|
Note
|
jdbc method uses java.util.Properties (and appears so Java-centric). Use format("jdbc") instead.
|
jdbc(url: String, table: String, properties: Properties): DataFrame
jdbc(url: String, table: String,
parts: Array[Partition],
connectionProperties: Properties): DataFrame
jdbc(url: String, table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame
jdbc(url: String, table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
connectionProperties: Properties): DataFrame
jdbc
allows you to create DataFrame
that represents table
in the database available as url
.