object comes with many functions for column manipulation in DataFrames.
The functions object is an experimental feature of Spark since version 1.3.0.
You can access the functions using the following import statement:
import org.apache.spark.sql.functions._
There are nearly 50 or more functions in the functions
object. Some functions are transformations of Column
objects (or column names) into other Column
objects or transform DataFrame
into DataFrame
The functions are grouped by functional areas:
Aggregate functions
Non-aggregate functions (aka normal functions)
Date time functions
…and others
You should read the official documentation of the functions object. |
udf(f: FunctionN[...]): UserDefinedFunction
The udf
family of functions allows you to create user-defined functions (UDFs) based on a user-defined function in Scala. It accepts f
function of 0 to 10 arguments and the input and output types are automatically inferred (given the types of the respective input and output types of the function f
import org.apache.spark.sql.functions._
val _length: String => Int = _.length
val _lengthUDF = udf(_length)
// define a dataframe
val df = sc.parallelize(0 to 3).toDF("num")
// apply the user-defined function to "num" column
scala> df.withColumn("len", _lengthUDF($"num")).show
| 0| 1|
| 1| 1|
| 2| 1|
| 3| 1|
Since Spark 2.0.0, there is another variant of udf
udf(f: AnyRef, dataType: DataType): UserDefinedFunction
udf(f: AnyRef, dataType: DataType)
allows you to use a Scala closure for the function argument (as f
) and explicitly declaring the output data type (as dataType
// given the dataframe above
import org.apache.spark.sql.types.IntegerType
val byTwo = udf((n: Int) => n * 2, IntegerType)
scala> df.withColumn("len", byTwo($"num")).show
| 0| 0|
| 1| 2|
| 2| 4|
| 3| 6|
split(str: Column, pattern: String): Column
function splits str
column using pattern
. It returns a new Column
split UDF uses java.lang.String.split(String regex, int limit) method.
val df = Seq((0, "hello|world"), (1, "witaj|swiecie")).toDF("num", "input")
val withSplit = df.withColumn("split", split($"input", "[|]"))
|num| input| split|
| 0| hello|world| [hello, world]|
| 1|witaj|swiecie|[witaj, swiecie]|
.$|()[{^?*+\ are RegEx’s meta characters and are considered special.
upper(e: Column): Column
function converts a string column into one with all letter upper. It returns a new Column
The following example uses two functions that accept a Column and return another to showcase how to chain them.
val df = Seq((0,1,"hello"), (2,3,"world"), (2,4, "ala")).toDF("id", "val", "name")
val withUpperReversed = df.withColumn("upper", reverse(upper($"name")))
| id|val| name|upper|
| 0| 1|hello|OLLEH|
| 2| 3|world|DLROW|
| 2| 4| ala| ALA|
They are also called normal functions.
struct(cols: Column*): Column
struct(colName: String, colNames: String*): Column
family of functions allows you to create a new struct column based on a collection of Column
or their names.
The difference between struct and another similar array function is that the types of the columns can be different (in struct ).
scala> df.withColumn("struct", struct($"name", $"val")).show
| id|val| name| struct|
| 0| 1|hello|[hello,1]|
| 2| 3|world|[world,3]|
| 2| 4| ala| [ala,4]|
broadcast[T](df: Dataset[T]): Dataset[T]
function marks the input Dataset small enough to be used in broadcast join
Consult Broadcast Join document. |
val left = Seq((0, "aa"), (0, "bb")).toDF("id", "token").as[(Int, String)]
val right = Seq(("aa", 0.99), ("bb", 0.57)).toDF("token", "prob").as[(String, Double)]
scala> left.join(broadcast(right), "token").explain(extended = true)
== Parsed Logical Plan ==
'Join UsingJoin(Inner,List('token))
:- Project [_1#42 AS id#45, _2#43 AS token#46]
: +- LocalRelation [_1#42, _2#43]
+- BroadcastHint
+- Project [_1#55 AS token#58, _2#56 AS prob#59]
+- LocalRelation [_1#55, _2#56]
== Analyzed Logical Plan ==
token: string, id: int, prob: double
Project [token#46, id#45, prob#59]
+- Join Inner, (token#46 = token#58)
:- Project [_1#42 AS id#45, _2#43 AS token#46]
: +- LocalRelation [_1#42, _2#43]
+- BroadcastHint
+- Project [_1#55 AS token#58, _2#56 AS prob#59]
+- LocalRelation [_1#55, _2#56]
== Optimized Logical Plan ==
Project [token#46, id#45, prob#59]
+- Join Inner, (token#46 = token#58)
:- Project [_1#42 AS id#45, _2#43 AS token#46]
: +- Filter isnotnull(_2#43)
: +- LocalRelation [_1#42, _2#43]
+- BroadcastHint
+- Project [_1#55 AS token#58, _2#56 AS prob#59]
+- Filter isnotnull(_1#55)
+- LocalRelation [_1#55, _2#56]
== Physical Plan ==
*Project [token#46, id#45, prob#59]
+- *BroadcastHashJoin [token#46], [token#58], Inner, BuildRight
:- *Project [_1#42 AS id#45, _2#43 AS token#46]
: +- *Filter isnotnull(_2#43)
: +- LocalTableScan [_1#42, _2#43]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- *Project [_1#55 AS token#58, _2#56 AS prob#59]
+- *Filter isnotnull(_1#55)
+- LocalTableScan [_1#55, _2#56]
expr(expr: String): Column
function parses the input expr
SQL string to a Column
it represents.
val ds = Seq((0, "hello"), (1, "world"))
.toDF("id", "token")
.as[(Long, String)]
| id|token|
| 0|hello|
| 1|world|
val filterExpr = expr("token = 'hello'")
scala> ds.filter(filterExpr).show
| id|token|
| 0|hello|
Internally, expr
uses the active session’s sqlParser or creates a new SparkSqlParser to call parseExpression method.