The aim of this project is to compare the time it takes to extract, transform and load large datasets using three different methods. Please read the Medium post for more information.
The data used in this project is the publicly available Stack Exchange Data Dump. Users.xml
and Posts.xml
were converted to users.csv.gz
and posts.csv.gz
and used as the source files for this project.
For the ISO 3166-1 country codes, the CSV used from DataHub was used (as country_codes.csv
).
For Google Cloud:
- Create a project
- Create a Cloud Storage Bucket and upload
posts.csv.gz
,users.csv.gz
andcountry_codes.csv
files
For Spark:
- Install sbt
- Install SDKMAN!
- In the
spark
folder, use SDKMAN! to install JDK (8 or 11 currently supported) and set JDK version of the project usingsdk env init
For Dask:
- Python 3.x installed
- Install packages from
requirements.txt
->pip install -r /dask/requirements.txt
(for running on local)
For dbt:
- Python 3.x installed
- Install packages from
requirements.txt
->pip install -r /dbt/requirements.txt
- Copy the ISO 3166-1 country codes CSV into
./bigquery_dbt/seeds/country_codes.csv
- Setup a dbt profile in
~/.dbt/profiles.yml
calledbigquery_dbt
for BigQuery (Example)
-
Make BigQuery dataset
bq mk --dataset ${PROJECT_ID}:${DATASET}
-
Load files into BigQuery as tables (can be done concurrently)
bq load \
--autodetect \
--source_format=CSV \
${DATASET}.posts \
gs://${BUCKET_NAME}/posts.csv.gz
bq load \
--autodetect \
--source_format=CSV \
${DATASET}.users \
gs://${BUCKET_NAME}/users.csv.gz
-
Ensure Google project id is specified in
database
field inschema.yml
-
Run dbt
cd ./bigquery_dbt
dbt build # Load CSV as reference table (via seeds), run tests etc.
dbt run
- Load created table into GCS
bq extract \
--destination_format CSV \
--compression GZIP \
--field_delimiter ',' \
${PROJECT_ID}:${DATASET}.aggregated_users \
gs://${BUCKET_NAME}/dbt_bigquery/agg_users.csv.gz
- Ensure that you change the
gcsBucket
value inaggregate-users.scala
- Run the following (in the
spark
folder) to compile and package the project into a.jar
for Dataproc:
sbt
Then within the sbt console:
package
- Copy the
jar
from local to GCS (optional):
gsutil cp spark/target/scala-2.12/${JAR_FILENAME}.jar gs://${BUCKET_NAME}/spark/aggregateusers.jar
- Create Dataproc cluster:
gcloud dataproc clusters create ${SPARK_CLUSTER_NAME} \
--project=${PROJECT_ID} \
--region=${REGION} \
--image-version=2.0 \
--master-machine-type n1-standard-8 \
--worker-machine-type n1-standard-8 \
--num-workers 6
- Submit Spark job on Dataproc cluster
gcloud dataproc jobs submit spark \
--cluster=${SPARK_CLUSTER_NAME} \
--class=stackoverflow.AggregateUsers \
--jars=gs://${BUCKET_NAME}/spark/aggregateusers.jar \
--region=${REGION}
- Delete cluster when finished
- Copy initialisation actions to local bucket (optional):
gsutil cp gs://goog-dataproc-initialization-actions-${ZONE}/dask/dask.sh gs://${BUCKET_NAME}/dask/
- Create cluster
gcloud dataproc clusters create ${DASK_CLUSTER_NAME} \
--project=${PROJECT_ID} \
--region=${REGION} \
--master-machine-type n1-standard-8 \
--worker-machine-type n1-standard-8 \
--num-workers 6 \
--image-version preview-ubuntu \
--initialization-actions gs://${BUCKET_NAME}/dask/dask.sh \
--metadata dask-runtime=yarn \
--enable-component-gateway
- Copy files
gcloud compute scp \
--project=${PROJECT_ID} \
--zone=${ZONE} \
--recurse ./dask/ ${DASK_CLUSTER_NAME}-m:~/
- Install package requirements & run
gcloud compute ssh ${CLUSTER_NAME}-m --zone ${ZONE}
/opt/conda/default/bin/python -m pip install python-dotenv
/opt/conda/default/bin/python ./dask/transform.py
- Delete cluster when finished