Skip to content

Commit

Permalink
Prepare Ecommerce codebase for Spark ( Iceberg ) Support (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
ilias1111 authored Oct 8, 2024
1 parent 07a61b3 commit b63685c
Show file tree
Hide file tree
Showing 29 changed files with 595 additions and 94 deletions.
79 changes: 57 additions & 22 deletions .github/workflows/pr_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ name: pr_tests

on:
pull_request:
branches:
- main
- 'release/**'

concurrency: dbt_integration_tests

Expand Down Expand Up @@ -42,13 +39,12 @@ env:
SNOWFLAKE_TEST_WAREHOUSE: ${{ secrets.SNOWFLAKE_TEST_WAREHOUSE }}

# Postgres Connection
POSTGRES_TEST_HOST: ${{ secrets.POSTGRES_TEST_HOST }}
POSTGRES_TEST_USER: ${{ secrets.POSTGRES_TEST_USER }}
POSTGRES_TEST_PASS: ${{ secrets.POSTGRES_TEST_PASS }}
POSTGRES_TEST_PORT: ${{ secrets.POSTGRES_TEST_PORT }}
POSTGRES_TEST_DBNAME: ${{ secrets.POSTGRES_TEST_DBNAME }}
POSTGRES_TEST_HOST: ${{ secrets.POSTGRES_TEST_HOST }}
POSTGRES_TEST_PORT: ${{ secrets.POSTGRES_TEST_PORT }}

# Databricks Connection
DATABRICKS_TEST_HOST: ${{ secrets.DATABRICKS_TEST_HOST }}
DATABRICKS_TEST_HTTP_PATH: ${{ secrets.DATABRICKS_TEST_HTTP_PATH }}
DATABRICKS_TEST_TOKEN: ${{ secrets.DATABRICKS_TEST_TOKEN }}
Expand All @@ -63,10 +59,10 @@ jobs:
# Run tests from integration_tests sub dir
working-directory: ./integration_tests
strategy:
fail-fast: false
matrix:
dbt_version: ["1.*"]
warehouse: ["bigquery", "snowflake", "databricks", "postgres"] # TODO: Add RS self-hosted runner

warehouse: ["postgres", "bigquery", "snowflake", "databricks", "spark_iceberg"] # TODO: Add RS self-hosted runner
services:
postgres:
image: postgres:latest
Expand All @@ -90,51 +86,90 @@ jobs:

# Remove '*' and replace '.' with '_' in DBT_VERSION & set as SCHEMA_SUFFIX.
# SCHEMA_SUFFIX allows us to run multiple versions of dbt in parallel without overwriting the output tables
- name: Set SCHEMA_SUFFIX env
run: echo "SCHEMA_SUFFIX=$(echo ${DBT_VERSION%.*} | tr . _)" >> $GITHUB_ENV
env:
DBT_VERSION: ${{ matrix.dbt_version }}
- name: Configure Docker credentials
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKERHUB_SNOWPLOWCI_READ_USERNAME }}
password: ${{ secrets.DOCKERHUB_SNOWPLOWCI_READ_PASSWORD }}
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: eu-west-1
- name: Set warehouse variables
id: set_warehouse
run: |
WAREHOUSE_PLATFORM=$(echo ${{ matrix.warehouse }} | cut -d'_' -f1)
WAREHOUSE_SPECIFIC=$(echo ${{ matrix.warehouse }} | cut -s -d'_' -f2)
echo "WAREHOUSE_PLATFORM=${WAREHOUSE_PLATFORM}" >> $GITHUB_ENV
echo "WAREHOUSE_SPECIFIC=${WAREHOUSE_SPECIFIC}" >> $GITHUB_ENV
echo "warehouse_platform=${WAREHOUSE_PLATFORM}" >> $GITHUB_OUTPUT
echo "warehouse_specific=${WAREHOUSE_SPECIFIC}" >> $GITHUB_OUTPUT
# Remove '*' and replace '.' with '_' in DBT_VERSION & set as SCHEMA_SUFFIX.
# SCHEMA_SUFFIX allows us to run multiple versions of dbt in parallel without overwriting the output tables
- name: Set SCHEMA_SUFFIX env
run: echo "SCHEMA_SUFFIX=$(echo ${DBT_VERSION%.*} | tr . _)" >> $GITHUB_ENV
env:
DBT_VERSION: ${{ matrix.dbt_version }}

