Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add delta spark packages to support delta lake #9

Merged
merged 8 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.DS_Store
.idea
.coverage
*_pycache__
*_pycache__
cdm_shared_workspace/
22 changes: 17 additions & 5 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,23 @@ USER root

RUN apt-get update && apt-get install -y \
# GCC required to resolve error during JupyterLab installation: psutil could not be installed from sources because gcc is not installed.
gcc \
gcc curl \
&& rm -rf /var/lib/apt/lists/*

# Install jars to support delta lake spark operations
ENV HADOOP_AWS_VER=3.3.4
RUN curl -O https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_AWS_VER}/hadoop-aws-${HADOOP_AWS_VER}.jar \
&& mv hadoop-aws-${HADOOP_AWS_VER}.jar /opt/bitnami/spark/jars

# NOTE: ensure Delta Spark jars matche python pip delta-spark version specified in the Pipfile
ENV DELTA_SPARK_VER=3.2.0
ENV SCALA_VER=2.12
RUN curl -O https://repo1.maven.org/maven2/io/delta/delta-spark_${SCALA_VER}/${DELTA_SPARK_VER}/delta-spark_${SCALA_VER}-${DELTA_SPARK_VER}.jar \
&& mv delta-spark_${SCALA_VER}-${DELTA_SPARK_VER}.jar /opt/bitnami/spark/jars

Run curl -O https://repo1.maven.org/maven2/io/delta/delta-storage/${DELTA_SPARK_VER}/delta-storage-${DELTA_SPARK_VER}.jar \
MrCreosote marked this conversation as resolved.
Show resolved Hide resolved
&& mv delta-storage-${DELTA_SPARK_VER}.jar /opt/bitnami/spark/jars
MrCreosote marked this conversation as resolved.
Show resolved Hide resolved

# install pipenv
RUN pip3 install pipenv

Expand All @@ -19,10 +33,8 @@ RUN pipenv sync --system
COPY ./src/ /src
ENV PYTHONPATH "${PYTHONPATH}:/src"

COPY scripts/entrypoint.sh /opt/
RUN chmod a+x /opt/entrypoint.sh
COPY ./scripts/ /opt/scripts/
RUN chmod a+x /opt/scripts/*.sh

# Switch back to the original user
USER ${ORI_USER}

ENTRYPOINT ["/opt/entrypoint.sh"]
3 changes: 3 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ name = "pypi"
jupyterlab= "==4.2.0"
pyspark= "==3.5.1"
boto3 = "==1.34.109"
minio = "==7.2.7"
delta-spark = "==3.2.0" # should match JAR version (DELTA_SPARK_VER) specified in the Dockerfile
pandas = "==2.2.2"

[dev-packages]
pytest = "==8.2.1"
Expand Down
188 changes: 175 additions & 13 deletions Pipfile.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ In addition, the environment variable `SPARK_MASTER_URL` should also be configur
```python
from spark.utils import get_spark_session

spark = get_spark_session('TestApp')
spark = get_spark_session(app_name)

# To build spark session for Delta Lake operations, set the delta_lake parameter to True
spark = get_spark_session(app_name, delta_lake=True)
```

#### Manually Configuring SparkSession/SparkContext
Expand Down
49 changes: 47 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ version: '3'
services:

spark-master:
image: bitnami/spark:3.5.1
build:
context: .
dockerfile: Dockerfile
container_name: spark-master
ports:
- "8080:8080"
Expand Down Expand Up @@ -51,10 +53,48 @@ services:
environment:
- SPARK_MASTER_URL=spark://spark-master:7077

minio:
image: minio/minio
container_name: spark-minio
expose:
- "9000"
ports:
- "9000:9000"
# MinIO Console is available at http://localhost:9001
- "9001:9001"
environment:
MINIO_ROOT_USER: minio
MINIO_ROOT_PASSWORD: minio123
MINIO_ACCESS_KEY: minio
MINIO_SECRET_KEY: minio123
healthcheck:
test: timeout 5s bash -c ':> /dev/tcp/127.0.0.1/9000' || exit 1
MrCreosote marked this conversation as resolved.
Show resolved Hide resolved
interval: 1s
timeout: 10s
retries: 5
# Note there is no bucket by default
command: server /data --console-address ":9001"
MrCreosote marked this conversation as resolved.
Show resolved Hide resolved

minio-create-bucket:
MrCreosote marked this conversation as resolved.
Show resolved Hide resolved
image: minio/mc
depends_on:
minio:
condition: service_healthy
entrypoint: >
bash -c "
mc alias set minio http://minio:9000 minio minio123 &&
if ! mc ls minio/delta-lake 2>/dev/null; then
mc mb minio/delta-lake && echo 'Bucket delta-lake created'
else
echo 'bucket delta-lake already exists'
fi
"

notebook:
build:
context: .
dockerfile: Dockerfile
entrypoint: /opt/scripts/notebook_entrypoint.sh
MrCreosote marked this conversation as resolved.
Show resolved Hide resolved
container_name: spark-notebook
ports:
- "4041:4041"
Expand All @@ -63,4 +103,9 @@ services:
environment:
- NOTEBOOK_PORT=4041
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_DRIVER_HOST=spark-notebook
- SPARK_DRIVER_HOST=spark-notebook
- MINIO_URL=http://minio:9000
- MINIO_ACCESS_KEY=minio
- MINIO_SECRET_KEY=minio123
volumes:
MrCreosote marked this conversation as resolved.
Show resolved Hide resolved
- ./cdm_shared_workspace:/cdm_shared_workspace
File renamed without changes.
50 changes: 39 additions & 11 deletions src/spark/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
from pyspark.sql import SparkSession


def get_spark_session(app_name: str, local: bool = False) -> SparkSession:
def get_spark_session(app_name: str,
local: bool = False,
delta_lake: bool = False) -> SparkSession:
"""
Helper to get and manage the `SparkSession` and keep all of our spark configuration params in one place.

:param app_name: The name of the application
:param local: Whether to run the spark session locally or not
:param local: Whether to run the spark session locally or doesn't
:param delta_lake: build the spark session with delta lake support

:return: A `SparkSession` object
"""
Expand All @@ -19,14 +22,39 @@

spark_conf = SparkConf()

spark_conf.setAll(
[
(
"spark.master",
os.environ.get("SPARK_MASTER_URL", "spark://spark-master:7077"),
),
("spark.app.name", app_name),
]
)
if delta_lake:

jars_dir = "/opt/bitnami/spark/jars/"
jar_files = [os.path.join(jars_dir, f) for f in os.listdir(jars_dir) if f.endswith(".jar")]
jars = ",".join(jar_files)

Check warning on line 29 in src/spark/utils.py

View check run for this annotation

Codecov / codecov/patch

src/spark/utils.py#L27-L29

Added lines #L27 - L29 were not covered by tests

spark_conf.setAll(

Check warning on line 31 in src/spark/utils.py

View check run for this annotation

Codecov / codecov/patch

src/spark/utils.py#L31

Added line #L31 was not covered by tests
[
(
"spark.master",
os.environ.get("SPARK_MASTER_URL", "spark://spark-master:7077"),
),
("spark.app.name", app_name),
("spark.hadoop.fs.s3a.endpoint", os.environ.get("MINIO_URL")),
("spark.hadoop.fs.s3a.access.key", os.environ.get("MINIO_ACCESS_KEY")),
("spark.hadoop.fs.s3a.secret.key", os.environ.get("MINIO_SECRET_KEY")),
("spark.jars", jars),
("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension"),
("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog"),
("spark.hadoop.fs.s3a.path.style.access", "true"),
("spark.hadoop.fs.s3a.connection.ssl.enabled", "false"),
MrCreosote marked this conversation as resolved.
Show resolved Hide resolved
("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"),
]
)
else:
spark_conf.setAll(
[
(
"spark.master",
os.environ.get("SPARK_MASTER_URL", "spark://spark-master:7077"),
),
("spark.app.name", app_name),
]
)

return SparkSession.builder.config(conf=spark_conf).getOrCreate()
Loading