PySpark provides you with access to Python language bindings to the Apache Spark big data engine.
This document outlines the best practices you should follow when writing PySpark code.
Automatic Python code formatting tools already exist so this document focuses specifically on PySpark best practices and how to structure PySpark code, not on general Python code formatting.
Import the PySpark SQL functions into a variable named F
to avoid polluting the global namespace.
from pyspark.sql import functions as F
You can structure libraries such that the public interface is exposed in a single namespace, so its easy to identify the source of subsequent function invocations. Here's an example with the quinn library:
import quinn
quinn.validate_absence_of_columns(df, ["age", "cool"])
This import style makes it easy to identify where the validate_absence_of_columns
function was defined.
You can also use this import style:
from quinn import validate_absence_of_columns
Don't follow this import style that makes it hard to determine where validate_absence_of_columns
comes from:
from quinn import *
Here's an example of a column function that returns child
when the age is less than 13, teenager
when the age is between 13 and 19, and adult
when the age is above 19.
def life_stage(col):
return (
F.when(col < 13, "child")
.when(col.between(13, 19), "teenager")
.when(col > 19, "adult")
)
The life_stage()
function will return null
when col
is null
. All built-in Spark functions gracefully handle the null
case, so we don't need to write explicit null
logic in the life_stage()
function.
Column functions can also be optimized by the Spark compiler, so this is a good way to write code.
Custom DataFrame transformations are functions that take a DataFrame as an argument and return a DataFrame. Custom DataFrame transformations are easy to test and reuse, so they're a good way to structure Spark code.
Let's wrap the life_stage
column function that we previously defined in a schema dependent custom transformation.
def with_life_stage(df):
return df.withColumn("life_stage", life_stage(F.col("age")))
You can invoke this schema dependent custom DataFrame transformation with the transform
method:
df.transform(with_life_stage)
with_life_stage
is an example of a schema dependent custom DataFrame transformation because it must be run on DataFrame that contains an age
column. Schema dependent custom DataFrame transformations make assumptions about the schema of the DataFrames they're run on.
Let's refactor the with_life_stage
function so that it takes the column name as a parameter and does not depend on the underlying DataFrame schema (this syntax works as of PySpark 3.3.0).
def with_life_stage2(df, col_name):
return df.withColumn("life_stage", life_stage(F.col(col_name)))
There are two ways to invoke this schema independent custom DataFrame transformation:
# invoke with a positional argument
df.transform(with_life_stage2, "age")
# invoke with a keyword argument
df.transform(with_life_stage2, col_name="age")
Read this blog post for a full discussion on building custom transformation functions pre-PySpark 3.3.0.
Schema dependent transformations should be used for functions that rely on a large number of columns or functions that are only expected to be run on a certain schema (e.g. a data table with a schema that doesn't change).
Schema independent transformations should be run for functions that will be run on DataFrames with different schemas.
null
should be used in DataFrames for values that are unknown, missing, or irrelevant.
Spark core functions frequently return null
and your code can also add null
to DataFrames (by returning None
or relying on Spark functions that return null
).
Let's take a look at a with_full_name
custom DataFrame transformation:
def with_full_name(df):
return df.withColumn(
"full_name", F.concat_ws(" ", F.col("first_name"), F.col("last_name"))
)
with_full_name
returns null
if either the first_name
or last_name
is null
. Let's take a look at an example:
Let's create a DataFrame to demonstrate the functionality of with_full_name
:
df = spark.createDataFrame(
[("Marilyn", "Monroe"), ("Abraham", None), (None, None)]
).toDF("first_name", "last_name")
Here are the DataFrame contents:
+----------+---------+
|first_name|last_name|
+----------+---------+
| Marilyn| Monroe|
| Abraham| null|
| null| null|
+----------+---------+
Here's how to invoke the custom DataFrame transformation using the transform
method:
df.transform(with_full_name).show()
+----------+---------+--------------+
|first_name|last_name| full_name|
+----------+---------+--------------+
| Marilyn| Monroe|Marilyn Monroe|
| Abraham| null| Abraham|
| null| null| |
+----------+---------+--------------+
The nullable
property of a column should be set to false
if the column should not take null
values. Look at the nullable
properties in the resulting DataFrame.
df.transform(with_full_name).printSchema()
root
|-- first_name: string (nullable = true)
|-- last_name: string (nullable = true)
|-- full_name: string (nullable = false)
first_name
and last_name
have nullable
set to true
because they can take null
values. full_name
has nullable
set to false
because every value must be a non-null string.
See the section on User Defined Functions for more information about properly handling the None
/ null
values for UDFs.
You can use the chispa library to unit test your PySpark column functions and custom DataFrame transformations.
Let's look at how to unit test the life_stage
column function:
def life_stage(col):
return (
F.when(col < 13, "child")
.when(col.between(13, 19), "teenager")
.when(col > 19, "adult")
)
Create a test DataFrame with the expected return value of the function for each row:
df = spark.createDataFrame(
[
("karen", 56, "adult"),
("jodie", 16, "teenager"),
("jason", 3, "child"),
(None, None, None),
]
).toDF("first_name", "age", "expected")
Now invoke the function and create the result DataFrame:
res = df.withColumn("actual", life_stage(F.col("age")))
Assert that the actual return value equals the expected value:
import chispa
chispa.assert_column_equality(res, "expected", "actual")
You should always test the None
/ null
case to make sure that your code behaves as expected.
Suppose you have the following custom transformation:
def with_full_name(df):
return df.withColumn(
"full_name", F.concat_ws(" ", F.col("first_name"), F.col("last_name"))
)
Create a minimalistic input DataFrame with some test data:
input_df = spark.createDataFrame(
[("Marilyn", "Monroe"), ("Abraham", None), (None, None)]
).toDF("first_name", "last_name")
Create a DataFrame with the expected data:
expected_df = spark.createDataFrame(
[
("Marilyn", "Monroe", "Marilyn Monroe"),
("Abraham", None, "Abraham"),
(None, None, ""),
]
).toDF("first_name", "last_name", "full_name")
Make sure the expected DataFrame equals the input DataFrame with the custom DataFrame transformation applied:
chispa.assert_df_equality(
expected_df, input_df.transform(with_full_name), ignore_nullable=True
)
TODO: Determine if Black or Ruff should be recommended.
You should use Black to automatically format your code in a PEP 8 compliant manner.
You should use automatic code formatting for both your projects and your notebooks.
TODO: Add guidance
Variables should use snake_case
, as required by Black.
Variables that point to DataFrames and RDDs should be suffixed to make your code readable:
- Variables pointing to DataFrames should be suffixed with
df
:
people_df.createOrReplaceTempView("people")
- Variables pointing to RDDs should be suffixed with
rdd
:
people_rdd = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
Use the variable col
for Column
arguments.
def min(col)
Use col1
and col2
for functions that take two Column
arguments.
def corr(col1, col2)
Use cols
for functions that take an arbitrary number of Column
arguments.
def array(cols)
Use col_name
for functions that take a String
argument that refers to the name of a Column
.
def sqrt(col_name)
Use col_name1
and col_name2
for methods that take multiple column name arguments.
Collections should use plural variable names.
animals = ["dog", "cat", "goose"]
# DON'T DO THIS
animal_list = ["dog", "cat", "goose"]
Singular nouns should be used for single objects.
my_car_color = "red"
Columns have name, type, nullable, and metadata properties.
Columns that contain boolean values should use predicate names like is_nice_person
or has_red_hair
. Use snake_case
for column names, so it's easier to write SQL code.
You can write (col("is_summer") and col("is_europe"))
instead of (col("is_summer") === true and col("is_europe") === true)
. The predicate column names make the concise syntax readable.
Columns should be typed properly. Don't overuse StringType
columns.
Columns should only be nullable if null
values are allowed. Functions invoked on nullable columns should always handle null
values gracefully.
Use acronyms when needed to keep column names short. Define any acronyms used at the top of the data file, so other programmers can follow along.
Use the following shorthand notation for columns that perform comparisons.
gt
: greater thanlt
: less thanleq
: less than or equal togeq
: greater than or equal toeq
: equal tobetween
: between two values
Here are some example column names:
player_age_gt_20
player_age_gt_15_leq_30
player_age_between_13_19
player_age_eq_45
You can write User Defined Functions (UDFs) when you need to write code that leverages Python programming features / Python libraries that aren't accessible in Spark.
Here's an example of a UDF that appends "is fun" to a string:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
@udf(returnType=StringType())
def bad_funify(s):
return s + " is fun!"
The bad_funify
function is poorly structured because it errors out when run on a column with null
values.
Here's how to refactor the UDF to handle null
input without erroring out:
@udf(returnType=StringType())
def good_funify(s):
return None if s == None else s + " is fun!"
In this case, a UDF isn't even necessary. You can just define a regular column function to get the same functionality:
def best_funify(col):
return F.concat(col, F.lit(" is fun!"))
UDFs are a black box from the Spark compiler's perspective and should be avoided whenever possible.
-
with
precedes transformations that add columns:-
with_cool_cat()
adds the columncool_cat
to a DataFrame -
with_is_nice_person()
adds the columnis_nice_person
to a DataFrame.
-
-
filter
precedes transformations that remove rows:-
filter_negative_growth_rate()
removes the data rows where thegrowth_rate
column is negative -
filter_invalid_zip_codes()
removes the data with a malformedzip_code
-
-
enrich
precedes transformations that clobber columns. Clobbing columns should be avoided when possible, soenrich
transformations should only be used in rare circumstances. -
explode
precedes transformations that add rows to a DataFrame by "exploding" a row into multiple rows.
TODO
TODO
TODO
TODO
TODO: Add guidance
Libraries should generally support multiple Spark versions (multiple Python versions for that matter too). For example, the chispa library should work with many PySpark versions and chispa should intentionally avoid depending on new PySpark features that would compromise pip install chispa
from working properly on older PySpark versions.
It's OK for applications to only support a single Spark/Python version.
TODO: Add guidance on propertly supporting multiple Spark/Delta Lake versions.
TODO: Look how PySpark is documented and follow their examples
You should write generic open source code whenever possible. Open source code is easily reusable (especially when it's uploaded to PyPi) and forces you to abstract reusable chunks of open code from business logic.
TODO: Add quinn and mack examples
- Support many PySpark versions
- Expose the public interface with a clean import interface
- Only surface a minimal public interface in your libraries
- Limit project dependencies and inspect transitive dependencies closely. Try to avoid depending on projects with lots of transitive dependencies.
- Write code that's easy to copy and paste in notebooks
- Organize code into column functions and custom transformations whenever possible
- Write code in version controlled projects, so you can take advantage of text editor features (and not pay for an expensive cluster when developing)
TODO: Discuss why version control is important
TODO
TODO
TODO