Skip to content

Commit

Permalink
Add YARN as a resource / job manager
Browse files Browse the repository at this point in the history
Uses YARN by default, but can use spark standalone optionally for now.

Tested by importing and querying the Fast Genomics Gene table.
  • Loading branch information
MrCreosote committed Jul 24, 2024
1 parent 8b09cda commit 302645a
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 38 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ cdr/cdm/jupyter/
build
/.project
/.pydevproject*
/.pytest_cache/
/.settings/
4 changes: 4 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ RUN cp -r /gradle/${GRADLE_JARS_DIR}/* /opt/bitnami/spark/jars/

RUN chown -R spark_user:spark /opt/bitnami

# make an empty yarn conf dir to prevent spark from complaining
RUN mkdir -p /opt/yarn/conf && chown -R spark_user:spark /opt/yarn
ENV YARN_CONF_DIR=/opt/yarn/conf

# install pipenv
RUN pip3 install pipenv

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Python 3.11 must be installed on the system.
```
pipenv sync --dev # only the first time or when Pipfile.lock changes
pipenv shell
PYTHONPATH=. pytest test
PYTHONPATH=src pytest test
```

## Racher Deployment
Expand Down
28 changes: 28 additions & 0 deletions config/yarn-write-policy.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:*"
],
"Resource": [
"arn:aws:s3:::yarn",
"arn:aws:s3:::yarn/*"
]
},
{
"Effect": "Deny",
"Action": [
"s3:CreateBucket",
"s3:DeleteBucket",
"s3:ForceDeleteBucket",
"s3:ListAllMyBuckets"
],
"Resource": [
"arn:aws:s3:::yarn",
"arn:aws:s3:::yarn/*"
]
}
]
}
43 changes: 30 additions & 13 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,29 @@ version: '3'

services:

yarn-resourcemanager:
image: ghcr.io/kbase/cdm-prototype-yarn:pr-6
container_name: yarn-resourcemanager
ports:
- 8088:8088 # web ui
environment:
- YARN_MODE=resourcemanager
- MINIO_URL=http://minio:9002
- MINIO_ACCESS_KEY=yarnuser
- MINIO_SECRET_KEY=yarnpass

yarn-nodemanager:
image: ghcr.io/kbase/cdm-prototype-yarn:pr-6
container_name: yarn-nodemanager
ports:
- 8042:8042 # web ui
environment:
- YARN_MODE=nodemanager
- YARN_RESOURCEMANAGER_HOSTNAME=yarn-resourcemanager
- MINIO_URL=http://minio:9002
- MINIO_ACCESS_KEY=yarnuser
- MINIO_SECRET_KEY=yarnpass

spark-master:
build:
context: .
Expand Down Expand Up @@ -90,21 +113,11 @@ services:
depends_on:
minio:
condition: service_healthy
entrypoint: >
bash -c "
mc alias set minio http://minio:9002 minio minio123 &&
if ! mc ls minio/cdm-lake 2>/dev/null; then
mc mb minio/cdm-lake && echo 'Bucket cdm-lake created'
else
echo 'bucket cdm-lake already exists'
fi &&
mc admin user add minio minio-readonly minio123 &&
mc admin policy create minio cdm-lake-read-only-policy /config/cdm-lake-read-only-policy.json &&
mc admin policy attach minio cdm-lake-read-only-policy --user=minio-readonly &&
echo 'CDM Read-only user and policy set'
"
entrypoint: /scripts/minio_create_bucket_entrypoint.sh
volumes:
- ./config/cdm-lake-read-only-policy.json:/config/cdm-lake-read-only-policy.json
- ./config/yarn-write-policy.json:/config/yarn-write-policy.json
- ./scripts/minio_create_bucket_entrypoint.sh:/scripts/minio_create_bucket_entrypoint.sh

dev_notebook:
build:
Expand All @@ -118,11 +131,13 @@ services:
- minio-create-bucket
environment:
- NOTEBOOK_PORT=4041
- YARN_RESOURCE_MANAGER_URL=http://yarn-resourcemanager:8032
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_DRIVER_HOST=spark-dev-notebook
- MINIO_URL=http://minio:9002
- MINIO_ACCESS_KEY=minio
- MINIO_SECRET_KEY=minio123
- S3_YARN_BUCKET=yarn
- SPARK_MODE=notebook
- MAX_EXECUTORS=4
- POSTGRES_USER=hive
Expand All @@ -145,11 +160,13 @@ services:
- minio-create-bucket
environment:
- NOTEBOOK_PORT=4042
- YARN_RESOURCE_MANAGER_URL=http://yarn-resourcemanager:8032
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_DRIVER_HOST=spark-user-notebook
- MINIO_URL=http://minio:9002
- MINIO_ACCESS_KEY=minio-readonly
- MINIO_SECRET_KEY=minio123
- S3_YARN_BUCKET=yarn
- SPARK_MODE=notebook
- MAX_EXECUTORS=4
# TODO: create postgres user w/ only write access to the hive tables
Expand Down
32 changes: 32 additions & 0 deletions scripts/minio_create_bucket_entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/bin/bash

mc alias set minio http://minio:9002 minio minio123

# make deltalake bucket
if ! mc ls minio/cdm-lake 2>/dev/null; then
mc mb minio/cdm-lake && echo 'Bucket cdm-lake created'
else
echo 'bucket cdm-lake already exists'
fi

# make yarn bucket
if ! mc ls minio/yarn 2>/dev/null; then
mc mb minio/yarn && echo 'Bucket yarn created'
else
echo 'bucket yarn already exists'
fi

# create policies
mc admin policy create minio yarn-write-policy /config/yarn-write-policy.json
mc admin policy create minio cdm-lake-read-only-policy /config/cdm-lake-read-only-policy.json

# make read only user for user notebook w/ yarn write privs
mc admin user add minio minio-readonly minio123
mc admin policy attach minio cdm-lake-read-only-policy --user=minio-readonly
mc admin policy attach minio yarn-write-policy --user=minio-readonly
echo 'CDM Read-only user and policy set'

# make yarn user
mc admin user add minio yarnuser yarnpass
mc admin policy attach minio yarn-write-policy --user=yarnuser
echo 'YARN user and policy set'
54 changes: 33 additions & 21 deletions src/spark/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import site
from datetime import datetime
from threading import Timer
from urllib.parse import urlparse

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession, DataFrame
Expand Down Expand Up @@ -36,14 +37,19 @@ def _get_jars(jar_names: list) -> str:
return ", ".join(jars)


def _get_delta_lake_conf(
jars_str: str,
) -> dict:
def _get_s3_conf() -> dict:
return {

Check warning on line 41 in src/spark/utils.py

View check run for this annotation

Codecov / codecov/patch

src/spark/utils.py#L41

Added line #L41 was not covered by tests
"spark.hadoop.fs.s3a.endpoint": os.environ.get("MINIO_URL"),
"spark.hadoop.fs.s3a.access.key": os.environ.get("MINIO_ACCESS_KEY"),
"spark.hadoop.fs.s3a.secret.key": os.environ.get("MINIO_SECRET_KEY"),
"spark.hadoop.fs.s3a.path.style.access": "true",
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
}

def _get_delta_lake_conf() -> dict:
"""
Helper function to get Delta Lake specific Spark configuration.
:param jars_str: A comma-separated string of JAR file paths
:return: A dictionary of Delta Lake specific Spark configuration
reference: https://blog.min.io/delta-lake-minio-multi-cloud/
Expand All @@ -52,15 +58,9 @@ def _get_delta_lake_conf(
site_packages_path = site.getsitepackages()[0]

return {
"spark.jars": jars_str,
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
"spark.databricks.delta.retentionDurationCheck.enabled": "false",
"spark.hadoop.fs.s3a.endpoint": os.environ.get("MINIO_URL"),
"spark.hadoop.fs.s3a.access.key": os.environ.get("MINIO_ACCESS_KEY"),
"spark.hadoop.fs.s3a.secret.key": os.environ.get("MINIO_SECRET_KEY"),
"spark.hadoop.fs.s3a.path.style.access": "true",
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.sql.catalogImplementation": "hive",
# SparkMonitor extension configuration
# https://github.com/swan-cern/sparkmonitor?tab=readme-ov-file#setting-up-the-extension
Expand All @@ -77,6 +77,7 @@ def _stop_spark_session(spark):
def _get_base_spark_conf(
app_name: str,
executor_cores: int,
yarn: bool
) -> SparkConf:
"""
Helper function to get the base Spark configuration.
Expand All @@ -86,16 +87,21 @@ def _get_base_spark_conf(
:return: A SparkConf object with the base configuration
"""
return SparkConf().setAll([
("spark.master", os.environ.get("SPARK_MASTER_URL", "spark://spark-master:7077")),
("spark.app.name", app_name),
("spark.executor.cores", executor_cores),
])
sc = SparkConf().set("spark.app.name", app_name).set("spark.executor.cores", executor_cores)
if yarn:
yarnparse = urlparse(os.environ.get("YARN_RESOURCE_MANAGER_URL"))
sc.setMaster("yarn"

Check warning on line 93 in src/spark/utils.py

View check run for this annotation

Codecov / codecov/patch

src/spark/utils.py#L92-L93

Added lines #L92 - L93 were not covered by tests
).set("spark.hadoop.yarn.resourcemanager.hostname", yarnparse.hostname
).set("spark.hadoop.yarn.resourcemanager.address", yarnparse.netloc)
else:
sc.set("spark.master", os.environ.get("SPARK_MASTER_URL", "spark://spark-master:7077"))
return sc


def get_spark_session(
app_name: str = None,
local: bool = False,
yarn: bool = True,
delta_lake: bool = True,
timeout_sec: int = 4 * 60 * 60,
executor_cores: int = DEFAULT_EXECUTOR_CORES) -> SparkSession:
Expand All @@ -116,18 +122,24 @@ def get_spark_session(
if local:
return SparkSession.builder.appName(app_name).getOrCreate()

spark_conf = _get_base_spark_conf(app_name, executor_cores)
spark_conf = _get_base_spark_conf(app_name, executor_cores, yarn)
sc = {}
if delta_lake or yarn:
sc.update(_get_s3_conf())

Check warning on line 128 in src/spark/utils.py

View check run for this annotation

Codecov / codecov/patch

src/spark/utils.py#L128

Added line #L128 was not covered by tests
if yarn:
sc["spark.yarn.stagingDir"] = "s3a://" + os.environ["S3_YARN_BUCKET"]

Check warning on line 130 in src/spark/utils.py

View check run for this annotation

Codecov / codecov/patch

src/spark/utils.py#L130

Added line #L130 was not covered by tests

if delta_lake:

# Just to include the necessary jars for Delta Lake
jar_names = [f"delta-spark_{SCALA_VER}-{DELTA_SPARK_VER}.jar",
f"hadoop-aws-{HADOOP_AWS_VER}.jar"]
jars_str = _get_jars(jar_names)
delta_conf = _get_delta_lake_conf(jars_str)
for key, value in delta_conf.items():
spark_conf.set(key, value)
if not yarn:
sc["spark.jars"] = _get_jars(jar_names)
sc.update(_get_delta_lake_conf())

Check warning on line 139 in src/spark/utils.py

View check run for this annotation

Codecov / codecov/patch

src/spark/utils.py#L137-L139

Added lines #L137 - L139 were not covered by tests

for key, value in sc.items():
spark_conf.set(key, value)

Check warning on line 142 in src/spark/utils.py

View check run for this annotation

Codecov / codecov/patch

src/spark/utils.py#L142

Added line #L142 was not covered by tests
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
timeout_sec = os.getenv('SPARK_TIMEOUT_SECONDS', timeout_sec)
Timer(int(timeout_sec), _stop_spark_session, [spark]).start()
Expand Down
6 changes: 3 additions & 3 deletions test/src/spark/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def spark_session_non_local(mock_spark_master):

with mock.patch.dict('os.environ', {"SPARK_MASTER_URL": spark_master_url,
"SPARK_TIMEOUT_SECONDS": "2"}):
spark_session = get_spark_session("TestApp", local=False, delta_lake=False)
spark_session = get_spark_session("TestApp", local=False, delta_lake=False, yarn=False)
print("Created non-local Spark session.")
try:
yield spark_session, port
Expand Down Expand Up @@ -98,7 +98,7 @@ def test_get_base_spark_conf():
executor_cores = 3

with mock.patch.dict('os.environ', {}):
result = _get_base_spark_conf(app_name, executor_cores)
result = _get_base_spark_conf(app_name, executor_cores, False)
assert isinstance(result, SparkConf)
assert result.get("spark.master") == expected_master_url
assert result.get("spark.app.name") == expected_app_name
Expand All @@ -111,7 +111,7 @@ def test_get_base_spark_conf_with_env():
executor_cores = 3

with mock.patch.dict('os.environ', {"SPARK_MASTER_URL": custom_master_url}):
result = _get_base_spark_conf(app_name, executor_cores)
result = _get_base_spark_conf(app_name, executor_cores, False)
assert isinstance(result, SparkConf)
assert result.get("spark.master") == custom_master_url
assert result.get("spark.app.name") == app_name
Expand Down

0 comments on commit 302645a

Please sign in to comment.