diff --git a/README.md b/README.md index 9a4368d..884b6be 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,8 @@ from airflow_dbt.operators.dbt_operator import ( DbtSeedOperator, DbtSnapshotOperator, DbtRunOperator, - DbtTestOperator + DbtTestOperator, + DbtCleanOperator, ) from airflow.utils.dates import days_ago @@ -36,7 +37,11 @@ with DAG(dag_id='dbt', default_args=default_args, schedule_interval='@daily') as retries=0, # Failing tests would fail the task, and we don't want Airflow to try again ) - dbt_seed >> dbt_snapshot >> dbt_run >> dbt_test + dbt_clean = DbtCleanOperator( + task_id='dbt_clean, + ) + + dbt_seed >> dbt_snapshot >> dbt_run >> dbt_test >> dbt_clean ``` ## Installation @@ -65,6 +70,8 @@ There are five operators currently implemented: * Calls [`dbt run`](https://docs.getdbt.com/docs/run) * `DbtTestOperator` * Calls [`dbt test`](https://docs.getdbt.com/docs/test) +* `DbtCleanOperator` + * Calls [`dbt clean`](https://docs.getdbt.com/docs/clean) Each of the above operators accept the following arguments: diff --git a/airflow_dbt/operators/dbt_operator.py b/airflow_dbt/operators/dbt_operator.py index 6233d8d..46c9679 100644 --- a/airflow_dbt/operators/dbt_operator.py +++ b/airflow_dbt/operators/dbt_operator.py @@ -142,3 +142,12 @@ def __init__(self, profiles_dir=None, target=None, *args, **kwargs): def execute(self, context): self.create_hook().run_cli('deps') + + +class DbtCleanOperator(DbtBaseOperator): + @apply_defaults + def __init__(self, profiles_dir=None, target=None, *args, **kwargs): + super(DbtCleanOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + + def execute(self, context): + self.create_hook().run_cli('clean') diff --git a/tests/operators/test_dbt_operator.py b/tests/operators/test_dbt_operator.py index 8ce2c5f..78604d1 100644 --- a/tests/operators/test_dbt_operator.py +++ b/tests/operators/test_dbt_operator.py @@ -7,7 +7,8 @@ DbtSnapshotOperator, DbtRunOperator, DbtTestOperator, - DbtDepsOperator + DbtDepsOperator, + DbtCleanOperator, ) @@ -64,3 +65,12 @@ def test_dbt_deps(self, mock_run_cli): ) operator.execute(None) mock_run_cli.assert_called_once_with('deps') + + @mock.patch.object(DbtCliHook, 'run_cli') + def test_dbt_clean(self, mock_run_cli): + operator = DbtCleanOperator( + task_id='clean', + dag=self.dag + ) + operator.execute(None) + mock_run_cli.assert_called_once_with('clean')