From a6bce33baec0935745735325e1fe7f021352a4cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Mon, 11 Nov 2024 12:41:16 -0800 Subject: [PATCH 01/25] Remove line --- .github/workflows/push-to-main.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/push-to-main.yml b/.github/workflows/push-to-main.yml index 8267fb1..23a1120 100644 --- a/.github/workflows/push-to-main.yml +++ b/.github/workflows/push-to-main.yml @@ -160,4 +160,3 @@ jobs: - name: Drop PR database run: "dbt --no-write-json run-operation drop_recreate_db --args '{db_name: ${{env.DATACOVES__MAIN__DATABASE}}, recreate: False}'" # yamllint disable-line rule:line-length - From e2e2031e9bf64ae5c54e466b1a16ed1ca525e1a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Mon, 11 Nov 2024 12:52:21 -0800 Subject: [PATCH 02/25] Add stg_mayrapena1324_personal_loans model --- .pre-commit-config.yaml | 4 ++-- transform/models/L2_core/mayrapena1324_avg_by_grade.yml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ef6f3d0..0134682 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -11,8 +11,8 @@ repos: - id: check-script-ref-and-source - id: check-model-has-description - id: check-model-has-properties-file - # - id: check-model-has-all-columns - # - id: check-database-casing-consistency + - id: check-model-has-all-columns + always_run: true - repo: https://github.com/sqlfluff/sqlfluff diff --git a/transform/models/L2_core/mayrapena1324_avg_by_grade.yml b/transform/models/L2_core/mayrapena1324_avg_by_grade.yml index dfc2b70..143e30c 100644 --- a/transform/models/L2_core/mayrapena1324_avg_by_grade.yml +++ b/transform/models/L2_core/mayrapena1324_avg_by_grade.yml @@ -2,7 +2,7 @@ version: 2 models: - name: mayrapena1324_avg_by_grade - description: 'An average loan amount by grade' + description: '' columns: - name: grade - name: avg_loan_amount From 7433a30d75a8211cc663ce1f7c96d68916e9314d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Mon, 11 Nov 2024 12:55:36 -0800 Subject: [PATCH 03/25] Remove description - testing --- .../L1_staging/loans/stg_mayrapena1324_personal_loans.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transform/models/L1_staging/loans/stg_mayrapena1324_personal_loans.yml b/transform/models/L1_staging/loans/stg_mayrapena1324_personal_loans.yml index 3e2ffe1..d603f67 100644 --- a/transform/models/L1_staging/loans/stg_mayrapena1324_personal_loans.yml +++ b/transform/models/L1_staging/loans/stg_mayrapena1324_personal_loans.yml @@ -2,7 +2,7 @@ version: 2 models: - name: stg_mayrapena1324_personal_loans - description: 'A staging model for personal loans' + description: '' columns: - name: airbyte_raw_id - name: airbyte_extracted_at From 3df98456fa6edb5f9771eea122b7244655bf3c60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Mon, 11 Nov 2024 12:58:15 -0800 Subject: [PATCH 04/25] Remove col test --- transform/models/L2_core/mayrapena1324_avg_by_grade.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/transform/models/L2_core/mayrapena1324_avg_by_grade.yml b/transform/models/L2_core/mayrapena1324_avg_by_grade.yml index 143e30c..c1df51a 100644 --- a/transform/models/L2_core/mayrapena1324_avg_by_grade.yml +++ b/transform/models/L2_core/mayrapena1324_avg_by_grade.yml @@ -4,6 +4,5 @@ models: - name: mayrapena1324_avg_by_grade description: '' columns: - - name: grade - name: avg_loan_amount - name: total_loans From 328706ae27c3db0acb7b0144cd034616b2247a26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Mon, 11 Nov 2024 13:05:39 -0800 Subject: [PATCH 05/25] Add column --- .pre-commit-config.yaml | 1 - transform/models/L2_core/mayrapena1324_avg_by_grade.yml | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0134682..94e6f43 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -12,7 +12,6 @@ repos: - id: check-model-has-description - id: check-model-has-properties-file - id: check-model-has-all-columns - always_run: true - repo: https://github.com/sqlfluff/sqlfluff diff --git a/transform/models/L2_core/mayrapena1324_avg_by_grade.yml b/transform/models/L2_core/mayrapena1324_avg_by_grade.yml index c1df51a..143e30c 100644 --- a/transform/models/L2_core/mayrapena1324_avg_by_grade.yml +++ b/transform/models/L2_core/mayrapena1324_avg_by_grade.yml @@ -4,5 +4,6 @@ models: - name: mayrapena1324_avg_by_grade description: '' columns: + - name: grade - name: avg_loan_amount - name: total_loans From f50ff0279092c1e74f923e41196e373814612a06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Mon, 11 Nov 2024 13:36:52 -0800 Subject: [PATCH 06/25] Remove desc --- transform/models/L1_staging/loans/_loans.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transform/models/L1_staging/loans/_loans.yml b/transform/models/L1_staging/loans/_loans.yml index 26b9fd6..9f4f208 100644 --- a/transform/models/L1_staging/loans/_loans.yml +++ b/transform/models/L1_staging/loans/_loans.yml @@ -5,4 +5,4 @@ sources: database: RAW tables: - name: PERSONAL_LOANS - description: 'A personal loans source table' + description: '' From 7feb71321a41a15488f96d2a3d25f707f787981f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Mon, 11 Nov 2024 13:43:01 -0800 Subject: [PATCH 07/25] Testing --- transform/models/L1_staging/loans/_loans.yml | 2 +- .../L1_staging/loans/stg_mayrapena1324_personal_loans.yml | 2 +- transform/models/L2_core/mayrapena1324_avg_by_grade.yml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/transform/models/L1_staging/loans/_loans.yml b/transform/models/L1_staging/loans/_loans.yml index 9f4f208..10207ce 100644 --- a/transform/models/L1_staging/loans/_loans.yml +++ b/transform/models/L1_staging/loans/_loans.yml @@ -5,4 +5,4 @@ sources: database: RAW tables: - name: PERSONAL_LOANS - description: '' + description: 'A personal Loans table' diff --git a/transform/models/L1_staging/loans/stg_mayrapena1324_personal_loans.yml b/transform/models/L1_staging/loans/stg_mayrapena1324_personal_loans.yml index d603f67..3a94274 100644 --- a/transform/models/L1_staging/loans/stg_mayrapena1324_personal_loans.yml +++ b/transform/models/L1_staging/loans/stg_mayrapena1324_personal_loans.yml @@ -2,7 +2,7 @@ version: 2 models: - name: stg_mayrapena1324_personal_loans - description: '' + description: 'A personal Loans table' columns: - name: airbyte_raw_id - name: airbyte_extracted_at diff --git a/transform/models/L2_core/mayrapena1324_avg_by_grade.yml b/transform/models/L2_core/mayrapena1324_avg_by_grade.yml index 143e30c..bdbe427 100644 --- a/transform/models/L2_core/mayrapena1324_avg_by_grade.yml +++ b/transform/models/L2_core/mayrapena1324_avg_by_grade.yml @@ -2,7 +2,7 @@ version: 2 models: - name: mayrapena1324_avg_by_grade - description: '' + description: columns: - name: grade - name: avg_loan_amount From c73d3a3c522ebafc254873f391eece2fcca2107b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Mon, 11 Nov 2024 14:37:20 -0800 Subject: [PATCH 08/25] Add empty desc --- transform/models/L2_core/mayrapena1324_avg_by_grade.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transform/models/L2_core/mayrapena1324_avg_by_grade.yml b/transform/models/L2_core/mayrapena1324_avg_by_grade.yml index bdbe427..143e30c 100644 --- a/transform/models/L2_core/mayrapena1324_avg_by_grade.yml +++ b/transform/models/L2_core/mayrapena1324_avg_by_grade.yml @@ -2,7 +2,7 @@ version: 2 models: - name: mayrapena1324_avg_by_grade - description: + description: '' columns: - name: grade - name: avg_loan_amount From 869c1b12e45ece3c6d541b4edc745150db2c1e2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Mon, 11 Nov 2024 14:58:02 -0800 Subject: [PATCH 09/25] Change DAG name --- orchestrate/dags/sample_dag.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/orchestrate/dags/sample_dag.py b/orchestrate/dags/sample_dag.py index 7000a85..1579be9 100644 --- a/orchestrate/dags/sample_dag.py +++ b/orchestrate/dags/sample_dag.py @@ -7,8 +7,8 @@ @dag( default_args={ "start_date": datetime.datetime(2023, 1, 1, 0, 0), - "owner": "John Doe", - "email": "john@example.com", + "owner": "Mayra Peña", + "email": "mayra@example.com", "email_on_failure": True, }, description="Daily dbt run", @@ -16,10 +16,10 @@ tags=["version_1"], catchup=False, ) -def daily_run(): +def sample_dag(): run_dbt = DatacovesDbtOperator( task_id="run_dbt", bash_command="dbt source freshness && dbt build" ) -dag = daily_run() +dag = sample_dag() From f708fa8fd2da74118e98d78f0ff92ddc38a8dc5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Mon, 11 Nov 2024 15:31:39 -0800 Subject: [PATCH 10/25] Add extract and load --- orchestrate/dags/mayrapena1324_dag.py | 40 +++++++++++++++++++ orchestrate/dags/yml_dag.py | 22 ++++++++++ .../sample_dag.yml | 0 orchestrate/dags_yml_definitions/yml_dag.yml | 22 ++++++++++ transform/.dbt_coves/config.yml | 36 ++++++++--------- 5 files changed, 102 insertions(+), 18 deletions(-) create mode 100644 orchestrate/dags/mayrapena1324_dag.py create mode 100644 orchestrate/dags/yml_dag.py rename orchestrate/{dag_yml_definitions => dags_yml_definitions}/sample_dag.yml (100%) create mode 100644 orchestrate/dags_yml_definitions/yml_dag.yml diff --git a/orchestrate/dags/mayrapena1324_dag.py b/orchestrate/dags/mayrapena1324_dag.py new file mode 100644 index 0000000..6d35fd0 --- /dev/null +++ b/orchestrate/dags/mayrapena1324_dag.py @@ -0,0 +1,40 @@ +from airflow.decorators import dag +from operators.datacoves.dbt import DatacovesDbtOperator +from pendulum import datetime + +@dag( + default_args={ + "start_date": datetime(2023, 10, 10), + "owner": "airflow", + "email": "mayrapena1324@example.com", + "email_on_failure": True, + }, + + catchup=False, + tags = ["version_1"], + description = "My first Airflow DAG", + + # This is a regular CRON schedule. Helpful resources + # https://cron-ai.vercel.app/ + # https://crontab.guru/ + schedule_interval = "0 0 1 */12 *" +) +def mayrapena1324_sample_dag(): + + # Calling dbt commands + dbt_debug = DatacovesDbtOperator( + task_id = "dbt_debug_task", + bash_command = "dbt debug", + ) + + dbt_run = DatacovesDbtOperator( + task_id = "dbt_run_task", + bash_command = "dbt run --select +mayrapena1324_avg_by_grade", + ) + + # tells Airflow what order to run tasks in + dbt_debug >> dbt_run + + +# Invoke Dag +dag = mayrapena1324_sample_dag() diff --git a/orchestrate/dags/yml_dag.py b/orchestrate/dags/yml_dag.py new file mode 100644 index 0000000..de1fccb --- /dev/null +++ b/orchestrate/dags/yml_dag.py @@ -0,0 +1,22 @@ +from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator +from airflow.decorators import dag +import datetime +from operators.datacoves.dbt import DatacovesDbtOperator +from airflow.decorators import task_group +@dag( + default_args={'start_date': '2023-01'}, + description="Personal Loan Average", + schedule_interval="0 0 1 */12 *", + tags=['version_1'], + catchup=False +) +def yml_dag(): + @task_group(group_id='extract_and_load_airbyte', tooltip='Airbyte Extract and Load') + def extract_and_load_airbyte(): + tg_extract_and_load_airbyte = extract_and_load_airbyte() + transform = DatacovesDbtOperator( + task_id='transform', + bash_command="dbt build -s 'tag:daily_run_airbyte+ -t prd'" + ) + transform.set_upstream([tg_extract_and_load_airbyte]) +dag = yml_dag() diff --git a/orchestrate/dag_yml_definitions/sample_dag.yml b/orchestrate/dags_yml_definitions/sample_dag.yml similarity index 100% rename from orchestrate/dag_yml_definitions/sample_dag.yml rename to orchestrate/dags_yml_definitions/sample_dag.yml diff --git a/orchestrate/dags_yml_definitions/yml_dag.yml b/orchestrate/dags_yml_definitions/yml_dag.yml new file mode 100644 index 0000000..450d4fb --- /dev/null +++ b/orchestrate/dags_yml_definitions/yml_dag.yml @@ -0,0 +1,22 @@ +description: "Personal Loan Average" +schedule_interval: "0 0 1 */12 *" +tags: + - version_1 +default_args: + start_date: 2023-01 +catchup: false + +nodes: + extract_and_load_airbyte: + generator: AirbyteDbtGenerator + type: task_group + + tooltip: "Airbyte Extract and Load" + dbt_list_args: "--select tag:daily_run_airbyte" + + transform: + operator: operators.datacoves.dbt.DatacovesDbtOperator + type: task + + bash_command: "dbt build -s 'tag:daily_run_airbyte+ -t prd'" + dependencies: ["extract_and_load_airbyte"] diff --git a/transform/.dbt_coves/config.yml b/transform/.dbt_coves/config.yml index eb3c19b..4b08579 100644 --- a/transform/.dbt_coves/config.yml +++ b/transform/.dbt_coves/config.yml @@ -24,20 +24,20 @@ generate: # UNCOMMENT THE FOLLOWING LINES TO ENABLE AIRFLOW DAGS GENERATION # BASED ON AIRBYTE AND FIVETRAN CONNECTIONS - # generators_params: - # AirbyteDbtGenerator: - # host: "{{ env_var('DATACOVES__AIRBYTE_HOST_NAME') }}" - # port: "{{ env_var('DATACOVES__AIRBYTE_PORT') }}" - # airbyte_conn_id: airbyte_connection + generators_params: + AirbyteDbtGenerator: + host: "{{ env_var('DATACOVES__AIRBYTE_HOST_NAME') }}" + port: "{{ env_var('DATACOVES__AIRBYTE_PORT') }}" + airbyte_conn_id: airbyte_connection - # dbt_project_path: "{{ env_var('DATACOVES__DBT_HOME') }}" - # run_dbt_compile: false - # run_dbt_deps: false + dbt_project_path: "{{ env_var('DATACOVES__DBT_HOME') }}" + run_dbt_compile: false + run_dbt_deps: false - # AirbyteGenerator: - # host: "{{ env_var('DATACOVES__AIRBYTE_HOST_NAME') }}" - # port: "{{ env_var('DATACOVES__AIRBYTE_PORT') }}" - # airbyte_conn_id: airbyte_connection + AirbyteGenerator: + host: "{{ env_var('DATACOVES__AIRBYTE_HOST_NAME') }}" + port: "{{ env_var('DATACOVES__AIRBYTE_PORT') }}" + airbyte_conn_id: airbyte_connection # FivetranDbtGenerator: # api_key: "{{ env_var('DATACOVES__FIVETRAN_API_KEY') }}" @@ -55,11 +55,11 @@ generate: # - fivetran-connection-id-2 # UNCOMMENT THE FOLLOWING LINES TO ENABLE AIRBYTE EXTRACTION -# extract: -# airbyte: -# path: /config/workspace/load/airbyte -# host: "{{ env_var('DATACOVES__AIRBYTE_HOST_NAME') }}" -# port: "{{ env_var('DATACOVES__AIRBYTE_PORT') }}" +extract: + airbyte: + path: /config/workspace/load/airbyte + host: "{{ env_var('DATACOVES__AIRBYTE_HOST_NAME') }}" + port: "{{ env_var('DATACOVES__AIRBYTE_PORT') }}" # fivetran: # path: /config/workspace/extract/fivetran @@ -72,4 +72,4 @@ generate: # fivetran: # path: /config/workspace/load/fivetran -# run_connection_tests: true \ No newline at end of file +# run_connection_tests: true From 4e66f46fc1dba67d15983de4db0761e220ac7028 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Mon, 11 Nov 2024 15:33:19 -0800 Subject: [PATCH 11/25] dbt debug sample dag --- orchestrate/dags/sample_dag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestrate/dags/sample_dag.py b/orchestrate/dags/sample_dag.py index 1579be9..fd24fc6 100644 --- a/orchestrate/dags/sample_dag.py +++ b/orchestrate/dags/sample_dag.py @@ -18,7 +18,7 @@ ) def sample_dag(): run_dbt = DatacovesDbtOperator( - task_id="run_dbt", bash_command="dbt source freshness && dbt build" + task_id="run_dbt", bash_command="dbt debug" ) From 2cdc6f0c9dc271ca3c1785e345f660b03e7bb21a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Mon, 11 Nov 2024 15:38:33 -0800 Subject: [PATCH 12/25] Add daily run tag --- transform/models/L1_staging/country_data/_country_data.yml | 2 ++ transform/models/L1_staging/loans/_loans.yml | 2 ++ 2 files changed, 4 insertions(+) diff --git a/transform/models/L1_staging/country_data/_country_data.yml b/transform/models/L1_staging/country_data/_country_data.yml index aa4cdca..e0f3613 100644 --- a/transform/models/L1_staging/country_data/_country_data.yml +++ b/transform/models/L1_staging/country_data/_country_data.yml @@ -3,6 +3,8 @@ version: 2 sources: - name: COUNTRY_DATA database: RAW + tags: + - daily_run_airbyte tables: - name: COUNTRY_POPULATIONS description: 'Raw population information from Github Datasets repository' diff --git a/transform/models/L1_staging/loans/_loans.yml b/transform/models/L1_staging/loans/_loans.yml index 10207ce..f2e4f1f 100644 --- a/transform/models/L1_staging/loans/_loans.yml +++ b/transform/models/L1_staging/loans/_loans.yml @@ -3,6 +3,8 @@ version: 2 sources: - name: MAYRAPENA1324 database: RAW + tags: + - daily_run_airbyte tables: - name: PERSONAL_LOANS description: 'A personal Loans table' From 19bc4fff981a05a90b48a441e77142ad374c2d30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Mon, 11 Nov 2024 15:39:45 -0800 Subject: [PATCH 13/25] Fix yml dag --- orchestrate/dags/yml_dag.py | 37 ++++++++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/orchestrate/dags/yml_dag.py b/orchestrate/dags/yml_dag.py index de1fccb..93a0e50 100644 --- a/orchestrate/dags/yml_dag.py +++ b/orchestrate/dags/yml_dag.py @@ -1,22 +1,37 @@ -from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator -from airflow.decorators import dag import datetime + +from airflow.decorators import dag, task_group +from airflow.providers.airbyte.operators.airbyte import \ + AirbyteTriggerSyncOperator from operators.datacoves.dbt import DatacovesDbtOperator -from airflow.decorators import task_group + + @dag( - default_args={'start_date': '2023-01'}, - description="Personal Loan Average", - schedule_interval="0 0 1 */12 *", - tags=['version_1'], - catchup=False + default_args={"start_date": "2023-01"}, + description="Personal Loan Average", + schedule_interval="0 0 1 */12 *", + tags=["version_1"], + catchup=False, ) def yml_dag(): - @task_group(group_id='extract_and_load_airbyte', tooltip='Airbyte Extract and Load') + @task_group(group_id="extract_and_load_airbyte", tooltip="Airbyte Extract and Load") def extract_and_load_airbyte(): + country_populations_datacoves_train = AirbyteTriggerSyncOperator( + task_id="country_populations_datacoves_train", + connection_id="676575f7-22d7-41f4-ab78-52099d8cbccb", + airbyte_conn_id="airbyte_connection", + ) + personal_loans_datacoves_train = AirbyteTriggerSyncOperator( + task_id="personal_loans_datacoves_train", + connection_id="04b4fc09-bc22-4d19-b457-2d1fe84fbd40", + airbyte_conn_id="airbyte_connection", + ) + tg_extract_and_load_airbyte = extract_and_load_airbyte() transform = DatacovesDbtOperator( - task_id='transform', - bash_command="dbt build -s 'tag:daily_run_airbyte+ -t prd'" + task_id="transform", bash_command="dbt build -s 'tag:daily_run_airbyte+ -t prd'" ) transform.set_upstream([tg_extract_and_load_airbyte]) + + dag = yml_dag() From b953be62b2e42bbb28368bad3665c0070d594aa3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Mon, 11 Nov 2024 15:59:52 -0800 Subject: [PATCH 14/25] Add yml dag --- orchestrate/dags/mayrapena1324_dag.py | 60 ++++++++++--------- .../{yml_dag.yml => mayrapena1324_dag.yml} | 6 +- 2 files changed, 36 insertions(+), 30 deletions(-) rename orchestrate/dags_yml_definitions/{yml_dag.yml => mayrapena1324_dag.yml} (79%) diff --git a/orchestrate/dags/mayrapena1324_dag.py b/orchestrate/dags/mayrapena1324_dag.py index 6d35fd0..037956c 100644 --- a/orchestrate/dags/mayrapena1324_dag.py +++ b/orchestrate/dags/mayrapena1324_dag.py @@ -1,40 +1,42 @@ -from airflow.decorators import dag +import datetime + +from airflow.decorators import dag, task_group +from airflow.providers.airbyte.operators.airbyte import \ + AirbyteTriggerSyncOperator from operators.datacoves.dbt import DatacovesDbtOperator -from pendulum import datetime + @dag( default_args={ - "start_date": datetime(2023, 10, 10), - "owner": "airflow", - "email": "mayrapena1324@example.com", + "start_date": datetime.datetime(2023, 1, 1, 0, 0), + "owner": "Mayra Pena", + "email": "mayra@datacoves.com", "email_on_failure": True, }, - + description="Personal Loan Average", + schedule_interval="0 0 1 */12 *", + tags=["version_1"], catchup=False, - tags = ["version_1"], - description = "My first Airflow DAG", - - # This is a regular CRON schedule. Helpful resources - # https://cron-ai.vercel.app/ - # https://crontab.guru/ - schedule_interval = "0 0 1 */12 *" ) -def mayrapena1324_sample_dag(): - - # Calling dbt commands - dbt_debug = DatacovesDbtOperator( - task_id = "dbt_debug_task", - bash_command = "dbt debug", - ) - - dbt_run = DatacovesDbtOperator( - task_id = "dbt_run_task", - bash_command = "dbt run --select +mayrapena1324_avg_by_grade", +def mayrapena1324_dag(): + @task_group(group_id="extract_and_load_airbyte", tooltip="Airbyte Extract and Load") + def extract_and_load_airbyte(): + country_populations_datacoves_train = AirbyteTriggerSyncOperator( + task_id="country_populations_datacoves_train", + connection_id="676575f7-22d7-41f4-ab78-52099d8cbccb", + airbyte_conn_id="airbyte_connection", + ) + personal_loans_datacoves_train = AirbyteTriggerSyncOperator( + task_id="personal_loans_datacoves_train", + connection_id="04b4fc09-bc22-4d19-b457-2d1fe84fbd40", + airbyte_conn_id="airbyte_connection", + ) + + tg_extract_and_load_airbyte = extract_and_load_airbyte() + transform = DatacovesDbtOperator( + task_id="transform", bash_command="dbt build -s 'tag:daily_run_airbyte+ -t prd'" ) - - # tells Airflow what order to run tasks in - dbt_debug >> dbt_run + transform.set_upstream([tg_extract_and_load_airbyte]) -# Invoke Dag -dag = mayrapena1324_sample_dag() +dag = mayrapena1324_dag() diff --git a/orchestrate/dags_yml_definitions/yml_dag.yml b/orchestrate/dags_yml_definitions/mayrapena1324_dag.yml similarity index 79% rename from orchestrate/dags_yml_definitions/yml_dag.yml rename to orchestrate/dags_yml_definitions/mayrapena1324_dag.yml index 450d4fb..775cd3e 100644 --- a/orchestrate/dags_yml_definitions/yml_dag.yml +++ b/orchestrate/dags_yml_definitions/mayrapena1324_dag.yml @@ -3,7 +3,11 @@ schedule_interval: "0 0 1 */12 *" tags: - version_1 default_args: - start_date: 2023-01 + start_date: 2023-01-01 + # Replace name and email + owner: Mayra Pena + email: mayra@datacoves.com + email_on_failure: true catchup: false nodes: From 89b170bfa1cbc852fa153523365d3dd90e0b47c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Mon, 11 Nov 2024 16:01:21 -0800 Subject: [PATCH 15/25] Add a line --- transform/models/L2_core/mayrapena1324_avg_by_grade.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/transform/models/L2_core/mayrapena1324_avg_by_grade.yml b/transform/models/L2_core/mayrapena1324_avg_by_grade.yml index 143e30c..3bf10d4 100644 --- a/transform/models/L2_core/mayrapena1324_avg_by_grade.yml +++ b/transform/models/L2_core/mayrapena1324_avg_by_grade.yml @@ -7,3 +7,4 @@ models: - name: grade - name: avg_loan_amount - name: total_loans + From 79458337b29e724e8ae2ba418de845eaf92b8e4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Mon, 11 Nov 2024 16:10:55 -0800 Subject: [PATCH 16/25] Edit governance check --- .github/workflows/pull_request_build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pull_request_build.yml b/.github/workflows/pull_request_build.yml index fad51e9..d9c0fbb 100644 --- a/.github/workflows/pull_request_build.yml +++ b/.github/workflows/pull_request_build.yml @@ -91,7 +91,7 @@ jobs: run: "dbt-coves generate docs --merge-deferred --state logs" - name: Run governance checks - run: "pre-commit run --from-ref origin/${{ github.event.pull_request.base.ref }} --to-ref HEAD" + run: "dbt parse && pre-commit run --from-ref origin/${{ github.event.pull_request.base.ref }} --to-ref HEAD" ##### Real dbt run given that we passed governance checks - name: Run dbt build slim mode From 45f30598553b29ff59df1972db7ffdd1ee12206c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Mon, 11 Nov 2024 16:14:37 -0800 Subject: [PATCH 17/25] Testing --- .github/workflows/pull_request_build.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/pull_request_build.yml b/.github/workflows/pull_request_build.yml index d9c0fbb..f1f9b51 100644 --- a/.github/workflows/pull_request_build.yml +++ b/.github/workflows/pull_request_build.yml @@ -78,12 +78,12 @@ jobs: ##### Governance Checks # this first runs dbt but creates empty tables, this is enough to then run the hooks and fail fast - - name: Governance run of dbt with EMPTY models using slim mode - if: ${{ steps.prod_manifest.outputs.manifest_found == 'true' && contains(github.event.pull_request.labels.*.name, 'full-refresh') != true }} - run: "dbt build --fail-fast --defer --state logs --select state:modified+ --empty" + # - name: Governance run of dbt with EMPTY models using slim mode + # if: ${{ steps.prod_manifest.outputs.manifest_found == 'true' && contains(github.event.pull_request.labels.*.name, 'full-refresh') != true }} + # run: "dbt build --fail-fast --defer --state logs --select state:modified+ --empty" - name: Governance run of dbt with EMPTY models using full run - if: ${{ steps.prod_manifest.outputs.manifest_found == 'false' || contains(github.event.pull_request.labels.*.name, 'full-refresh') }} + # if: ${{ steps.prod_manifest.outputs.manifest_found == 'false' || contains(github.event.pull_request.labels.*.name, 'full-refresh') }} run: "dbt build --fail-fast --empty" - name: Generate Docs Combining Prod and branch catalog.json From c181462e600dea6ef908ae409e62b719becb22e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Mon, 11 Nov 2024 16:17:56 -0800 Subject: [PATCH 18/25] Remove extra line --- .github/workflows/pull_request_build.yml | 4 +++- transform/models/L2_core/mayrapena1324_avg_by_grade.yml | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pull_request_build.yml b/.github/workflows/pull_request_build.yml index f1f9b51..3a93016 100644 --- a/.github/workflows/pull_request_build.yml +++ b/.github/workflows/pull_request_build.yml @@ -91,7 +91,9 @@ jobs: run: "dbt-coves generate docs --merge-deferred --state logs" - name: Run governance checks - run: "dbt parse && pre-commit run --from-ref origin/${{ github.event.pull_request.base.ref }} --to-ref HEAD" + # run: "pre-commit run --from-ref origin/${{ github.event.pull_request.base.ref }} --to-ref HEAD" + run: "pre-commit run --all" + ##### Real dbt run given that we passed governance checks - name: Run dbt build slim mode diff --git a/transform/models/L2_core/mayrapena1324_avg_by_grade.yml b/transform/models/L2_core/mayrapena1324_avg_by_grade.yml index 3bf10d4..143e30c 100644 --- a/transform/models/L2_core/mayrapena1324_avg_by_grade.yml +++ b/transform/models/L2_core/mayrapena1324_avg_by_grade.yml @@ -7,4 +7,3 @@ models: - name: grade - name: avg_loan_amount - name: total_loans - From 264958be7e3c403945ab28498d034ccec7db9cca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Mon, 11 Nov 2024 16:22:17 -0800 Subject: [PATCH 19/25] Testing --- .github/workflows/pull_request_build.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/pull_request_build.yml b/.github/workflows/pull_request_build.yml index 3a93016..73a9e51 100644 --- a/.github/workflows/pull_request_build.yml +++ b/.github/workflows/pull_request_build.yml @@ -78,12 +78,12 @@ jobs: ##### Governance Checks # this first runs dbt but creates empty tables, this is enough to then run the hooks and fail fast - # - name: Governance run of dbt with EMPTY models using slim mode - # if: ${{ steps.prod_manifest.outputs.manifest_found == 'true' && contains(github.event.pull_request.labels.*.name, 'full-refresh') != true }} - # run: "dbt build --fail-fast --defer --state logs --select state:modified+ --empty" + - name: Governance run of dbt with EMPTY models using slim mode + if: ${{ steps.prod_manifest.outputs.manifest_found == 'true' && contains(github.event.pull_request.labels.*.name, 'full-refresh') != true }} + run: "dbt build --fail-fast --defer --state logs --select state:modified+ --empty" - name: Governance run of dbt with EMPTY models using full run - # if: ${{ steps.prod_manifest.outputs.manifest_found == 'false' || contains(github.event.pull_request.labels.*.name, 'full-refresh') }} + if: ${{ steps.prod_manifest.outputs.manifest_found == 'false' || contains(github.event.pull_request.labels.*.name, 'full-refresh') }} run: "dbt build --fail-fast --empty" - name: Generate Docs Combining Prod and branch catalog.json From f0efec1a1501c6cf31d919b05d3bec11d8c7f63a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Mon, 11 Nov 2024 16:38:32 -0800 Subject: [PATCH 20/25] Testing --- transform/models/L1_staging/loans/_loans.yml | 2 +- transform/models/L2_core/mayrapena1324_avg_by_grade.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/transform/models/L1_staging/loans/_loans.yml b/transform/models/L1_staging/loans/_loans.yml index 10207ce..9f4f208 100644 --- a/transform/models/L1_staging/loans/_loans.yml +++ b/transform/models/L1_staging/loans/_loans.yml @@ -5,4 +5,4 @@ sources: database: RAW tables: - name: PERSONAL_LOANS - description: 'A personal Loans table' + description: '' diff --git a/transform/models/L2_core/mayrapena1324_avg_by_grade.yml b/transform/models/L2_core/mayrapena1324_avg_by_grade.yml index 143e30c..085063a 100644 --- a/transform/models/L2_core/mayrapena1324_avg_by_grade.yml +++ b/transform/models/L2_core/mayrapena1324_avg_by_grade.yml @@ -2,7 +2,7 @@ version: 2 models: - name: mayrapena1324_avg_by_grade - description: '' + description: 'Average personal loan by grade' columns: - name: grade - name: avg_loan_amount From cd836c85c6b79d9b2c2b91908f003f7cf825445d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Mon, 11 Nov 2024 16:47:35 -0800 Subject: [PATCH 21/25] fix dags_yml_definitions folder --- .../{dag_yml_definitions => dags_yml_definitions}/sample_dag.yml | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename orchestrate/{dag_yml_definitions => dags_yml_definitions}/sample_dag.yml (100%) diff --git a/orchestrate/dag_yml_definitions/sample_dag.yml b/orchestrate/dags_yml_definitions/sample_dag.yml similarity index 100% rename from orchestrate/dag_yml_definitions/sample_dag.yml rename to orchestrate/dags_yml_definitions/sample_dag.yml From 8e9268820d7f2b787db31b15a90b4f4b474c2a52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Mon, 11 Nov 2024 16:58:27 -0800 Subject: [PATCH 22/25] Add my first dag --- orchestrate/dags/mayrapena1324_dag.py | 42 +++++++++++++++++++ .../mayrapena1324_dag.yml | 26 ++++++++++++ 2 files changed, 68 insertions(+) create mode 100644 orchestrate/dags/mayrapena1324_dag.py create mode 100644 orchestrate/dags_yml_definitions/mayrapena1324_dag.yml diff --git a/orchestrate/dags/mayrapena1324_dag.py b/orchestrate/dags/mayrapena1324_dag.py new file mode 100644 index 0000000..037956c --- /dev/null +++ b/orchestrate/dags/mayrapena1324_dag.py @@ -0,0 +1,42 @@ +import datetime + +from airflow.decorators import dag, task_group +from airflow.providers.airbyte.operators.airbyte import \ + AirbyteTriggerSyncOperator +from operators.datacoves.dbt import DatacovesDbtOperator + + +@dag( + default_args={ + "start_date": datetime.datetime(2023, 1, 1, 0, 0), + "owner": "Mayra Pena", + "email": "mayra@datacoves.com", + "email_on_failure": True, + }, + description="Personal Loan Average", + schedule_interval="0 0 1 */12 *", + tags=["version_1"], + catchup=False, +) +def mayrapena1324_dag(): + @task_group(group_id="extract_and_load_airbyte", tooltip="Airbyte Extract and Load") + def extract_and_load_airbyte(): + country_populations_datacoves_train = AirbyteTriggerSyncOperator( + task_id="country_populations_datacoves_train", + connection_id="676575f7-22d7-41f4-ab78-52099d8cbccb", + airbyte_conn_id="airbyte_connection", + ) + personal_loans_datacoves_train = AirbyteTriggerSyncOperator( + task_id="personal_loans_datacoves_train", + connection_id="04b4fc09-bc22-4d19-b457-2d1fe84fbd40", + airbyte_conn_id="airbyte_connection", + ) + + tg_extract_and_load_airbyte = extract_and_load_airbyte() + transform = DatacovesDbtOperator( + task_id="transform", bash_command="dbt build -s 'tag:daily_run_airbyte+ -t prd'" + ) + transform.set_upstream([tg_extract_and_load_airbyte]) + + +dag = mayrapena1324_dag() diff --git a/orchestrate/dags_yml_definitions/mayrapena1324_dag.yml b/orchestrate/dags_yml_definitions/mayrapena1324_dag.yml new file mode 100644 index 0000000..775cd3e --- /dev/null +++ b/orchestrate/dags_yml_definitions/mayrapena1324_dag.yml @@ -0,0 +1,26 @@ +description: "Personal Loan Average" +schedule_interval: "0 0 1 */12 *" +tags: + - version_1 +default_args: + start_date: 2023-01-01 + # Replace name and email + owner: Mayra Pena + email: mayra@datacoves.com + email_on_failure: true +catchup: false + +nodes: + extract_and_load_airbyte: + generator: AirbyteDbtGenerator + type: task_group + + tooltip: "Airbyte Extract and Load" + dbt_list_args: "--select tag:daily_run_airbyte" + + transform: + operator: operators.datacoves.dbt.DatacovesDbtOperator + type: task + + bash_command: "dbt build -s 'tag:daily_run_airbyte+ -t prd'" + dependencies: ["extract_and_load_airbyte"] From c9d73c8cfcbdab9a9851bdf320cacded5f9d405e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Mon, 11 Nov 2024 18:44:05 -0800 Subject: [PATCH 23/25] Add description to personal loans source --- transform/models/L1_staging/loans/_loans.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transform/models/L1_staging/loans/_loans.yml b/transform/models/L1_staging/loans/_loans.yml index e9c4b89..d0a58b8 100644 --- a/transform/models/L1_staging/loans/_loans.yml +++ b/transform/models/L1_staging/loans/_loans.yml @@ -7,4 +7,4 @@ sources: - daily_run_airbyte tables: - name: PERSONAL_LOANS - description: '' + description: 'A personal loans table' From 4f3b29c75648e8dac162379c7b0809c1aef3eb59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Tue, 12 Nov 2024 15:06:31 -0800 Subject: [PATCH 24/25] Delete airflow dags --- orchestrate/dags/mayrapena1324_dag.py | 42 ------------------- .../mayrapena1324_dag.yml | 26 ------------ 2 files changed, 68 deletions(-) delete mode 100644 orchestrate/dags/mayrapena1324_dag.py delete mode 100644 orchestrate/dags_yml_definitions/mayrapena1324_dag.yml diff --git a/orchestrate/dags/mayrapena1324_dag.py b/orchestrate/dags/mayrapena1324_dag.py deleted file mode 100644 index 037956c..0000000 --- a/orchestrate/dags/mayrapena1324_dag.py +++ /dev/null @@ -1,42 +0,0 @@ -import datetime - -from airflow.decorators import dag, task_group -from airflow.providers.airbyte.operators.airbyte import \ - AirbyteTriggerSyncOperator -from operators.datacoves.dbt import DatacovesDbtOperator - - -@dag( - default_args={ - "start_date": datetime.datetime(2023, 1, 1, 0, 0), - "owner": "Mayra Pena", - "email": "mayra@datacoves.com", - "email_on_failure": True, - }, - description="Personal Loan Average", - schedule_interval="0 0 1 */12 *", - tags=["version_1"], - catchup=False, -) -def mayrapena1324_dag(): - @task_group(group_id="extract_and_load_airbyte", tooltip="Airbyte Extract and Load") - def extract_and_load_airbyte(): - country_populations_datacoves_train = AirbyteTriggerSyncOperator( - task_id="country_populations_datacoves_train", - connection_id="676575f7-22d7-41f4-ab78-52099d8cbccb", - airbyte_conn_id="airbyte_connection", - ) - personal_loans_datacoves_train = AirbyteTriggerSyncOperator( - task_id="personal_loans_datacoves_train", - connection_id="04b4fc09-bc22-4d19-b457-2d1fe84fbd40", - airbyte_conn_id="airbyte_connection", - ) - - tg_extract_and_load_airbyte = extract_and_load_airbyte() - transform = DatacovesDbtOperator( - task_id="transform", bash_command="dbt build -s 'tag:daily_run_airbyte+ -t prd'" - ) - transform.set_upstream([tg_extract_and_load_airbyte]) - - -dag = mayrapena1324_dag() diff --git a/orchestrate/dags_yml_definitions/mayrapena1324_dag.yml b/orchestrate/dags_yml_definitions/mayrapena1324_dag.yml deleted file mode 100644 index 775cd3e..0000000 --- a/orchestrate/dags_yml_definitions/mayrapena1324_dag.yml +++ /dev/null @@ -1,26 +0,0 @@ -description: "Personal Loan Average" -schedule_interval: "0 0 1 */12 *" -tags: - - version_1 -default_args: - start_date: 2023-01-01 - # Replace name and email - owner: Mayra Pena - email: mayra@datacoves.com - email_on_failure: true -catchup: false - -nodes: - extract_and_load_airbyte: - generator: AirbyteDbtGenerator - type: task_group - - tooltip: "Airbyte Extract and Load" - dbt_list_args: "--select tag:daily_run_airbyte" - - transform: - operator: operators.datacoves.dbt.DatacovesDbtOperator - type: task - - bash_command: "dbt build -s 'tag:daily_run_airbyte+ -t prd'" - dependencies: ["extract_and_load_airbyte"] From e14cac41efb06208693590403035d8ce44e89bd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mayra=20Pe=C3=B1a?= Date: Tue, 12 Nov 2024 15:07:23 -0800 Subject: [PATCH 25/25] Change command for airflow dag --- orchestrate/dags_yml_definitions/sample_dag.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestrate/dags_yml_definitions/sample_dag.yml b/orchestrate/dags_yml_definitions/sample_dag.yml index d8afd80..3b2afe9 100644 --- a/orchestrate/dags_yml_definitions/sample_dag.yml +++ b/orchestrate/dags_yml_definitions/sample_dag.yml @@ -14,4 +14,4 @@ nodes: run_dbt: type: task operator: operators.datacoves.dbt.DatacovesDbtOperator - bash_command: "dbt source freshness && dbt build" + bash_command: "dbt debug"