- name: Set DEFAULT_TARGET env
run: |
echo "DEFAULT_TARGET=${{ matrix.warehouse }}" >> $GITHUB_ENV
echo "DEFAULT_TARGET=${{matrix.warehouse}}" >> $GITHUB_ENV
- name: Python setup
uses: actions/setup-python@v5
uses: actions/setup-python@v4
with:
python-version: "3.8.x"

- name: Pip cache
uses: actions/cache@v4
uses: actions/cache@v3
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{ matrix.warehouse }}
key: ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{env.WAREHOUSE_PLATFORM}}
restore-keys: |
${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{ matrix.warehouse }}
${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{env.WAREHOUSE_PLATFORM}}
# Install latest patch version. Upgrade if cache contains old patch version.
- name: Install dependencies
run: |
pip install --upgrade pip wheel setuptools
pip install -Iv dbt-${{ matrix.warehouse }}==${{ matrix.dbt_version }} --upgrade
pip install wheel setuptools
pip install -Iv dbt-${{env.WAREHOUSE_PLATFORM}}==${{ matrix.dbt_version }} --upgrade
dbt deps
if: ${{matrix.warehouse != 'spark'}}
if: ${{env.WAREHOUSE_PLATFORM != 'spark'}}

- name: Install spark dependencies
run: |
pip install --upgrade pip wheel setuptools
pip install -Iv "dbt-${{ matrix.warehouse }}[ODBC]"==${{ matrix.dbt_version }} --upgrade
pip install -Iv "dbt-${{ env.WAREHOUSE_PLATFORM }}[PyHive]"==${{ matrix.dbt_version }} --upgrade
dbt deps
if: ${{matrix.warehouse == 'spark'}}
if: ${{env.WAREHOUSE_PLATFORM == 'spark'}}

- name: Install Docker Compose
run: |
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
- name: Build and start Spark cluster
working-directory: .github/workflows/spark_deployment
run: |
docker-compose up -d
echo "Waiting for Spark services to start..."
sleep 90
if: ${{env.WAREHOUSE_PLATFORM == 'spark'}}

- name: "Pre-test: Drop ci schemas"
run: |
dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }}
dbt run-operation post_ci_cleanup --target ${{matrix.warehouse}}
- name: Run tests
run: ./.scripts/integration_test.sh -d ${{ matrix.warehouse }}
run: ./.scripts/integration_test.sh -d ${{matrix.warehouse}}

# post_ci_cleanup sits in utils package
- name: "Post-test: Drop ci schemas"
run: |
dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }}
dbt run-operation post_ci_cleanup --target ${{matrix.warehouse}}
34 changes: 34 additions & 0 deletions .github/workflows/spark_deployment/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
FROM openjdk:11-jre-slim

# Set environment variables
ENV SPARK_VERSION=3.5.1
ENV HADOOP_VERSION=3.3.4
ENV ICEBERG_VERSION=1.4.2
ENV AWS_SDK_VERSION=1.12.581

# Install necessary tools
RUN apt-get update && apt-get install -y curl wget procps rsync ssh

# Download and install Spark
RUN wget https://downloads.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz && \
tar -xvzf spark-${SPARK_VERSION}-bin-hadoop3.tgz && \
mv spark-${SPARK_VERSION}-bin-hadoop3 /spark && \
rm spark-${SPARK_VERSION}-bin-hadoop3.tgz

# Set Spark environment variables
ENV SPARK_HOME=/spark
ENV PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

# Download necessary JARs
RUN mkdir -p /spark/jars && \
wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/${ICEBERG_VERSION}/iceberg-spark-runtime-3.5_2.12-${ICEBERG_VERSION}.jar -O /spark/jars/iceberg-spark-runtime.jar && \
wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar -O /spark/jars/iceberg-aws-bundle.jar && \
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -O /spark/jars/hadoop-aws.jar && \
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar -O /spark/jars/aws-java-sdk-bundle.jar

# Create directory for Spark events
RUN mkdir -p /tmp/spark-events

WORKDIR /spark

CMD ["bash"]
20 changes: 20 additions & 0 deletions .github/workflows/spark_deployment/build_and_push.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash

# Set variables
DOCKER_HUB_ORG="snowplow"
IMAGE_NAME="spark-s3-iceberg"
TAG="latest"

# Build the image
echo "Building Docker image..."
docker build --platform linux/amd64 -t $DOCKER_HUB_ORG/$IMAGE_NAME:$TAG .

# Log in to Docker Hub
echo "Logging in to Docker Hub..."
docker login

