org.apache.spark.sql.functions
object comes with many functions for column manipulation in DataFrames.
Note
|
The functions object is an experimental feature of Spark since version 1.3.0.
|
You can access the functions using the following import statement:
import org.apache.spark.sql.functions._
There are nearly 50 or more functions in the functions
object. Some functions are transformations of Column
objects (or column names) into other Column
objects or transform DataFrame
into DataFrame
.
The functions are grouped by functional areas:
-
Aggregate functions
-
Non-aggregate functions (aka normal functions)
-
Date time functions
-
…and others
Tip
|
You should read the official documentation of the functions object. |
udf(f: FunctionN[...]): UserDefinedFunction
The udf
family of functions allows you to create user-defined functions (UDFs) based on a user-defined function in Scala. It accepts f
function of 0 to 10 arguments and the input and output types are automatically inferred (given the types of the respective input and output types of the function f
).
import org.apache.spark.sql.functions._
val _length: String => Int = _.length
val _lengthUDF = udf(_length)
// define a dataframe
val df = sc.parallelize(0 to 3).toDF("num")
// apply the user-defined function to "num" column
scala> df.withColumn("len", _lengthUDF($"num")).show
+---+---+
|num|len|
+---+---+
| 0| 1|
| 1| 1|
| 2| 1|
| 3| 1|
+---+---+
Since Spark 2.0.0, there is another variant of udf
function:
udf(f: AnyRef, dataType: DataType): UserDefinedFunction
udf(f: AnyRef, dataType: DataType)
allows you to use a Scala closure for the function argument (as f
) and explicitly declaring the output data type (as dataType
).
// given the dataframe above
import org.apache.spark.sql.types.IntegerType
val byTwo = udf((n: Int) => n * 2, IntegerType)
scala> df.withColumn("len", byTwo($"num")).show
+---+---+
|num|len|
+---+---+
| 0| 0|
| 1| 2|
| 2| 4|
| 3| 6|
+---+---+
split(str: Column, pattern: String): Column
split
function splits str
column using pattern
. It returns a new Column
.
Note
|
split UDF uses java.lang.String.split(String regex, int limit) method.
|
val df = Seq((0, "hello|world"), (1, "witaj|swiecie")).toDF("num", "input")
val withSplit = df.withColumn("split", split($"input", "[|]"))
scala> withSplit.show
+---+-------------+----------------+
|num| input| split|
+---+-------------+----------------+
| 0| hello|world| [hello, world]|
| 1|witaj|swiecie|[witaj, swiecie]|
+---+-------------+----------------+
Note
|
.$|()[{^?*+\ are RegEx’s meta characters and are considered special.
|
upper(e: Column): Column
upper
function converts a string column into one with all letter upper. It returns a new Column
.
Note
|
The following example uses two functions that accept a Column and return another to showcase how to chain them.
|
val df = Seq((0,1,"hello"), (2,3,"world"), (2,4, "ala")).toDF("id", "val", "name")
val withUpperReversed = df.withColumn("upper", reverse(upper($"name")))
scala> withUpperReversed.show
+---+---+-----+-----+
| id|val| name|upper|
+---+---+-----+-----+
| 0| 1|hello|OLLEH|
| 2| 3|world|DLROW|
| 2| 4| ala| ALA|
+---+---+-----+-----+
They are also called normal functions.
struct(cols: Column*): Column
struct(colName: String, colNames: String*): Column
struct
family of functions allows you to create a new struct column based on a collection of Column
or their names.
Note
|
The difference between struct and another similar array function is that the types of the columns can be different (in struct ).
|
scala> df.withColumn("struct", struct($"name", $"val")).show
+---+---+-----+---------+
| id|val| name| struct|
+---+---+-----+---------+
| 0| 1|hello|[hello,1]|
| 2| 3|world|[world,3]|
| 2| 4| ala| [ala,4]|
+---+---+-----+---------+
broadcast[T](df: Dataset[T]): Dataset[T]
broadcast
function marks the input Dataset small enough to be used in broadcast join
.
Tip
|
Consult Broadcast Join document. |
val left = Seq((0, "aa"), (0, "bb")).toDF("id", "token").as[(Int, String)]
val right = Seq(("aa", 0.99), ("bb", 0.57)).toDF("token", "prob").as[(String, Double)]
scala> left.join(broadcast(right), "token").explain(extended = true)
== Parsed Logical Plan ==
'Join UsingJoin(Inner,List('token))
:- Project [_1#42 AS id#45, _2#43 AS token#46]
: +- LocalRelation [_1#42, _2#43]
+- BroadcastHint
+- Project [_1#55 AS token#58, _2#56 AS prob#59]
+- LocalRelation [_1#55, _2#56]
== Analyzed Logical Plan ==
token: string, id: int, prob: double
Project [token#46, id#45, prob#59]
+- Join Inner, (token#46 = token#58)
:- Project [_1#42 AS id#45, _2#43 AS token#46]
: +- LocalRelation [_1#42, _2#43]
+- BroadcastHint
+- Project [_1#55 AS token#58, _2#56 AS prob#59]
+- LocalRelation [_1#55, _2#56]
== Optimized Logical Plan ==
Project [token#46, id#45, prob#59]
+- Join Inner, (token#46 = token#58)
:- Project [_1#42 AS id#45, _2#43 AS token#46]
: +- Filter isnotnull(_2#43)
: +- LocalRelation [_1#42, _2#43]
+- BroadcastHint
+- Project [_1#55 AS token#58, _2#56 AS prob#59]
+- Filter isnotnull(_1#55)
+- LocalRelation [_1#55, _2#56]
== Physical Plan ==
*Project [token#46, id#45, prob#59]
+- *BroadcastHashJoin [token#46], [token#58], Inner, BuildRight
:- *Project [_1#42 AS id#45, _2#43 AS token#46]
: +- *Filter isnotnull(_2#43)
: +- LocalTableScan [_1#42, _2#43]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- *Project [_1#55 AS token#58, _2#56 AS prob#59]
+- *Filter isnotnull(_1#55)
+- LocalTableScan [_1#55, _2#56]
expr(expr: String): Column
expr
function parses the input expr
SQL string to a Column
it represents.
val ds = Seq((0, "hello"), (1, "world"))
.toDF("id", "token")
.as[(Long, String)]
scala> ds.show
+---+-----+
| id|token|
+---+-----+
| 0|hello|
| 1|world|
+---+-----+
val filterExpr = expr("token = 'hello'")
scala> ds.filter(filterExpr).show
+---+-----+
| id|token|
+---+-----+
| 0|hello|
+---+-----+
Internally, expr
uses the active session’s sqlParser or creates a new SparkSqlParser to call parseExpression method.