diff --git a/orchestrate/dags/martyg_dag.py b/orchestrate/dags/martyg_dag.py new file mode 100644 index 0000000..e96a1d5 --- /dev/null +++ b/orchestrate/dags/martyg_dag.py @@ -0,0 +1,43 @@ +import datetime + +from airflow.decorators import dag, task_group +from airflow.providers.airbyte.operators.airbyte import \ + AirbyteTriggerSyncOperator +from operators.datacoves.dbt import DatacovesDbtOperator + + +@dag( + default_args={ + "start_date": datetime.datetime(2023, 1, 1, 0, 0), + "owner": "Marty Goode", + "email": "marty.goode@example.com", + "email_on_failure": True, + "retries": 3, + }, + description="Personal Loan Average", + schedule="0 0 1 */12 *", + tags=["version_1"], + catchup=False, +) +def martyg_dag(): + @task_group(group_id="extract_and_load_airbyte", tooltip="Airbyte Extract and Load") + def extract_and_load_airbyte(): + country_populations_datacoves_train = AirbyteTriggerSyncOperator( + task_id="country_populations_datacoves_train", + connection_id="676575f7-22d7-41f4-ab78-52099d8cbccb", + airbyte_conn_id="airbyte_connection", + ) + martyg_source_datacoves_train = AirbyteTriggerSyncOperator( + task_id="martyg_source_datacoves_train", + connection_id="b330c95c-6a89-476a-84b6-1462fe28fa9e", + airbyte_conn_id="airbyte_connection", + ) + + tg_extract_and_load_airbyte = extract_and_load_airbyte() + transform = DatacovesDbtOperator( + task_id="transform", bash_command="dbt build -s 'tag:daily_run_airbyte+ -t prd'" + ) + transform.set_upstream([tg_extract_and_load_airbyte]) + + +dag = martyg_dag() diff --git a/orchestrate/dags_yml_definitions/martyg_dag.yml b/orchestrate/dags_yml_definitions/martyg_dag.yml new file mode 100644 index 0000000..3eaf417 --- /dev/null +++ b/orchestrate/dags_yml_definitions/martyg_dag.yml @@ -0,0 +1,27 @@ +description: "Personal Loan Average" +schedule: "0 0 1 */12 *" +tags: + - version_1 +default_args: + start_date: 2023-01-01 + # Replace name and email@example.com (not real email) + owner: Marty Goode + email: marty.goode@example.com + email_on_failure: true + retries: 3 +catchup: false + +nodes: + extract_and_load_airbyte: + generator: AirbyteDbtGenerator + type: task_group + + tooltip: "Airbyte Extract and Load" + dbt_list_args: "--select tag:daily_run_airbyte" + + transform: + operator: operators.datacoves.dbt.DatacovesDbtOperator + type: task + + bash_command: "dbt build -s 'tag:daily_run_airbyte+ -t prd'" + dependencies: ["extract_and_load_airbyte"]