# Push the image to Docker Hub
echo "Pushing image to Docker Hub..."
docker push $DOCKER_HUB_ORG/$IMAGE_NAME:$TAG

echo "Image successfully built and pushed to Docker Hub"
66 changes: 66 additions & 0 deletions .github/workflows/spark_deployment/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
version: '3'

networks:
spark-network:
driver: bridge

services:
spark-master:
image: snowplow/spark-s3-iceberg:latest
command: ["/bin/bash", "-c", "/spark/sbin/start-master.sh -h spark-master --properties-file /spark/conf/spark-defaults.conf && tail -f /spark/logs/spark--org.apache.spark.deploy.master.Master-1-*.out"]
hostname: spark-master
ports:
- '8080:8080'
- '7077:7077'
environment:
- SPARK_LOCAL_IP=spark-master
- SPARK_MASTER_HOST=spark-master
- SPARK_MASTER_PORT=7077
- SPARK_MASTER_OPTS="-Dspark.driver.memory=2g"
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_REGION=eu-west-1
- AWS_DEFAULT_REGION=eu-west-1
volumes:
- ./spark-defaults.conf:/spark/conf/spark-defaults.conf
networks:
- spark-network

spark-worker:
image: snowplow/spark-s3-iceberg:latest
command: ["/bin/bash", "-c", "sleep 10 && /spark/sbin/start-worker.sh spark://spark-master:7077 --properties-file /spark/conf/spark-defaults.conf && tail -f /spark/logs/spark--org.apache.spark.deploy.worker.Worker-*.out"]
depends_on:
- spark-master
environment:
- SPARK_WORKER_CORES=2
- SPARK_WORKER_MEMORY=4G
- SPARK_EXECUTOR_MEMORY=3G
- SPARK_LOCAL_IP=spark-worker
- SPARK_MASTER=spark://spark-master:7077
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_REGION=eu-west-1
- AWS_DEFAULT_REGION=eu-west-1
volumes:
- ./spark-defaults.conf:/spark/conf/spark-defaults.conf
networks:
- spark-network

thrift-server:
image: snowplow/spark-s3-iceberg:latest
command: ["/bin/bash", "-c", "sleep 30 && /spark/sbin/start-thriftserver.sh --master spark://spark-master:7077 --driver-memory 2g --executor-memory 3g --hiveconf hive.server2.thrift.port=10000 --hiveconf hive.server2.thrift.bind.host=0.0.0.0 --conf spark.sql.hive.thriftServer.async=true --conf spark.sql.hive.thriftServer.workerQueue.size=2000 --conf spark.sql.hive.thriftServer.maxWorkerThreads=100 --conf spark.sql.hive.thriftServer.minWorkerThreads=50 && tail -f /spark/logs/spark--org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-*.out"]
ports:
- '10000:10000'
depends_on:
- spark-master
- spark-worker
environment:
- SPARK_LOCAL_IP=thrift-server
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_REGION=eu-west-1
- AWS_DEFAULT_REGION=eu-west-1
volumes:
- ./spark-defaults.conf:/spark/conf/spark-defaults.conf
networks:
- spark-network
44 changes: 44 additions & 0 deletions .github/workflows/spark_deployment/spark-defaults.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
spark.master spark://spark-master:7077

spark.sql.warehouse.dir s3a://dbt-spark-iceberg/github-integration-testing
spark.sql.catalog.glue org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.glue.catalog-impl org.apache.iceberg.aws.glue.GlueCatalog
spark.sql.catalog.glue.warehouse s3a://dbt-spark-iceberg/github-integration-testing
spark.sql.catalog.glue.io-impl org.apache.iceberg.aws.s3.S3FileIO
spark.sql.defaultCatalog glue
spark.sql.catalog.glue.database dbt-spark-iceberg

spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.access.key <AWS_ACCESS_KEY_ID>
spark.hadoop.fs.s3a.secret.key <AWS_SECRET_ACCESS_KEY>
spark.hadoop.fs.s3a.endpoint s3.eu-west-1.amazonaws.com
spark.hadoop.fs.s3a.path.style.access true
spark.hadoop.fs.s3a.region eu-west-1
spark.hadoop.fs.s3a.aws.region eu-west-1

# Enabling AWS SDK V4 signing (required for regions launched after January 2014)
spark.hadoop.com.amazonaws.services.s3.enableV4 true
spark.hadoop.fs.s3a.aws.credentials.provider org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

# Hive Metastore Configuration (using AWS Glue)
spark.hadoop.hive.metastore.client.factory.class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory

# Thrift Server Configuration for better performance in concurrent environments
spark.sql.hive.thriftServer.singleSession false
spark.sql.hive.thriftServer.async true
# spark.sql.hive.thriftServer.maxWorkerThreads 100
# spark.sql.hive.thriftServer.minWorkerThreads 50
# spark.sql.hive.thriftServer.workerQueue.size 2000

# Memory and Performance Tuning
# spark.driver.memory 2g
# spark.executor.memory 3g
# spark.worker.memory 4g
spark.network.timeout 600s
spark.sql.broadcastTimeout 600s
spark.sql.adaptive.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer

# Logging and Debugging
spark.eventLog.enabled true
spark.eventLog.dir /tmp/spark-events
6 changes: 5 additions & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ on-run-end:
models:
snowplow_ecommerce:
+materialized: table
+file_format: "{{ 'delta' if target.type not in ['spark'] else 'iceberg'}}"
+incremental_strategy: "{{ none if target.type not in ['spark'] else 'merge' }}"
base:
manifest:
+schema: "snowplow_manifest"
Expand Down Expand Up @@ -135,7 +137,9 @@ models:
bigquery:
+enabled: "{{ target.type == 'bigquery' | as_bool() }}"
databricks:
+enabled: "{{ target.type in ['databricks', 'spark'] | as_bool() }}"
+enabled: "{{ target.type in ['databricks'] | as_bool() }}"
spark:
+enabled: "{{ target.type in ['spark'] | as_bool() }}"
default:
+enabled: "{{ target.type in ['redshift', 'postgres'] | as_bool() }}"
snowflake:
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/.scripts/integration_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ do
esac
done

declare -a SUPPORTED_DATABASES=("bigquery" "postgres" "databricks" "snowflake")
declare -a SUPPORTED_DATABASES=("bigquery" "postgres" "databricks" "snowflake", "spark_iceberg")

# set to lower case
DATABASE="$(echo $DATABASE | tr '[:upper:]' '[:lower:]')"
Expand All @@ -27,7 +27,7 @@ for db in ${DATABASES[@]}; do

eval "dbt seed --full-refresh --target $db" || exit 1;

echo "Snowplow e-commerce integration tests: Execute models (no mobile) - run 0/4"
echo "Snowplow e-commerce integration tests: Execute models (no mobile) - run 0/4"

eval "dbt run --full-refresh --vars '{snowplow__allow_refresh: true, snowplow__backfill_limit_days: 30, snowplow__enable_mobile_events: false}' --target $db" || exit 1;

Expand Down
21 changes: 21 additions & 0 deletions integration_tests/ci/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,24 @@ integration_tests:
dbname: "{{ env_var('POSTGRES_TEST_DBNAME') }}"
schema: "gh_sp_ecom_dbt_{{ env_var('SCHEMA_SUFFIX') }}"
threads: 4

redshift:
type: redshift
host: "{{ env_var('REDSHIFT_TEST_HOST') }}"
user: "{{ env_var('REDSHIFT_TEST_USER') }}"
pass: "{{ env_var('REDSHIFT_TEST_PASS') }}"
dbname: "{{ env_var('REDSHIFT_TEST_DBNAME') }}"
port: "{{ env_var('REDSHIFT_TEST_PORT') | as_number }}"
schema: "gh_sp_ecom_dbt_{{ env_var('SCHEMA_SUFFIX') }}"
threads: 4

spark_iceberg:
type: spark
method: thrift
host: "{{ env_var('SPARK_MASTER_HOST', 'localhost') }}"
port: 10000
user: "{{ env_var('SPARK_USER', 'spark') }}"
schema: "{{ env_var('SPARK_SCHEMA', 'default') }}"
connect_retries: 5
connect_timeout: 60
threads: 1
5 changes: 4 additions & 1 deletion integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ models:
snowplow_ecommerce_integration_tests:
bind: false
+schema: "snplw_ecommerce_int_tests"
+materialized: table
source:
bigquery:
+enabled: "{{ target.type == 'bigquery' | as_bool() }}"
databricks:
+enabled: "{{ target.type in ['databricks', 'spark'] | as_bool() }}"
+enabled: "{{ target.type in ['databricks'] | as_bool() }}"
spark:
+enabled: "{{ target.type in ['spark'] | as_bool() }}"
default:
+enabled: "{{ target.type in ['redshift', 'postgres'] | as_bool() }}"
snowflake:
Expand Down
Loading

0 comments on commit b63685c

Please sign in to comment.