Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added demo for Integration of lakeFS with Airflow and Iceberg #218

Merged
merged 1 commit into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions 00_notebooks/00_index.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
" * Isolating Airflow job run and atomic promotion to production\n",
" * Integration of lakeFS with Airflow via Hooks\n",
" * Troubleshooting production issues\n",
" * Integration of lakeFS with Airflow and Databricks\n",
" * Integration of lakeFS with Airflow and Iceberg\n",
"* [**Airflow** (2)](https://github.com/treeverse/lakeFS-samples/blob/main/01_standalone_examples/airflow-02/) - lakeFS + Airflow\n",
"* [Azure **Databricks**](https://github.com/treeverse/lakeFS-samples/blob/main/01_standalone_examples/azure-databricks/)\n",
"* [AWS **Databricks**](https://github.com/treeverse/lakeFS-samples/blob/main/01_standalone_examples/aws-databricks/)\n",
Expand Down
232 changes: 232 additions & 0 deletions 01_standalone_examples/airflow-01/Iceberg.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "b1674ec1-9227-4159-a9bc-5000b31f6e12",
"metadata": {},
"source": [
"# [Integration of lakeFS with Airflow and Iceberg](https://docs.lakefs.io/integrations/airflow.html)\n",
"\n",
"## Use Case: Isolating Airflow job run and atomic promotion to production"
]
},
{
"cell_type": "markdown",
"id": "cc68e27f-99e2-46d9-8558-717351708c7f",
"metadata": {},
"source": [
"## Prerequisites\n",
"\n",
"###### This Notebook requires connecting to a lakeFS Server.\n",
"###### To spin up lakeFS quickly - use the Playground (https://demo.lakefs.io) which provides lakeFS server on-demand with a single click;\n",
"###### Or, alternatively, refer to lakeFS Quickstart doc (https://docs.lakefs.io/quickstart/installing.html)."
]
},
{
"cell_type": "markdown",
"id": "d960cc24",
"metadata": {},
"source": [
"In this demo, you'll learn how to integrate lakeFS with Apache Airflow to perform isolated job runs with atomic promotions to production. Here we can use an existing Airflow DAG to demonstrate lakeFS for your ETL pipelines. The notebook will guide you through creating a lakeFS repository and visualizing your workflow in the Airflow UI. "
]
},
{
"cell_type": "markdown",
"id": "16ddc884-bdf5-4fc5-97b1-38662358268c",
"metadata": {},
"source": [
"## Setup Task: Change your lakeFS credentials"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b9a905b0-ab02-426d-8049-7138de6efc31",
"metadata": {},
"outputs": [],
"source": [
"lakefsEndPoint = 'http://host.docker.internal:8000'\n",
"lakefsAccessKey = 'AKIAIOSFOLKFSSAMPLES'\n",
"lakefsSecretKey = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'"
]
},
{
"cell_type": "markdown",
"id": "85b5ae63-dc9b-43b3-a883-0a3865ad5dc6",
"metadata": {},
"source": [
"## Setup Task: You can change lakeFS repo name (it can be an existing repo or provide another repo name)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a4a1f393-3346-440f-8083-99aeb6013443",
"metadata": {},
"outputs": [],
"source": [
"repo = \"airflow-iceberg-repo\""
]
},
{
"cell_type": "markdown",
"id": "b0f793f5-22f2-43f7-8e5f-f39149703314",
"metadata": {},
"source": [
"## Setup Task: Versioning Information"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c6f3e3d3-df16-4899-a52e-4cbb2892e409",
"metadata": {},
"outputs": [],
"source": [
"sourceBranch = \"main\"\n",
"newBranch = \"airflow_demo_iceberg_dag\""
]
},
{
"cell_type": "markdown",
"id": "adcdeffb-d15f-4d84-87ae-bd2af291758a",
"metadata": {},
"source": [
"## Setup Task: Storage Information\n",
"#### Change the Storage Namespace to a location in the bucket you’ve configured. The storage namespace is a location in the underlying storage where data for this repository will be stored."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "de2bea94-b287-4515-afba-51c2de0df3fe",
"metadata": {},
"outputs": [],
"source": [
"storageNamespace = 's3://example/' + repo # e.g. \"s3://bucket\""
]
},
{
"cell_type": "markdown",
"id": "fbc69394-47d2-464e-b9ec-ebbd0383422b",
"metadata": {},
"source": [
"## Setup Task: Run additional [Setup](./airflow/Iceberg/IcebergDAGSetup.ipynb) tasks here"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "43315a02-5e0d-4480-bd35-1dcb82e0e473",
"metadata": {},
"outputs": [],
"source": [
"%run ./airflow/Iceberg/IcebergDAGSetup.ipynb"
]
},
{
"cell_type": "markdown",
"id": "4e4ac852-ac82-45bf-b49a-1323aa673f2d",
"metadata": {},
"source": [
"## Create Repository"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "996ed676-16e2-4948-8b56-c3f774036764",
"metadata": {},
"outputs": [],
"source": [
"print(lakefs.Repository(repo).create(storage_namespace=storageNamespace, default_branch=sourceBranch, exist_ok=True))"
]
},
{
"cell_type": "markdown",
"id": "cba46998-3a3d-4b52-ba59-6ee5c1893634",
"metadata": {},
"source": [
"## You can review [lakeFS Iceberg DAG](./Airflow/Iceberg/lakefs_iceberg_dag.py) program."
]
},
{
"cell_type": "markdown",
"id": "57bf42ac-9e4a-46a6-a615-16825da83598",
"metadata": {},
"source": [
"## Visualize [lakeFS Iceberg DAG Graph](http://127.0.0.1:8080/dags/lakefs_iceberg_dag/graph) in Airflow UI. Login by using username \"airflow\" and password \"airflow\"."
]
},
{
"cell_type": "markdown",
"id": "b230ad3d-420b-491a-9129-51f99c5e95b2",
"metadata": {},
"source": [
"## Trigger lakeFS Iceberg DAG"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ca0c89a0-d8e7-4689-80a4-9dee7aa0fac1",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"! airflow dags unpause lakefs_iceberg_dag\n",
"! airflow dags trigger lakefs_iceberg_dag"
]
},
{
"cell_type": "markdown",
"id": "897d0e77-8cb9-47fb-81b2-74c96fae10b9",
"metadata": {},
"source": [
"## Visualize [lakeFS Iceberg DAG Graph](http://127.0.0.1:8080/dags/lakefs_iceberg_dag/graph).\n",
"### Toggle Auto Refresh switch in DAG Graph to see the continuous progress of the workflow.\n",
"#### Click on any lakeFS commit or merge task box, then click on \"lakeFS\" button (this URL will take you to applicable commit/merge in lakeFS). You will also find this URL in the Airflow log if you click on Log button and search for \"lakeFS URL\"."
]
},
{
"cell_type": "markdown",
"id": "15dc2bcb-5dc7-4fa4-8e38-78c40acced8c",
"metadata": {},
"source": [
"## More Questions?\n",
"\n",
"###### Join the lakeFS Slack group - https://lakefs.io/slack"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ecb6473d-a5c0-41f2-b5d5-6f6dd9d3f337",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.4"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
4 changes: 4 additions & 0 deletions 01_standalone_examples/airflow-01/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ This repository includes following Jupyter Notebooks which you can run on your l
* Integration of lakeFS with Airflow and Databricks
* Use Case: Run Databricks notebook via Airflow DAG

6. Iceberg:
* Integration of lakeFS with Airflow and Iceberg
* Use Case: Isolating Airflow job run and atomic promotion to production

## Prerequisites
* Docker installed on your local machine
* lakeFS installed and running on your local machine or on a server or in the cloud. If you don't have lakeFS already running then either use [lakeFS Playground](https://demo.lakefs.io/) which provides lakeFS server on-demand with a single click or refer to [lakeFS Quickstart](https://docs.lakefs.io/quickstart/) doc.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "9b2c8fa0-1702-411a-b11c-3190679bf31c",
"metadata": {},
"source": [
"# [Integration of lakeFS with Airflow](https://docs.lakefs.io/integrations/airflow.html)\n",
"\n",
"## Use Case: Isolating Airflow job run and atomic promotion to production"
]
},
{
"cell_type": "markdown",
"id": "7a129e87-56c4-401f-9b4b-cc4e7587cf1e",
"metadata": {},
"source": [
"## Run [Common Setup](../Common/CommonSetup.ipynb) tasks here"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b8bfe84c-fce2-4be0-8314-073c6b9aa1d6",
"metadata": {},
"outputs": [],
"source": [
"%run ./airflow/Common/CommonSetup.ipynb"
]
},
{
"cell_type": "markdown",
"id": "a6b6792e-b1be-4c26-90cd-e9d54924354f",
"metadata": {},
"source": [
"## Copy DAG programs to Airflow DAGs directory and sync to Airflow database"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4f31409a-f75c-4b2d-97ad-f82eb18dd776",
"metadata": {},
"outputs": [],
"source": [
"! cp ./airflow/Iceberg/lakefs_iceberg_dag.py ./airflow/dags\n",
"\n",
"dagbag = DagBag(include_examples=False)\n",
"dagbag.sync_to_db()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.4"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
import sys

lakeFSBranch = sys.argv[1]

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

spark.sql(f"CREATE OR REPLACE TABLE lakefs.{lakeFSBranch}.lakefs_demo.authors(id int, name string) USING iceberg")

spark.sql(f' \
INSERT INTO lakefs.{lakeFSBranch}.lakefs_demo.authors (id, name) \
VALUES (1, "J.R.R. Tolkien"), (2, "George R.R. Martin"), \
(3, "Agatha Christie"), (4, "Isaac Asimov"), (5, "Stephen King") \
')
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
import sys

lakeFSBranch = sys.argv[1]

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

spark.sql(f"CREATE OR REPLACE TABLE lakefs.{lakeFSBranch}.lakefs_demo.book_sales(id int, sale_date date, book_id int, price double) USING iceberg")

spark.sql(f" \
INSERT INTO lakefs.{lakeFSBranch}.lakefs_demo.book_sales (id, sale_date, book_id, price) \
VALUES (1, DATE '2024-04-12', 1, 25.50), \
(2, DATE '2024-04-11', 2, 17.99), \
(3, DATE '2024-04-10', 3, 12.95), \
(4, DATE '2024-04-13', 4, 32.00), \
(5, DATE '2024-04-12', 5, 29.99), \
(6, DATE '2024-03-15', 1, 23.99), \
(7, DATE '2024-02-22', 2, 19.50), \
(8, DATE '2024-01-10', 3, 14.95), \
(9, DATE '2023-12-05', 4, 28.00), \
(10, DATE '2023-11-18', 5, 27.99), \
(11, DATE '2023-10-26', 2, 18.99), \
(12, DATE '2023-10-12', 1, 22.50), \
(13, DATE '2024-04-09', 3, 11.95), \
(14, DATE '2024-03-28', 4, 35.00), \
(15, DATE '2024-04-05', 5, 31.99), \
(16, DATE '2024-03-01', 1, 27.50), \
(17, DATE '2024-02-14', 2, 21.99), \
(18, DATE '2024-01-07', 3, 13.95), \
(19, DATE '2023-12-20', 4, 29.00), \
(20, DATE '2023-11-03', 5, 28.99) \
")
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
import sys

lakeFSBranch = sys.argv[1]

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

spark.sql(f"CREATE OR REPLACE TABLE lakefs.{lakeFSBranch}.lakefs_demo.books(id int, title string, author_id int) USING iceberg")

spark.sql(f' \
INSERT INTO lakefs.{lakeFSBranch}.lakefs_demo.books (id, title, author_id) \
VALUES (1, "The Lord of the Rings", 1), (2, "The Hobbit", 1), \
(3, "A Song of Ice and Fire", 2), (4, "A Clash of Kings", 2), \
(5, "And Then There Were None", 3), (6, "Murder on the Orient Express", 3), \
(7, "Foundation", 4), (8, "I, Robot", 4), \
(9, "The Shining", 5), (10, "It", 5) \
')
Loading
Loading