Skip to content

Latest commit

 

History

History
440 lines (318 loc) · 13.8 KB

spark-sql-sqlcontext.adoc

File metadata and controls

440 lines (318 loc) · 13.8 KB

SQLContext

Caution

As of Spark 2.0.0 SQLContext is only for backward compatibility and is a mere wrapper of SparkSession.

In the older Spark 1.x, SQLContext was the entry point for Spark SQL. Whatever you do in Spark SQL it has to start from creating an instance of SQLContext.

A SQLContext object requires a SparkContext, a CacheManager, and a SQLListener. They are all transient and do not participate in serializing a SQLContext.

You should use SQLContext for the following:

Creating SQLContext Instance

You can create a SQLContext using the following constructors:

  • SQLContext(sc: SparkContext)

  • SQLContext.getOrCreate(sc: SparkContext)

  • SQLContext.newSession() allows for creating a new instance of SQLContext with a separate SQL configuration (through a shared SparkContext).

Setting Configuration Properties

You can set Spark SQL configuration properties using:

  • setConf(props: Properties): Unit

  • setConf(key: String, value: String): Unit

You can get the current value of a configuration property by key using:

  • getConf(key: String): String

  • getConf(key: String, defaultValue: String): String

  • getAllConfs: immutable.Map[String, String]

Note
Properties that start with spark.sql are reserved for Spark SQL.

Creating DataFrames

emptyDataFrame

emptyDataFrame: DataFrame

emptyDataFrame creates an empty DataFrame. It calls createDataFrame with an empty RDD[Row] and an empty schema StructType(Nil).

createDataFrame for RDD and Seq

createDataFrame[A <: Product](rdd: RDD[A]): DataFrame
createDataFrame[A <: Product](data: Seq[A]): DataFrame

createDataFrame family of methods can create a DataFrame from an RDD of Scala’s Product types like case classes or tuples or Seq thereof.

createDataFrame for RDD of Row with Explicit Schema

createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

This variant of createDataFrame creates a DataFrame from RDD of Row and explicit schema.

Registering User-Defined Functions (UDF)

udf: UDFRegistration

udf method gives you access to UDFRegistration to manipulate user-defined functions. Functions registered using udf are available for Hive queries only.

Tip
Read up on UDFs in UDFs — User-Defined Functions document.
// Create a DataFrame
val df = Seq("hello", "world!").zip(0 to 1).toDF("text", "id")

// Register the DataFrame as a temporary table in Hive
df.registerTempTable("texts")

scala> sql("SHOW TABLES").show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
|    texts|       true|
+---------+-----------+

scala> sql("SELECT * FROM texts").show
+------+---+
|  text| id|
+------+---+
| hello|  0|
|world!|  1|
+------+---+

// Just a Scala function
val my_upper: String => String = _.toUpperCase

// Register the function as UDF
spark.udf.register("my_upper", my_upper)

scala> sql("SELECT *, my_upper(text) AS MY_UPPER FROM texts").show
+------+---+--------+
|  text| id|MY_UPPER|
+------+---+--------+
| hello|  0|   HELLO|
|world!|  1|  WORLD!|
+------+---+--------+

Caching DataFrames in In-Memory Cache

isCached(tableName: String): Boolean

isCached method asks CacheManager whether tableName table is cached in memory or not. It simply requests CacheManager for CachedData and when exists, it assumes the table is cached.

cacheTable(tableName: String): Unit

You can cache a table in memory using cacheTable.

Caution
Why would I want to cache a table?
uncacheTable(tableName: String)
clearCache(): Unit

uncacheTable and clearCache remove one or all in-memory cached tables.

Implicits — SQLContext.implicits

The implicits object is a helper class with methods to convert objects into Datasets and DataFrames, and also comes with many Encoders for "primitive" types as well as the collections thereof.

Note

Import the implicits by import spark.implicits._ as follows:

val spark = new SQLContext(sc)
import spark.implicits._

It holds Encoders for Scala "primitive" types like Int, Double, String, and their collections.

It offers support for creating Dataset from RDD of any types (for which an encoder exists in scope), or case classes or tuples, and Seq.

It also offers conversions from Scala’s Symbol or $ to Column.

It also offers conversions from RDD or Seq of Product types (e.g. case classes or tuples) to DataFrame. It has direct conversions from RDD of Int, Long and String to DataFrame with a single column name _1.

Note
It is not possible to call toDF methods on RDD objects of other "primitive" types except Int, Long, and String.

Creating Datasets

createDataset[T: Encoder](data: Seq[T]): Dataset[T]
createDataset[T: Encoder](data: RDD[T]): Dataset[T]

createDataset family of methods creates a Dataset from a collection of elements of type T, be it a regular Scala Seq or Spark’s RDD.

It requires that there is an encoder in scope.

Note
Importing SQLContext.implicits brings many encoders available in scope.

Accessing DataFrameReader (read method)

read: DataFrameReader

The experimental read method returns a DataFrameReader that is used to read data from external storage systems and load it into a DataFrame.

Creating External Tables

