From 6e5fc3efe62d24bb6915184445ef867753ce07f1 Mon Sep 17 00:00:00 2001 From: Fenimore Love Date: Fri, 22 Oct 2021 14:57:04 -0400 Subject: [PATCH 1/2] Add DbtCleanOperator --- README.md | 9 +++++++-- airflow_dbt/operators/dbt_operator.py | 9 +++++++++ tests/operators/test_dbt_operator.py | 12 +++++++++++- 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 9a4368d..542da9d 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,8 @@ from airflow_dbt.operators.dbt_operator import ( DbtSeedOperator, DbtSnapshotOperator, DbtRunOperator, - DbtTestOperator + DbtTestOperator, + DbtCleanOperator, ) from airflow.utils.dates import days_ago @@ -36,7 +37,11 @@ with DAG(dag_id='dbt', default_args=default_args, schedule_interval='@daily') as retries=0, # Failing tests would fail the task, and we don't want Airflow to try again ) - dbt_seed >> dbt_snapshot >> dbt_run >> dbt_test + dbt_clean = DbtCleanOperator( + task_id='dbt_clean, + ) + + dbt_seed >> dbt_snapshot >> dbt_run >> dbt_test >> dbt_clean ``` ## Installation diff --git a/airflow_dbt/operators/dbt_operator.py b/airflow_dbt/operators/dbt_operator.py index 6233d8d..46c9679 100644 --- a/airflow_dbt/operators/dbt_operator.py +++ b/airflow_dbt/operators/dbt_operator.py @@ -142,3 +142,12 @@ def __init__(self, profiles_dir=None, target=None, *args, **kwargs): def execute(self, context): self.create_hook().run_cli('deps') + + +class DbtCleanOperator(DbtBaseOperator): + @apply_defaults + def __init__(self, profiles_dir=None, target=None, *args, **kwargs): + super(DbtCleanOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + + def execute(self, context): + self.create_hook().run_cli('clean') diff --git a/tests/operators/test_dbt_operator.py b/tests/operators/test_dbt_operator.py index 8ce2c5f..78604d1 100644 --- a/tests/operators/test_dbt_operator.py +++ b/tests/operators/test_dbt_operator.py @@ -7,7 +7,8 @@ DbtSnapshotOperator, DbtRunOperator, DbtTestOperator, - DbtDepsOperator + DbtDepsOperator, + DbtCleanOperator, ) @@ -64,3 +65,12 @@ def test_dbt_deps(self, mock_run_cli): ) operator.execute(None) mock_run_cli.assert_called_once_with('deps') + + @mock.patch.object(DbtCliHook, 'run_cli') + def test_dbt_clean(self, mock_run_cli): + operator = DbtCleanOperator( + task_id='clean', + dag=self.dag + ) + operator.execute(None) + mock_run_cli.assert_called_once_with('clean') From 2e3b6dab52d1c3dd276ab6ad06585a8ffdbfb450 Mon Sep 17 00:00:00 2001 From: Fenimore Love Date: Fri, 22 Oct 2021 15:07:24 -0400 Subject: [PATCH 2/2] Update readme --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 542da9d..884b6be 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,8 @@ There are five operators currently implemented: * Calls [`dbt run`](https://docs.getdbt.com/docs/run) * `DbtTestOperator` * Calls [`dbt test`](https://docs.getdbt.com/docs/test) +* `DbtCleanOperator` + * Calls [`dbt clean`](https://docs.getdbt.com/docs/clean) Each of the above operators accept the following arguments: