diff --git a/00_notebooks/00_index.ipynb b/00_notebooks/00_index.ipynb index 5c1371398..5c6a3f47d 100644 --- a/00_notebooks/00_index.ipynb +++ b/00_notebooks/00_index.ipynb @@ -75,6 +75,8 @@ " * Isolating Airflow job run and atomic promotion to production\n", " * Integration of lakeFS with Airflow via Hooks\n", " * Troubleshooting production issues\n", + " * Integration of lakeFS with Airflow and Databricks\n", + " * Integration of lakeFS with Airflow and Iceberg\n", "* [**Airflow** (2)](https://github.com/treeverse/lakeFS-samples/blob/main/01_standalone_examples/airflow-02/) - lakeFS + Airflow\n", "* [Azure **Databricks**](https://github.com/treeverse/lakeFS-samples/blob/main/01_standalone_examples/azure-databricks/)\n", "* [AWS **Databricks**](https://github.com/treeverse/lakeFS-samples/blob/main/01_standalone_examples/aws-databricks/)\n", diff --git a/01_standalone_examples/airflow-01/Iceberg.ipynb b/01_standalone_examples/airflow-01/Iceberg.ipynb new file mode 100644 index 000000000..5bc8e032b --- /dev/null +++ b/01_standalone_examples/airflow-01/Iceberg.ipynb @@ -0,0 +1,232 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "b1674ec1-9227-4159-a9bc-5000b31f6e12", + "metadata": {}, + "source": [ + "# [Integration of lakeFS with Airflow and Iceberg](https://docs.lakefs.io/integrations/airflow.html)\n", + "\n", + "## Use Case: Isolating Airflow job run and atomic promotion to production" + ] + }, + { + "cell_type": "markdown", + "id": "cc68e27f-99e2-46d9-8558-717351708c7f", + "metadata": {}, + "source": [ + "## Prerequisites\n", + "\n", + "###### This Notebook requires connecting to a lakeFS Server.\n", + "###### To spin up lakeFS quickly - use the Playground (https://demo.lakefs.io) which provides lakeFS server on-demand with a single click;\n", + "###### Or, alternatively, refer to lakeFS Quickstart doc (https://docs.lakefs.io/quickstart/installing.html)." + ] + }, + { + "cell_type": "markdown", + "id": "d960cc24", + "metadata": {}, + "source": [ + "In this demo, you'll learn how to integrate lakeFS with Apache Airflow to perform isolated job runs with atomic promotions to production. Here we can use an existing Airflow DAG to demonstrate lakeFS for your ETL pipelines. The notebook will guide you through creating a lakeFS repository and visualizing your workflow in the Airflow UI. " + ] + }, + { + "cell_type": "markdown", + "id": "16ddc884-bdf5-4fc5-97b1-38662358268c", + "metadata": {}, + "source": [ + "## Setup Task: Change your lakeFS credentials" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b9a905b0-ab02-426d-8049-7138de6efc31", + "metadata": {}, + "outputs": [], + "source": [ + "lakefsEndPoint = 'http://host.docker.internal:8000'\n", + "lakefsAccessKey = 'AKIAIOSFOLKFSSAMPLES'\n", + "lakefsSecretKey = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'" + ] + }, + { + "cell_type": "markdown", + "id": "85b5ae63-dc9b-43b3-a883-0a3865ad5dc6", + "metadata": {}, + "source": [ + "## Setup Task: You can change lakeFS repo name (it can be an existing repo or provide another repo name)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a4a1f393-3346-440f-8083-99aeb6013443", + "metadata": {}, + "outputs": [], + "source": [ + "repo = \"airflow-iceberg-repo\"" + ] + }, + { + "cell_type": "markdown", + "id": "b0f793f5-22f2-43f7-8e5f-f39149703314", + "metadata": {}, + "source": [ + "## Setup Task: Versioning Information" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c6f3e3d3-df16-4899-a52e-4cbb2892e409", + "metadata": {}, + "outputs": [], + "source": [ + "sourceBranch = \"main\"\n", + "newBranch = \"airflow_demo_iceberg_dag\"" + ] + }, + { + "cell_type": "markdown", + "id": "adcdeffb-d15f-4d84-87ae-bd2af291758a", + "metadata": {}, + "source": [ + "## Setup Task: Storage Information\n", + "#### Change the Storage Namespace to a location in the bucket you’ve configured. The storage namespace is a location in the underlying storage where data for this repository will be stored." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "de2bea94-b287-4515-afba-51c2de0df3fe", + "metadata": {}, + "outputs": [], + "source": [ + "storageNamespace = 's3://example/' + repo # e.g. \"s3://bucket\"" + ] + }, + { + "cell_type": "markdown", + "id": "fbc69394-47d2-464e-b9ec-ebbd0383422b", + "metadata": {}, + "source": [ + "## Setup Task: Run additional [Setup](./airflow/Iceberg/IcebergDAGSetup.ipynb) tasks here" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "43315a02-5e0d-4480-bd35-1dcb82e0e473", + "metadata": {}, + "outputs": [], + "source": [ + "%run ./airflow/Iceberg/IcebergDAGSetup.ipynb" + ] + }, + { + "cell_type": "markdown", + "id": "4e4ac852-ac82-45bf-b49a-1323aa673f2d", + "metadata": {}, + "source": [ + "## Create Repository" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "996ed676-16e2-4948-8b56-c3f774036764", + "metadata": {}, + "outputs": [], + "source": [ + "print(lakefs.Repository(repo).create(storage_namespace=storageNamespace, default_branch=sourceBranch, exist_ok=True))" + ] + }, + { + "cell_type": "markdown", + "id": "cba46998-3a3d-4b52-ba59-6ee5c1893634", + "metadata": {}, + "source": [ + "## You can review [lakeFS Iceberg DAG](./Airflow/Iceberg/lakefs_iceberg_dag.py) program." + ] + }, + { + "cell_type": "markdown", + "id": "57bf42ac-9e4a-46a6-a615-16825da83598", + "metadata": {}, + "source": [ + "## Visualize [lakeFS Iceberg DAG Graph](http://127.0.0.1:8080/dags/lakefs_iceberg_dag/graph) in Airflow UI. Login by using username \"airflow\" and password \"airflow\"." + ] + }, + { + "cell_type": "markdown", + "id": "b230ad3d-420b-491a-9129-51f99c5e95b2", + "metadata": {}, + "source": [ + "## Trigger lakeFS Iceberg DAG" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ca0c89a0-d8e7-4689-80a4-9dee7aa0fac1", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "! airflow dags unpause lakefs_iceberg_dag\n", + "! airflow dags trigger lakefs_iceberg_dag" + ] + }, + { + "cell_type": "markdown", + "id": "897d0e77-8cb9-47fb-81b2-74c96fae10b9", + "metadata": {}, + "source": [ + "## Visualize [lakeFS Iceberg DAG Graph](http://127.0.0.1:8080/dags/lakefs_iceberg_dag/graph).\n", + "### Toggle Auto Refresh switch in DAG Graph to see the continuous progress of the workflow.\n", + "#### Click on any lakeFS commit or merge task box, then click on \"lakeFS\" button (this URL will take you to applicable commit/merge in lakeFS). You will also find this URL in the Airflow log if you click on Log button and search for \"lakeFS URL\"." + ] + }, + { + "cell_type": "markdown", + "id": "15dc2bcb-5dc7-4fa4-8e38-78c40acced8c", + "metadata": {}, + "source": [ + "## More Questions?\n", + "\n", + "###### Join the lakeFS Slack group - https://lakefs.io/slack" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ecb6473d-a5c0-41f2-b5d5-6f6dd9d3f337", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.4" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/01_standalone_examples/airflow-01/README.md b/01_standalone_examples/airflow-01/README.md index 8628b2922..ffd8d37b9 100644 --- a/01_standalone_examples/airflow-01/README.md +++ b/01_standalone_examples/airflow-01/README.md @@ -24,6 +24,10 @@ This repository includes following Jupyter Notebooks which you can run on your l * Integration of lakeFS with Airflow and Databricks * Use Case: Run Databricks notebook via Airflow DAG +6. Iceberg: +* Integration of lakeFS with Airflow and Iceberg +* Use Case: Isolating Airflow job run and atomic promotion to production + ## Prerequisites * Docker installed on your local machine * lakeFS installed and running on your local machine or on a server or in the cloud. If you don't have lakeFS already running then either use [lakeFS Playground](https://demo.lakefs.io/) which provides lakeFS server on-demand with a single click or refer to [lakeFS Quickstart](https://docs.lakefs.io/quickstart/) doc. diff --git a/01_standalone_examples/airflow-01/airflow/Iceberg/IcebergDAGSetup.ipynb b/01_standalone_examples/airflow-01/airflow/Iceberg/IcebergDAGSetup.ipynb new file mode 100644 index 000000000..0ead74156 --- /dev/null +++ b/01_standalone_examples/airflow-01/airflow/Iceberg/IcebergDAGSetup.ipynb @@ -0,0 +1,74 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "9b2c8fa0-1702-411a-b11c-3190679bf31c", + "metadata": {}, + "source": [ + "# [Integration of lakeFS with Airflow](https://docs.lakefs.io/integrations/airflow.html)\n", + "\n", + "## Use Case: Isolating Airflow job run and atomic promotion to production" + ] + }, + { + "cell_type": "markdown", + "id": "7a129e87-56c4-401f-9b4b-cc4e7587cf1e", + "metadata": {}, + "source": [ + "## Run [Common Setup](../Common/CommonSetup.ipynb) tasks here" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b8bfe84c-fce2-4be0-8314-073c6b9aa1d6", + "metadata": {}, + "outputs": [], + "source": [ + "%run ./airflow/Common/CommonSetup.ipynb" + ] + }, + { + "cell_type": "markdown", + "id": "a6b6792e-b1be-4c26-90cd-e9d54924354f", + "metadata": {}, + "source": [ + "## Copy DAG programs to Airflow DAGs directory and sync to Airflow database" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4f31409a-f75c-4b2d-97ad-f82eb18dd776", + "metadata": {}, + "outputs": [], + "source": [ + "! cp ./airflow/Iceberg/lakefs_iceberg_dag.py ./airflow/dags\n", + "\n", + "dagbag = DagBag(include_examples=False)\n", + "dagbag.sync_to_db()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.4" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/01_standalone_examples/airflow-01/airflow/Iceberg/create_authors_table.py b/01_standalone_examples/airflow-01/airflow/Iceberg/create_authors_table.py new file mode 100644 index 000000000..717a5108f --- /dev/null +++ b/01_standalone_examples/airflow-01/airflow/Iceberg/create_authors_table.py @@ -0,0 +1,16 @@ +from pyspark.context import SparkContext +from pyspark.sql.session import SparkSession +import sys + +lakeFSBranch = sys.argv[1] + +sc = SparkContext.getOrCreate() +spark = SparkSession(sc) + +spark.sql(f"CREATE OR REPLACE TABLE lakefs.{lakeFSBranch}.lakefs_demo.authors(id int, name string) USING iceberg") + +spark.sql(f' \ +INSERT INTO lakefs.{lakeFSBranch}.lakefs_demo.authors (id, name) \ +VALUES (1, "J.R.R. Tolkien"), (2, "George R.R. Martin"), \ + (3, "Agatha Christie"), (4, "Isaac Asimov"), (5, "Stephen King") \ +') diff --git a/01_standalone_examples/airflow-01/airflow/Iceberg/create_book_sales_table.py b/01_standalone_examples/airflow-01/airflow/Iceberg/create_book_sales_table.py new file mode 100644 index 000000000..bc6157c24 --- /dev/null +++ b/01_standalone_examples/airflow-01/airflow/Iceberg/create_book_sales_table.py @@ -0,0 +1,34 @@ +from pyspark.context import SparkContext +from pyspark.sql.session import SparkSession +import sys + +lakeFSBranch = sys.argv[1] + +sc = SparkContext.getOrCreate() +spark = SparkSession(sc) + +spark.sql(f"CREATE OR REPLACE TABLE lakefs.{lakeFSBranch}.lakefs_demo.book_sales(id int, sale_date date, book_id int, price double) USING iceberg") + +spark.sql(f" \ +INSERT INTO lakefs.{lakeFSBranch}.lakefs_demo.book_sales (id, sale_date, book_id, price) \ +VALUES (1, DATE '2024-04-12', 1, 25.50), \ + (2, DATE '2024-04-11', 2, 17.99), \ + (3, DATE '2024-04-10', 3, 12.95), \ + (4, DATE '2024-04-13', 4, 32.00), \ + (5, DATE '2024-04-12', 5, 29.99), \ + (6, DATE '2024-03-15', 1, 23.99), \ + (7, DATE '2024-02-22', 2, 19.50), \ + (8, DATE '2024-01-10', 3, 14.95), \ + (9, DATE '2023-12-05', 4, 28.00), \ + (10, DATE '2023-11-18', 5, 27.99), \ + (11, DATE '2023-10-26', 2, 18.99), \ + (12, DATE '2023-10-12', 1, 22.50), \ + (13, DATE '2024-04-09', 3, 11.95), \ + (14, DATE '2024-03-28', 4, 35.00), \ + (15, DATE '2024-04-05', 5, 31.99), \ + (16, DATE '2024-03-01', 1, 27.50), \ + (17, DATE '2024-02-14', 2, 21.99), \ + (18, DATE '2024-01-07', 3, 13.95), \ + (19, DATE '2023-12-20', 4, 29.00), \ + (20, DATE '2023-11-03', 5, 28.99) \ +") diff --git a/01_standalone_examples/airflow-01/airflow/Iceberg/create_books_table.py b/01_standalone_examples/airflow-01/airflow/Iceberg/create_books_table.py new file mode 100644 index 000000000..46f6a2bba --- /dev/null +++ b/01_standalone_examples/airflow-01/airflow/Iceberg/create_books_table.py @@ -0,0 +1,19 @@ +from pyspark.context import SparkContext +from pyspark.sql.session import SparkSession +import sys + +lakeFSBranch = sys.argv[1] + +sc = SparkContext.getOrCreate() +spark = SparkSession(sc) + +spark.sql(f"CREATE OR REPLACE TABLE lakefs.{lakeFSBranch}.lakefs_demo.books(id int, title string, author_id int) USING iceberg") + +spark.sql(f' \ +INSERT INTO lakefs.{lakeFSBranch}.lakefs_demo.books (id, title, author_id) \ +VALUES (1, "The Lord of the Rings", 1), (2, "The Hobbit", 1), \ + (3, "A Song of Ice and Fire", 2), (4, "A Clash of Kings", 2), \ + (5, "And Then There Were None", 3), (6, "Murder on the Orient Express", 3), \ + (7, "Foundation", 4), (8, "I, Robot", 4), \ + (9, "The Shining", 5), (10, "It", 5) \ +') diff --git a/01_standalone_examples/airflow-01/airflow/Iceberg/delete_cancelled_sales.py b/01_standalone_examples/airflow-01/airflow/Iceberg/delete_cancelled_sales.py new file mode 100644 index 000000000..5b92c6552 --- /dev/null +++ b/01_standalone_examples/airflow-01/airflow/Iceberg/delete_cancelled_sales.py @@ -0,0 +1,13 @@ +from pyspark.context import SparkContext +from pyspark.sql.session import SparkSession +import sys + +lakeFSBranch = sys.argv[1] + +sc = SparkContext.getOrCreate() +spark = SparkSession(sc) + +spark.sql(f" \ +DELETE FROM lakefs.{lakeFSBranch}.lakefs_demo.book_sales \ +WHERE id IN (10, 15, 2, 1, 6); \ +") diff --git a/01_standalone_examples/airflow-01/airflow/Iceberg/lakefs_iceberg_dag.py b/01_standalone_examples/airflow-01/airflow/Iceberg/lakefs_iceberg_dag.py new file mode 100644 index 000000000..9b1d265a8 --- /dev/null +++ b/01_standalone_examples/airflow-01/airflow/Iceberg/lakefs_iceberg_dag.py @@ -0,0 +1,271 @@ +from airflow.decorators import dag +#from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.utils.dates import days_ago +from lakefs_provider.operators.create_branch_operator import LakeFSCreateBranchOperator +from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator +from lakefs_provider.operators.commit_operator import LakeFSCommitOperator +from lakefs_provider.operators.merge_operator import LakeFSMergeOperator +import lakefs +from airflow.models import Variable +from airflow.models.dagrun import DagRun +import time +from functools import partial +from airflow.utils.log.logging_mixin import LoggingMixin + +import sys +sys.path.insert(0, '/home/jovyan') +from lakefs_demo import print_commit_result, post_execute_commit + +repo = Variable.get("repo") +lakefsAccessKey = Variable.get("lakefsAccessKey") +lakefsSecretKey = Variable.get("lakefsSecretKey") +lakefsEndPoint = Variable.get("lakefsEndPoint") + +# These args will get passed on to each operator +# You can override them on a per-task basis during operator initialization +default_args = { + "owner": "lakeFS", + "branch": Variable.get("newBranch") + '_{{ ts_nodash }}', + "repo": Variable.get("repo"), + #"path": Variable.get("fileName"), + "default-branch": Variable.get("sourceBranch"), + "lakefs_conn_id": Variable.get("conn_lakefs") +} + +@dag(default_args=default_args, + render_template_as_native_obj=True, + max_active_runs=1, + start_date=days_ago(2), + schedule_interval=None, + tags=['testing']) +def lakefs_iceberg_dag(): + jars_partition_data = Variable.get("spark_home") + '/jars/hadoop-aws-*.jar,' + Variable.get("spark_home") + '/jars/aws-java-sdk-bundle-*.jar' + sparkConfig ={ + "spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "spark.hadoop.fs.s3a.endpoint": lakefsEndPoint, + "spark.hadoop.fs.s3a.path.style.access": "true", + "spark.hadoop.fs.s3a.access.key": lakefsAccessKey, + "spark.hadoop.fs.s3a.secret.key": lakefsSecretKey, + "spark.sql.catalog.lakefs": "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.lakefs.catalog-impl": "io.lakefs.iceberg.LakeFSCatalog", + "spark.sql.catalog.lakefs.warehouse": f"lakefs://{repo}", + "spark.sql.catalog.lakefs.uri": lakefsEndPoint, + "spark.sql.catalog.lakefs.cache-enabled": "false", + "spark.sql.defaultCatalog": "lakefs", + "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" + } + sparkPackages = "org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.3.0,io.lakefs:lakefs-iceberg:0.1.1" + applicationPath="/home/jovyan/airflow/Iceberg/" + + # Create the branch to run on + task_create_etl_branch = LakeFSCreateBranchOperator( + task_id='create_etl_branch', + branch=default_args.get('branch'), + source_branch=default_args.get('default-branch') + ) + + task_create_etl_branch.post_execute = partial(print_commit_result, message='lakeFS commit id is: ') + + task_create_tables_branch = LakeFSCreateBranchOperator( + task_id='create_tables_branch', + branch=default_args.get('branch') + '_create_tables', + source_branch=default_args.get('branch') + ) + + task_create_tables_branch.post_execute = partial(print_commit_result, message='lakeFS commit id is: ') + + task_create_authors_table_branch = LakeFSCreateBranchOperator( + task_id='create_authors_table_branch', + branch=default_args.get('branch') + '_create_authors_table', + source_branch=default_args.get('branch') + '_create_tables' + ) + + task_create_authors_table_branch.post_execute = partial(print_commit_result, message='lakeFS commit id is: ') + + task_create_books_table_branch = LakeFSCreateBranchOperator( + task_id='create_books_table_branch', + branch=default_args.get('branch') + '_create_books_table', + source_branch=default_args.get('branch') + '_create_tables' + ) + + task_create_books_table_branch.post_execute = partial(print_commit_result, message='lakeFS commit id is: ') + + task_create_book_sales_table_branch = LakeFSCreateBranchOperator( + task_id='create_book_sales_table_branch', + branch=default_args.get('branch') + '_create_book_sales_table', + source_branch=default_args.get('branch') + '_create_tables' + ) + + task_create_book_sales_table_branch.post_execute = partial(print_commit_result, message='lakeFS commit id is: ') + + task_create_authors_table = SparkSubmitOperator( + task_id='create_authors_table', + conn_id='conn_spark', + application=applicationPath+"create_authors_table.py", + application_args=[default_args.get('branch') + '_create_authors_table'], + jars=jars_partition_data, + packages=sparkPackages, + conf=sparkConfig + ) + + task_create_authors_table.pre_execute = lambda context: LoggingMixin().log.info( + 'Branch name is: ' + Variable.get("newBranch") + '_' \ + + context['ts_nodash'] + '_create_authors_table' \ + + ' and lakeFS URL is: ' + Variable.get("lakefsUIEndPoint") \ + + '/repositories/' + Variable.get("repo") + '/objects?ref=' \ + + Variable.get("newBranch") + '_' + context['ts_nodash'] + '_create_authors_table' ) + + task_commit_authors_table = LakeFSCommitOperator( + task_id='commit_authors_table', + branch=default_args.get('branch') + '_create_authors_table', + msg='committ Authors table to lakeFS using airflow!', + metadata={"committed_from": "airflow-operator", "source": "Sales System"} + ) + + task_commit_authors_table.post_execute = partial(post_execute_commit, message='lakeFS commit id is: ') + + task_create_books_table = SparkSubmitOperator( + task_id='create_books_table', + conn_id='conn_spark', + application=applicationPath+"create_books_table.py", + application_args=[default_args.get('branch') + '_create_books_table'], + jars=jars_partition_data, + packages=sparkPackages, + conf=sparkConfig + ) + + task_create_books_table.pre_execute = lambda context: LoggingMixin().log.info( + 'Branch name is: ' + Variable.get("newBranch") + '_' \ + + context['ts_nodash'] + '_create_books_table' \ + + ' and lakeFS URL is: ' + Variable.get("lakefsUIEndPoint") \ + + '/repositories/' + Variable.get("repo") + '/objects?ref=' \ + + Variable.get("newBranch") + '_' + context['ts_nodash'] + '_create_books_table' ) + + task_commit_books_table = LakeFSCommitOperator( + task_id='commit_books_table', + branch=default_args.get('branch') + '_create_books_table', + msg='committ Books table to lakeFS using airflow!', + metadata={"committed_from": "airflow-operator", "source": "Sales System"} + ) + + task_commit_books_table.post_execute = partial(post_execute_commit, message='lakeFS commit id is: ') + + task_create_book_sales_table = SparkSubmitOperator( + task_id='create_book_sales_table', + conn_id='conn_spark', + application=applicationPath+"create_book_sales_table.py", + application_args=[default_args.get('branch') + '_create_book_sales_table'], + jars=jars_partition_data, + packages=sparkPackages, + conf=sparkConfig + ) + + task_create_book_sales_table.pre_execute = lambda context: LoggingMixin().log.info( + 'Branch name is: ' + Variable.get("newBranch") + '_' \ + + context['ts_nodash'] + '_create_book_sales_table' \ + + ' and lakeFS URL is: ' + Variable.get("lakefsUIEndPoint") \ + + '/repositories/' + Variable.get("repo") + '/objects?ref=' \ + + Variable.get("newBranch") + '_' + context['ts_nodash'] + '_create_book_sales_table' ) + + task_commit_book_sales_table = LakeFSCommitOperator( + task_id='commit_book_sales_table', + branch=default_args.get('branch') + '_create_book_sales_table', + msg='committ Book Sales table to lakeFS using airflow!', + metadata={"committed_from": "airflow-operator", "source": "Sales System"} + ) + + task_commit_book_sales_table.post_execute = partial(post_execute_commit, message='lakeFS commit id is: ') + + task_merge_create_authors_table_branch = LakeFSMergeOperator( + task_id='merge_create_authors_table_branch', + do_xcom_push=True, + source_ref=default_args.get('branch') + '_create_authors_table', + destination_branch=default_args.get('branch') + '_create_tables', + msg='merging ' + default_args.get('branch') + '_create_authors_table' + ' to the ' + default_args.get('branch') + '_create_tables' + ' branch', + metadata={"committer": "airflow-operator"} + ) + + task_merge_create_authors_table_branch.post_execute = partial(post_execute_commit, message='lakeFS commit id is: ') + + task_merge_create_books_table_branch = LakeFSMergeOperator( + task_id='merge_create_books_table_branch', + do_xcom_push=True, + source_ref=default_args.get('branch') + '_create_books_table', + destination_branch=default_args.get('branch') + '_create_tables', + msg='merging ' + default_args.get('branch') + '_create_books_table' + ' to the ' + default_args.get('branch') + '_create_tables' + ' branch', + metadata={"committer": "airflow-operator"} + ) + + task_merge_create_books_table_branch.post_execute = partial(post_execute_commit, message='lakeFS commit id is: ') + + task_merge_create_book_sales_table_branch = LakeFSMergeOperator( + task_id='merge_create_book_sales_table_branch', + do_xcom_push=True, + source_ref=default_args.get('branch') + '_create_book_sales_table', + destination_branch=default_args.get('branch') + '_create_tables', + msg='merging ' + default_args.get('branch') + '_create_book_sales_table' + ' to the ' + default_args.get('branch') + '_create_tables' + ' branch', + metadata={"committer": "airflow-operator"} + ) + + task_merge_create_book_sales_table_branch.post_execute = partial(post_execute_commit, message='lakeFS commit id is: ') + + task_merge_create_tables_branch = LakeFSMergeOperator( + task_id='merge_create_tables_branch', + do_xcom_push=True, + source_ref=default_args.get('branch') + '_create_tables', + destination_branch=default_args.get('branch'), + msg='merging ' + default_args.get('branch') + '_create_tables' + ' to the ' + default_args.get('branch') + ' branch', + metadata={"committer": "airflow-operator"} + ) + + task_merge_create_tables_branch.post_execute = partial(post_execute_commit, message='lakeFS commit id is: ') + + task_delete_cancelled_sales = SparkSubmitOperator( + task_id='delete_cancelled_sales', + conn_id='conn_spark', + application=applicationPath+"delete_cancelled_sales.py", + application_args=[default_args.get('branch')], + jars=jars_partition_data, + packages=sparkPackages, + conf=sparkConfig + ) + + task_delete_cancelled_sales.pre_execute = lambda context: LoggingMixin().log.info( + 'Branch name is: ' + Variable.get("newBranch") + '_' \ + + context['ts_nodash'] \ + + ' and lakeFS URL is: ' + Variable.get("lakefsUIEndPoint") \ + + '/repositories/' + Variable.get("repo") + '/objects?ref=' \ + + Variable.get("newBranch") + '_' + context['ts_nodash'] ) + + task_commit_cancelled_sales = LakeFSCommitOperator( + task_id='commit_cancelled_sales', + branch=default_args.get('branch'), + msg='committ Cancelled Sales to lakeFS using airflow!', + metadata={"committed_from": "airflow-operator", "source": "Data Cleansing Process"} + ) + + task_commit_cancelled_sales.post_execute = partial(post_execute_commit, message='lakeFS commit id is: ') + + # Merge the changes back to the main branch. + task_merge_etl_branch = LakeFSMergeOperator( + task_id='merge_etl_branch', + do_xcom_push=True, + source_ref=default_args.get('branch'), + destination_branch=default_args.get('default-branch'), + msg='merging ' + default_args.get('branch') + ' to the ' + default_args.get('default-branch') + ' branch', + metadata={"committer": "airflow-operator"} + ) + + task_merge_etl_branch.post_execute = partial(post_execute_commit, message='lakeFS commit id is: ') + + + task_create_etl_branch >> task_create_tables_branch >> [task_create_authors_table_branch, task_create_books_table_branch, task_create_book_sales_table_branch] + + task_create_authors_table_branch >> task_create_authors_table >> task_commit_authors_table >> task_merge_create_authors_table_branch >> task_merge_create_tables_branch + task_create_books_table_branch >> task_create_books_table >> task_commit_books_table >> task_merge_create_books_table_branch >> task_merge_create_tables_branch + task_create_book_sales_table_branch >> task_create_book_sales_table >> task_commit_book_sales_table >> task_merge_create_book_sales_table_branch >> task_merge_create_tables_branch + + task_merge_create_tables_branch >> task_delete_cancelled_sales >> task_commit_cancelled_sales >> task_merge_etl_branch + + +sample_workflow_dag = lakefs_iceberg_dag() \ No newline at end of file diff --git a/README.md b/README.md index e4dbe4a37..868eb8d0f 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,8 @@ Under the [standalone_examples](./01_standalone_examples/) folder are a set of e * Isolating Airflow job run and atomic promotion to production * Integration of lakeFS with Airflow via Hooks * Troubleshooting production issues + * Integration of lakeFS with Airflow and Databricks + * Integration of lakeFS with Airflow and Iceberg * [Airflow (2)](./01_standalone_examples/airflow-02/) - lakeFS + Airflow * [Azure Databricks](./01_standalone_examples/azure-databricks/) * [AWS Databricks](./01_standalone_examples/aws-databricks/)