Skip to content

Commit

Permalink
Merge pull request #9 from kbase/dev_add_delta_spark_packages
Browse files Browse the repository at this point in the history
add delta spark packages to support delta lake
  • Loading branch information
Tianhao-Gu authored May 23, 2024
2 parents bcc0e28 + 796a8d4 commit 2120545
Show file tree
Hide file tree
Showing 9 changed files with 324 additions and 56 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.DS_Store
.idea
.coverage
*_pycache__
*_pycache__
cdr/cdm/jupyter/
23 changes: 19 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,24 @@ USER root

RUN apt-get update && apt-get install -y \
# GCC required to resolve error during JupyterLab installation: psutil could not be installed from sources because gcc is not installed.
gcc \
gcc curl \
&& rm -rf /var/lib/apt/lists/*

# TODO: using Gradle to build the jar
# Install jars to support delta lake spark operations
ENV HADOOP_AWS_VER=3.3.4
RUN curl -O https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_AWS_VER}/hadoop-aws-${HADOOP_AWS_VER}.jar \
&& mv hadoop-aws-${HADOOP_AWS_VER}.jar /opt/bitnami/spark/jars

# NOTE: ensure Delta Spark jars matche python pip delta-spark version specified in the Pipfile
ENV DELTA_SPARK_VER=3.2.0
ENV SCALA_VER=2.12
RUN curl -O https://repo1.maven.org/maven2/io/delta/delta-spark_${SCALA_VER}/${DELTA_SPARK_VER}/delta-spark_${SCALA_VER}-${DELTA_SPARK_VER}.jar \
&& mv delta-spark_${SCALA_VER}-${DELTA_SPARK_VER}.jar /opt/bitnami/spark/jars

RUN curl -O https://repo1.maven.org/maven2/io/delta/delta-storage/${DELTA_SPARK_VER}/delta-storage-${DELTA_SPARK_VER}.jar \
&& mv delta-storage-${DELTA_SPARK_VER}.jar /opt/bitnami/spark/jars

# install pipenv
RUN pip3 install pipenv

Expand All @@ -19,10 +34,10 @@ RUN pipenv sync --system
COPY ./src/ /src
ENV PYTHONPATH "${PYTHONPATH}:/src"

COPY scripts/entrypoint.sh /opt/
RUN chmod a+x /opt/entrypoint.sh
COPY ./scripts/ /opt/scripts/
RUN chmod a+x /opt/scripts/*.sh

# Switch back to the original user
USER ${ORI_USER}

ENTRYPOINT ["/opt/entrypoint.sh"]
ENTRYPOINT ["/opt/scripts/entrypoint.sh"]
3 changes: 3 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ name = "pypi"
jupyterlab= "==4.2.0"
pyspark= "==3.5.1"
boto3 = "==1.34.109"
minio = "==7.2.7"
delta-spark = "==3.2.0" # should match JAR version (DELTA_SPARK_VER) specified in the Dockerfile
pandas = "==2.2.2"

[dev-packages]
pytest = "==8.2.1"
Expand Down
188 changes: 175 additions & 13 deletions Pipfile.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ In addition, the environment variable `SPARK_MASTER_URL` should also be configur
```python
from spark.utils import get_spark_session

spark = get_spark_session('TestApp')
spark = get_spark_session(app_name)

# To build spark session for Delta Lake operations, set the delta_lake parameter to True
spark = get_spark_session(app_name, delta_lake=True)
```

#### Manually Configuring SparkSession/SparkContext
Expand Down
51 changes: 49 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ version: '3'
services:

spark-master:
image: bitnami/spark:3.5.1
build:
context: .
dockerfile: Dockerfile
container_name: spark-master
ports:
- "8080:8080"
Expand Down Expand Up @@ -51,6 +53,44 @@ services:
environment:
- SPARK_MASTER_URL=spark://spark-master:7077

minio:
image: minio/minio
container_name: spark-minio
expose:
- "9000"
ports:
- "9000:9000"
# MinIO Console is available at http://localhost:9001
- "9001:9001"
environment:
MINIO_ROOT_USER: minio
MINIO_ROOT_PASSWORD: minio123
MINIO_ACCESS_KEY: minio
MINIO_SECRET_KEY: minio123
healthcheck:
# reference: https://github.com/rodrigobdz/docker-compose-healthchecks?tab=readme-ov-file#minio-release2023-11-01t18-37-25z-and-older
test: timeout 5s bash -c ':> /dev/tcp/127.0.0.1/9000' || exit 1
interval: 1s
timeout: 10s
retries: 5
# Note there is no bucket by default
command: server /data --console-address ":9001"

minio-create-bucket:
image: minio/mc
depends_on:
minio:
condition: service_healthy
entrypoint: >
bash -c "
mc alias set minio http://minio:9000 minio minio123 &&
if ! mc ls minio/delta-lake 2>/dev/null; then
mc mb minio/delta-lake && echo 'Bucket delta-lake created'
else
echo 'bucket delta-lake already exists'
fi
"
notebook:
build:
context: .
Expand All @@ -60,7 +100,14 @@ services:
- "4041:4041"
depends_on:
- spark-master
- minio-create-bucket
environment:
- NOTEBOOK_PORT=4041
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_DRIVER_HOST=spark-notebook
- SPARK_DRIVER_HOST=spark-notebook
- MINIO_URL=http://minio:9000
- MINIO_ACCESS_KEY=minio
- MINIO_SECRET_KEY=minio123
- SPARK_MODE=notebook
volumes:
- ./cdr/cdm/jupyter:/cdm_shared_workspace
32 changes: 8 additions & 24 deletions scripts/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -1,26 +1,10 @@
#!/bin/bash

echo "starting jupyter notebook"

if [ -n "$SPARK_DRIVER_HOST" ]; then
echo "Setting spark.driver.host to $SPARK_DRIVER_HOST"
source /opt/bitnami/scripts/spark-env.sh
if [ -z "$SPARK_CONF_FILE" ]; then
echo "Error: unable to find SPARK_CONF_FILE path"
exit 1
fi
echo "spark.driver.host $SPARK_DRIVER_HOST" >> $SPARK_CONF_FILE
fi

WORKSPACE_DIR="/cdm_shared_workspace"
mkdir -p "$WORKSPACE_DIR"
cd "$WORKSPACE_DIR"

# Start Jupyter Lab
jupyter lab --ip=0.0.0.0 \
--port=$NOTEBOOK_PORT \
--no-browser \
--allow-root \
--notebook-dir="$WORKSPACE_DIR" \
--ServerApp.token='' \
--ServerApp.password=''
if [ "$SPARK_MODE" = "notebook" ]; then
exec /opt/scripts/notebook_entrypoint.sh "$@"
else
# In bitnami/spark Dockerfile, the entrypoint is set to /opt/bitnami/scripts/spark/entrypoint.sh and followed
# by CMD ["/opt/bitnami/scripts/spark/run.sh"] meaning that the entrypoint is expected the run.sh script as an argument.
# reference: https://github.com/bitnami/containers/blob/main/bitnami/spark/3.5/debian-12/Dockerfile#L69
exec /opt/bitnami/scripts/spark/entrypoint.sh "$@" /opt/bitnami/scripts/spark/run.sh
fi
26 changes: 26 additions & 0 deletions scripts/notebook_entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/bin/bash

echo "starting jupyter notebook"

if [ -n "$SPARK_DRIVER_HOST" ]; then
echo "Setting spark.driver.host to $SPARK_DRIVER_HOST"
source /opt/bitnami/scripts/spark-env.sh
if [ -z "$SPARK_CONF_FILE" ]; then
echo "Error: unable to find SPARK_CONF_FILE path"
exit 1
fi
echo "spark.driver.host $SPARK_DRIVER_HOST" >> $SPARK_CONF_FILE
fi

WORKSPACE_DIR="/cdm_shared_workspace"
mkdir -p "$WORKSPACE_DIR"
cd "$WORKSPACE_DIR"

# Start Jupyter Lab
jupyter lab --ip=0.0.0.0 \
--port=$NOTEBOOK_PORT \
--no-browser \
--allow-root \
--notebook-dir="$WORKSPACE_DIR" \
--ServerApp.token='' \
--ServerApp.password=''
49 changes: 38 additions & 11 deletions src/spark/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
from pyspark.sql import SparkSession


def get_spark_session(app_name: str, local: bool = False) -> SparkSession:
def get_spark_session(app_name: str,
local: bool = False,
delta_lake: bool = False) -> SparkSession:
"""
Helper to get and manage the `SparkSession` and keep all of our spark configuration params in one place.
:param app_name: The name of the application
:param local: Whether to run the spark session locally or not
:param local: Whether to run the spark session locally or doesn't
:param delta_lake: build the spark session with delta lake support
:return: A `SparkSession` object
"""
Expand All @@ -19,14 +22,38 @@ def get_spark_session(app_name: str, local: bool = False) -> SparkSession:

spark_conf = SparkConf()

spark_conf.setAll(
[
(
"spark.master",
os.environ.get("SPARK_MASTER_URL", "spark://spark-master:7077"),
),
("spark.app.name", app_name),
]
)
if delta_lake:

jars_dir = "/opt/bitnami/spark/jars/"
jar_files = [os.path.join(jars_dir, f) for f in os.listdir(jars_dir) if f.endswith(".jar")]
jars = ",".join(jar_files)

spark_conf.setAll(
[
(
"spark.master",
os.environ.get("SPARK_MASTER_URL", "spark://spark-master:7077"),
),
("spark.app.name", app_name),
("spark.hadoop.fs.s3a.endpoint", os.environ.get("MINIO_URL")),
("spark.hadoop.fs.s3a.access.key", os.environ.get("MINIO_ACCESS_KEY")),
("spark.hadoop.fs.s3a.secret.key", os.environ.get("MINIO_SECRET_KEY")),
("spark.jars", jars),
("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension"),
("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog"),
("spark.hadoop.fs.s3a.path.style.access", "true"),
("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"),
]
)
else:
spark_conf.setAll(
[
(
"spark.master",
os.environ.get("SPARK_MASTER_URL", "spark://spark-master:7077"),
),
("spark.app.name", app_name),
]
)

return SparkSession.builder.config(conf=spark_conf).getOrCreate()

0 comments on commit 2120545

Please sign in to comment.