From ec71652f8f5d204d67508786ad399caac51b3679 Mon Sep 17 00:00:00 2001 From: Harel Shein Date: Fri, 10 Nov 2023 13:19:44 -0500 Subject: [PATCH] remove gx --- dags/etl_customers.py | 164 +++++++++++++++++------------------ dags/etl_orders.py | 196 +++++++++++++++++++++--------------------- 2 files changed, 180 insertions(+), 180 deletions(-) diff --git a/dags/etl_customers.py b/dags/etl_customers.py index f9aedba9..0faf5973 100644 --- a/dags/etl_customers.py +++ b/dags/etl_customers.py @@ -1,90 +1,90 @@ -from airflow.contrib.operators.bigquery_operator import BigQueryOperator -from airflow.utils.dates import days_ago -from great_expectations.data_context import BaseDataContext -from great_expectations.data_context.types.base import DataContextConfig, \ - GCSStoreBackendDefaults -from great_expectations_provider.operators.great_expectations import \ - GreatExpectationsOperator -from airflow import DAG +# from airflow.contrib.operators.bigquery_operator import BigQueryOperator +# from airflow.utils.dates import days_ago +# from great_expectations.data_context import BaseDataContext +# from great_expectations.data_context.types.base import DataContextConfig, \ +# GCSStoreBackendDefaults +# from great_expectations_provider.operators.great_expectations import \ +# GreatExpectationsOperator +# from airflow import DAG -project_config = DataContextConfig( - datasources={ - "food_delivery_db": { - "data_asset_type": { - "module_name": "great_expectations.dataset", - "class_name": "SqlAlchemyDataset" - }, - "class_name": "SqlAlchemyDatasource", - "module_name": "great_expectations.datasource", - "credentials": { - "url": "bigquery://openlineage-404715/food_delivery" - }, - } - }, - store_backend_defaults=GCSStoreBackendDefaults( - default_bucket_name="us-central1-datakin-staging-135fd46b-bucket", - default_project_name="openlineage-404715", - expectations_store_prefix="great_expectations/expectations", - ) -) +# project_config = DataContextConfig( +# datasources={ +# "food_delivery_db": { +# "data_asset_type": { +# "module_name": "great_expectations.dataset", +# "class_name": "SqlAlchemyDataset" +# }, +# "class_name": "SqlAlchemyDatasource", +# "module_name": "great_expectations.datasource", +# "credentials": { +# "url": "bigquery://openlineage-404715/food_delivery" +# }, +# } +# }, +# store_backend_defaults=GCSStoreBackendDefaults( +# default_bucket_name="us-central1-datakin-staging-135fd46b-bucket", +# default_project_name="openlineage-404715", +# expectations_store_prefix="great_expectations/expectations", +# ) +# ) -context = BaseDataContext(project_config=project_config) +# context = BaseDataContext(project_config=project_config) -default_args = { - 'owner': 'datascience', - 'depends_on_past': False, - 'start_date': days_ago(1), - 'email_on_failure': False, - 'email_on_retry': False, - 'email': ['datascience@example.com'] -} +# default_args = { +# 'owner': 'datascience', +# 'depends_on_past': False, +# 'start_date': days_ago(1), +# 'email_on_failure': False, +# 'email_on_retry': False, +# 'email': ['datascience@example.com'] +# } -dag = DAG( - 'etl_customers', - schedule_interval='@hourly', - catchup=False, - default_args=default_args, - description='Loads newly registered customers daily.' -) +# dag = DAG( +# 'etl_customers', +# schedule_interval='@hourly', +# catchup=False, +# default_args=default_args, +# description='Loads newly registered customers daily.' +# ) -t1 = BigQueryOperator( - task_id='if_not_exists', - sql=''' - CREATE TABLE IF NOT EXISTS food_delivery.customers ( - id INT64, - created_at TIME, - updated_at TIME, - name STRING, - email STRING, - address STRING, - phone STRING, - city_id INT64 - ) - ''', - use_legacy_sql=False, - dag=dag -) +# t1 = BigQueryOperator( +# task_id='if_not_exists', +# sql=''' +# CREATE TABLE IF NOT EXISTS food_delivery.customers ( +# id INT64, +# created_at TIME, +# updated_at TIME, +# name STRING, +# email STRING, +# address STRING, +# phone STRING, +# city_id INT64 +# ) +# ''', +# use_legacy_sql=False, +# dag=dag +# ) -t2 = BigQueryOperator( - task_id='etl', - sql=''' - SELECT id, created_at, updated_at, name, email, address, phone, city_id - FROM food_delivery.tmp_customers - ''', - destination_dataset_table='openlineage-404715.food_delivery.customers', - use_legacy_sql=False, - dag=dag -) +# t2 = BigQueryOperator( +# task_id='etl', +# sql=''' +# SELECT id, created_at, updated_at, name, email, address, phone, city_id +# FROM food_delivery.tmp_customers +# ''', +# destination_dataset_table='openlineage-404715.food_delivery.customers', +# use_legacy_sql=False, +# dag=dag +# ) -t3 = GreatExpectationsOperator( - expectation_suite_name='customers_suite', - batch_kwargs={ - 'table': 'customers', - 'datasource': 'food_delivery_db' - }, - data_context=context, - dag=dag, - task_id='customers_expectation', -) +# t3 = GreatExpectationsOperator( +# expectation_suite_name='customers_suite', +# batch_kwargs={ +# 'table': 'customers', +# 'datasource': 'food_delivery_db' +# }, +# data_context=context, +# dag=dag, +# task_id='customers_expectation', +# ) -t1 >> t2 >> t3 +# t1 >> t2 >> t3 diff --git a/dags/etl_orders.py b/dags/etl_orders.py index b96bc146..f0e7b32b 100644 --- a/dags/etl_orders.py +++ b/dags/etl_orders.py @@ -1,106 +1,106 @@ -from airflow.contrib.operators.bigquery_operator import BigQueryOperator -from airflow.utils.dates import days_ago -from great_expectations.data_context import BaseDataContext -from great_expectations.data_context.types.base import DataContextConfig, \ - GCSStoreBackendDefaults -from great_expectations_provider.operators.great_expectations import \ - GreatExpectationsOperator -from airflow import DAG -import os +# from airflow.contrib.operators.bigquery_operator import BigQueryOperator +# from airflow.utils.dates import days_ago +# from great_expectations.data_context import BaseDataContext +# from great_expectations.data_context.types.base import DataContextConfig, \ +# GCSStoreBackendDefaults +# from great_expectations_provider.operators.great_expectations import \ +# GreatExpectationsOperator +# from airflow import DAG +# import os -project_config = DataContextConfig( - datasources={ - "food_delivery_db": { - "data_asset_type": { - "module_name": "great_expectations.dataset", - "class_name": "SqlAlchemyDataset" - }, - "class_name": "SqlAlchemyDatasource", - "module_name": "great_expectations.datasource", - "credentials": { - "url": "bigquery://openlineage-404715/food_delivery" - }, - } - }, - validation_operators={ - 'action_list_operator': { - 'class_name': "ActionListValidationOperator", - 'action_list': [{ - "name": "openlineage", - "action": { - "class_name": "OpenLineageValidationAction", - "module_name": "openlineage.common.provider.great_expectations", - "openlineage_host": os.getenv('OPENLINEAGE_URL'), - "openlineage_apiKey": os.getenv('OPENLINEAGE_API_KEY'), - "openlineage_namespace": os.getenv('OPENLINEAGE_NAMESPACE', 'default'), - "job_name": "validate_orders" - } - }] - } - }, - anonymous_usage_statistics={'enabled': False}, - store_backend_defaults=GCSStoreBackendDefaults( - default_bucket_name="us-central1-datakin-staging-135fd46b-bucket", - default_project_name="openlineage-404715", - expectations_store_prefix="great_expectations/expectations", - ) -) +# project_config = DataContextConfig( +# datasources={ +# "food_delivery_db": { +# "data_asset_type": { +# "module_name": "great_expectations.dataset", +# "class_name": "SqlAlchemyDataset" +# }, +# "class_name": "SqlAlchemyDatasource", +# "module_name": "great_expectations.datasource", +# "credentials": { +# "url": "bigquery://openlineage-404715/food_delivery" +# }, +# } +# }, +# validation_operators={ +# 'action_list_operator': { +# 'class_name': "ActionListValidationOperator", +# 'action_list': [{ +# "name": "openlineage", +# "action": { +# "class_name": "OpenLineageValidationAction", +# "module_name": "openlineage.common.provider.great_expectations", +# "openlineage_host": os.getenv('OPENLINEAGE_URL'), +# "openlineage_apiKey": os.getenv('OPENLINEAGE_API_KEY'), +# "openlineage_namespace": os.getenv('OPENLINEAGE_NAMESPACE', 'default'), +# "job_name": "validate_orders" +# } +# }] +# } +# }, +# anonymous_usage_statistics={'enabled': False}, +# store_backend_defaults=GCSStoreBackendDefaults( +# default_bucket_name="us-central1-datakin-staging-135fd46b-bucket", +# default_project_name="openlineage-404715", +# expectations_store_prefix="great_expectations/expectations", +# ) +# ) -context = BaseDataContext(project_config=project_config) +# context = BaseDataContext(project_config=project_config) -default_args = { - 'owner': 'datascience', - 'depends_on_past': False, - 'start_date': days_ago(1), - 'email_on_failure': False, - 'email_on_retry': False, - 'email': ['datascience@example.com'] -} +# default_args = { +# 'owner': 'datascience', +# 'depends_on_past': False, +# 'start_date': days_ago(1), +# 'email_on_failure': False, +# 'email_on_retry': False, +# 'email': ['datascience@example.com'] +# } -dag = DAG( - 'etl_orders', - schedule_interval='@hourly', - catchup=False, - default_args=default_args, - description='Loads newly placed orders daily.' -) +# dag = DAG( +# 'etl_orders', +# schedule_interval='@hourly', +# catchup=False, +# default_args=default_args, +# description='Loads newly placed orders daily.' +# ) -t1 = BigQueryOperator( - task_id='if_not_exists', - sql=''' - CREATE TABLE IF NOT EXISTS food_delivery.orders ( - id INT64, - placed_on TIME, - menu_item_id INT64, - quantity INT64, - discount_id INT64, - comment STRING - ) - ''', - use_legacy_sql=False, - dag=dag -) +# t1 = BigQueryOperator( +# task_id='if_not_exists', +# sql=''' +# CREATE TABLE IF NOT EXISTS food_delivery.orders ( +# id INT64, +# placed_on TIME, +# menu_item_id INT64, +# quantity INT64, +# discount_id INT64, +# comment STRING +# ) +# ''', +# use_legacy_sql=False, +# dag=dag +# ) -t2 = BigQueryOperator( - task_id='insert', - sql=''' - SELECT id, placed_on, menu_item_id, quantity, discount_id, comment - FROM food_delivery.tmp_orders; - ''', - destination_dataset_table='openlineage-404715.food_delivery.orders', - use_legacy_sql=False, - dag=dag -) +# t2 = BigQueryOperator( +# task_id='insert', +# sql=''' +# SELECT id, placed_on, menu_item_id, quantity, discount_id, comment +# FROM food_delivery.tmp_orders; +# ''', +# destination_dataset_table='openlineage-404715.food_delivery.orders', +# use_legacy_sql=False, +# dag=dag +# ) -t3 = GreatExpectationsOperator( - expectation_suite_name='orders_suite', - batch_kwargs={ - 'table': 'orders', - 'datasource': 'food_delivery_db' - }, - data_context=context, - dag=dag, - task_id='customers_expectation', -) +# t3 = GreatExpectationsOperator( +# expectation_suite_name='orders_suite', +# batch_kwargs={ +# 'table': 'orders', +# 'datasource': 'food_delivery_db' +# }, +# data_context=context, +# dag=dag, +# task_id='customers_expectation', +# ) -t1 >> t2 >> t3 +# t1 >> t2 >> t3