createExternalTable(tableName: String, path: String): DataFrame
createExternalTable(tableName: String, path: String, source: String): DataFrame
createExternalTable(tableName: String, source: String, options: Map[String, String]): DataFrame
createExternalTable(tableName: String, source: String, schema: StructType, options: Map[String, String]): DataFrame

The experimental createExternalTable family of methods is used to create an external table tableName and return a corresponding DataFrame.

Caution
FIXME What is an external table?

It assumes parquet as the default data source format that you can change using spark.sql.sources.default setting.

Dropping Temporary Tables

dropTempTable(tableName: String): Unit

dropTempTable method drops a temporary table tableName.

Caution
FIXME What is a temporary table?

Creating Dataset[Long] (range method)

range(end: Long): Dataset[Long]
range(start: Long, end: Long): Dataset[Long]
range(start: Long, end: Long, step: Long): Dataset[Long]
range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[Long]

The range family of methods creates a Dataset[Long] with the sole id column of LongType for given start, end, and step.

Note
The three first variants use SparkContext.defaultParallelism for the number of partitions numPartitions.
scala> spark.range(5)
res0: org.apache.spark.sql.Dataset[Long] = [id: bigint]

scala> .show
+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+

Creating DataFrames for Table

table(tableName: String): DataFrame

table method creates a tableName table and returns a corresponding DataFrame.

Listing Existing Tables

tables(): DataFrame
tables(databaseName: String): DataFrame

table methods return a DataFrame that holds names of existing tables in a database.

scala> spark.tables.show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
|        t|       true|
|       t2|       true|
+---------+-----------+

The schema consists of two columns - tableName of StringType and isTemporary of BooleanType.

Note
tables is a result of SHOW TABLES [IN databaseName].
tableNames(): Array[String]
tableNames(databaseName: String): Array[String]

tableNames are similar to tables with the only difference that they return Array[String] which is a collection of table names.

Accessing StreamingQueryManager

streams: StreamingQueryManager

The streams method returns a StreamingQueryManager that is used to…​TK

Caution
FIXME

Managing Active SQLContext for JVM

SQLContext.getOrCreate(sparkContext: SparkContext): SQLContext

SQLContext.getOrCreate method returns an active SQLContext object for the JVM or creates a new one using a given sparkContext.

Note
It is a factory-like method that works on SQLContext class.

Interestingly, there are two helper methods to set and clear the active SQLContext object - setActive and clearActive respectively.

setActive(spark: SQLContext): Unit
clearActive(): Unit

Executing SQL Queries

sql(sqlText: String): DataFrame

sql executes the sqlText SQL query.

Note
It supports Hive statements through HiveContext.
scala> sql("set spark.sql.hive.version").show(false)
16/04/10 15:19:36 INFO HiveSqlParser: Parsing command: set spark.sql.hive.version
+----------------------+-----+
|key                   |value|
+----------------------+-----+
|spark.sql.hive.version|1.2.1|
+----------------------+-----+

scala> sql("describe database extended default").show(false)
16/04/10 15:21:14 INFO HiveSqlParser: Parsing command: describe database extended default
+-------------------------+--------------------------+
|database_description_item|database_description_value|
+-------------------------+--------------------------+
|Database Name            |default                   |
|Description              |Default Hive database     |
|Location                 |file:/user/hive/warehouse |
|Properties               |                          |
+-------------------------+--------------------------+

// Create temporary table
scala> spark.range(10).registerTempTable("t")
16/04/14 23:34:31 INFO HiveSqlParser: Parsing command: t

scala> sql("CREATE temporary table t2 USING PARQUET OPTIONS (PATH 'hello') AS SELECT * FROM t")
16/04/14 23:34:38 INFO HiveSqlParser: Parsing command: CREATE temporary table t2 USING PARQUET OPTIONS (PATH 'hello') AS SELECT * FROM t

scala> spark.tables.show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
|        t|       true|
|       t2|       true|
+---------+-----------+

sql parses sqlText using a dialect that can be set up using spark.sql.dialect setting.

Note

sql is imported in spark-shell so you can execute Hive statements without spark prefix.

scala> println(s"This is Spark ${sc.version}")
This is Spark 2.0.0-SNAPSHOT

scala> :imports
 1) import spark.implicits._  (52 terms, 31 are implicit)
 2) import spark.sql          (1 terms)
Tip
You may also use spark-sql shell script to interact with Hive.

Internally, it uses SessionState.sqlParser.parsePlan(sql) method to create a LogicalPlan.

Caution
FIXME Review
scala> sql("show tables").show(false)
16/04/09 13:05:32 INFO HiveSqlParser: Parsing command: show tables
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
|dafa     |false      |
+---------+-----------+
Tip

Enable INFO logging level for the loggers that correspond to the implementations of AbstractSqlParser to see what happens inside sql.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.hive.execution.HiveSqlParser=INFO

Refer to Logging.

Creating New Session

newSession(): SQLContext

You can use newSession method to create a new session without a cost of instantiating a new SqlContext from scratch.

newSession returns a new SqlContext that shares SparkContext, CacheManager, SQLListener, and ExternalCatalog.

Caution
FIXME Why would I need that?