Skip to content

Latest commit

 

History

History
267 lines (207 loc) · 7.54 KB

spark-sql-functions.adoc

File metadata and controls

267 lines (207 loc) · 7.54 KB

Standard Functions (functions object)

org.apache.spark.sql.functions object comes with many functions for column manipulation in DataFrames.

Note
The functions object is an experimental feature of Spark since version 1.3.0.

You can access the functions using the following import statement:

import org.apache.spark.sql.functions._

There are nearly 50 or more functions in the functions object. Some functions are transformations of Column objects (or column names) into other Column objects or transform DataFrame into DataFrame.

The functions are grouped by functional areas:

window

Caution
FIXME

Defining UDFs (udf factories)

udf(f: FunctionN[...]): UserDefinedFunction

The udf family of functions allows you to create user-defined functions (UDFs) based on a user-defined function in Scala. It accepts f function of 0 to 10 arguments and the input and output types are automatically inferred (given the types of the respective input and output types of the function f).

import org.apache.spark.sql.functions._
val _length: String => Int = _.length
val _lengthUDF = udf(_length)

// define a dataframe
val df = sc.parallelize(0 to 3).toDF("num")

// apply the user-defined function to "num" column
scala> df.withColumn("len", _lengthUDF($"num")).show
+---+---+
|num|len|
+---+---+
|  0|  1|
|  1|  1|
|  2|  1|
|  3|  1|
+---+---+

Since Spark 2.0.0, there is another variant of udf function:

udf(f: AnyRef, dataType: DataType): UserDefinedFunction

udf(f: AnyRef, dataType: DataType) allows you to use a Scala closure for the function argument (as f) and explicitly declaring the output data type (as dataType).

// given the dataframe above

import org.apache.spark.sql.types.IntegerType
val byTwo = udf((n: Int) => n * 2, IntegerType)

scala> df.withColumn("len", byTwo($"num")).show
+---+---+
|num|len|
+---+---+
|  0|  0|
|  1|  2|
|  2|  4|
|  3|  6|
+---+---+

String functions

split function

split(str: Column, pattern: String): Column

split function splits str column using pattern. It returns a new Column.

val df = Seq((0, "hello|world"), (1, "witaj|swiecie")).toDF("num", "input")
val withSplit = df.withColumn("split", split($"input", "[|]"))

scala> withSplit.show
+---+-------------+----------------+
|num|        input|           split|
+---+-------------+----------------+
|  0|  hello|world|  [hello, world]|
|  1|witaj|swiecie|[witaj, swiecie]|
+---+-------------+----------------+
Note
.$|()[{^?*+\ are RegEx’s meta characters and are considered special.

upper function

upper(e: Column): Column

upper function converts a string column into one with all letter upper. It returns a new Column.

Note
The following example uses two functions that accept a Column and return another to showcase how to chain them.
val df = Seq((0,1,"hello"), (2,3,"world"), (2,4, "ala")).toDF("id", "val", "name")
val withUpperReversed = df.withColumn("upper", reverse(upper($"name")))

scala> withUpperReversed.show
+---+---+-----+-----+
| id|val| name|upper|
+---+---+-----+-----+
|  0|  1|hello|OLLEH|
|  2|  3|world|DLROW|
|  2|  4|  ala|  ALA|
+---+---+-----+-----+

Non-aggregate functions

They are also called normal functions.

struct functions

struct(cols: Column*): Column
struct(colName: String, colNames: String*): Column

struct family of functions allows you to create a new struct column based on a collection of Column or their names.

Note
The difference between struct and another similar array function is that the types of the columns can be different (in struct).
scala> df.withColumn("struct", struct($"name", $"val")).show
+---+---+-----+---------+
| id|val| name|   struct|
+---+---+-----+---------+
|  0|  1|hello|[hello,1]|
|  2|  3|world|[world,3]|
|  2|  4|  ala|  [ala,4]|
+---+---+-----+---------+

broadcast function

broadcast[T](df: Dataset[T]): Dataset[T]

broadcast function marks the input Dataset small enough to be used in broadcast join.

Tip
Consult Broadcast Join document.
val left = Seq((0, "aa"), (0, "bb")).toDF("id", "token").as[(Int, String)]
val right = Seq(("aa", 0.99), ("bb", 0.57)).toDF("token", "prob").as[(String, Double)]

scala> left.join(broadcast(right), "token").explain(extended = true)
== Parsed Logical Plan ==
'Join UsingJoin(Inner,List('token))
:- Project [_1#42 AS id#45, _2#43 AS token#46]
:  +- LocalRelation [_1#42, _2#43]
+- BroadcastHint
   +- Project [_1#55 AS token#58, _2#56 AS prob#59]
      +- LocalRelation [_1#55, _2#56]

== Analyzed Logical Plan ==
token: string, id: int, prob: double
Project [token#46, id#45, prob#59]
+- Join Inner, (token#46 = token#58)
   :- Project [_1#42 AS id#45, _2#43 AS token#46]
   :  +- LocalRelation [_1#42, _2#43]
   +- BroadcastHint
      +- Project [_1#55 AS token#58, _2#56 AS prob#59]
         +- LocalRelation [_1#55, _2#56]

== Optimized Logical Plan ==
Project [token#46, id#45, prob#59]
+- Join Inner, (token#46 = token#58)
   :- Project [_1#42 AS id#45, _2#43 AS token#46]
   :  +- Filter isnotnull(_2#43)
   :     +- LocalRelation [_1#42, _2#43]
   +- BroadcastHint
      +- Project [_1#55 AS token#58, _2#56 AS prob#59]
         +- Filter isnotnull(_1#55)
            +- LocalRelation [_1#55, _2#56]

== Physical Plan ==
*Project [token#46, id#45, prob#59]
+- *BroadcastHashJoin [token#46], [token#58], Inner, BuildRight
   :- *Project [_1#42 AS id#45, _2#43 AS token#46]
   :  +- *Filter isnotnull(_2#43)
   :     +- LocalTableScan [_1#42, _2#43]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- *Project [_1#55 AS token#58, _2#56 AS prob#59]
         +- *Filter isnotnull(_1#55)
            +- LocalTableScan [_1#55, _2#56]

expr function

expr(expr: String): Column

expr function parses the input expr SQL string to a Column it represents.

val ds = Seq((0, "hello"), (1, "world"))
  .toDF("id", "token")
  .as[(Long, String)]

scala> ds.show
+---+-----+
| id|token|
+---+-----+
|  0|hello|
|  1|world|
+---+-----+

val filterExpr = expr("token = 'hello'")

scala> ds.filter(filterExpr).show
+---+-----+
| id|token|
+---+-----+
|  0|hello|
+---+-----+

Internally, expr uses the active session’s sqlParser or creates a new SparkSqlParser to call parseExpression method.